前言

最近在写项目的时候有个服务模块用到了canal,发现在配置环境的时候还是有不少问题的,于是就有了这篇文章。
(PS:由于本文是临时起意写的,在内容上会比较简洁,如果没有帮到你,我很抱歉QAQ)

MYSQL配置

首先需要拉取mysql镜像,进入容器内部的etc文件下找到my.cnf文件,将文件复制到宿主机的指定目录下用于挂载。
在my.cnf文件中添加如下内容:

1
2
3
4
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

创建一个config.sql文件用于授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant,内容如下所示:

1
2
3
4
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

MYSQL的docker-compose:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
mysql:
container_name: mysql
image: mysql:latest
restart: always
command:
--default-authentication-plugin=caching_sha2_password
environment:
- MYSQL_ROOT_PASSWORD=xxx
- MYSQL_DATABASE=xxx
- MYSQL_USER=xxx
- MYSQL_PASSWORD=xxx
- TZ=Asia/Shanghai
volumes:
- ./config/sql:/docker-entrypoint-initdb.d/
- ./.data/mysql:/var/lib/mysql
- ./config/mysql/my.cnf:/etc/my.cnf # 挂载my.cnf文件
ports:
- "3306:3306"
networks:
- xxx

ZOOKEEPER+KAFKA配置

由于kafka是依赖于zookeeper的,所以这两个就放在一起了。
docker-compose如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
zookeeper:
container_name: zookeeper
image: zookeeper:latest
restart: always
ports:
- 2181:2181
volumes:
- ./.data/zookeeper/data:/data
- ./.data/zookeeper/datalog:/datalog
- ./.data/zookeeper/logs:/logs
networks:
- xxx

kafka:
container_name: kafka
image: wurstmeister/kafka:latest
restart: always
ports:
- 9092:9092
environment:
- KAFKA_BROKER_ID=0
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
volumes:
- ./.data/kafka:/kafka
depends_on:
- zookeeper
networks:
- xxx

这边需要重点留意一下kafka环境的ADVERTISED_LISTENERS与LISTENERS两个的区别。(PS:本人在编写docker-compose文件的时候由于没有注意导致canal连接kafka一直失败,会在canal的log里面输出:Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.)
kafka启动后会在zookeeper的/brokers/ids下注册监听协议,包括IP和端口号,客户端连接的时候,会取得这个IP和端口号。
而advertised.listeners是会写入到zookeeper节点中的真正暴露给外部使用的连接地址。也就是说advertised.listeners才是真正的对外代理地址,我们canal要连接的就是这个地址,而listeners的作用仅仅只是监听。我们在这边直接设置advertised.listeners为Kafka的地址。

CANAL配置

重头戏来了。
使用同样的操作获取/home/admin/canal-server/conf/example路径下的instance.properties以及/home/admin/canal-server/conf路径下的canal.properties文件进行挂载。
修改instance.properties文件的配置,如下所示(注意这些是分散开的):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=2 # 不要与my.cnf的server_id重复即可

# position info
canal.instance.master.address=127.0.0.1:3306

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

# mq config
canal.mq.topic=xxx #你要用的主题名
canal.mq.partitionsNum=1
#库名.表名: 唯一主键,多个表之间用逗号分隔
canal.mq.partitionHash=库名.表名:唯一主键

修改canal.properties文件的配置,如下所示(注意这些是分散开的):

1
2
3
4
5
6
7
8
9
10
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = kafka

# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = false

##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092

canal的docker-compose如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
canal-server:
image: canal/canal-server:latest
container_name: canal-server
restart: always
environment:
- canal.instance.master.address=mysql:3306 # MySQL容器的地址和端口
- canal.instance.dbUsername=canal # canal的MySQL用户名
- canal.instance.dbPassword=canal # canal的MySQL密码
volumes:
- ./config/canal-server/instance.properties:/home/admin/canal-server/conf/example/instance.properties
- ./config/canal-server/canal.properties:/home/admin/canal-server/conf/canal.properties
- ./config/canal-server/startup.sh:/home/admin/canal-server/bin/startup.sh
ports:
- 11111:11111
links:
- mysql
- kafka
networks:
- xxx

测试

  • docker-compose up启动容器,确定启动无误后继续。
  • 执行docker exec -it 容器ID /bin/bash进入kafka容器内部
  • 执行如下命令:
    1
    2
    3
    cd /opt/kafka/bin
    ./kafka-topics.sh --create --topic xxxxx --bootstrap-server localhost:9092 # 创建目标主题
    ./kafka-console-consumer.sh --topic xxxxx --from-beginning --bootstrap-server localhost:9092 # 启动该主题的消费者
  • 同理进入 MYSQL 容器内部,use 目标数据库,对目标表执行 insert 语句。
  • 查看 kafka ,出现如下结果则配置成功
    QQ20240112-224320.png