在处理实时数据时,需要即时地获得 数据库 <http://www.codercto.com/category/database.html>
表中数据的变化,然后将数据变化发送到Kafka中。这篇文章将介绍如何使用Kafka
Connector完成这一工作。当获取实时数据时,数据源需要支持对数据变化进行反馈。不同的数据源采用了不同的技术和方法实现该功能,因为我们的业务数据库是MS
SQL Server,因此这篇文章采用MSQL作为数据源。
调研
ETL之增量抽取方式:https://www.cnblogs.com/fjhh/p/5370891.html
<https://www.cnblogs.com/fjhh/p/5370891.html>
1、触发器方式
2、时间戳方式
3、全表删除插入方式
4、全表比对方式
5、日志表方式
6、系统日志分析方式
7.1 ORACLE改变数据捕获
7.2 ORACLE闪回查询方式
8、比较和分析
ODBC数据管理器 SqlServer实时数据同步到MySql
安装安装mysqlconnector
配置mysqlconnector
新建链接服务器
创建连接mysql数据库的账号及密码
建立允许远程访问连接操作
建立LOOPBACK 服务器链接
设置服务器链接选项,阻止SQL Server 由于远过程调用而将本地事务提升为分布事务(重点)
编写触发器和存储过程
最终选择Connector
首先需要选择Connector,不同的数据源有不同的Connector,例如ActiveMQ Connector、MySql Connector、MSSQL
Connector等。即便是同一数据源,也可能有不同的第三方提供。我一共尝试了下面两个MSSQL Connector:
* https://debezium.io/docs/connectors/sqlserver/
<https://debezium.io/docs/connectors/sqlserver/>
* https://docs.confluent.io/current/connect/kafka-connect-cdc-mssql/index.html
<https://docs.confluent.io/current/connect/kafka-connect-cdc-mssql/index.html>
比较遗憾的是:这两个Connector,debezium的是Alpha版本,confluent的是Preview版本,反正都不是正式版,而 MySql
<http://www.codercto.com/category/mysql.html> 都已经有正式版本了,可见开源社区对MS真的不友好呀 >_<、
它们两个一个是使用MSSQL Server的 Change Data Capture
<https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/about-change-data-capture-sql-server?view=sql-server-2017>
获取数据变更,一个是使用 Change Tracking
<https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/about-change-tracking-sql-server?view=sql-server-2017>
。
因为Change Tracking相比Change Data
Capture来说,更轻量一些,因此我选用了confluent的Connector。其下载地址是: https://www.confluent.io/hub/
<https://www.confluent.io/hub/>
安装Connector
下载后,将其解压缩至 $KAFKA_HOME/connectors 文件夹下,如下图所示:
$KAFKA_HOME是你的kafka安装目录,如果是集群,要安装在集群下每台机器的connectors目录下(注意,这个文件夹你的下面可能没有,但是你自己新建一个就行)。
配置Connector
接下来要对Connector进行配置,此时可以回顾一下 Kafka Connect 基本概念
<http://www.tracefact.net/tech/086.html>
。Connector是一组独立的集群,并且是作为Kafka集群的客户端,我们首先需要对Connector进行配置,配置文件位于
$KAFKA_HOME/config/connect-distributed.properties(说实在的,直接粘贴把你的替换掉就行):
# kafka集群地址 bootstrap.servers=hserver1:9092,hserver2:9092,hserver3:9092 #
Connector集群的名称,同一集群内的Connector需要保持此group.id一致 group.id=connect-cluster #
存储到kafka的数据格式 key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false value.converter.schemas.enable=false #
内部转换器的格式,针对offsets、config和status,一般不需要修改
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false #
用于保存offsets的topic,应该有多个partitions,并且拥有副本(replication) # Kafka
Connect会自动创建这个topic,但是你可以根据需要自行创建 #
如果kafka单机运行,replication.factor设置为1;当kafka为集群时,可以设置不大于集群中主机数 #
因为我这里的环境是3主机的集群,因此设为2 offset.storage.topic=connect-offsets
offset.storage.replication.factor=2 offset.storage.partitions=12 #
保存connector和task的配置,应该只有1个partition,并且有多个副本
config.storage.topic=connect-configs config.storage.replication.factor=2 #
用于保存状态,可以拥有多个partition和replication status.storage.topic=connect-status
status.storage.replication.factor=2 status.storage.partitions=6 # Flush much
faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000 # RESET主机名,默认为本机 #rest.host.name= # REST端口号
rest.port=18083 # The Hostname & Port that will be given out to other workers
to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name= #rest.advertised.port= # 保存connectors的路径 #
plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/home/kafka/kafka_2.12-2.0.0/connectors
注意到connect-distributed.properties中的distributed。Kafka
Connector有两种运行模式,单机(Standalone)和分布式(Distrubited)。因为单机通常作为测试运行,因此这篇文章只演示分布式运行模式。在config文件夹下,还有一个单机运行的配置文件,叫做connect-standalone.properties,内容大同小异。
创建Topic
尽管首次运行Kafka
connector时,会自动创建上面的topic,但是如果创建出错,那么Connector就会启动失败。保险起见,可以在运行Connector之前,手动创建好上面的三个特殊topic。
个人工具用的比较6:https://blog.csdn.net/singgel/article/details/83417907
<https://blog.csdn.net/singgel/article/details/83417907>(这个里面有kafka的webUI安装)
当然,命令也准备的有:
# bin/kafka-topics.sh --zookeeper hserver1:2181/kafka --create --topic
connect-offsets --replication-factor 2 --partitions 12 # bin/kafka-topics.sh
--zookeeper hserver1:2181/kafka --create --topic connect-configs
--replication-factor 2 --partitions 1 # bin/kafka-topics.sh --zookeeper
hserver1:2181/kafka --create --topic connect-status --replication-factor 2
--partitions 6
运行Connector
接下来就可以运行Connctor了,此时还没有涉及到任何业务或者数据库相关的配置和操作(即 Kafka Connect 基本概念
<http://www.tracefact.net/tech/086.html> 中提到的用户配置)。
执行下面的代码以运行Connector:
# bin/connect-distributed.sh config/connect-distributed.properties
上面这样是前台运行,当退出shell后进程也就结束了,前台运行的好处就是在开始运行时便于调试。如果想要后台运行,则需加上-daemon选项:
# bin/connect-distributed.sh -daemon config/connect-distributed.properties
运行connect时,会看到不停地涌现大量INFO信息,此时可以修改一下connect-log4j.properties,只显示WARN信息。
# vim config/connect-log4j.properties log4j.rootLogger=WARN, stdout
开启MSSQL数据库的Change Tracking
在继续进行之前,我们在数据库中创建表test_table,并且开启Change
Tracking功能(下面的db_name是数据库名,test_table是表名,这个CHANGE_TRACKING的权限是可以粒度到表上的):
Go CREATE TABLE [dbo].[test_table]( [Id] [int] IDENTITY(1,1) NOT NULL,
[UserName] [varchar](50) NOT NULL, [IsOnline] [bit] NOT NULL, [LastLogin] [int]
NOT NULL, CONSTRAINT [PK_test_table] PRIMARY KEY CLUSTERED ( [Id] ASC ) Go
ALTER DATABASE db_name SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS,
AUTO_CLEANUP = ON) Go ALTER TABLE [db_name].dbo.[test_table] ENABLE
CHANGE_TRACKING WITH (TRACK_COLUMNS_UPDATED = ON)
Kafka Connector REST API
当Kafka Connector运行起来以后,它就开启了REST
API端口,像我们上面配置的是:18083。如果我们需要运行Task,比如实时捕捉数据库数据变化并写入Kafka,那么就需要像这个REST
API提交用户配置(User Config)。在提交用户配置之前,我们先看看Kafka Connector REST API都包含哪些常见功能:
获取Worker的信息
因为我的kafka(主机名分别为hserver1、hserver2、hserver3)和kafka
connector集群是共用主机的,因此可以使用下面的命令获取(你需要将下面的hserver1改成ip或者相应的主机名):
# curl -s hserver1:18083/ | jq { "version": "1.1.0", "commit":
"fdcf75ea326b8e07", "kafka_cluster_id": "N93UISCxTS-SYZPfM8p1sQ" }
是的,要是你是可以web访问的话,http://hserver1:18083/ <http://192.168.70.4:18083/>:
获取Worker上已经安装的Connector
此时的Connector是静态概念,即上面第一节安装的Confluent MSSQL
Connector,从下面的显示可以看到,我安装了好几个Connector:
# curl -s hserver1:18083/connector-plugins | jq [ { "class":
"io.confluent.connect.cdc.mssql.MsSqlSourceConnector", "type": "source",
"version": "0.0.1.9" }, { "class":
"org.apache.kafka.connect.file.FileStreamSinkConnector", "type": "sink",
"version": "1.1.0" }, { "class":
"org.apache.kafka.connect.file.FileStreamSourceConnector", "type": "source",
"version": "1.1.0" } ]
对于你来说,可能就只有io.confluent.connect.cdc.mssql.MsSqlSourceConnector这一个connector。
列出当前运行的connector(task)
# curl -s hserver1:18083/connectors | jq []
因为我们当前Connector中没有提交过任何的用户配置(即没有启动Task),因此上面返回空数组。
提交Connector用户配置
当提交用户配置时,就会启动一个Connector Task,Connector Task执行实际的作业。用户配置是一个 Json
<http://www.codercto.com/category/json.html> 文件,同样通过REST
API提交(下面的server相关信息不要忘记换成你自己的,dabase.server.name没有的话,就使用机器IP):
# curl -s -X POST -H "Content-Type: application/json" --data '{ "name":
"connector-mssql-online", "config": { "connector.class":
"io.confluent.connect.cdc.mssql.MsSqlSourceConnector", "tasks.max": 1,
"server.name": "127.0.0.1", "server.port" : "1433", "username": "user_id",
"password": "your_password", "initial.database": "db_name",
"database.server.name": "localhost", "change.tracking.tables": "dbo.test_table"
} }' http://hserver1:18083/connectors | jq
注意上面的配置要修改成你的本地配置。提交完成后,再次执行上一小节的命令,会看到已经有一个connector在运行了,其名称为connector-mssql-online:
# curl -s hserver1:18083/connectors | jq [ "connector-mssql-online" ]
查看connector的信息
# curl -s hserver1:18083/connectors/connector-mssql-online | jq { "name":
"connector-mssql-online", "config": { "connector.class":
"io.confluent.connect.cdc.mssql.MsSqlSourceConnector", "password":
"your_password", "initial.database": "db_name", "server.name": "127.0.0.1",
"tasks.max": "1", "server.port": "1433", "name": "connector-mssql-online",
"database.server.name": "localhost", "change.tracking.tables":
"dbo.test_table", "username": "user_id" }, "tasks": [ { "connector":
"connector-mssql-online", "task": 0 } ], "type": "source" }
上面task:0,不是说有0个task,是task的id是0。
查看connector下运行的task信息
使用下面的命令,可以查看connector下运行的task的信息:
# curl -s hserver1:18083/connectors/connector-mssql-online/tasks | jq [ {
"id": { "connector": "connector-mssql-online", "task": 0 }, "config": {
"connector.class": "io.confluent.connect.cdc.mssql.MsSqlSourceConnector",
"password": "your_password", "initial.database": "db_name", "task.class":
"io.confluent.connect.cdc.mssql.MsSqlSourceTask", "server.name": "127.0.0.1",
"tasks.max": "1", "server.port": "1433", "name": "connector-mssql-online",
"database.server.name": "localhost", "change.tracking.tables":
"dbo.test_table", "username": "user_id" } } ]
这里task的配置信息继承自connector的配置。
查看connector当前状态
# curl -s hserver1:18083/connectors/connector-mssql-online/status | jq {
"name": "connector-mssql-online", "connector": { "state": "RUNNING",
"worker_id": "192.168.70.3:18083" }, "tasks": [ { "state": "RUNNING", "id": 0,
"worker_id": "192.168.70.3:18083" } ], "type": "source" }
暂停/重启 Connector
# curl -s -X PUT hserver1:18083/connectors/connector-mssql-online/pause # curl
-s -X PUT hserver1:18083/connectors/connector-mssql-online/resume
删除 Connector
# curl -s -X DELETE hserver1:18083/connectors/connector-mssql-online
从Kafka中读取变动数据
默认情况下,MSSQL Connector会将表的变动写入到:${databaseName}.${tableName}
这个topic中,这个topic的名称可以通过 topic.format
这个用户配置参数中进行设置,因为我们并没有配置,因此,topic的名称为db_name.test_online。
运行下面的控制台脚本,从Kafka中实时读取topic的内容:
# bin/kafka-console-consumer.sh --bootstrap-server hserver1:9092 --topic
db_name.test_table --from-beginning
此时因为没有任何数据,因此控制台会阻塞。
对test_table表进行修改
依次执行下面的增删改语句,对test_online表进行修改:
insert into [test_table](UserName,IsOnline,LastLogin) values('测试', 1,
DATEDIFF(s, '19700101',GETDATE())) update test_online Set UserName='tom' where
UserName='测试' Delete test_online Where UserName='tom'
现在查看Kafka读取端控制台,可以看到以 Json
<http://www.codercto.com/jiaocheng/json/json-tutorial.html> 格式实时收到了数据库变动的消息:
# bin/kafka-console-consumer.sh --bootstrap-server hserver1:9092 --topic
tgstat_ddztest.test_online
{"Id":5,"UserName":"测试","IsOnline":true,"LastLogin":1540635666,"_cdc_metadata":{"sys_change_operation":"I","sys_change_creation_version":"13","sys_change_version":"13","databaseName":"tgstat_ddztest","schemaName":"dbo","tableName":"test_online"}}
{"Id":5,"UserName":"tom","IsOnline":true,"LastLogin":1540635666,"_cdc_metadata":{"sys_change_operation":"U","sys_change_creation_version":"0","sys_change_version":"14","databaseName":"tgstat_ddztest","schemaName":"dbo","tableName":"test_online"}}
null
从上面的消息可以看到,对于delete操作,收到了null。对于insert和update操作,收到了详细的变动信息。
至此,我们就配置完了Kafka Connector,并且实时获取到了数据库变更的消息。后续可以使用Spark
Stream连接至此Topic,进行实时的数据运算和分析。以后有机会再进行掩饰。
感谢阅读,希望这篇文章能给你带来帮助!
热门工具 换一换