前言 最近在写项目的时候有个服务模块用到了canal,发现在配置环境的时候还是有不少问题的,于是就有了这篇文章。 (PS:由于本文是临时起意写的,在内容上会比较简洁,如果没有帮到你,我很抱歉QAQ)
MYSQL配置 首先需要拉取mysql镜像,进入容器内部的etc文件下找到my.cnf文件,将文件复制到宿主机的指定目录下用于挂载。 在my.cnf文件中添加如下内容:
1 2 3 4 [mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
创建一个config.sql文件用于授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant,内容如下所示:
1 2 3 4 CREATE USER canal IDENTIFIED BY 'canal' ; GRANT SELECT , REPLICATION SLAVE, REPLICATION CLIENT ON * .* TO 'canal' @'%' ;FLUSH PRIVILEGES;
MYSQL的docker-compose:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 mysql: container_name: mysql image: mysql:latest restart: always command: --default-authentication-plugin=caching_sha2_password environment: - MYSQL_ROOT_PASSWORD=xxx - MYSQL_DATABASE=xxx - MYSQL_USER=xxx - MYSQL_PASSWORD=xxx - TZ=Asia/Shanghai volumes: - ./config/sql:/docker-entrypoint-initdb.d/ - ./.data/mysql:/var/lib/mysql - ./config/mysql/my.cnf:/etc/my.cnf ports: - "3306:3306" networks: - xxx
ZOOKEEPER+KAFKA配置 由于kafka是依赖于zookeeper的,所以这两个就放在一起了。 docker-compose如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 zookeeper: container_name: zookeeper image: zookeeper:latest restart: always ports: - 2181 :2181 volumes: - ./.data/zookeeper/data:/data - ./.data/zookeeper/datalog:/datalog - ./.data/zookeeper/logs:/logs networks: - xxx kafka: container_name: kafka image: wurstmeister/kafka:latest restart: always ports: - 9092 :9092 environment: - KAFKA_BROKER_ID=0 - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 volumes: - ./.data/kafka:/kafka depends_on: - zookeeper networks: - xxx
这边需要重点留意一下kafka环境的ADVERTISED_LISTENERS与LISTENERS两个的区别。(PS:本人在编写docker-compose文件的时候由于没有注意导致canal连接kafka一直失败,会在canal的log里面输出:Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.)
kafka启动后会在zookeeper的/brokers/ids下注册监听协议,包括IP和端口号,客户端连接的时候,会取得这个IP和端口号。 而advertised.listeners是会写入到zookeeper节点中的真正暴露给外部使用的连接地址。也就是说advertised.listeners才是真正的对外代理地址,我们canal要连接的就是这个地址,而listeners的作用仅仅只是监听。我们在这边直接设置advertised.listeners为Kafka的地址。
CANAL配置 重头戏来了。 使用同样的操作获取/home/admin/canal-server/conf/example路径下的instance.properties以及/home/admin/canal-server/conf路径下的canal.properties文件进行挂载。 修改instance.properties文件的配置,如下所示(注意这些是分散开的):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ## mysql serverId , v1.0.26+ will autoGen canal.instance.mysql.slaveId=2 # 不要与my.cnf的server_id重复即可 # position info canal.instance.master.address=127.0.0.1:3306 # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal # mq config canal.mq.topic=xxx #你要用的主题名 canal.mq.partitionsNum=1 #库名.表名: 唯一主键,多个表之间用逗号分隔 canal.mq.partitionHash=库名.表名:唯一主键
修改canal.properties文件的配置,如下所示(注意这些是分散开的):
1 2 3 4 5 6 7 8 9 10 # tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ canal.serverMode = kafka # auto scan instance dir add/remove and start/stop instance canal.auto.scan = false ################################################## ######### Kafka ############# ################################################## kafka.bootstrap.servers = 127.0.0.1:9092
canal的docker-compose如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 canal-server: image: canal/canal-server:latest container_name: canal-server restart: always environment: - canal.instance.master.address=mysql:3306 - canal.instance.dbUsername=canal - canal.instance.dbPassword=canal volumes: - ./config/canal-server/instance.properties:/home/admin/canal-server/conf/example/instance.properties - ./config/canal-server/canal.properties:/home/admin/canal-server/conf/canal.properties - ./config/canal-server/startup.sh:/home/admin/canal-server/bin/startup.sh ports: - 11111 :11111 links: - mysql - kafka networks: - xxx
测试
docker-compose up
启动容器,确定启动无误后继续。
执行docker exec -it 容器ID /bin/bash
进入kafka容器内部
执行如下命令: 1 2 3 cd /opt/kafka/bin./kafka-topics.sh --create --topic xxxxx --bootstrap-server localhost:9092 ./kafka-console-consumer.sh --topic xxxxx --from-beginning --bootstrap-server localhost:9092
同理进入 MYSQL 容器内部,use 目标数据库,对目标表执行 insert 语句。
查看 kafka ,出现如下结果则配置成功