经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Flink » 查看文章
一文解开主流开源变更数据捕获技术之Flink CDC的入门使用
来源:cnblogs  作者:itxiaoshen  时间:2023/6/28 8:50:01  对本文有异议

@

概述

定义

flink-cdc-connectors 官网 https://github.com/ververica/flink-cdc-connectors 源码release最新版本2.4.0

flink-cdc-connectors 文档地址 https://ververica.github.io/flink-cdc-connectors/master/

flink-cdc-connectors 源码地址 https://github.com/ververica/flink-cdc-connectors

CDC Connectors for Apache Flink 是Apache Flink的一组源连接器,使用更改数据捕获(CDC)从不同的数据库摄取更改,其集成了Debezium作为捕获数据变化的引擎,因此它可以充分利用Debezium的能力。

Flink CDC是由Flink社区开发的flink-cdc-connectors 的source组件,基于数据库日志的 Change Data Caputre 技术,实现了从 MySQL、PostgreSQL 等数据库全量和增量的一体化读取能力,并借助 Flink 优秀的管道能力和丰富的上下游生态,支持捕获多种数据库的变更,并将这些变更实时同步到下游存储。

什么是CDC?

这里也简单说明下,CDC为三个英文Change Data Capture(变更数据捕获)的缩写,核心思想是监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其它服务进行订阅及消费。

CDC的分类

CDC主要分为基于查询的CDC和基于binlog的CDC,两者之间区别主要如下:

image-20230626155037373

特性

  • 支持读取数据库快照,即使发生故障,也只进行一次处理,继续读取事务日志。
  • 数据流API的CDC连接器,用户可以在单个作业中消费多个数据库和表上的更改,而无需部署Debezium和Kafka。
  • 用于表/SQL API的CDC连接器,用户可以使用SQL DDL创建CDC源来监视单个表上的更改。

应用场景

  • 数据分发,将一个数据源分发给多个下游,常用于业务解耦、微服务。
  • 数据集成,将分散异构的数据源集成到数据仓库中,消除数据孤岛,便于后续的分析。
  • 数据迁移,常用于数据库备份、容灾等。

支持数据源

CDC Connectors for Apache Flink支持从多种数据库到Flink摄取快照数据和实时更改,然后转换和下沉到各种下游系统

image-20230626154109523

支撑数据源包括如下:

image-20230626160745676

实战

这里以MySQL作为数据源为例,通过flink-connector-mysql-cdc实现数据变更获取,先准备MySQL环境,这里复用前面<<实时采集MySQL数据之轻量工具Maxwell实操>>的文章环境,数据库有两个my_maxwell_01,my_maxwell_02,每个数据库都有相同account和product表。pom文件引入依赖

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>cn.itxs.flink</groupId>
  7. <artifactId>flink-cdc-demo</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <maven.compiler.source>8</maven.compiler.source>
  11. <maven.compiler.target>8</maven.compiler.target>
  12. <flink.version>1.17.1</flink.version>
  13. <flink.cdc.version>2.4.0</flink.cdc.version>
  14. <mysql.client.version>8.0.29</mysql.client.version>
  15. <fastjson.version>1.2.83</fastjson.version>
  16. </properties>
  17. <dependencies>
  18. <dependency>
  19. <groupId>org.apache.flink</groupId>
  20. <artifactId>flink-streaming-java</artifactId>
  21. <version>${flink.version}</version>
  22. <scope>provided</scope>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.apache.flink</groupId>
  26. <artifactId>flink-clients</artifactId>
  27. <version>${flink.version}</version>
  28. <scope>provided</scope>
  29. </dependency>
  30. <dependency>
  31. <groupId>org.apache.flink</groupId>
  32. <artifactId>flink-connector-base</artifactId>
  33. <version>${flink.version}</version>
  34. <scope>provided</scope>
  35. </dependency>
  36. <dependency>
  37. <groupId>org.apache.flink</groupId>
  38. <artifactId>flink-table-api-java-bridge</artifactId>
  39. <version>${flink.version}</version>
  40. <scope>provided</scope>
  41. </dependency>
  42. <dependency>
  43. <groupId>org.apache.flink</groupId>
  44. <artifactId>flink-table-planner-loader</artifactId>
  45. <version>${flink.version}</version>
  46. <scope>provided</scope>
  47. </dependency>
  48. <dependency>
  49. <groupId>org.apache.flink</groupId>
  50. <artifactId>flink-table-runtime</artifactId>
  51. <version>${flink.version}</version>
  52. <scope>provided</scope>
  53. </dependency>
  54. <dependency>
  55. <groupId>mysql</groupId>
  56. <artifactId>mysql-connector-java</artifactId>
  57. <version>${mysql.client.version}</version>
  58. </dependency>
  59. <dependency>
  60. <groupId>com.ververica</groupId>
  61. <artifactId>flink-connector-mysql-cdc</artifactId>
  62. <version>${flink.cdc.version}</version>
  63. </dependency>
  64. <dependency>
  65. <groupId>com.alibaba</groupId>
  66. <artifactId>fastjson</artifactId>
  67. <version>${fastjson.version}</version>
  68. </dependency>
  69. </dependencies>
  70. <build>
  71. <plugins>
  72. <plugin>
  73. <groupId>org.apache.maven.plugins</groupId>
  74. <artifactId>maven-shade-plugin</artifactId>
  75. <version>3.2.4</version>
  76. <executions>
  77. <execution>
  78. <phase>package</phase>
  79. <goals>
  80. <goal>shade</goal>
  81. </goals>
  82. <configuration>
  83. <artifactSet>
  84. <excludes>
  85. <exclude>com.google.code.findbugs:jsr305</exclude>
  86. <exclude>org.slf4j:*</exclude>
  87. <exclude>log4j:*</exclude>
  88. </excludes>
  89. </artifactSet>
  90. <filters>
  91. <filter>
  92. <!-- Do not copy the signatures in the META-INF folder.
  93. Otherwise, this might cause SecurityExceptions when using the JAR. -->
  94. <artifact>*:*</artifact>
  95. <excludes>
  96. <exclude>META-INF/*.SF</exclude>
  97. <exclude>META-INF/*.DSA</exclude>
  98. <exclude>META-INF/*.RSA</exclude>
  99. </excludes>
  100. </filter>
  101. </filters>
  102. <transformers combine.children="append">
  103. <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
  104. </transformer>
  105. </transformers>
  106. </configuration>
  107. </execution>
  108. </executions>
  109. </plugin>
  110. </plugins>
  111. </build>
  112. </project>

创建DataStreamDemo.java,

  1. package cn.itxs.cdc;
  2. import com.ververica.cdc.connectors.mysql.source.MySqlSource;
  3. import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
  4. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. public class DataStreamDemo {
  7. public static void main(String[] args) throws Exception {
  8. MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
  9. .hostname("mysqlserver")
  10. .port(3306)
  11. .databaseList("my_maxwell_01,my_maxwell_02")
  12. .tableList("my_maxwell_01.*,my_maxwell_02.product")
  13. .username("root")
  14. .password("12345678")
  15. .deserializer(new JsonDebeziumDeserializationSchema()) // 将SourceRecord转换为JSON字符串
  16. .build();
  17. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  18. // 开启checkpoint
  19. env.enableCheckpointing(3000);
  20. env
  21. .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
  22. // 设置平行度为4
  23. .setParallelism(4)
  24. .print().setParallelism(1); // 对sink打印使用并行性1来保持消息顺序
  25. env.execute("Print MySQL Snapshot + Binlog");
  26. }
  27. }

由于上面flink的依赖配置provided,因此在IDEA中启动的话需要勾选下面标红的选项

image-20230627165925042

启动程序,查看日志可以看到从mysql读取目前全量的数据,my_maxwell_02也只读取product表数据

image-20230627170148732

修改两个库的表后可以看到相应修改信息,其中也确认my_maxwell_02的account没有读取变更数据。

image-20230627170435984

  1. {"before":{"id":7,"name":"李丹","age":44},"after":{"id":7,"name":"李丹","age":48},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1687856595000,"snapshot":"false","db":"my_maxwell_01","sequence":null,"table":"account","server_id":1,"gtid":null,"file":"binlog.000025","pos":2798,"row":0,"thread":330184,"query":null},"op":"u","ts_ms":1687856598620,"transaction":null}
  2. {"before":{"id":1,"name":"iphone13","type":1},"after":{"id":1,"name":"iphone14","type":1},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1687856605000,"snapshot":"false","db":"my_maxwell_01","sequence":null,"table":"product","server_id":1,"gtid":null,"file":"binlog.000025","pos":3140,"row":0,"thread":330184,"query":null},"op":"u","ts_ms":1687856608748,"transaction":null}
  3. {"before":{"id":1,"name":"iphone13","type":1},"after":{"id":1,"name":"iphone14","type":1},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1687856628000,"snapshot":"false","db":"my_maxwell_02","sequence":null,"table":"product","server_id":1,"gtid":null,"file":"binlog.000025","pos":3486,"row":0,"thread":330184,"query":null},"op":"u","ts_ms":1687856631643,"transaction":null}

打包后放到集群上,执行

  1. bin/flink run -m hadoop1:8081 -c cn.itxs.cdc.DataStreamDemo ./lib/flink-cdc-demo-1.0-SNAPSHOT.jar

image-20230627185543185

可以看到的日志也成功输出表的全量的日志和刚才修改增量数据

image-20230627185508643

如果需要断点续传可以使用状态后端存储来实现

  1. CheckpointConfig checkpointConfig = env.getCheckpointConfig();
  2. checkpointConfig.setCheckpointStorage("hdfs://hadoop111:9000/checkpoints/flink/cdc");
  3. checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(2));
  4. checkpointConfig.setTolerableCheckpointFailureNumber(5);
  5. checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));
  6. checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

FlinkSQL方式代码示例

创建SqlDemo.java文件

  1. package cn.itxs.cdc;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.table.api.Table;
  5. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  6. import org.apache.flink.types.Row;
  7. public class SqlDemo {
  8. public static void main(String[] args) throws Exception {
  9. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  10. env.setParallelism(1);
  11. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  12. tableEnv.executeSql("CREATE TABLE account (\n" +
  13. " id INT NOT NULL,\n" +
  14. " name STRING,\n" +
  15. " age INT,\n" +
  16. " PRIMARY KEY(id) NOT ENFORCED\n" +
  17. ") WITH (\n" +
  18. " 'connector' = 'mysql-cdc',\n" +
  19. " 'hostname' = 'mysqlserver',\n" +
  20. " 'port' = '3306',\n" +
  21. " 'username' = 'root',\n" +
  22. " 'password' = '12345678',\n" +
  23. " 'database-name' = 'my_maxwell_01',\n" +
  24. " 'table-name' = 'account'\n" +
  25. ");");
  26. Table table = tableEnv.sqlQuery("select * from account");
  27. DataStream<Row> rowDataStream = tableEnv.toChangelogStream(table);
  28. rowDataStream.print("account_binlog====");
  29. env.execute();
  30. }
  31. }

启动程序,查看日志可以看到从mysql读取my_maxwell_01库account表的全量的数据,修改表数据也确认读取变更数据。

image-20230627182647771

  • 本人博客网站IT小神 www.itxiaoshen.com

原文链接:https://www.cnblogs.com/itxiaoshen/p/17510226.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号