1. 引言
 
 事务大家都知道,就是相当于一个原子操作,要么全部执行,要么发生异常全部回滚。但事务只限于本地事务,即各个数据库操作必须在同一数据库下执行。拿我最近的接手的项目来说,各个模块全部部署于不同的服务器,都有自己独立的数据库。前端想要删除一个用户,先调用用户平台的删除用户接口,再调用权限平台的删除权限接口。起初觉得这样操作没什么问题,后来有几次数据异常后,发现有的用户信息没有,但权限信息还存在,导致数据不一致。此时,就想到了用分布式事物来解决。所谓分布式事物,我个人理解是为了解决数据一致性的问题。
2. kafka+本地事物表解决分布式事务
  
消息队列的产生是为了解决各系统间通信问题,因为Kafka用的比较多,此处就想到用Kafka+本地事物表去解决分布式事务问题。关于Kafka+zookeeper的搭建此处不做详解。
 
  上图是自己基于Kafka+本地事物表实现的基本流程(图自己画的,可能不太清楚)代码后文贴出,(上图箭头只代表流程,和下文的1.2.3无关)此处讲一下自己的思路。先申明,kafka只能保证最终一致性,并不是强一致性。我们最终目的是保证上图2个蓝色方块的任务执行。方便说明,假定2个系统A,B 
分别对应的2个数据库A库和B库。其中A库中的事务表叫做A事务表,B库中的事务表叫做B事务表。要执行的蓝色方块叫A业务和B业务。
  1. 在A系统中,启用A库的事物,执行如下2步操作。
    1)A系统执行A业务
    2)A系统在A库的A事物表中写一条状态为NEW的数据(此处数据的ID唯一)
    此处启用A库的事务,即2步操作要木全部执行,要木不执行。
  2. 
A系统中启用一个定时任务,5s中执行一次,轮训A库的A事物表,看是否有状态为NEW的数据,如果有,将此记录发送到Kafka消息队列中,并修改此条数据的状态为Published。此时A系统的操作全部执行完毕。
  3.
 B系统启用进程拉取kafka数据,如果发现有从A系统来的数据,将此数据记录到B系统的B事务表中,更新此数据在B系统的B事务表状态为NEW(因为ID唯一,此条数据的ID和存放在A库中的数据的ID相同,如果出现网络异常导致B系统重复收到数据,但看到自己库中已有此ID的数据,便会将重复消息弃用,此处是保证只执行一次),更新完成后,Kafka确认提交(此处要关闭Kafka的自动提交)
  4. B系统启用定时任务,5s执行一次,轮训B库的B事物表,看是否有状态为NEW的数据,如果有,执行如下2步操作。
    1)B系统执行B业务
    2)B系统更新B库的B事物表,将此条状态为New的数据改为状态为Published
    此处启用B库的事务,即2步操作要木全部执行,要木不执行。
3. 实现代码 
  相对于Kafka来说,A系统相当于消息生产者,B系统相当于消息消费者。下面为SQL建表语句。
-- A系统事务表 CREATE TABLE `kafka_event_publish` ( `id` bigint(20) unsigned NOT 
NULL AUTO_INCREMENT, `payload` varchar(2000) NOT NULL, `eventType` varchar(30) 
NOT NULL, `status` varchar(30) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB 
AUTO_INCREMENT=5 DEFAULT CHARSET=utf8; -- B系统事务表 CREATE TABLE 
`kafka_event_process` ( `id`bigint(20) unsigned NOT NULL AUTO_INCREMENT, 
`payload`varchar(2000) NOT NULL, `eventType` varchar(30) NOT NULL, `status` 
varchar(30) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=6 
DEFAULT CHARSET=utf8; 
 
  Kafka用来发送消息,接收消息,下面为Kafka的配置类。
package com.boot.util; // 消费者消息状态 public enum EventProcessStatus { NEW, 
PROCESSED;private EventProcessStatus() { } } 
--------------------------------------package com.boot.util; // 生产者消息状态 public 
enum EventPublishStatus { NEW, PUBLISHED; private EventPublishStatus() { } } 
---------------------------------------package com.boot.util; // Kafka主题 public 
enum EventType { USER_CREATED; private EventType() { } } package com.boot.util; 
import java.util.Arrays; import java.util.Iterator; import java.util.Properties;
import java.util.concurrent.ExecutionException; import 
java.util.function.Consumer;import 
org.apache.kafka.clients.consumer.CommitFailedException;import 
org.apache.kafka.clients.consumer.ConsumerRecord;import 
org.apache.kafka.clients.consumer.ConsumerRecords;import 
org.apache.kafka.clients.consumer.KafkaConsumer;import 
org.apache.kafka.clients.producer.KafkaProducer;import 
org.apache.kafka.clients.producer.Producer;import 
org.apache.kafka.clients.producer.ProducerRecord;// kafka工具类 public class 
KafkaUtil {private static Producer<String, String> producer; private static 
KafkaConsumer<String, String> consumer; public KafkaUtil() { } // 
Kafka发送消息,topic为主题,value为具体消息 public static void sendSync(String topic, String 
value)throws ExecutionException, InterruptedException { producer.send(new 
ProducerRecord(topic, value)).get(); }// Kafka接收消息 public static void 
consume(Consumer<String> c) { // 订阅主题为USER_CREATED的消息  
consumer.subscribe(Arrays.asList(EventType.USER_CREATED.name()));while(true) { 
ConsumerRecords<String, String> records = consumer.poll(100L); Iterator var2 = 
records.iterator();while(var2.hasNext()) { ConsumerRecord<String, String> 
record = (ConsumerRecord)var2.next(); System.out.println(record); 
c.accept(record.value()); }try { consumer.commitSync(); } catch 
(CommitFailedException var4) { System.out.println("Kafka消费者提交offset失败"); } } } 
// kafka基础配置 static { Properties producerProps = new Properties(); 
producerProps.put("bootstrap.servers", 
"10.113.56.68:9093,10.113.56.68:9094,10.113.56.68:9092"); producerProps.put(
"key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
producerProps.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer"); producer = new 
KafkaProducer(producerProps); Properties consumerProps= new Properties(); 
consumerProps.put("bootstrap.servers", 
"10.113.56.68:9093,10.113.56.68:9094,10.113.56.68:9092"); consumerProps.put(
"key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"
); consumerProps.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer"); consumerProps.put(
"group.id", "VoucherGroup"); consumerProps.put("enable.auto.commit", "false"); 
consumer= new KafkaConsumer(consumerProps); } } 
  A系统主要执行的操作有 
1)执行业务操作,2)插入New消息到数据库,3)定时任务轮训数据库为New的数据,4)发送到Kafka中,5)修改数据库消息状态为Published。此处1),2)步操作不贴代码。下面为A系统中(即生产者)代码。
import com.boot.kafka.transaction.EventPublishService; import 
org.springframework.scheduling.annotation.Scheduled;import 
org.springframework.stereotype.Component;import javax.annotation.Resource; /** 
* @Author xiabing5 * @Create 2019/8/2 10:13 * @Desc spring定时器,定时向kafka中发送事物消息 *
*/ @Component public class EventPublishSchedule { @Resource private 
EventPublishService eventPublishService;/* * 每N毫秒执行一次*/ @Scheduled(fixedRate = 
5000) private void publish() { eventPublishService.publish(); } } import 
com.boot.mapper.KafkaEventPublishMapper;import com.boot.pojo.KafkaEventPublish; 
import com.boot.util.EventPublishStatus; import com.boot.util.KafkaUtil; import 
org.springframework.stereotype.Service;import 
org.springframework.transaction.annotation.Transactional;import 
org.springframework.util.CollectionUtils;import javax.annotation.Resource; 
import java.util.*; /** * @Author xiabing5 * @Create 2019/8/2 9:34 * @Desc 
kafka解决分布式事物(消息发送端) **/ @Service public class EventPublishService { @Resource 
private KafkaEventPublishMapper eventPublishMapper; // 事务表的Mapper  
@Transactional(rollbackFor= Exception.class) public void publish() { // 
查询所有状态为NEW的事件 Map<String,Object> params = new HashMap<String,Object>(); 
params.put("status", EventPublishStatus.NEW.name()); List<KafkaEventPublish> 
eventPublishList = eventPublishMapper.selectEventPublish(params); if(!
CollectionUtils.isEmpty(eventPublishList)) {// 发送消息队列 List<Long> ids = 
sendEventPublish(eventPublishList);if (!CollectionUtils.isEmpty(ids)) { //
更新数据库状态为PUBLISHED  eventPublishMapper.updateEventStatus(ids, 
EventPublishStatus.PUBLISHED.name()); } } }/** * @Author xiabing5 * @Create 
2019/8/2 10:32 * @Desc 发送EventPublish对象集合 返回发送成功的EventPublish的ID集合 **/ private 
static List<Long> sendEventPublish(List<KafkaEventPublish> kafkaEventPublishes) 
{if(CollectionUtils.isEmpty(kafkaEventPublishes)) { return 
Collections.emptyList(); } List<Long> ids = new ArrayList<Long>(); for
(KafkaEventPublish kafkaEventPublish : kafkaEventPublishes) {try { 
KafkaUtil.sendSync(kafkaEventPublish.getEventType().name(),kafkaEventPublish.getPayload()); 
ids.add(kafkaEventPublish.getId()); System.out.println("发送kafka消息成功"); } catch 
(Exception e) { System.out.println("发送kafka消息失败 "+ kafkaEventPublish); } } 
return ids; } } 
  B系统主要执行的操作有,1)从kafka中拉取数据 ,2)将此数据放入数据库事务表,更新状态为New ,3) 
定时任务轮询状态为New的数据,执行相应业务操作,4)更新New数据状态为Complete 。下面为B系统中(即消费者)代码。
import com.boot.kafka.transaction.EventProcessService; import 
org.springframework.scheduling.annotation.Scheduled;import 
org.springframework.stereotype.Component;import javax.annotation.Resource; // 
消费者定时任务 @Component public class EventProcessSchedule { @Resource private 
EventProcessService eventProcessService; @Scheduled(fixedRate= 5000) private 
void process() { eventProcessService.process(); } } import 
com.boot.mapper.KafkaEventProcessMapper;import com.boot.pojo.KafkaEventProcess; 
import com.boot.util.EventProcessStatus; import com.boot.util.EventType; import 
com.boot.util.KafkaUtil;import 
com.google.common.util.concurrent.ThreadFactoryBuilder;import 
org.springframework.stereotype.Service;import 
org.springframework.transaction.annotation.Transactional;import 
javax.annotation.PostConstruct;import javax.annotation.Resource; import 
java.util.HashMap;import java.util.List; import java.util.Map; import 
java.util.concurrent.ExecutorService;import java.util.concurrent.Executors; 
import java.util.concurrent.ThreadFactory; import java.util.stream.Collectors; 
/** * @Author xiabing5 * @Create 2019/8/2 13:37 * @Desc 接收kafka消息service类 **/ 
@Servicepublic class EventProcessService { @Resource private 
KafkaEventProcessMapper kafkaEventProcessMapper;// 创建单一线程线程池  @PostConstruct 
public void init() { ThreadFactory threadFactory = new ThreadFactoryBuilder() 
.setNameFormat("MqMessageConsumerThread-%d") .setDaemon(true) .build(); 
ExecutorService executorService= 
Executors.newSingleThreadExecutor(threadFactory); executorService.execute(new 
MqMessageConsumerThread()); }// 自定义接收线程 private class MqMessageConsumerThread 
implements Runnable { @Override public void run() { 
KafkaUtil.consume(consumerRecord-> { KafkaEventProcess kafkaEventProcess = new 
KafkaEventProcess(); kafkaEventProcess .setPayload(consumerRecord); 
kafkaEventProcess .setEventType(EventType.USER_CREATED); kafkaEventProcess 
.setStatus(EventProcessStatus.NEW); 
kafkaEventProcessMapper.insertEventProcess(kafkaEventProcess); }); } }// 
执行业务逻辑操作 @Transactional(rollbackFor = Exception.class) public void process() { 
// 查询表中状态为new的事件 Map<String,Object> params = new HashMap<String,Object>(); 
params.put("status",EventProcessStatus.NEW.name()); List<KafkaEventProcess> 
kafkaEventProcessList = kafkaEventProcessMapper.selectEventProcess(params); for
(KafkaEventProcess kafkaEventProcess : kafkaEventProcessList) {// 执行业务操作 
System.out.println("删除你"); } List<Long> ids = 
kafkaEventProcessList.stream().map(item -> 
item.getId()).collect(Collectors.toList()); 
kafkaEventProcessMapper.updateEventStatus(ids,EventProcessStatus.PROCESSED.name()); 
} } 
  补充:此处没有贴事务表的sql语句(即Mapper.xml)无非是添加数据库记录,更新记录状态语句。此代码在我的实践中能运行。
4. 总结
  分布式问题一直是我最近比较棘手问题,如分布式锁,定时任务在集群下重复执行等。自己也是个小白,希望通过每次实践后,能总结出点东西,便于以后去遍历。
  
 
热门工具 换一换