基于flink+hudi+hive实现湖仓一体化实战

hivefans
20 min readSep 16, 2021

一、环境准备

hadoop 2.10.1

hive 2.3.6

hudi 0.9

flink 1.12.2

各组件的安装配置就不介绍了,可以google搜索下就能找到

二、flink+hudi实战步骤

  1. flink 单机或者集群配置好,下载hudi依赖包

wget https://repo.maven.apache.org/maven2/org/apache/hudi/hudi-flink-bundle_2.11/0.9.0/hudi-flink-bundle_2.11-0.9.0.jar

放到 $FLINK_HOME/lib目录下,注意scala版本对应,然后启动flink服务

启动flink sql客户端

#HADOOP_HOME是解压二进制包后的hadoop根目录。 
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
#启动flink单机集群
./bin/sql-client.sh embedded

2. 创建flink 批量模式表

CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://node1:8020/hudi/t1'
);

CREATE TABLE t3(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://node1:8020/hudi/t2',
'table.type' = 'MERGE_ON_READ',
'compaction.tasks'='5'
);

插入数据

INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');


#查询表数据,设置一下查询模式为tableau
set execution.result-mode=tableau;

3. 创建flink stream模式表

CREATE TABLE t2(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://node1:8020/hudi/t1',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true',
'read.streaming.check-interval' = '4'
);

这里将 table option read.streaming.enabled 设置为 true,表明通过 streaming 的方式读取表数据;
opiton read.streaming.check-interval 指定了 source 监控新的 commits 的间隔为 4s;
option table.type 设置表类型为 MERGE_ON_READ,目前只有 MERGE_ON_READ 表支持 streaming 读

从批模式写入一条数据

insert into t1 values ('id9','yangge',27,TIMESTAMP '1970-01-01 00:00:01','par5');

隔几秒后在流模式可以读取到一条新增的数据

三、flink+hudi+hive整合同步实战

  1. 编译hudi源码支持hive

hudi默认支持hive编译版本为2.3.1,因为版本不一致需要编译hudi

编辑hudi源码根目录中pom.xml,修改其中hive版本

<hive.version>2.3.6</hive.version>

编辑hudi源码目录中packaging/hudi-flink-bundle中的pom.xml,修改其中hive版本

<hive.version>2.3.6</hive.version>

执行以下命令编译Hudi

mvn install -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive2

2. 配置hive环境

在 Hive 安装目录下创建 auxlib/ 文件夹,并把hudi 编译后packaging/hudi-hadoop-mr-bundle/target目录下的hudi-hadoop-mr-bundle-0.9.0.jar 放入其中

启动hive metastore与hive server2服务

nohup /opt/hive/bin/hive --service metastore & 

nohup /opt/hive/bin/hiveserver2 &

3. hive配置模板

Flink hive sync 现在支持两种 hive sync mode, 分别是 hms 和 jdbc 模式。 其中 hms 只需要配置 metastore uris;而 jdbc 模式需要同时配置 jdbc 属性 和 metastore uris,具体配置模版如下:

源表的建表语句:

1.批表的建表语句
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://node1:8020/hudi/t1'
);

2.流表的建表语句
CREATE TABLE t2(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://node1:8020/hudi/t2',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true',
'read.streaming.start-commit' = '20210901151206' ,
'read.streaming.check-interval' = '4'
);

hms mode 配置

1.批模式hms模式的建表语句:
CREATE TABLE t11(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
with(
'connector'='hudi',
'path' ='hdfs://node1:8020/hudi/t1',
'table.type'='COPY_ON_WRITE', -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出
'hive_sync.enable'='true', -- required,开启hive同步功能
'hive_sync.mode' = 'hms', -- required, 将hive sync mode设置为hms, 默认jdbc
'hive_sync.metastore.uris' = 'thrift://node1:9083', -- required, metastore的端口
'hive_sync.table'='t11', -- required, hive 新建的表名
'hive_sync.db'='default' -- required, hive 新建的数据库名
);

2.流模式的hms模式的建表语句并且指定了并行度:
CREATE TABLE t12(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
with(
'connector'='hudi',
'path' ='hdfs://node1:8020/hudi/t1',
'table.type'='COPY_ON_WRITE', -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出
'hive_sync.enable'='true', -- required,开启hive同步功能
'hive_sync.mode' = 'hms', -- required, 将hive sync mode设置为hms, 默认jdbc
'read.streaming.enabled' = 'true',
'read.streaming.start-commit' = '20210901151206' ,
'read.streaming.check-interval' = '4',
'hive_sync.metastore.uris' = 'thrift://node1:9083' ,
'hive_sync.table'='t12', -- required, hive 新建的表名
'hive_sync.db'='default', -- required, hive 新建的数据库名
'write.tasks'='1',
'compaction.tasks'='1'
);

3.t2表的批模式hms模式的建表语句:
CREATE TABLE t13(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
with(
'connector'='hudi',
'path' ='hdfs://node1:8020/hudi/t2',
'table.type'='MERGE_ON_READ', -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出
'hive_sync.enable'='true', -- required,开启hive同步功能
'hive_sync.mode' = 'hms', -- required, 将hive sync mode设置为hms, 默认jdbc
'hive_sync.metastore.uris' = 'thrift://node1:9083', -- required, metastore的端口
'hive_sync.table'='t13', -- required, hive 新建的表名
'hive_sync.db'='default' -- required, hive 新建的数据库名
);

4.t2表的流模式的hms模式的建表语句并且指定了并行度:
CREATE TABLE t14(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
with(
'connector'='hudi',
'path' ='hdfs://node1:8020/hudi/t2',
'table.type'='MERGE_ON_READ', -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出
'hive_sync.enable'='true', -- required,开启hive同步功能
'hive_sync.mode' = 'hms', -- required, 将hive sync mode设置为hms, 默认jdbc
'read.streaming.enabled' = 'true',
'read.streaming.start-commit' = '20210901151206' ,
'read.streaming.check-interval' = '4',
'hive_sync.metastore.uris' = 'thrift://node1:9083' ,
'hive_sync.table'='t14', -- required, hive 新建的表名
'hive_sync.db'='default', -- required, hive 新建的数据库名
'write.tasks'='1',
'compaction.tasks'='1'
);

jdbc mode 配置

1.批模式 jdbc mode模式的建表语句:
CREATE TABLE t15(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
with(
'connector'='hudi',
'path' = 'hdfs://node1:8020/hudi/t1',
'table.type'='COPY_ON_WRITE', -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出
'hive_sync.enable'='true', -- required,开启hive同步功能
'hive_sync.metastore.uris' = 'thrift://node1:9083', -- required, metastore的端口
'hive_sync.jdbc_url'='jdbc:hive2://node1:10000', -- required, hiveServer地址
'hive_sync.table'='t15', -- required, hive 新建的表名
'hive_sync.db'='default', -- required, hive 新建的数据库名
'hive_sync.username'='root', -- required, HMS 用户名
'hive_sync.password'='123456'
);

2.流模式 jdbc mode模式的建表语句带并行度:
CREATE TABLE t16(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
with(
'connector'='hudi',
'path' = 'hdfs://node1:8020/hudi/t1',
'table.type'='COPY_ON_WRITE', -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出
'hive_sync.enable'='true', -- required,开启hive同步功能
'hive_sync.metastore.uris' = 'thrift://node1:9083', -- required, metastore的端口
'hive_sync.jdbc_url'='jdbc:hive2://node1:10000', -- required, hiveServer地址
'hive_sync.table'='t16', -- required, hive 新建的表名
'hive_sync.db'='default', -- required, hive 新建的数据库名
'hive_sync.username'='root', -- required, HMS 用户名
'hive_sync.password'='123456' ,
'read.streaming.enabled' = 'true',
'read.streaming.start-commit' = '20210901151206' ,
'read.streaming.check-interval' = '4' ,
'write.tasks'='1',
'compaction.tasks'='1' -- required, HMS 密码
);


3.关于t2的批模式 jdbc mode模式的建表语句:
CREATE TABLE t17(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
with(
'connector'='hudi',
'path' ='hdfs://node1:8020/hudi/t2',
'table.type'='MERGE_ON_READ', -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出
'hive_sync.enable'='true', -- required,开启hive同步功能
'hive_sync.metastore.uris' = 'thrift://node1:9083', -- required, metastore的端口
'hive_sync.jdbc_url'='jdbc:hive2://node1:10000', -- required, hiveServer地址
'hive_sync.table'='t17', -- required, hive 新建的表名
'hive_sync.db'='default', -- required, hive 新建的数据库名
'hive_sync.username'='root', -- required, HMS 用户名
'hive_sync.password'='123456' ,
'write.tasks'='1',
'compaction.tasks'='1'
);

4.关于t2的流模式 jdbc mode模式的建表语句:
CREATE TABLE t18(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector'='hudi',
'path' ='hdfs://node1:8020/hudi/t2',
'table.type'='MERGE_ON_READ', -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出
'hive_sync.enable'='true', -- required,开启hive同步功能
'hive_sync.metastore.uris' = 'thrift://node1:9083', -- required, metastore的端口
'hive_sync.jdbc_url'='jdbc:hive2://node1:10000', -- required, hiveServer地址
'hive_sync.table'='t18', -- required, hive 新建的表名
'hive_sync.db'='default', -- required, hive 新建的数据库名
'hive_sync.username'='root', -- required, HMS 用户名
'hive_sync.password'='123456' ,
'write.tasks'='1',
'compaction.tasks'='1',
'read.streaming.enabled' = 'true',
'read.streaming.start-commit' = '20210901151206' ,
'read.streaming.check-interval' = '4'
);

注意:在FlinkSQL建hive同步表,数据不会自动同步,还需要手动记性insert into插入,例如

insert into t15 select *  from t1;

使用beeline或者hive命令行查询,需要设置参数

set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;

如果感觉这两种模式需要手动同步的话,可以使用hive外部表方式

创建表语句

CREATE EXTERNAL TABLE `test`(               
`_hoodie_commit_time` string,
`_hoodie_commit_seqno` string,
`_hoodie_record_key` string,
`_hoodie_partition_path` string,
`_hoodie_file_name` string,
`uuid` string,
`name` string,
`age` bigint,
`ts` bigint)
PARTITIONED BY (
`partition` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'hdfs://data1:9000/hudi/t3/par2';

添加分区

alter table test add if not exists partition(`partition`='par2') location 'hdfs://data1:9000/hudi/t3/par2';

创建好外部表和增加分区后,可以查询到外部表数据

--

--