Canal 将 MySQL 数据库变化传送至 RocketMQ 进行数据流转。
MySQL 相关
配置 BINLOG
修改配置文件 /etc/mysql/mysql.conf.d/mysqld.cnf
这是官方 5.7.42 版本配置文件路径,在 [mysqld]
标签下增加配置
log-bin = mysql-bin
binlog-format = ROW
server_id = 1
修改后配置文件大概这样:
[mysqld]
pid-file = /var/run/mysqld/mysqld.pid
socket = /var/run/mysqld/mysqld.sock
datadir = /var/lib/mysql
log-error = /var/log/mysql/error.log
log-bin = mysql-bin
binlog-format = ROW
server_id = 1
bind-address = 127.0.0.1
symbolic-links = 0
重启数据库
sudo systemctl restart mysql
检查主从状态
检查配置结果
SHOW VARIABLES LIKE '%log_bin%';
+---------------------------------+--------------------------------+
| Variable_name | Value |
+---------------------------------+--------------------------------+
| log_bin | ON |
| log_bin_basename | /var/lib/mysql/mysql-bin |
| log_bin_index | /var/lib/mysql/mysql-bin.index |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
| sql_log_bin | ON |
+---------------------------------+--------------------------------+
6 rows in set (0.00 sec)
Canal 相关
配置数据库
创建 Canal 主从角色
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
创建测试库
CREATE DATABASE analytic CHARACTER SET utf8mb4;
SHOW DATABASES;
创建测试表
CREATE TABLE `user` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '用户 ID',
`username` varchar(50) DEFAULT NULL COMMENT '用户名',
`password` varchar(50) DEFAULT NULL COMMENT '密码',
`email` varchar(45) DEFAULT NULL COMMENT '邮箱',
`phone` varchar(15) DEFAULT NULL COMMENT '手机号',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4;
插入测试数据
INSERT INTO `analytic`.`user` ( `id`, `username`, `password`, `email`, `phone` )
VALUES
( 1, '张三', '275&553/7', '3rb95f7kb98ws7nre@qq.com', '18124578941' ),
( 2, '李四', '148309#1=', '4fsdng27fp3q55au2@qq.com', '13197387591' ),
( 3, '王五', '77=0&6923', 'wf4189dbm7yfmqw73@qq.com', '18638390876' ),
( 4, '赵六', '0872_~191', 'je7h008x475klwnbp@qq.com', '13039809384' ),
( 5, '钱七', '264.4+939', '6n20g95qw2hswp4ep@qq.com', '18898052943' );
配置 Canal 对接 MQ
安装 Canal Deployer
wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
sudo mkdir /opt/canal/deployer
sudo tar xf canal.deployer-1.1.6.tar.gz -C /opt/canal/deployer
sudo chown -R $USER:$USER /opt/canal/deployer
修改 Canal 配置文件 deployer/conf/canal.properties
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = tcp
改成 RocketMQ
##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =
一般来说只需改以下两行
rocketmq.producer.group = canal-producer
rocketmq.namesrv.addr = 127.0.0.1:9876
修改配置 deployer/conf/example/instance.properties
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.mq.topic=canal-default-topic
小贴士:这里canal.mq.topic
为默认主题,如果启用动态主题的话,MQ会根据分类自动将不同表的消息写入不同的主题,如果没有匹配的会写入到默认主题,如果没有配置默认主题,任务会截止后报错,canal.mq.dynamicTopic
和canal.instance.filter.regex
配合
修正项目文件权限
find . -type d | xargs chmod 755
find . -type f | xargs chmod 644
chmod +x bin/*.sh
重启 Canal
bin/stop.sh
bin/startup.sh
然后查看堆栈
jps -ml
2403 com.alibaba.otter.canal.deployer.CanalLauncher
Canal 工作验证
使用 Dashboard 查看数据库更新
可以使用 WebUI 查看数据库更新的详情
主题
修改 MySQL 数据
UPDATE `analytic`.`user` SET `username` = '余罪' WHERE `id` = 5
如果安装了 RocketMQ-Dashboard 可以登录查看 Topic 中已经出现名为 canal-topic
主题。
Java 测试代码
尝试使用示例代码
git clone https://github.com/vndroid/java-canal-mq.git
然后修改文件 src/main/java/cn/butterfly/canal/listener/UserCanalListener.java
的 #19 为实际使用的主题名
diff --git a/src/main/java/cn/butterfly/canal/listener/UserCanalListener.java b/src/main/java/cn/butterfly/canal/listener/UserCanalListener.java
index 6c0359a..2203386 100644
--- a/src/main/java/cn/butterfly/canal/listener/UserCanalListener.java
+++ b/src/main/java/cn/butterfly/canal/listener/UserCanalListener.java
@@ -16,7 +16,7 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(
- topic = "canal_topic",
+ topic = "canal-topic",
consumerGroup = "canal_group"
)
public class UserCanalListener implements RocketMQListener<CanalMessage<User>> {
配置 MQ 地址 src/main/resources/application-dev.yml
然后构建项目
mvn clean package -Dmaven.test.skip=true
启动项目,监听在 10086 端口
java -jar -Dserver.port=10086 target/canal-0.0.1-SNAPSHOT.jar
另一边可以修改数据库数据,即时查看更新日志
2023-08-18 17:45:58.780 INFO 22428 --- [MessageThread_2] c.b.canal.listener.UserCanalListener :
====================
Database.table: analytic.user
Type of operation: INSERT
User(id=5, username=钱七, password=264.4+939, email=6n20g95qw2hswp4ep@qq.com, phone=18898052943)
Canal Admin
为了方便 Canal 程序管理,官方推出了一个 WebUI 项目,可以在网页中查看状态
wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.admin-1.1.6.tar.gz
sudo mkdir /opt/canal/admin
sudo tar xf canal.admin-1.1.6.tar.gz -C /opt/canal/admin
sudo chown -R $USER:$USER /opt/canal/admin/
初始化数据库,使用 root 登录数据库,执行 conf/canal_manager.sql
初始化数据库。
CREATE DATABASE canal_manager;
GRANT ALL PRIVILEGES ON canal_manager.* TO 'canal'@'%';
FLUSH PRIVILEGES;
USE canal_manager;
source conf/canal_manager.sql;
附录
参考链接
- Canal 结合 RocketMQ 实现数据的增量同步 - CSDN
- canal RabbitMQ dynamicTopic 使用记录 - CSDN
- Canal dynamicTopic问题 - CSDN
本文由 柒 创作,采用 知识共享署名4.0
国际许可协议进行许可。
转载本站文章前请注明出处,文章作者保留所有权限。
最后编辑时间: 2023-08-21 13:23 PM