Skip to content

Canal

主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

基于日志增量订阅和消费的业务包括:

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

官方文档:https://github.com/alibaba/canal/wiki/QuickStart

canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议

    image-20230615143944915

    • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
    • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
    • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

  • canal 解析 binary log 对象(原始为 byte 流)

示例

搭建Canal环境

mysql配置

当前的canal开源版本支持5.7及以下的版本

canal的原理是基于mysql binlog技术,需要开启mysql的binlog写入功能,并且配置binlog模式为row。

properties
[mysqld]  
log-bin=mysql-bin 
binlog-format=ROW #选择row模式  
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
[mysqld]  
log-bin=mysql-bin 
binlog-format=ROW #选择row模式  
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

数据库重启后, 简单测试 my.cnf 配置是否生效:

mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+

canal的原理是模拟自己为mysql slave,所以这里一定需要做为mysql slave的相关权限

mysql -uroot -proot
#创建账号(账号:canal;密码:canal)
CREATE USER canal IDENTIFIED BY 'canal'; 
#授予权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
#刷新并应用权限
FLUSH PRIVILEGES;
mysql -uroot -proot
#创建账号(账号:canal;密码:canal)
CREATE USER canal IDENTIFIED BY 'canal'; 
#授予权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
#刷新并应用权限
FLUSH PRIVILEGES;
#针对已有的账户可通过grants查询权限:
show grants for 'canal'
#针对已有的账户可通过grants查询权限:
show grants for 'canal'

导入初始化SQL canal/canal_admin/canal_manager.sql

Canal配置

docker-compose部署Canal

docker-compose-canal.yml

yaml
version: '3'
networks:
  canal:
services:
  canal_admin:
    image: canal/canal-admin:v1.1.5
    container_name: canal_admin             
    restart: unless-stopped
    volumes:                               
      - "./canal/canal-admin/conf/application.yml:/home/admin/canal-admin/conf/application.yml"
      # 如果需要jvm内存控制可放开下面注释,修改`JAVA_OPTS`参数
#      - "./canal/canal-admin/bin/startup.sh:/home/admin/canal-admin/bin/startup.sh"
      - "./canal/canal-admin/logs:/home/admin/canal-admin/logs"
    environment: 
      TZ: Asia/Shanghai
      LANG: en_US.UTF-8
      canal.adminUser: admin
      canal.adminPasswd: xxx
      spring.datasource.address: xxx:13306
      spring.datasource.database: canal_manager
      spring.datasource.username: xxx
      spring.datasource.password: xxx
    ports:
      - "8089:8089"
    networks:
      - canal
  canal_server:
    image: canal/canal-server:v1.1.5
    container_name: canal_server            
    restart: unless-stopped                   
    volumes:                                 
      - "./canal/canal-server/conf/example/instance.properties:/home/admin/canal-server/conf/example/instance.properties"
      - "./canal/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties"
      - "./canal/canal-server/conf/canal_local.properties:/home/admin/canal-server/conf/canal_local.properties"
      - "./canal/canal-server/logs:/home/admin/canal-server/logs"
    environment:                           
      TZ: Asia/Shanghai
      LANG: en_US.UTF-8
      canal.register.ip: 81.68.218.181
      canal.admin.manager: canal_admin:8089
      canal.admin.port: 11110
      canal.admin.user: admin
      canal.admin.passwd: 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9
#      canal.admin.register.cluster: online
    ports:
      - "11110:11110"
      - "11111:11111"
      - "11112:11112"
    depends_on:
      - canal_admin
    links:
      - canal_admin
    networks:
      - canal
version: '3'
networks:
  canal:
services:
  canal_admin:
    image: canal/canal-admin:v1.1.5
    container_name: canal_admin             
    restart: unless-stopped
    volumes:                               
      - "./canal/canal-admin/conf/application.yml:/home/admin/canal-admin/conf/application.yml"
      # 如果需要jvm内存控制可放开下面注释,修改`JAVA_OPTS`参数
#      - "./canal/canal-admin/bin/startup.sh:/home/admin/canal-admin/bin/startup.sh"
      - "./canal/canal-admin/logs:/home/admin/canal-admin/logs"
    environment: 
      TZ: Asia/Shanghai
      LANG: en_US.UTF-8
      canal.adminUser: admin
      canal.adminPasswd: xxx
      spring.datasource.address: xxx:13306
      spring.datasource.database: canal_manager
      spring.datasource.username: xxx
      spring.datasource.password: xxx
    ports:
      - "8089:8089"
    networks:
      - canal
  canal_server:
    image: canal/canal-server:v1.1.5
    container_name: canal_server            
    restart: unless-stopped                   
    volumes:                                 
      - "./canal/canal-server/conf/example/instance.properties:/home/admin/canal-server/conf/example/instance.properties"
      - "./canal/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties"
      - "./canal/canal-server/conf/canal_local.properties:/home/admin/canal-server/conf/canal_local.properties"
      - "./canal/canal-server/logs:/home/admin/canal-server/logs"
    environment:                           
      TZ: Asia/Shanghai
      LANG: en_US.UTF-8
      canal.register.ip: 81.68.218.181
      canal.admin.manager: canal_admin:8089
      canal.admin.port: 11110
      canal.admin.user: admin
      canal.admin.passwd: 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9
#      canal.admin.register.cluster: online
    ports:
      - "11110:11110"
      - "11111:11111"
      - "11112:11112"
    depends_on:
      - canal_admin
    links:
      - canal_admin
    networks:
      - canal
canal.properties
properties
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9

# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rabbitMQ

rabbitmq.host = xxx:5672
rabbitmq.virtual.host = /
rabbitmq.exchange = canal.exchange
rabbitmq.username = admin
rabbitmq.password = admin
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9

# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rabbitMQ

rabbitmq.host = xxx:5672
rabbitmq.virtual.host = /
rabbitmq.exchange = canal.exchange
rabbitmq.username = admin
rabbitmq.password = admin
example/instance.propertios
properties
canal.instance.master.address=xxx:3306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8

# table regex 只同步test数据库下的t_user表
canal.instance.filter.regex=test\\.t_user
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*

# mq config
canal.mq.topic=canal_routing_key
canal.instance.master.address=xxx:3306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8

# table regex 只同步test数据库下的t_user表
canal.instance.filter.regex=test\\.t_user
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*

# mq config
canal.mq.topic=canal_routing_key

springboot整合canal

xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yaml
# RabbitMQ配置
spring:
  rabbitmq:
    addresses: xxx:5672 # 指定client连接到的server的地址,多个以逗号分隔
    username: admin
    password: admin
    virtual-host: /
# RabbitMQ配置
spring:
  rabbitmq:
    addresses: xxx:5672 # 指定client连接到的server的地址,多个以逗号分隔
    username: admin
    password: admin
    virtual-host: /
java
@Slf4j
@Component
public class CanalRabbitMqListener {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = MqConstant.CANAL_QUEUE, durable = "true"),
                    exchange = @Exchange(value = MqConstant.CANAL_EXCHANGE),
                    key = MqConstant.CANAL_ROUTING_KEY
            )
    })
    public void handleCanalDataChange(String message) {
        log.info("[canal] 接收消息: {}", JSON.toJSONString(message));
    }
}
@Slf4j
@Component
public class CanalRabbitMqListener {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = MqConstant.CANAL_QUEUE, durable = "true"),
                    exchange = @Exchange(value = MqConstant.CANAL_EXCHANGE),
                    key = MqConstant.CANAL_ROUTING_KEY
            )
    })
    public void handleCanalDataChange(String message) {
        log.info("[canal] 接收消息: {}", JSON.toJSONString(message));
    }
}
java
public interface MqConstant {
    String CANAL_EXCHANGE = "canal.exchange";
    String CANAL_QUEUE = "canal_queue";
    String CANAL_ROUTING_KEY = "canal_routing_key";
}
public interface MqConstant {
    String CANAL_EXCHANGE = "canal.exchange";
    String CANAL_QUEUE = "canal_queue";
    String CANAL_ROUTING_KEY = "canal_routing_key";
}

canal-spring-boot-starter

tips: 可参考 https://github.com/NormanGyllenhaal/canal-client 此方式需将canal.properties配置文件中的canal.serverMode属性值修改为tcp

xml
<!-- https://mvnrepository.com/artifact/top.javatool/canal-spring-boot-starter -->
<dependency>
    <groupId>top.javatool</groupId>
    <artifactId>canal-spring-boot-starter</artifactId>
    <version>1.2.1-RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/top.javatool/canal-spring-boot-starter -->
<dependency>
    <groupId>top.javatool</groupId>
    <artifactId>canal-spring-boot-starter</artifactId>
    <version>1.2.1-RELEASE</version>
</dependency>
yaml
canal:
  server: xxx:11111
  # canal-admin中Instance管理下需存在example实例配置
  destination: example
canal:
  server: xxx:11111
  # canal-admin中Instance管理下需存在example实例配置
  destination: example
java
@Slf4j
@Component
@CanalTable(value = "t_user")
public class UserHandler implements EntryHandler<User> {

    @Override
    public void insert(User user) {
        log.info("insert message  {}", user);
    }

    @Override
    public void update(User before, User after) {
        log.info("update before {} ", before);
        log.info("update after {}", after);
    }

    @Override
    public void delete(User user) {
        log.info("delete  {}", user);
    }
}
@Slf4j
@Component
@CanalTable(value = "t_user")
public class UserHandler implements EntryHandler<User> {

    @Override
    public void insert(User user) {
        log.info("insert message  {}", user);
    }

    @Override
    public void update(User before, User after) {
        log.info("update before {} ", before);
        log.info("update after {}", after);
    }

    @Override
    public void delete(User user) {
        log.info("delete  {}", user);
    }
}
java
@Data
@Table(name = "t_user")
public class User implements Serializable {

    /**
     * 主键
     */
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name = "user_id")
    private Integer userId;

    /**
     * 用户名
     */
    @Column(name = "username")
    private String username;

    /**
     * 密码
     */
    @Column(name = "password")
    private String password;

    /**
     * 性别
     */
    @Column(name = "sex")
    private Integer sex;

    /**
     * 备注
     */
    private String remark;

    /**
     * 时间
     */
    private Date date;
}
@Data
@Table(name = "t_user")
public class User implements Serializable {

    /**
     * 主键
     */
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name = "user_id")
    private Integer userId;

    /**
     * 用户名
     */
    @Column(name = "username")
    private String username;

    /**
     * 密码
     */
    @Column(name = "password")
    private String password;

    /**
     * 性别
     */
    @Column(name = "sex")
    private Integer sex;

    /**
     * 备注
     */
    private String remark;

    /**
     * 时间
     */
    private Date date;
}

经测试发现这个jar存在一些bug,ex:针对表字段,数据原本为空,修改为有值的时候,如果java这边用非String字段类型去接收会报错!

canal实际测试

配置

访问地址:http://ip地址:8089 默认登录账号密码:admin/123456

image-20230903124418202

image-20230903125933698

1、canal.properties
properties
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9

# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rabbitMQ

rabbitmq.host = 81.68.218.181:5672
rabbitmq.virtual.host = dev
rabbitmq.exchange = canal.exchange
rabbitmq.username = admin
rabbitmq.password = admin
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9

# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rabbitMQ

rabbitmq.host = 81.68.218.181:5672
rabbitmq.virtual.host = dev
rabbitmq.exchange = canal.exchange
rabbitmq.username = admin
rabbitmq.password = admin
2、test/instance.propertios

image-20230903124716465

properties
canal.instance.master.address=81.68.218.181:13306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8

# table regex 只同步test数据库下的t_user表
canal.instance.filter.regex=test\\.t_user
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*

# mq config
canal.mq.topic=canal_routing_key
canal.instance.master.address=81.68.218.181:13306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8

# table regex 只同步test数据库下的t_user表
canal.instance.filter.regex=test\\.t_user
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*

# mq config
canal.mq.topic=canal_routing_key

tips: 这里不要用默认的 我们直接新建一个 test的 instance

image-20230903125039931

mq监听canal消息数据

java
@Slf4j
@Component
public class CanalRabbitMqListener {

    private final String CANAL_EXCHANGE = "canal.exchange";
    private final String CANAL_QUEUE = "canal_queue";
    private final String CANAL_ROUTING_KEY = "canal_routing_key";

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = CANAL_QUEUE, durable = "true"),
                    exchange = @Exchange(value = CANAL_EXCHANGE),
                    key = CANAL_ROUTING_KEY
            )
    })
    public void handleCanalDataChange(String message) {
        log.info("[canal] 接收消息: {}", JsonUtils.obj2json(message));
    }
}
@Slf4j
@Component
public class CanalRabbitMqListener {

    private final String CANAL_EXCHANGE = "canal.exchange";
    private final String CANAL_QUEUE = "canal_queue";
    private final String CANAL_ROUTING_KEY = "canal_routing_key";

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = CANAL_QUEUE, durable = "true"),
                    exchange = @Exchange(value = CANAL_EXCHANGE),
                    key = CANAL_ROUTING_KEY
            )
    })
    public void handleCanalDataChange(String message) {
        log.info("[canal] 接收消息: {}", JsonUtils.obj2json(message));
    }
}
1.test库t_user表新增一条数据

image-20230903125650045

image-20230903125720185

新增数据消息体

json
{
    "data":[
        {
            "id":"1",
            "user_key":"新增",
            "user_value":"add"
        }
    ],
    "database":"test",
    "es":1693715762000,
    "id":2,
    "isDdl":false,
    "mysqlType":{
        "id":"int",
        "user_key":"varchar(32)",
        "user_value":"varchar(64)"
    },
    "old":null,
    "pkNames":[
        "id"
    ],
    "sql":"",
    "sqlType":{
        "id":4,
        "user_key":12,
        "user_value":12
    },
    "table":"t_user",
    "ts":1693715763019,
    "type":"INSERT"
}
{
    "data":[
        {
            "id":"1",
            "user_key":"新增",
            "user_value":"add"
        }
    ],
    "database":"test",
    "es":1693715762000,
    "id":2,
    "isDdl":false,
    "mysqlType":{
        "id":"int",
        "user_key":"varchar(32)",
        "user_value":"varchar(64)"
    },
    "old":null,
    "pkNames":[
        "id"
    ],
    "sql":"",
    "sqlType":{
        "id":4,
        "user_key":12,
        "user_value":12
    },
    "table":"t_user",
    "ts":1693715763019,
    "type":"INSERT"
}
2.修改一条数据

image-20230903130159953

json
{
    "data":[
        {
            "id":"2",
            "user_key":"修改",
            "user_value":"update"
        }
    ],
    "database":"test",
    "es":1693717247000,
    "id":4,
    "isDdl":false,
    "mysqlType":{
        "id":"int",
        "user_key":"varchar(32)",
        "user_value":"varchar(64)"
    },
    "old":[
        {
            "user_key":"新增2",
            "user_value":"add2"
        }
    ],
    "pkNames":[
        "id"
    ],
    "sql":"",
    "sqlType":{
        "id":4,
        "user_key":12,
        "user_value":12
    },
    "table":"t_user",
    "ts":1693717247148,
    "type":"UPDATE"
}
{
    "data":[
        {
            "id":"2",
            "user_key":"修改",
            "user_value":"update"
        }
    ],
    "database":"test",
    "es":1693717247000,
    "id":4,
    "isDdl":false,
    "mysqlType":{
        "id":"int",
        "user_key":"varchar(32)",
        "user_value":"varchar(64)"
    },
    "old":[
        {
            "user_key":"新增2",
            "user_value":"add2"
        }
    ],
    "pkNames":[
        "id"
    ],
    "sql":"",
    "sqlType":{
        "id":4,
        "user_key":12,
        "user_value":12
    },
    "table":"t_user",
    "ts":1693717247148,
    "type":"UPDATE"
}
3.删除一条数据
json
{
    "data":[
        {
            "id":"2",
            "user_key":"修改",
            "user_value":"update"
        }
    ],
    "database":"test",
    "es":1693717401000,
    "id":5,
    "isDdl":false,
    "mysqlType":{
        "id":"int",
        "user_key":"varchar(32)",
        "user_value":"varchar(64)"
    },
    "old":null,
    "pkNames":[
        "id"
    ],
    "sql":"",
    "sqlType":{
        "id":4,
        "user_key":12,
        "user_value":12
    },
    "table":"t_user",
    "ts":1693717401178,
    "type":"DELETE"
}
{
    "data":[
        {
            "id":"2",
            "user_key":"修改",
            "user_value":"update"
        }
    ],
    "database":"test",
    "es":1693717401000,
    "id":5,
    "isDdl":false,
    "mysqlType":{
        "id":"int",
        "user_key":"varchar(32)",
        "user_value":"varchar(64)"
    },
    "old":null,
    "pkNames":[
        "id"
    ],
    "sql":"",
    "sqlType":{
        "id":4,
        "user_key":12,
        "user_value":12
    },
    "table":"t_user",
    "ts":1693717401178,
    "type":"DELETE"
}