pyspark read hbase Convert to dataframe

  1. edit spark-defaults.conf file. add the following configuration
# coding=utf8
import sys
import time
import json,requests
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark import SparkContext,SparkConf
from pyspark.sql import Row
import pyspark.sql.functions as F

conf = SparkConf().setAppName("spark read hbase")
sc = SparkContext(conf=conf)

conf = {
"hbase.zookeeper.quorum": "node1,node2,node3",
"hbase.mapreduce.inputtable": "test",
"hbase.mapreduce.scan.row.start": "1541692800"
"hbase.mapreduce.scan.row.stop": "1541692900"

keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbaserdd = sc.newAPIHadoopRDD( \
"org.apache.hadoop.hbase.mapreduce.TableInputFormat", \
"", \
"org.apache.hadoop.hbase.client.Result", \
keyConverter=keyConv, \
valueConverter=valueConv, \
spark = SparkSession(sc)

def call_transfor(jsonstr):
jsonlist = [json.loads(i) for i in jsonstr]
fdc = {}
for col in jsonlist:
colname = col['qualifier']
value = col['value']
fdc[colname] = value
return fdc

def rdd_to_df(hbase_rdd):
fdc_split = x:(x[0],x[1].split('\n')))
fdc_cols = x:(x[0],call_transfor(x[1])))
colnames = ['row_key'] + x:[i for i in x[1]]).take(1)[0]
fdc_dataframe = x:[x[0]]+[x[1][i] for i in x[1]]).toDF(colnames)
#fdc_rdd = x:[x[0]]+[x[1][i] for i in x[1]])
return fdc_dataframe

fdc_data = rdd_to_df(hbaserdd)



Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store