在开始之前,我们还是需要先看一下Maxwell官网 <http://maxwells-daemon.io/>,对Maxwell有一个简单的了解。
Maxwell通过canal解析binlog,并将其发送到Kafka,后续我们通过自己的业务逻辑,处理得到的binlog日志,就OK了。我之前在用的时候,是公司要实时同步业务库的数据到HBase中,然后实现一些实时的查询业务。如果有兴趣的朋友,可以看看canal的底层实现,在这里我就不多赘述了。
首先我们需要做的准备工作是:
1、 首先我们需要开启MySQL的binlog,并且调整其为ROW模式;
SHOW VARIABLES LIKE '%log_bin%';
2、开通MySQL高权限账号:
GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '*******';
GRANT SELECT, REPLICATION CLIENT,REPLICATION SLAVE ON *.* TO
'maxwell'@'%';
FLUSH PRIVILEGES;
开通一个具有replication的庄户,以便后续通过这个账户去实时获取MySQL的binlog。
3、现在我们开始安装部署Maxwell,
① 修改配置文件 config.properties
② 启动测试:
bin/maxwell --host=192.168.167.210 --user=maxwell --password=******
--producer=stuout
bin/maxwell-bootstrap --config localhost.properties --database foobar
--table test --where "my_date >= '2017-01-07 00:00:00'" --log_level info
此时可以通过kafka启动一个消费者去消费你的topic,大功告成了。
4、遇到问题:
10:34:35,389 WARN MaxwellMetrics - Metrics will not be exposed:
metricsReportingType not configured. 10:34:36,531 ERROR TaskManager - cause:
java.lang.ClassCastException: [B cannot be cast to java.lang.Integer at
com.zendesk.maxwell.schema.columndef.IntColumnDef.toLong(IntColumnDef.java:32)
~[maxwell-1.17.1.jar:1.17.1] at
com.zendesk.maxwell.schema.columndef.IntColumnDef.asJSON(IntColumnDef.java:48)
~[maxwell-1.17.1.jar:1.17.1] at
com.zendesk.maxwell.replication.BinlogConnectorEvent.writeData(BinlogConnectorEvent.java:108)
~[maxwell-1.17.1.jar:1.17.1] at
com.zendesk.maxwell.replication.BinlogConnectorEvent.buildRowMap(BinlogConnectorEvent.java:158)
~[maxwell-1.17.1.jar:1.17.1] at
com.zendesk.maxwell.replication.BinlogConnectorEvent.jsonMaps(BinlogConnectorEvent.java:187)
~[maxwell-1.17.1.jar:1.17.1] at
com.zendesk.maxwell.replication.BinlogConnectorReplicator.getTransactionRows(BinlogConnectorReplicator.java:387)
~[maxwell-1.17.1.jar:1.17.1] at
com.zendesk.maxwell.replication.BinlogConnectorReplicator.getRow(BinlogConnectorReplicator.java:490)
~[maxwell-1.17.1.jar:1.17.1] at
com.zendesk.maxwell.replication.BinlogConnectorReplicator.work(BinlogConnectorReplicator.java:150)
~[maxwell-1.17.1.jar:1.17.1] at
com.zendesk.maxwell.util.RunLoopProcess.runLoop(RunLoopProcess.java:27)
~[maxwell-1.17.1.jar:1.17.1] at
com.zendesk.maxwell.Maxwell.startInner(Maxwell.java:224)
~[maxwell-1.17.1.jar:1.17.1] at
com.zendesk.maxwell.Maxwell.start(Maxwell.java:156)
~[maxwell-1.17.1.jar:1.17.1] at
com.zendesk.maxwell.Maxwell.main(Maxwell.java:245) ~[maxwell-1.17.1.jar:1.17.1]
java.lang.ClassCastException: [B cannot be cast to java.lang.Integer at
com.zendesk.maxwell.schema.columndef.IntColumnDef.toLong(IntColumnDef.java:32)
at
com.zendesk.maxwell.schema.columndef.IntColumnDef.asJSON(IntColumnDef.java:48)
at
com.zendesk.maxwell.replication.BinlogConnectorEvent.writeData(BinlogConnectorEvent.java:108)
at
com.zendesk.maxwell.replication.BinlogConnectorEvent.buildRowMap(BinlogConnectorEvent.java:158)
at
com.zendesk.maxwell.replication.BinlogConnectorEvent.jsonMaps(BinlogConnectorEvent.java:187)
at
com.zendesk.maxwell.replication.BinlogConnectorReplicator.getTransactionRows(BinlogConnectorReplicator.java:387)
at
com.zendesk.maxwell.replication.BinlogConnectorReplicator.getRow(BinlogConnectorReplicator.java:490)
at
com.zendesk.maxwell.replication.BinlogConnectorReplicator.work(BinlogConnectorReplicator.java:150)
at com.zendesk.maxwell.util.RunLoopProcess.runLoop(RunLoopProcess.java:27) at
com.zendesk.maxwell.Maxwell.startInner(Maxwell.java:224) at
com.zendesk.maxwell.Maxwell.start(Maxwell.java:156) at
com.zendesk.maxwell.Maxwell.main(Maxwell.java:245)
原因:创建账号后,会在数据库中创建一个database,其中包含:bootstrap、columns、databases、heartbeats、positions、schemas、tables几张表,查找原因是因为positions和schemas中记录的binlog-partition不一致。
热门工具 换一换