1.开启mysql binlog日志 安装路径下的my.ini文件添加配置
log-bin=mysql-bin #开启日志
binlog-format=ROW #选择row模式
server_id=1
开启日志需要重启mysql服务后生效
2.下载canal 地址:https://github.com/alibaba/canal/releases
修改配置conf/canal.properties 全局配置,设置自己的数据库
#################################################
## mysql serverId
canal.instance.mysql.slaveId = 1234
# position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
# username/password,需要改成自己的数据库信息
canal.instance.dbUsername = root
canal.instance.dbPassword = root
canal.instance.defaultDatabaseName = canneltest
canal.instance.connectionCharset = UTF-8
# table regex
canal.instance.filter.regex = .*\\..*
#################################################
配置slave,slave可以配置成订阅单表日志,也可以配置订阅多表,或所有的表的日志,也可以配置多个slave,只需修改
canal.instance.filter.regex 参数,我这里订阅了所有的表日志
conf\example路径下的
#################################################
## mysql serverId
canal.instance.mysql.slaveId = 1234
# position info
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
# username/password
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =canneltest
canal.instance.connectionCharset = UTF-8
# table regex
canal.instance.filter.regex = .*\\..*
# table black regex
canal.instance.filter.black.regex =
#################################################
然后用程序需要订阅解析日志,引入相应版本的jar,我这里是1.0.24
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client
</artifactId> <version>1.0.24</version> </dependency>public static void main
(String[] args)throws InterruptedException { // 链接canal CanalConnector
connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1"
,11111), "example", "", ""); connector.connect(); // 开启订阅日志
connector.subscribe(); // 循环订阅 while (true) { try { // 每次读取 1000 条 Message
message = connector.getWithoutAck(1000); System.out.println(message); long
batchID = message.getId(); int size = message.getEntries().size(); if
(batchID == -1 || size == 0) { Thread.sleep(1000); // 没有数据 } else { System.out
.println("数据进入===>"+message); } connector.ack(batchID); } catch (Exception e) {
//TODO: handle exception } finally { Thread.sleep(1000); } } }
可以打印解析message生成sql,集成kafka提供生产,供订阅同步数据
热门工具 换一换