什么是消息队列?
所谓消息队列,就是一个以队列数据结构为基础的一个真实存在的实体,如数组,redis中的队列集合等等,都可以。

为什么要使用队列?

主要原因是由于在高并发环境下,由于来不及同步处理,请求往往会发生堵塞,比如说,大量的insert,update之类的请求同时到达MySQL,直接导致无数的行锁表锁,甚至最后请求会堆积过多,从而触发too
many connections错误。通过使用消息队列,我们可以异步处理请求,从而缓解系统的压力。

比如说点赞这个功能,这个在高并发的情况下,很容易造成数据库连接数占满,到时整个网站响应缓慢,才是就是想到要解决数据库的压力问题,一般就是两种方案,一是提高数据库本身的能力(如增加连接数,读写分离等),但是数据库总是有极限的,到达了极限是没有办法在提升了的,此时就要考虑第二种方案,释放数据库的压力,将压力转移到缓存里面。就拿实际的点赞来说吧,用户的点赞请求到来,我只是将点赞请求投递到消息队列里面,后续的点赞请求可以将消息合并,即只更新点赞数,不产生新的任务,此时有个进行再不断的轮训消息队列,将点赞消息消耗,并将值更新到数据库里面,这样就有效的降低了数据库的压力,因为在缓存层将数个数据库更新请求合并成一个,大大提高了效率,降低了负载。

Redis实现的消息队列
Redis提供了两种方式来作消息队列。 生产者消费模式和发布订阅者模式。 
生产者消费模式会让一个或者多个客户端监听消息队列,一旦消息到达,消费者马上消费,谁先抢到算谁的,如果队列里没有消息,则消费者继续监听。
其实在生产者消费模式中生产者是一堆线程,消费者是另一堆线程,内存缓冲区可以使用List数组队列,数据类型只需要定义一个简单的类就好。关键是如何处理多线程之间的协作。

发布订阅者模式也是一个或多个客户端订阅消息频道,只要发布者发布消息,所有订阅者都能收到消息,订阅者都是平等的。

这里使用的是生产者消费模式。

基于Redis的消息队列实现的异步操作原理图如下: 







具体的实现代码如下:

1.首先定义事件的类型,使用枚举类,便于取出各种事件
package com.springboot.springboot.async; /** * @author WilsonSong * @date
2018/6/3 * 枚举类,就是事件的各种类型 */ public enum EventType { LIKE(0), COMMENT(1),
LOGIN(2),MAIL(3); private int value; EventType(int value){ this.value = value;
} public int getValue(){ return value; } }
2.定义事件的具体实现类

类里面很多的实现方法都是返回的是EventModel这个类,是为了以后点赞的时候能够链式的取出与这个事件相关的参数
package com.springboot.springboot.async; import java.util.HashMap; import
java.util.Map; /** * @author WilsonSong * @date 2018/6/3 * 不同的事件肯定是有不同的类型的 */
public class EventModel { //例如,有人评论了一个问题,那type就是评论, actorId就是谁评论的, //
entityId和entityType就是评论的是那个问题,entityOwnerId就是那个问题关联的对象 private EventType type;
//事件的类型 private int actorId; //事件的触发者 private int entityType; //触发事件的载体 private
int entityId; //和entityType组合成触发事件的载体 可以使任何一个实体的id,问题,评论,用户,站内信等等 private int
entityOwnerId;
//载体关联的对象,当我们给一个人点赞时,系统要给那个人(也就是entityOwnerId)发送一个站内信,通知那个人他被点赞了。 public
EventModel(){ } public EventModel(EventType type){ this.type = type; }
//定义可扩展的字段 private Map<String, String> exts = new HashMap<>(); public
EventModel setExts(String key, String value){ exts.put(key,value); return this;
} public String getExts(String key){ return exts.get(key); } public EventType
getType() { return type; } //为了能够实现链状的设置 public EventModel setType(EventType
type) { this.type = type; return this; //这个就是为了实现这个xxx.setType().setXX(); }
public int getActorId() { return actorId; } public EventModel setActorId(int
actorId) { this.actorId = actorId; return this; } public int getEntityType() {
return entityType; } public EventModel setEntityType(int entityType) {
this.entityType = entityType; return this; } public int getEntityId() { return
entityId; } public EventModel setEntityId(int entityId) { this.entityId =
entityId; return this; } public int getEntityOwnerId() { return entityOwnerId;
} public EventModel setEntityOwnerId(int entityOwnerId) { this.entityOwnerId =
entityOwnerId; return this; } public Map<String, String> getExts() { return
exts; } public EventModel setExts(Map<String, String> exts) { this.exts = exts;
return this; } }
3.EventProducer的实现--生产者,作用是把事件分发到队列中
package com.springboot.springboot.async; import
com.alibaba.fastjson.JSONObject; import
com.springboot.springboot.utils.JedisAdapter; import
com.springboot.springboot.utils.RedisKeyUtil; import
org.springframework.beans.factory.annotation.Autowired; import
org.springframework.stereotype.Service; /** * @author WilsonSong * @date
2018/6/3 * 事件的入口,用来统一分发事件,就是在队列中插入 */ @Service public class EventProducer {
@Autowired JedisAdapter jedisAdapter; //把事件分发出去 EventProducer public boolean
fireEvent(EventModel eventModel){ try{ //序列化,将EventModel 转换WieJSON的字符串 String
json = JSONObject.toJSONString(eventModel); String key =
RedisKeyUtil.getEventQueueKey(); jedisAdapter.lpush(key, json); return true;
}catch (Exception e){ return false; } } //事件的取出与消费 }
4、Redis的统一封装  --队列

因为这里是基于Redis的队列实现异步操作,需要对Redis的一些函数重新封装,并与redis缓存进行数据交互
package com.springboot.springboot.utils; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import
com.springboot.springboot.model.User; import org.slf4j.Logger; import
org.slf4j.LoggerFactory; import
org.springframework.beans.factory.InitializingBean; import
org.springframework.stereotype.Service; import
redis.clients.jedis.BinaryClient; import redis.clients.jedis.Jedis; import
redis.clients.jedis.JedisPool; import redis.clients.jedis.Tuple; import
java.util.List; /** * @author WilsonSong * @date 2018/6/1 */ @Service public
class JedisAdapter implements InitializingBean { private static final Logger
logger = LoggerFactory.getLogger(JedisAdapter.class); private JedisPool pool;
public static void print(int index, Object object) {
System.out.println(String.format("%d, %s", index, object.toString())); }
@Override public void afterPropertiesSet() throws Exception { pool = new
JedisPool("redis://localhost:6379/10"); } //增加 public long sadd(String key,
String value) { Jedis jedis = null; try { jedis = pool.getResource(); return
jedis.sadd(key, value); } catch (Exception e) { logger.error("Redis添加数据异常" +
e.getMessage()); } finally { if (jedis != null) { jedis.close(); } } return 0;
} public long srem(String key, String value){ Jedis jedis = null; try{ jedis =
pool.getResource(); return jedis.srem(key,value); }catch (Exception e){
logger.error("Redis删除数据异常"); }finally { if (jedis != null){ jedis.close(); } }
return 0; } //查询数量 public long scard(String key){ Jedis jedis = null; try{
jedis = pool.getResource(); return jedis.scard(key); }catch (Exception e){
logger.error("Redis统计数量异常" + e.getMessage()); }finally { if (jedis != null){
jedis.close(); } } return 0; } public boolean sismember(String key, String
value){ Jedis jedis = null; try{ jedis = pool.getResource(); return
jedis.sismember(key,value); }catch (Exception e){ logger.error("Redis查询异常" +
e.getMessage()); }finally { if (jedis != null){ jedis.close(); } } return
false; } public long lpush(String key, String value){ Jedis jedis = null; try {
jedis = pool.getResource(); return jedis.lpush(key,value); }catch (Exception
e){ logger.error("Redis队列添加异常"); }finally { if (jedis != null){ jedis.close();
} } return 0; } public List<String> brpop(int timeout, String key){ Jedis jedis
= null; try{ jedis = pool.getResource(); return jedis.brpop(timeout,key);
}catch (Exception e){ logger.error("Redis队列弹出数据异常"); }finally { if (jedis !=
null){ jedis.close(); } } return null; }
因为Redis是key--value的模式,每一个事件都应该有与其对应的key,为了统一管理并且不产生混淆,定义统一的key的生成
package com.springboot.springboot.utils; /** * @author WilsonSong * @date
2018/6/2 * 为了防止生成的key有冲突 */ public class RedisKeyUtil { private static String
SPLIT = ":"; private static String BIZ_LIKE = "LIKE"; private static String
BIZ_DISLIKE = "DISLIKE"; private static String BIZ_EVENTQUEUE = "EVENTQUEUE";
//获取点赞的key public static String getLikeKey(int entityType, int entityId){
return BIZ_LIKE + SPLIT + String.valueOf(entityType) + SPLIT
+String.valueOf(entityId); } //获取点踩的key public static String getDislikeKey(int
entityType, int entityId){ return BIZ_DISLIKE +SPLIT +
String.valueOf(entityType) + SPLIT + String.valueOf(entityId); } public static
String getEventQueueKey(){ return BIZ_EVENTQUEUE; } }
5.EventHandler接口

在消费者与事件之间写一个handler的接口,实现Consumer和handler之间的交互,因为消费者就是找到哪些EventHandler对当前的事件感兴趣
package com.springboot.springboot.async; import java.util.List; /** * @author
WilsonSong * @date 2018/6/3 * 用来处理事件的,谁关心这个事件,谁来做这个事件 */ public interface
EventHandler { void doHander(EventModel model); //谁来处理事件 List<EventType>
getSupportEventTypes(); //有哪些关心这些事件的 }
6. EventConsumer的实现---消费者

    创建一个类型为Map<EventType,
List<EventHandler>>的map,用于存放所有的Handler,然后将所有的事件注册到config中,即通过applicationContext获取实现了EventHandler接口的全部Handler。
   
启动线程去不断的去队列中查询事件并用brpop把事件拉出来,通过序列化和反序列化将取出的JSON转化为EventModel,寻找是否有能处理EventModel的Handler,调用每一个对该事件感兴趣的EventType的doHandle方法去处理事件

package com.springboot.springboot.async; import com.alibaba.fastjson.JSON;
import com.springboot.springboot.utils.JedisAdapter; import
com.springboot.springboot.utils.RedisKeyUtil; import org.slf4j.Logger; import
org.slf4j.LoggerFactory; import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean; import
org.springframework.beans.factory.annotation.Autowired; import
org.springframework.context.ApplicationContext; import
org.springframework.context.ApplicationContextAware; import
org.springframework.stereotype.Service; import java.util.ArrayList; import
java.util.HashMap; import java.util.List; import java.util.Map; /** * @author
WilsonSong * @date 2018/6/3 * 处理队列中的事件并与各个handler沟通 *
InitializingBean接口的作用在spring 初始化后,执行完所有属性设置方法(即setXxx)将 * 自动调用
afterPropertiesSet(), 在配置文件中无须特别的配置 */ @Service public class EventConsumer
implements InitializingBean,ApplicationContextAware{ private static final
Logger logger = LoggerFactory.getLogger(EventConsumer.class); private
Map<EventType, List<EventHandler>> config = new HashMap<>(); private
ApplicationContext applicationContext; //sping的上下文 @Autowired JedisAdapter
jedisAdapter; //这个方法将在所有的属性被初始化后调用 @Override public void afterPropertiesSet()
throws Exception { //获取现在有多少个eventHandler初始化了 Map<String, EventHandler> beans =
applicationContext.getBeansOfType(EventHandler.class); if (beans != null){ for
(Map.Entry<String,EventHandler> entry : beans.entrySet()){ List<EventType>
eventTypes = entry.getValue().getSupportEventTypes(); //找到那些handler对当前的事件感兴趣
for (EventType type : eventTypes){ if (!config.containsKey(type)){
//有可能是第一次注册这个事件,所以就可能初始的时候是null //把handler放到config中 config.put(type, new
ArrayList<EventHandler>()); //把event注册到config中 }
config.get(type).add(entry.getValue()); //把对这些event感兴趣的handler添加到config中 } } }
//打开线程去找队列中的事件 Thread thread = new Thread(new Runnable() { @Override public
void run() { while (true){ //一直取 String key = RedisKeyUtil.getEventQueueKey();
List<String> events = jedisAdapter.brpop(0,key); //若队列中没有这个事件的话就一直等待 for
(String message : events){ if (message.equals(key)){
//返回的第一个值可能是key,把他先过滤掉,取后面的event continue; } //通过JSon的方式反序列化 EventModel
eventModel = JSON.parseObject(message,EventModel.class); if
(!config.containsKey(eventModel.getType())){ //是不是有对这个事件有处理的handler
logger.error("不能识别的事件"); continue; } for (EventHandler handler :
config.get(eventModel.getType())){ handler.doHander(eventModel); } } } } });
thread.start(); } //将config中所有的配置的接口 @Override public void
setApplicationContext(ApplicationContext applicationContext) throws
BeansException { this.applicationContext = applicationContext; } }
7.处理具体事件的具体的XXXhandler

例如这里写的点赞的handler
package com.springboot.springboot.async.handler; import
com.springboot.springboot.async.EventHandler; import
com.springboot.springboot.async.EventModel; import
com.springboot.springboot.async.EventType; import
com.springboot.springboot.model.Message; import
com.springboot.springboot.model.User; import
com.springboot.springboot.service.MessageService; import
com.springboot.springboot.service.userService; import
com.springboot.springboot.utils.WendaUtil; import
org.springframework.beans.factory.annotation.Autowired; import
org.springframework.stereotype.Component; import java.util.Arrays; import
java.util.Date; import java.util.List; /** * @author WilsonSong * @date
2018/6/4 * 处理点赞事件的handler */ @Component //就是把普通的对象在spring容器中初始化 public class
LikeHandler implements EventHandler { @Autowired MessageService messageService;
@Autowired userService uService; @Override public void doHander(EventModel
model) { Message message = new Message();
message.setFrom_id(WendaUtil.SYSTEMCONTROLLER_userId); //以系统管理员的额身份给你发消息说谁给你点了赞
message.setTo_id(model.getEntityOwnerId()); //发给谁,就是那个entity拥有者的id
message.setCreated_date(new Date()); User user =
uService.getUser(model.getActorId()); //触发这个事件的用户id message.setContent("用户" +
user.getName() + "赞了你的评论,http://127.0.0.1:8080/question" +
model.getExts("questionId"));
message.setConversationId(message.getConversationId());
messageService.addMessage(message); } @Override public List<EventType>
getSupportEventTypes() { return Arrays.asList(EventType.LIKE); //只需要返回点赞的事件即可 }
}

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:637538335
关注微信