引言

  熟悉TPL Dataflow <https://www.cnblogs.com/JulianHuang/p/11177766.html>
博文的朋友可能记得这是个单体程序,使用TPL Dataflow 处理工作流任务, 在使用Docker部署的过程中, 有一个问题一直无法回避:

       在单体程序部署的瞬间(服务不可用)会有少量流量无法处理;更糟糕的情况下,迭代部署的这个版本有问题,上线后无法运作, 更多的流量没有得到处理。

      背负神圣使命(巨大压力)的程序猿心生一计, 为何不将单体程序改成分布式:服务A只接受数据,服务B只处理数据。

 

知识储备

    消息队列和订阅发布作为老生常谈的两个知识点被反复提及,按照JMS的规范, 官方称为点对点(point to point, queue) 和
订阅发布(publish/subscribe,topic ),

点对点

  消息生产者生产消息发送到queue中,然后消费者从queue中取出并且消费消息。

注意:

消息被消费以后,queue中不再有存储,所以消费者不可能消费到已经被消费的消息。

Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

当没有消费者可用时,这个消息会被保存直到有 一个可用的消费者。

发布/订阅

  消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。

注意:

发布者将消息发布到通道中,而不用知道订阅者是谁(不关注是否存在订阅者);订阅者可收听自己感兴趣的多个通道, 也不需要知道发布者是谁(不关注是哪个发布者)。

故如果没有消费者,发布的消息将得不到处理;



头脑风暴 

本次采用的消息队列模型:

*    解耦业务:  新建Receiver程序作为生产者,专注于接收并发送到队列;原有的webapp作为消费者专注数据处理。
*    起到削峰填谷的作用, 若建立多个消费者webapp容器,还能形成负载均衡的效果。 
Redis 原生支持发布/订阅 <https://redis.io/commands/publish>模型,内置的List数据结构
<https://redis.io/commands/lpush>亦能形成轻量级MQ的效果。

    需要关注Redis 两个命令( 左进右出,右进左出同理):

    LPUSH  &  RPOP/BRPOP <https://redis.io/commands/brpop>

Brpop 中的B 表示 “Block”,
是一个rpop命令的阻塞版本:若指定List没有新元素,在给定时间内,该命令会阻塞当前redis客户端连接,直到超时返回nil

编程实践

本次使用 ASPNetCore 完成RedisMQ的实践,引入Redis国产第三方开源库CSRedisCore.

不使用著名的StackExchange.Redis 组件库的原因:

*
之前一直使用StackExchange.Redis, 参考了很多资料,做了很多优化,并未完全解决RedisTimeoutException问题 

*
StackExchange.Redis基于其多路复用的连接机制,不支持阻塞式命令
<https://stackexchange.github.io/StackExchange.Redis/PipelinesMultiplexers>,
故采用了 CSRedisCore,该库强调了API 与Redis官方命令一致,很容易上手

生产者Receiver

 生产者使用LPush 命令向Redis List数据结构写入消息。
------------------截取自Startup.cs-------------------------
public void ConfigureServices(IServiceCollection services)
{
  // Redis客户端要定义成单例, 不然在大流量并发收数的时候, 会造成redis client来不及释放。另一方面也确认api控制器不是单例模式,
  var csredis = new
CSRedisClient(Configuration.GetConnectionString("redis")+",name=receiver");
  RedisHelper.Initialization(csredis);
  services.AddSingleton(csredis);
services.AddMvc();
}
------------------截取自数据接收Controller------------------- [Route("batch")]
[HttpPost]public async Task BatchPutEqidAndProfileIds([FromBody]List<EqidPair>
eqidPairs) {   if (!ModelState.IsValid)   throw new ArgumentException("Http
Body Payload Error.");   var redisKey = $"{DateTime.Now.ToString("yyyyMMdd")}";
eqidPairs= await EqidExtractor.EqidExtractAsync(eqidPairs); if (eqidPairs !=
null && eqidPairs.Any())     RedisHelper.LPush(redisKey, eqidPairs.ToArray());
    await Task.CompletedTask; }
 消费者webapp

     根据以上RedisMQ思路,事件消费方式是拉取pull,故需要轮询Redis  List数据结构,这里使用ASPNetCore内置的
BackgroundService后台服务类实现后台轮询消费任务。
public class BackgroundJob : BackgroundService { private readonly
IEqidPairHandler _eqidPairHandler;private readonly CSRedisClient[]
_cSRedisClients;private readonly IConfiguration _conf; private readonly ILogger
_logger;public BackgroundJob(IEqidPairHandler eqidPairHandler, CSRedisClient[]
csRedisClients,IConfiguration conf,ILoggerFactory loggerFactory) {
_eqidPairHandler= eqidPairHandler; _cSRedisClients = csRedisClients; _conf =
conf; _logger= loggerFactory.CreateLogger(nameof(BackgroundJob)); } protected
override async Task ExecuteAsync(CancellationToken stoppingToken) {
_logger.LogInformation("Service starting"); if (_cSRedisClients[0] == null) {
_cSRedisClients[0] = new CSRedisClient(_conf.GetConnectionString("redis") + "
,defaultDatabase=" + 0); } RedisHelper.Initialization(_cSRedisClients[0]); while
(!stoppingToken.IsCancellationRequested) { var key = $"
eqidpair:{DateTime.Now.ToString("yyyyMMdd")}"; var eqidpair = RedisHelper.BRPop(
5, key); if (eqidpair != null) await
_eqidPairHandler.AcceptEqidParamAsync(JsonConvert.DeserializeObject<EqidPair>
(eqidpair));// 强烈建议无论如何休眠一段时间,防止突发大流量导致webApp进程CPU满载,自行根据场景设置合理休眠时间 await
Task.Delay(10, stoppingToken); } _logger.LogInformation("Service stopping"); } }
 

最后依照引言中的部署原理图,将Nginx,Receiver, WebApp使用docker-compose工具容器化

根据docker-compsoe up <https://docs.docker.com/compose/reference/up/>
命令的用法,若容器正在运行且对应的Service Configuration或Image并未改变,该容器不会被ReCreate;

docker-compose  up  命令默认只会停止Service或Image变更的容器并重建。

If there are existing containers for a service, and the service’s
configuration or image was changed after the container’s creation, 
docker-compose up picks up the changes by stopping and recreating the
containers (preserving mounted volumes). To prevent Compose from picking up
changes, use the --no-recreate flag.

做一次上线测试验证,修改docker-compose.yml文件Web app的容器服务,docker-compose up;

仅数据处理程序WebApp容器被重建:



 Nice,分布式改造上线,效果很明显,现在可以放心安全的迭代Web App数据处理程序。
作者:JulianHuang <https://www.cnblogs.com/JulianHuang/>
码甲拙见,如有问题请下方留言大胆斧正;码字+Visio制图,均为原创,看官请不吝好评+关注,  ~。。~

本文欢迎转载,请转载页面明显位置注明原作者及原文链接。

 

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:637538335
关注微信