在开始之前,我们还是需要先看一下Maxwell官网 <http://maxwells-daemon.io/>,对Maxwell有一个简单的了解。


Maxwell通过canal解析binlog,并将其发送到Kafka,后续我们通过自己的业务逻辑,处理得到的binlog日志,就OK了。我之前在用的时候,是公司要实时同步业务库的数据到HBase中,然后实现一些实时的查询业务。如果有兴趣的朋友,可以看看canal的底层实现,在这里我就不多赘述了。

首先我们需要做的准备工作是:

 1、 首先我们需要开启MySQL的binlog,并且调整其为ROW模式;

        SHOW VARIABLES LIKE '%log_bin%';  

  2、开通MySQL高权限账号:

        GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '*******';
        GRANT SELECT, REPLICATION CLIENT,REPLICATION SLAVE ON *.* TO
'maxwell'@'%';

        FLUSH PRIVILEGES;

开通一个具有replication的庄户,以便后续通过这个账户去实时获取MySQL的binlog。

  3、现在我们开始安装部署Maxwell,

      ① 修改配置文件 config.properties

      ② 启动测试:

        bin/maxwell --host=192.168.167.210  --user=maxwell  --password=****** 
--producer=stuout

       bin/maxwell-bootstrap --config localhost.properties --database foobar
--table test --where "my_date >= '2017-01-07 00:00:00'" --log_level info

此时可以通过kafka启动一个消费者去消费你的topic,大功告成了。

4、遇到问题:
10:34:35,389 WARN MaxwellMetrics - Metrics will not be exposed:
metricsReportingType not configured. 10:34:36,531 ERROR TaskManager - cause:
java.lang.ClassCastException: [B cannot be cast to java.lang.Integer at
com.zendesk.maxwell.schema.columndef.IntColumnDef.toLong(IntColumnDef.java:32)
~[maxwell-1.17.1.jar:1.17.1] at
com.zendesk.maxwell.schema.columndef.IntColumnDef.asJSON(IntColumnDef.java:48)
~[maxwell-1.17.1.jar:1.17.1] at
com.zendesk.maxwell.replication.BinlogConnectorEvent.writeData(BinlogConnectorEvent.java:108)
~[maxwell-1.17.1.jar:1.17.1] at
com.zendesk.maxwell.replication.BinlogConnectorEvent.buildRowMap(BinlogConnectorEvent.java:158)
~[maxwell-1.17.1.jar:1.17.1] at
com.zendesk.maxwell.replication.BinlogConnectorEvent.jsonMaps(BinlogConnectorEvent.java:187)
~[maxwell-1.17.1.jar:1.17.1] at
com.zendesk.maxwell.replication.BinlogConnectorReplicator.getTransactionRows(BinlogConnectorReplicator.java:387)
~[maxwell-1.17.1.jar:1.17.1] at
com.zendesk.maxwell.replication.BinlogConnectorReplicator.getRow(BinlogConnectorReplicator.java:490)
~[maxwell-1.17.1.jar:1.17.1] at
com.zendesk.maxwell.replication.BinlogConnectorReplicator.work(BinlogConnectorReplicator.java:150)
~[maxwell-1.17.1.jar:1.17.1] at
com.zendesk.maxwell.util.RunLoopProcess.runLoop(RunLoopProcess.java:27)
~[maxwell-1.17.1.jar:1.17.1] at
com.zendesk.maxwell.Maxwell.startInner(Maxwell.java:224)
~[maxwell-1.17.1.jar:1.17.1] at
com.zendesk.maxwell.Maxwell.start(Maxwell.java:156)
~[maxwell-1.17.1.jar:1.17.1] at
com.zendesk.maxwell.Maxwell.main(Maxwell.java:245) ~[maxwell-1.17.1.jar:1.17.1]
java.lang.ClassCastException: [B cannot be cast to java.lang.Integer at
com.zendesk.maxwell.schema.columndef.IntColumnDef.toLong(IntColumnDef.java:32)
at
com.zendesk.maxwell.schema.columndef.IntColumnDef.asJSON(IntColumnDef.java:48)
at
com.zendesk.maxwell.replication.BinlogConnectorEvent.writeData(BinlogConnectorEvent.java:108)
at
com.zendesk.maxwell.replication.BinlogConnectorEvent.buildRowMap(BinlogConnectorEvent.java:158)
at
com.zendesk.maxwell.replication.BinlogConnectorEvent.jsonMaps(BinlogConnectorEvent.java:187)
at
com.zendesk.maxwell.replication.BinlogConnectorReplicator.getTransactionRows(BinlogConnectorReplicator.java:387)
at
com.zendesk.maxwell.replication.BinlogConnectorReplicator.getRow(BinlogConnectorReplicator.java:490)
at
com.zendesk.maxwell.replication.BinlogConnectorReplicator.work(BinlogConnectorReplicator.java:150)
at com.zendesk.maxwell.util.RunLoopProcess.runLoop(RunLoopProcess.java:27) at
com.zendesk.maxwell.Maxwell.startInner(Maxwell.java:224) at
com.zendesk.maxwell.Maxwell.start(Maxwell.java:156) at
com.zendesk.maxwell.Maxwell.main(Maxwell.java:245)

 原因:创建账号后,会在数据库中创建一个database,其中包含:bootstrap、columns、databases、heartbeats、positions、schemas、tables几张表,查找原因是因为positions和schemas中记录的binlog-partition不一致。

 

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:637538335
关注微信