如何监听 MySQL 实现数据变化后的实时通知

大家好,我是一安,之前介绍《实际开发中,如何保证数据库和缓存双写一致性》一文中,最后有提到使用Canal监听MySQL,今天就完整的介绍一下。

前言

在日常的工作中有没有遇到这样的场景,很多时候业务数据有变更需要及时加载到缓存、ES 或者发送到消息队列中通知下游服务。

一般遇到这种情况下,在实时性要求不高的场景我们有两种处理模式,一种是写任务定时推送数据同步到缓存中,另一个是下游服务定时自动拉取。这两种模式都依赖服务自己的定时周期时间,很多时候不好设定具体要多久执行一次,定时时间太短在数据没有变化的时候会有很多无效的操作,如果定时时间太长可能很多时候数据的延迟会比较大,某些时候影响也不好。

canal简介

如何监听 MySQL 实现数据变化后的实时通知Canal是阿里巴巴旗下的一款开源项目,纯Java开发,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

工作原理

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

环境准备

小编这里演示canal并推送mysql消息到rabbitmq,有兴趣的可以参考,修改接收方,比如kafka、ES

mysql和rabbitmq安装

安装mysql和rabbitmq,之前有专门介绍两个安装

如何快速搭建一套生产级MYSQL

如何快速搭建一套生产级RabbitMQ

这里直接使用之前搭的环境,使用过程中遇到一点小问题,下面会提到

  • 开启mysql的bin-log日志

小编之前的搭的主从复制,所以是小编这里是开启的,如果你是自己搭的环境,检查是否开启,没有的话开启即可,命令如下:

1.查看mysql是否开启bin-log日志
SHOW VARIABLES LIKE '%log_bin%'
2.没有开启的话,增加如下配置重启mysql
server-id=1
log-bin=mysql-bin
binlog-format=ROW
  • 创建canal用户获取bin-log日志

进入mysql执行如下命令即可:

grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by "canal";
flush privileges;
  • rabbitmq新建一个topic类型交换机canal.topic,然后新增队列:canal.topic, 绑定canal.topic交换机, RoutingKey:canal.topic

canal安装

Docker pull canal/canal-server:v1.1.5
# 创建一个容器
docker run --name canal -p 11111:11111  -v /mydata/canal-server/logs:/home/admin/canal-server/logs -d canal/canal-server:v1.1.5
# 复制容器中的配置文件到本地
docker cp canal:/home/admin/canal-server/conf/canal.properties /mydata/canal-server/conf/
docker cp canal:/home/admin/canal-server/conf/example/instance.properties /mydata/canal-server/conf/

注意:

  • 容器里配置复制到外面是为了后面挂载,以后修改配置不需要每次都进入容器内部
  • 小编这里一开始未复制出容器里配置,而且直接外面建了两个空的配置文件,然后追加了配置信息,启动一直报空指针
2022-08-23 11:06:51.844 [main] ERROR com.alibaba.otter.canal.deployer.CanalLauncher - ## S>>omething goes wrong when starting up the canal Server:
java.lang.NullPointerException: null
       at com.alibaba.otter.canal.deployer.CanalStarter.start(CanalStarter.java:68) ~[can>>al.deployer-1.1.5.jar:na]
       at com.alibaba.otter.canal.deployer.CanalLauncher.main(CanalLauncher.java:117) ~[canal.deployer-1.1.5.jar:na]
  • 查看源码部分后,启动需要加载很多配置,而小编使用的自己建的配置文件,只有部分配置

修改配置

instance.properties

# 不能和mysql重复
canal.instance.mysql.slaveId=2
# 使用mysql的虚拟ip和端口
canal.instance.master.address=192.168.5.128:3307
# binlog日志名称(非必选)
canal.instance.master.journal.name=mysql-bin.000006
# mysql主库链接时起始的binlog偏移量(非必选
canal.instance.master.position=4

# 使用已创建的canal用户
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

# rabbitmq中配置的绑定的 routingkey
canal.mq.topic=canal.topic

canal.properties

目前rabbitMQ没有支持端口配置,默认是5672

canal.serverMode = rabbitMQ
 
rabbitmq.host = 192.168.5.128
rabbitmq.virtual.host = /
# rabbitmq中新建的Exchange
rabbitmq.exchange = canal.topic
rabbitmq.username = guest
rabbitmq.password = guest
#exchange的模式
rabbitmq.deliveryMode = topic

重新创建canal容器并挂在配置文件

1.先停掉原来启动的
  docker stop canal
2.删除原来的容器
  docker rm canal
3.重新创建容器并挂载配置文件
  docker run --name canal -p 11111:11111 -v /mydata/canal-server/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties -v /mydata/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties -v /mydata/canal-server/logs:/home/admin/canal-server/logs -d canal/canal-server:v1.1.5

至此,环境准备完成,开始测试。

演示

新建数据库

  • 新建测试数据库:demo
create database demo;

问题出现了,mysql,rabbitmq,canal都启动正常,但rabbitmq收不到消息,小编这里找了一个小时终于找到了,是之前搭建mysql环境配置的问题,原来配置主从复制时指定了需要同步的数据库,注释掉就可以了(自己刨的坑自己填,大家要注意自己的环境)

#需要同步的数据库
#binlog-do-db=demo_0
#binlog-do-db=demo_1

看一下rabbitmq如何监听 MySQL 实现数据变化后的实时通知

{"data":null,"database":"","es":1661232453000,"id":4,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE DATABASE `demo` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci'","sqlType":null,"table":"","ts":1661232453785,"type":"QUERY"}

新增表

CREATE TABLE `user` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(30) NOT NULL DEFAULT '',
  `address` varchar(200) DEFAULT NULL,
  `email` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

看一下rabbitmq如何监听 MySQL 实现数据变化后的实时通知

{"data":null,"database":"demo","es":1661232589000,"id":5,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE `user` (n  `id` int(11) NOT NULL AUTO_INCREMENT,n  `name` varchar(30) NOT NULL DEFAULT '',n  `address` varchar(200) DEFAULT NULL,n  `email` varchar(100) DEFAULT NULL,n  PRIMARY KEY (`id`)n) ENGINE=InnoDB DEFAULT CHARSET=utf8","sqlType":null,"table":"user","ts":1661232589258,"type":"CREATE"}

新增数据记录

insert into user (name,address,email) values ('lisi','beijing','1234');

看一下rabbitmq如何监听 MySQL 实现数据变化后的实时通知

{"data":[{"id":"1","name":"lisi","address":"beijing","email":"1234"}],"database":"demo","es":1661232730000,"id":6,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(30)","address":"varchar(200)","email":"varchar(100)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"address":12,"email":12},"table":"user","ts":1661232730878,"type":"INSERT"}

数据修改,删除,字段变更就不一一测试了,留给大家一些操作空间如何监听 MySQL 实现数据变化后的实时通知

Java客户端连接

注意:客户端连接需要修改配置canal.properties中canal.serverMode = tcp

1.引入依赖

       <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.5</version>
        </dependency>

        <!-- Message、CanalEntry.Entry等来自此安装包 -->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.protocol</artifactId>
            <version>1.1.5</version>
        </dependency>

2.代码示例

  public static void main(String[] args){
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("192.168.5.128", 11111), "example""""");
        canalConnector.connect();
        //订阅所有消息
        canalConnector.subscribe(".*\..*");
        // 只订阅test数据库下的所有表
        //canalConnector.subscribe("test");
        //恢复到之前同步的那个位置
        canalConnector.rollback();

        for(;;){
            //获取指定数量的数据,但是不做确认标记,下一次取还会取到这些信息。 注:不会阻塞,若不够100,则有多少返回多少
            Message message = canalConnector.getWithoutAck(100);
            //获取消息id
            long batchId = message.getId();
            if(batchId != -1){
                printEnity(message.getEntries());
                //提交确认
                //canalConnector.ack(batchId);
                //处理失败,回滚数据
                //canalConnector.rollback(batchId);
            }
        }
    }
    private static void printEnity(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            if(entry.getEntryType() != CanalEntry.EntryType.ROWDATA){
                continue;
            }
            try{
                // 序列化数据
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    System.out.println(rowChange.getEventType());
                    switch (rowChange.getEventType()){
                        //如果希望监听多种事件,可以手动增加case
                        case INSERT:
                            // 表名
                            String tableName = entry.getHeader().getTableName();
                            System.out.println("表名:"+tableName);
                            //测试users表进行映射处
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            for(CanalEntry.Column c:afterColumnsList){
                             System.out.println("字段:"+c.getName()+",值:"+c.getValue());
                            }
                            System.out.println("插入的数据是:" + afterColumnsList);
                            break;
                        case UPDATE:
                            List<CanalEntry.Column> afterColumnsList2 = rowData.getAfterColumnsList();
                            System.out.println("更新的数据是:" + afterColumnsList2);
                            break;
                        case DELETE:
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            System.out.println("被删除的数据是:" + beforeColumnsList);
                            break;
                        default:
                    }
                }
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        }
    }

3.输出

INSERT
表名:user
字段:id,值:2
字段:name,值:lisi
字段:address,值:beijing
字段:email,值:1234
插入的数据是:
[index: 0 sqlType: 4 name: "id" isKey: true updated: true isNull: false value: "2" mysqlType: "int(11)"
, index: 1 sqlType: 12 name: "name" isKey: false updated: true isNull: false value: "lisi" mysqlType: "varchar(30)"
, index: 2 sqlType: 12 name: "address" isKey: false updated: true isNull: false value: "beijing" mysqlType: "varchar(200)"
, index: 3 sqlType: 12 name: "email" isKey: false updated: true isNull: false value: "1234" mysqlType: "varchar(100)"]

号外!号外!

如果这篇文章对你有所帮助,或者有所启发的话,帮忙点赞、在看、转发、收藏,你的支持就是我坚持下去的最大动力!

如何监听 MySQL 实现数据变化后的实时通知

面试官:MQ 消息丢失、重复、积压问题,如何解决?

Spring Boot 实现跨域的 5 种方式,总有一种适合你

别用Date了,Java8新特性之日期处理,现在学会也不迟!

如何监听 MySQL 实现数据变化后的实时通知


原文始发于微信公众号(一安未来):如何监听 MySQL 实现数据变化后的实时通知

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/44937.html

(1)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!