Environment configuration
- edit spark-defaults.conf file. add the following configuration
spark.executor.extraClassPath=/home/work/hbase/lib/
spark.driver.extraClassPath=/home/work/hbase/lib/
2. save the following code as hbase_df.py
#!/usr/bin/python
# coding=utf8
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import time
import json,requests
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark import SparkContext,SparkConf
from pyspark.sql import Row
import pyspark.sql.functions as F
conf = SparkConf().setAppName("spark read hbase")
sc = SparkContext(conf=conf)
conf = {
"hbase.zookeeper.quorum": "node1,node2,node3",
"hbase.mapreduce.inputtable": "test",
"hbase.mapreduce.scan.row.start": "1541692800"
"hbase.mapreduce.scan.row.stop": "1541692900"
}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbaserdd = sc.newAPIHadoopRDD( \
"org.apache.hadoop.hbase.mapreduce.TableInputFormat", \
"org.apache.hadoop.hbase.io.ImmutableBytesWritable", \
"org.apache.hadoop.hbase.client.Result", \
keyConverter=keyConv, \
valueConverter=valueConv, \
conf=conf)
spark = SparkSession(sc)
def call_transfor(jsonstr):
jsonlist = [json.loads(i) for i in jsonstr]
fdc = {}
for col in jsonlist:
colname = col['qualifier']
value = col['value']
fdc[colname] = value
return fdc
def rdd_to_df(hbase_rdd):
fdc_split = hbase_rdd.map(lambda x:(x[0],x[1].split('\n')))
fdc_cols = fdc_split.map(lambda x:(x[0],call_transfor(x[1])))
colnames = ['row_key'] + fdc_cols.map(lambda x:[i for i in x[1]]).take(1)[0]
fdc_dataframe = fdc_cols.map(lambda x:[x[0]]+[x[1][i] for i in x[1]]).toDF(colnames)
#fdc_rdd = fdc_cols.map(lambda x:[x[0]]+[x[1][i] for i in x[1]])
return fdc_dataframe
fdc_data = rdd_to_df(hbaserdd)
3. run hbase_df.py
pyspark hbase_df.py
We can use this method to read hbase and convert to spark dataframe, do not need use other project or develop