Canal学习笔记
主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
基于日志增量订阅和消费的业务包括:
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
canal 工作原理
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)
示例
环境搭建
mysql配置
当前的canal开源版本支持5.7及以下的版本
canal的原理是基于mysql binlog技术,需要开启mysql的binlog写入功能,并且配置binlog模式为row。
[mysqld]
log-bin=mysql-bin
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
数据库重启后, 简单测试 my.cnf
配置是否生效:
mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
canal的原理是模拟自己为mysql slave,所以这里一定需要做为mysql slave的相关权限
mysql -uroot -proot
#创建账号(账号:canal;密码:canal)
CREATE USER canal IDENTIFIED BY 'canal';
#授予权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
#刷新并应用权限
FLUSH PRIVILEGES;
#针对已有的账户可通过grants查询权限:
show grants for 'canal'
导入初始化SQL canal/canal_admin/canal_manager.sql
Canal配置
docker-compose部署Canal
docker-compose-canal.yml
version: '3'
networks:
canal:
services:
canal_admin:
image: canal/canal-admin:v1.1.5
container_name: canal_admin
restart: unless-stopped
volumes:
- "./canal/canal-admin/conf/application.yml:/home/admin/canal-admin/conf/application.yml"
# 如果需要jvm内存控制可放开下面注释,修改`JAVA_OPTS`参数
# - "./canal/canal-admin/bin/startup.sh:/home/admin/canal-admin/bin/startup.sh"
- "./canal/canal-admin/logs:/home/admin/canal-admin/logs"
environment:
TZ: Asia/Shanghai
LANG: en_US.UTF-8
canal.adminUser: admin
canal.adminPasswd: xxx
spring.datasource.address: xxx:13306
spring.datasource.database: canal_manager
spring.datasource.username: xxx
spring.datasource.password: xxx
ports:
- "8089:8089"
networks:
- canal
canal_server:
image: canal/canal-server:v1.1.5
container_name: canal_server
restart: unless-stopped
volumes:
- "./canal/canal-server/conf/example/instance.properties:/home/admin/canal-server/conf/example/instance.properties"
- "./canal/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties"
- "./canal/canal-server/conf/canal_local.properties:/home/admin/canal-server/conf/canal_local.properties"
- "./canal/canal-server/logs:/home/admin/canal-server/logs"
environment:
TZ: Asia/Shanghai
LANG: en_US.UTF-8
canal.register.ip: 81.68.218.181
canal.admin.manager: canal_admin:8089
canal.admin.port: 11110
canal.admin.user: admin
canal.admin.passwd: 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9
# canal.admin.register.cluster: online
ports:
- "11110:11110"
- "11111:11111"
- "11112:11112"
depends_on:
- canal_admin
links:
- canal_admin
networks:
- canal
canal.properties
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rabbitMQ
rabbitmq.host = xxx:5672
rabbitmq.virtual.host = /
rabbitmq.exchange = canal.exchange
rabbitmq.username = admin
rabbitmq.password = admin
example/instance.propertios
canal.instance.master.address=xxx:3306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# table regex 只同步test数据库下的t_user表
canal.instance.filter.regex=test\\.t_user
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# mq config
canal.mq.topic=canal_routing_key
springboot整合canal
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
# RabbitMQ配置
spring:
rabbitmq:
addresses: xxx:5672 # 指定client连接到的server的地址,多个以逗号分隔
username: admin
password: admin
virtual-host: /
@Slf4j
@Component
public class CanalRabbitMqListener {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = MqConstant.CANAL_QUEUE, durable = "true"),
exchange = @Exchange(value = MqConstant.CANAL_EXCHANGE),
key = MqConstant.CANAL_ROUTING_KEY
)
})
public void handleCanalDataChange(String message) {
log.info("[canal] 接收消息: {}", JSON.toJSONString(message));
}
}
public interface MqConstant {
String CANAL_EXCHANGE = "canal.exchange";
String CANAL_QUEUE = "canal_queue";
String CANAL_ROUTING_KEY = "canal_routing_key";
}
canal-spring-boot-starter
tips: 可参考 https://github.com/NormanGyllenhaal/canal-client 此方式需将
canal.properties
配置文件中的canal.serverMode
属性值修改为tcp
<!-- https://mvnrepository.com/artifact/top.javatool/canal-spring-boot-starter -->
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
canal:
server: xxx:11111
# canal-admin中Instance管理下需存在example实例配置
destination: example
@Slf4j
@Component
@CanalTable(value = "t_user")
public class UserHandler implements EntryHandler<User> {
@Override
public void insert(User user) {
log.info("insert message {}", user);
}
@Override
public void update(User before, User after) {
log.info("update before {} ", before);
log.info("update after {}", after);
}
@Override
public void delete(User user) {
log.info("delete {}", user);
}
}
@Data
@Table(name = "t_user")
public class User implements Serializable {
/**
* 主键
*/
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "user_id")
private Integer userId;
/**
* 用户名
*/
@Column(name = "username")
private String username;
/**
* 密码
*/
@Column(name = "password")
private String password;
/**
* 性别
*/
@Column(name = "sex")
private Integer sex;
/**
* 备注
*/
private String remark;
/**
* 时间
*/
private Date date;
}
经测试发现这个jar存在一些bug,ex:针对表字段,数据原本为空,修改为有值的时候,如果java这边用非String字段类型去接收会报错!
canal测试
配置
访问地址:http://ip地址:8089
默认登录账号密码:admin/123456
1、canal.properties
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rabbitMQ
rabbitmq.host = 81.68.218.181:5672
rabbitmq.virtual.host = dev
rabbitmq.exchange = canal.exchange
rabbitmq.username = admin
rabbitmq.password = admin
2、test/instance.propertios
canal.instance.master.address=81.68.218.181:13306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# table regex 只同步test数据库下的t_user表
canal.instance.filter.regex=test\\.t_user
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# mq config
canal.mq.topic=canal_routing_key
tips: 这里不要用默认的 我们直接新建一个 test的 instance
mq监听canal消息数据
@Slf4j
@Component
public class CanalRabbitMqListener {
private final String CANAL_EXCHANGE = "canal.exchange";
private final String CANAL_QUEUE = "canal_queue";
private final String CANAL_ROUTING_KEY = "canal_routing_key";
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = CANAL_QUEUE, durable = "true"),
exchange = @Exchange(value = CANAL_EXCHANGE),
key = CANAL_ROUTING_KEY
)
})
public void handleCanalDataChange(String message) {
log.info("[canal] 接收消息: {}", JsonUtils.obj2json(message));
}
}
test库 t_user表
1.新增一条数据
新增数据消息体
{
"data":[
{
"id":"1",
"user_key":"新增",
"user_value":"add"
}
],
"database":"test",
"es":1693715762000,
"id":2,
"isDdl":false,
"mysqlType":{
"id":"int",
"user_key":"varchar(32)",
"user_value":"varchar(64)"
},
"old":null,
"pkNames":[
"id"
],
"sql":"",
"sqlType":{
"id":4,
"user_key":12,
"user_value":12
},
"table":"t_user",
"ts":1693715763019,
"type":"INSERT"
}
2.修改一条数据
{
"data":[
{
"id":"2",
"user_key":"修改",
"user_value":"update"
}
],
"database":"test",
"es":1693717247000,
"id":4,
"isDdl":false,
"mysqlType":{
"id":"int",
"user_key":"varchar(32)",
"user_value":"varchar(64)"
},
"old":[
{
"user_key":"新增2",
"user_value":"add2"
}
],
"pkNames":[
"id"
],
"sql":"",
"sqlType":{
"id":4,
"user_key":12,
"user_value":12
},
"table":"t_user",
"ts":1693717247148,
"type":"UPDATE"
}
3.删除一条数据
{
"data":[
{
"id":"2",
"user_key":"修改",
"user_value":"update"
}
],
"database":"test",
"es":1693717401000,
"id":5,
"isDdl":false,
"mysqlType":{
"id":"int",
"user_key":"varchar(32)",
"user_value":"varchar(64)"
},
"old":null,
"pkNames":[
"id"
],
"sql":"",
"sqlType":{
"id":4,
"user_key":12,
"user_value":12
},
"table":"t_user",
"ts":1693717401178,
"type":"DELETE"
}