基于flink+hudi+hive实现湖仓一体化实战

  1. flink 单机或者集群配置好,下载hudi依赖包
#HADOOP_HOME是解压二进制包后的hadoop根目录。 
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
#启动flink单机集群
./bin/sql-client.sh embedded
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://node1:8020/hudi/t1'
);

CREATE TABLE t3(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://node1:8020/hudi/t2',
'table.type' = 'MERGE_ON_READ',
'compaction.tasks'='5'
);
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');


#查询表数据,设置一下查询模式为tableau
set execution.result-mode=tableau;
CREATE TABLE t2(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://node1:8020/hudi/t1',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true',
'read.streaming.check-interval' = '4'
);

这里将 table option read.streaming.enabled 设置为 true,表明通过 streaming 的方式读取表数据;
opiton read.streaming.check-interval 指定了 source 监控新的 commits 的间隔为 4s;
option table.type 设置表类型为 MERGE_ON_READ,目前只有 MERGE_ON_READ 表支持 streaming 读

从批模式写入一条数据

insert into t1 values ('id9','yangge',27,TIMESTAMP '1970-01-01 00:00:01','par5');
  1. 编译hudi源码支持hive
mvn install -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive2
nohup /opt/hive/bin/hive --service metastore & 

nohup /opt/hive/bin/hiveserver2 &

源表的建表语句:

1.批表的建表语句
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://node1:8020/hudi/t1'
);

2.流表的建表语句
CREATE TABLE t2(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://node1:8020/hudi/t2',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true',
'read.streaming.start-commit' = '20210901151206' ,
'read.streaming.check-interval' = '4'
);

hms mode 配置

1.批模式hms模式的建表语句:
CREATE TABLE t11(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
with(
'connector'='hudi',
'path' ='hdfs://node1:8020/hudi/t1',
'table.type'='COPY_ON_WRITE', -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出
'hive_sync.enable'='true', -- required,开启hive同步功能
'hive_sync.mode' = 'hms', -- required, 将hive sync mode设置为hms, 默认jdbc
'hive_sync.metastore.uris' = 'thrift://node1:9083', -- required, metastore的端口
'hive_sync.table'='t11', -- required, hive 新建的表名
'hive_sync.db'='default' -- required, hive 新建的数据库名
);

2.流模式的hms模式的建表语句并且指定了并行度:
CREATE TABLE t12(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
with(
'connector'='hudi',
'path' ='hdfs://node1:8020/hudi/t1',
'table.type'='COPY_ON_WRITE', -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出
'hive_sync.enable'='true', -- required,开启hive同步功能
'hive_sync.mode' = 'hms', -- required, 将hive sync mode设置为hms, 默认jdbc
'read.streaming.enabled' = 'true',
'read.streaming.start-commit' = '20210901151206' ,
'read.streaming.check-interval' = '4',
'hive_sync.metastore.uris' = 'thrift://node1:9083' ,
'hive_sync.table'='t12', -- required, hive 新建的表名
'hive_sync.db'='default', -- required, hive 新建的数据库名
'write.tasks'='1',
'compaction.tasks'='1'
);

3.t2表的批模式hms模式的建表语句:
CREATE TABLE t13(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
with(
'connector'='hudi',
'path' ='hdfs://node1:8020/hudi/t2',
'table.type'='MERGE_ON_READ', -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出
'hive_sync.enable'='true', -- required,开启hive同步功能
'hive_sync.mode' = 'hms', -- required, 将hive sync mode设置为hms, 默认jdbc
'hive_sync.metastore.uris' = 'thrift://node1:9083', -- required, metastore的端口
'hive_sync.table'='t13', -- required, hive 新建的表名
'hive_sync.db'='default' -- required, hive 新建的数据库名
);

4.t2表的流模式的hms模式的建表语句并且指定了并行度:
CREATE TABLE t14(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
with(
'connector'='hudi',
'path' ='hdfs://node1:8020/hudi/t2',
'table.type'='MERGE_ON_READ', -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出
'hive_sync.enable'='true', -- required,开启hive同步功能
'hive_sync.mode' = 'hms', -- required, 将hive sync mode设置为hms, 默认jdbc
'read.streaming.enabled' = 'true',
'read.streaming.start-commit' = '20210901151206' ,
'read.streaming.check-interval' = '4',
'hive_sync.metastore.uris' = 'thrift://node1:9083' ,
'hive_sync.table'='t14', -- required, hive 新建的表名
'hive_sync.db'='default', -- required, hive 新建的数据库名
'write.tasks'='1',
'compaction.tasks'='1'
);

jdbc mode 配置

1.批模式 jdbc mode模式的建表语句:
CREATE TABLE t15(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
with(
'connector'='hudi',
'path' = 'hdfs://node1:8020/hudi/t1',
'table.type'='COPY_ON_WRITE', -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出
'hive_sync.enable'='true', -- required,开启hive同步功能
'hive_sync.metastore.uris' = 'thrift://node1:9083', -- required, metastore的端口
'hive_sync.jdbc_url'='jdbc:hive2://node1:10000', -- required, hiveServer地址
'hive_sync.table'='t15', -- required, hive 新建的表名
'hive_sync.db'='default', -- required, hive 新建的数据库名
'hive_sync.username'='root', -- required, HMS 用户名
'hive_sync.password'='123456'
);

2.流模式 jdbc mode模式的建表语句带并行度:
CREATE TABLE t16(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
with(
'connector'='hudi',
'path' = 'hdfs://node1:8020/hudi/t1',
'table.type'='COPY_ON_WRITE', -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出
'hive_sync.enable'='true', -- required,开启hive同步功能
'hive_sync.metastore.uris' = 'thrift://node1:9083', -- required, metastore的端口
'hive_sync.jdbc_url'='jdbc:hive2://node1:10000', -- required, hiveServer地址
'hive_sync.table'='t16', -- required, hive 新建的表名
'hive_sync.db'='default', -- required, hive 新建的数据库名
'hive_sync.username'='root', -- required, HMS 用户名
'hive_sync.password'='123456' ,
'read.streaming.enabled' = 'true',
'read.streaming.start-commit' = '20210901151206' ,
'read.streaming.check-interval' = '4' ,
'write.tasks'='1',
'compaction.tasks'='1' -- required, HMS 密码
);


3.关于t2的批模式 jdbc mode模式的建表语句:
CREATE TABLE t17(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
with(
'connector'='hudi',
'path' ='hdfs://node1:8020/hudi/t2',
'table.type'='MERGE_ON_READ', -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出
'hive_sync.enable'='true', -- required,开启hive同步功能
'hive_sync.metastore.uris' = 'thrift://node1:9083', -- required, metastore的端口
'hive_sync.jdbc_url'='jdbc:hive2://node1:10000', -- required, hiveServer地址
'hive_sync.table'='t17', -- required, hive 新建的表名
'hive_sync.db'='default', -- required, hive 新建的数据库名
'hive_sync.username'='root', -- required, HMS 用户名
'hive_sync.password'='123456' ,
'write.tasks'='1',
'compaction.tasks'='1'
);

4.关于t2的流模式 jdbc mode模式的建表语句:
CREATE TABLE t18(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector'='hudi',
'path' ='hdfs://node1:8020/hudi/t2',
'table.type'='MERGE_ON_READ', -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出
'hive_sync.enable'='true', -- required,开启hive同步功能
'hive_sync.metastore.uris' = 'thrift://node1:9083', -- required, metastore的端口
'hive_sync.jdbc_url'='jdbc:hive2://node1:10000', -- required, hiveServer地址
'hive_sync.table'='t18', -- required, hive 新建的表名
'hive_sync.db'='default', -- required, hive 新建的数据库名
'hive_sync.username'='root', -- required, HMS 用户名
'hive_sync.password'='123456' ,
'write.tasks'='1',
'compaction.tasks'='1',
'read.streaming.enabled' = 'true',
'read.streaming.start-commit' = '20210901151206' ,
'read.streaming.check-interval' = '4'
);
insert into t15 select *  from t1;
set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
CREATE EXTERNAL TABLE `test`(               
`_hoodie_commit_time` string,
`_hoodie_commit_seqno` string,
`_hoodie_record_key` string,
`_hoodie_partition_path` string,
`_hoodie_file_name` string,
`uuid` string,
`name` string,
`age` bigint,
`ts` bigint)
PARTITIONED BY (
`partition` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'hdfs://data1:9000/hudi/t3/par2';
alter table test add if not exists partition(`partition`='par2') location 'hdfs://data1:9000/hudi/t3/par2';

--

--

--

spark kafak flink develop

Love podcasts or audiobooks? Learn on the go with our new app.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
hivefans

hivefans

spark kafak flink develop

More from Medium

Resolution of Critical vulnerability in log4j2 for CDH, HDP, HDF, and CDP

Micro architectural analysis of in memory OLTP Revisited

Data Exploration with KSQL

Training pipeline orchestration with Kubeflow pipelines