经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » MySQL » 查看文章
canal
来源:cnblogs  作者:废物大师兄  时间:2022/1/17 11:00:49  对本文有异议

1.  canal 简介

canal 主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

canal 工作原理:

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

官网地址:https://github.com/alibaba/canal

2.  canal 服务端

 (1)修改 /etc/my.cnf 文件,增加以下配置(PS:改完后别忘了重启数据库 systemctl restart mysqld )

  1. [mysqld]
  2. log-bin=mysql-bin # 开启 binlog
  3. binlog-format=ROW # 选择 ROW 模式
  4. server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

备忘录

  1. show variables like 'log_bin';
  2. show binary logs;
  3. show master status;
  4. select * from mysql.`user`;

(2)授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

  1. CREATE USER canal IDENTIFIED BY 'Canal@123456';
  2. GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
  3. -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
  4. FLUSH PRIVILEGES;

 (3)下载canal,并解压缩

https://github.com/alibaba/canal/releases

(4)修改配置

  1. vim conf/example/instance.properties

启动

  1. sh bin/startup.sh

查看日志

  1. # 查看 server 日志
  2. vi logs/canal/canal.log
  3. # 查看 instance 的日志
  4. vi logs/example/example.log
  5. # 关闭
  6. sh bin/stop.sh

3. canal 客户端

依赖

  1. <dependency>
  2. <groupId>com.alibaba.otter</groupId>
  3. <artifactId>canal.client</artifactId>
  4. <version>1.1.5</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.alibaba.otter</groupId>
  8. <artifactId>canal.protocol</artifactId>
  9. <version>1.1.5</version>
  10. </dependency>

配置

  1. canal.server.host=192.168.28.32
  2. canal.server.port=11111
  3. canal.server.destination=example
  4. canal.server.username=canal
  5. canal.server.password=canal
  6. spring.kafka.bootstrap-servers=192.168.28.32:9092

logback.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <configuration scan="true" scanPeriod="30 seconds" debug="false">
  3. <property name="log.charset" value="utf-8" />
  4. <property name="log.pattern" value="%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n" />
  5. <property name="log.dir" value="./logs" />
  6. <!--输出到控制台-->
  7. <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
  8. <encoder>
  9. <pattern>${log.pattern}</pattern>
  10. <charset>${log.charset}</charset>
  11. </encoder>
  12. </appender>
  13. <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
  14. <file>${log.dir}/my-canal-client.log</file>
  15. <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
  16. <fileNamePattern>${log.dir}/my-canal-client.%d{yyyy-MM-dd}.log</fileNamePattern>
  17. <maxHistory>30</maxHistory>
  18. <totalSizeCap>3GB</totalSizeCap>
  19. </rollingPolicy>
  20. <encoder>
  21. <pattern>${log.pattern}</pattern>
  22. </encoder>
  23. </appender>
  24. <root level="info">
  25. <appender-ref ref="console" />
  26. <appender-ref ref="file" />
  27. </root>
  28. </configuration>

测试代码

  1. package com.my.component.canal;
  2. import com.alibaba.otter.canal.client.CanalConnector;
  3. import com.alibaba.otter.canal.protocol.CanalEntry;
  4. import com.alibaba.otter.canal.protocol.Message;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.util.Assert;
  7. import java.util.List;
  8. /**
  9. * @Author ChengJianSheng
  10. * @Date 2022/1/13
  11. */
  12. @Slf4j
  13. public class Demo {
  14. private volatile boolean running = false;
  15. private CanalConnector connector;
  16. private Thread thread = null;
  17. public Demo(CanalConnector connector) {
  18. this.connector = connector;
  19. }
  20. public void start() {
  21. Assert.notNull(connector, "connector is null");
  22. thread = new Thread(this::process);
  23. running = true;
  24. thread.start();
  25. }
  26. public void stop() {
  27. if (!running) {
  28. return;
  29. }
  30. running = false;
  31. if (thread != null) {
  32. try {
  33. thread.join();
  34. } catch (InterruptedException e) {
  35. // ignore
  36. }
  37. }
  38. }
  39. private void process() {
  40. int batchSize = 5 * 1024;
  41. while (running) {
  42. try {
  43. connector.connect();
  44. connector.subscribe(".*\\..*");
  45. while (running) {
  46. Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
  47. long batchId = message.getId();
  48. int size = message.getEntries().size();
  49. if (batchId == -1 || size == 0) {
  50. // try {
  51. // Thread.sleep(1000);
  52. // } catch (InterruptedException e) {
  53. // }
  54. } else {
  55. printEntry(message.getEntries());
  56. }
  57. if (batchId != -1) {
  58. connector.ack(batchId); // 提交确认
  59. }
  60. }
  61. } catch (Throwable e) {
  62. log.error("process error!", e);
  63. try {
  64. Thread.sleep(1000L);
  65. } catch (InterruptedException e1) {
  66. // ignore
  67. }
  68. connector.rollback(); // 处理失败, 回滚数据
  69. } finally {
  70. connector.disconnect();
  71. }
  72. }
  73. }
  74. private void printEntry(List<CanalEntry.Entry> entrys) {
  75. for (CanalEntry.Entry entry : entrys) {
  76. if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
  77. continue;
  78. }
  79. CanalEntry.RowChange rowChage = null;
  80. try {
  81. rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
  82. } catch (Exception e) {
  83. throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
  84. }
  85. CanalEntry.EventType eventType = rowChage.getEventType();
  86. System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
  87. entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
  88. entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
  89. eventType));
  90. for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
  91. if (eventType == CanalEntry.EventType.DELETE) {
  92. printColumn(rowData.getBeforeColumnsList());
  93. } else if (eventType == CanalEntry.EventType.INSERT) {
  94. printColumn(rowData.getAfterColumnsList());
  95. } else {
  96. System.out.println("-------> before");
  97. printColumn(rowData.getBeforeColumnsList());
  98. System.out.println("-------> after");
  99. printColumn(rowData.getAfterColumnsList());
  100. }
  101. }
  102. }
  103. }
  104. private void printColumn(List<CanalEntry.Column> columns) {
  105. for (CanalEntry.Column column : columns) {
  106. System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
  107. }
  108. }
  109. }

启动

  1. package com.my.component.canal;
  2. import com.alibaba.otter.canal.client.CanalConnector;
  3. import com.alibaba.otter.canal.client.CanalConnectors;
  4. import com.alibaba.otter.canal.common.utils.AddressUtils;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.boot.CommandLineRunner;
  8. import org.springframework.kafka.core.KafkaTemplate;
  9. import org.springframework.stereotype.Component;
  10. import java.net.InetSocketAddress;
  11. import java.net.SocketAddress;
  12. /**
  13. * @Author ChengJianSheng
  14. * @Date 2022/1/4
  15. */
  16. @Slf4j
  17. @Component
  18. public class MyCommandLineRunner implements CommandLineRunner {
  19. @Autowired
  20. private CanalServerProperties canalServerProperties;
  21. @Autowired
  22. private KafkaTemplate<String, String> kafkaTemplate;
  23. @Override
  24. public void run(String... args) throws Exception {
  25. String destination = "example";
  26. // String ip = AddressUtils.getHostIp();
  27. // String ip = canalServerProperties.getHost();
  28. String ip = "192.168.28.32";
  29. SocketAddress address = new InetSocketAddress(ip, 11111);
  30. CanalConnector connector = CanalConnectors.newSingleConnector(address, destination, "canal", "canal");
  31. Demo demo = new Demo(connector);
  32. demo.start();
  33. Runtime.getRuntime().addShutdownHook(new Thread(() -> {
  34. try {
  35. log.info("## stop the canal client");
  36. demo.stop();
  37. } catch (Throwable e) {
  38. log.warn("##something goes wrong when stopping canal:", e);
  39. } finally {
  40. log.info("## canal client is down.");
  41. }
  42. }));
  43. }
  44. }

 

原文链接:http://www.cnblogs.com/cjsblog/p/15799448.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号