[TOC]
本项目代码,可用于运行在 Amazon Kinesis Data Analytics 中。
通过FlinkCDC实时采集Postgresql的数据,写入Kafka。
通过FlinkSQL,将mysql数据实时摄入 Iceberg。
由于KDA与Iceberg集成存在问题,会遇到类似 java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
这样的异常错误,可参见 #3044。本项目提供 workaround 解决该问题。
解决方案
- 参考 pom.xml 文件,通过 relocation 将冲突的类替换。
- 重写 HadoopUtils 类
消费Kafka的数据,以Hudi格式写入S3。MSK 先用无认证的模式。
执行:
-
编译后将target目录下的 jar文件 上传至S3目录下。
-
创建Kinesis Data Application项目,Flink 选择 1.5 版本。
-
VPC 选择 MSK 所在的VPC
-
Runtime properties 配置如下
Group Key Value FlinkApplicationProperties brokers MSK Boostrap Server FlinkApplicationProperties kafka-topic 需要消费的topic name FlinkApplicationProperties s3Path Hudi写入的S3目录(用s3a:例如s3a://[your bucket name]/data) FlinkApplicationProperties hivemetastore 用于同步hive元数据的thriftserver
保存配置后,点击【Run】