前言
最近在忙一个高考项目,看着系统顺利完成了这次高考,终于可以松口气了。看到那些即将参加高考的学生,也想起当年高三的自己。
下面分享下RabbitMQ实战经验,希望对大家有所帮助:
一、生产消息
关于RabbitMQ的基础使用,这里不再介绍了,项目中使用的是Exchange中的topic模式。
先上发消息的代码
private bool MarkErrorSend(string[] lstMsg) { try { var factory = new
ConnectionFactory() { UserName= "guest",//用户名 Password = "guest",//密码 HostName =
"localhost",//ConfigurationManager.AppSettings["sHostName"], }; //创建连接 var
connection = factory.CreateConnection(); //创建通道 var channel =
connection.CreateModel();try { //定义一个Direct类型交换机 channel.ExchangeDeclare(
exchange:"TestTopicChange", //exchange名称 type: ExchangeType.Topic, //
Topic模式,采用路由匹配 durable: true,//exchange持久化 autoDelete: false,//是否自动删除,一般设成false
arguments:null//一些结构化参数,比如:alternate-exchange ); //定义测试队列
channel.QueueDeclare( queue:"Test_Queue", //队列名称 durable: true, //
队列磁盘持久化(要和消息持久化一起使用才有效) exclusive: false,//
是否排他的,false。如果一个队列声明为排他队列,该队列首次声明它的连接可见,并在连接断开时自动删除 autoDelete: false,//
是否自动删除,一般设成false arguments: null ); //将队列绑定到交换机 string routeKey = "
TestRouteKey.*";//*匹配一个单词 channel.QueueBind( queue: "Test_Queue", exchange: "
TestTopicChange", routingKey: routeKey, arguments: null ); //
消息磁盘持久化,把DeliveryMode设成2(要和队列持久化一起使用才有效) IBasicProperties properties =
channel.CreateBasicProperties(); properties.DeliveryMode= 2;
channel.ConfirmSelect();//发送确认机制 foreach (var itemMsg in lstMsg) { byte[]
sendBytes = Encoding.UTF8.GetBytes(itemMsg); //发布消息 channel.BasicPublish(
exchange:"TestTopicChange", routingKey: "TestRouteKey.one", basicProperties:
properties, body: sendBytes ); }bool isAllPublished = channel.WaitForConfirms();
//通道(channel)里所有消息均发送才返回true return isAllPublished; } catch (Exception ex) { //
写错误日志 return false; } finally { channel.Close(); connection.Close(); } } catch {
//RabbitMQ.Client.Exceptions.BrokerUnreachableException: //When the configured
hostname was not reachable. return false; } }
发消息没啥特别的。关于消息持久化的介绍这里也不再介绍,不懂的可以看上篇文章
<https://www.cnblogs.com/FireworksEasyCool/p/10330225.html>
。发消息需要注意的地方是,可以选择多条消息一起发送,最后才确定消息发送成功,这样效率比较高;此外,需要尽量精简每条消息的长度(楼主在这里吃过亏),不然会因消息过长从而增加发送时间。在实际项目中一次发了4万多条数据没有出现问题。
二、接收消息
接下来说下消费消息的过程,我使用的是单个连接多个channel,每个channel每次只取一条消息方法。有人会问单个TCP连接,多个channel会不会影响通信效率。这个理论上肯定会有影响的,看影响大不大而已。我开的channel数一般去到30左右,并没有觉得影响效率,有可能是因为我每个channel是拿一条消息的原因。通过单个连接多个channel的方法,可以少开了很多连接。至于我为什么选每个channel每次只取一条消息,这是外界因素限制了,具体看自己需求。
接下接收消息的过程,首先定义一个RabbitMQHelper类,里面有个全局的conn连接变量,此外还有创建连接、关闭连接和验证连接是否打开等方法。程序运行一个定时器,当
检测到连接未打开的情况下,主动创建连接处理消息。
public class RabbitMQHelper { public IConnection conn = null; /// <summary> ///
创建RabbitMQ消息中间件连接/// </summary> /// <returns>返回连接对象</returns> public
IConnection RabbitConnection(string sHostName, ushort nChannelMax) { try { if
(conn ==null) { var factory = new ConnectionFactory() { UserName = "guest",//用户名
Password ="guest",//密码 HostName = sHostName,//
ConfigurationManager.AppSettings["MQIP"], AutomaticRecoveryEnabled = false,//
取消自动重连,改用定时器定时检测连接是否存在 RequestedConnectionTimeout = 10000,//请求超时时间设成10秒,默认的为30秒
RequestedChannelMax = nChannelMax//与开的线程数保持一致 }; //创建连接 conn =
factory.CreateConnection(); Console.WriteLine("RabbitMQ连接已创建!"); } return conn;
}catch { Console.WriteLine("创建连接失败,请检查RabbitMQ是否正常运行!"); return null; } } ///
<summary> /// 关闭RabbitMQ连接 /// </summary> public void Close() { try { if (conn
!=null) { if (conn.IsOpen) conn.Close(); conn = null; Console.WriteLine("
RabbitMQ连接已关闭!"); } } catch { } } /// <summary> /// 判断RabbitMQ连接是否打开 ///
</summary> /// <returns></returns> public bool IsOpen() { try { if (conn != null
) {if (conn.IsOpen) return true; } return false; } catch { return false; } } }
接下来我们看具体如何接收消息。
private static AutoResetEvent myEvent = new AutoResetEvent(false); private
RabbitMQHelper rabbit =new RabbitMQHelper(); private ushort nChannel = 10;//
一个连接的最大通道数和所开的线程数一致
首先初始化一个rabbit实例,然后通过RabbitConnection方法创建RabbitMQ连接。
当连接打开时候,用线程池运行接收消息的方法。注意了,这里开的线程必须和开的channel数量一致,不然会有问题(具体问题是,设了RabbitMQ连接超时时间为10秒,有时候不管用,原因未查明。RabbitMQ创建连接默认超时时间为30秒,假如在这个时间内再去调用创建的话,就有可能得到两倍的channel;)
/// <summary> /// 单个RabbitMQ连接开多个线程,每个线程开一个channel接受消息 /// </summary> private
void CreateConnecttion() { try { rabbit.RabbitConnection("localhost", nChannel);
if (rabbit.conn != null) { ThreadPool.SetMinThreads(1, 1);
ThreadPool.SetMaxThreads(100, 100); for (int i = 1; i <= nChannel; i++) {
ThreadPool.QueueUserWorkItem(new WaitCallback(ReceiveMsg), ""); }
myEvent.WaitOne();//等待所有线程工作完成后,才能关闭连接 rabbit.Close(); } } catch (Exception
ex) { rabbit.Close(); Console.WriteLine(ex.Message); } }
接着就是接收消息的方法,处理消息的过程省略了。
/// <summary> /// 接收并处理消息,在一个连接中创建多个通道(channel),避免创建多个连接 /// </summary> ///
<param name="con">RabbitMQ连接</param> private void ReceiveMsg(object obj) {
IModel channel= null; try { #region 创建通道,定义中转站和队列 channel =
rabbit.conn.CreateModel(); channel.ExchangeDeclare( exchange:"TestTopicChange",
//exchange名称 type: ExchangeType.Topic, //Topic模式,采用路由匹配 durable: true,//
exchange持久化 autoDelete: false,//是否自动删除,一般设成false arguments: null//
一些结构化参数,比如:alternate-exchange ); //定义阅卷队列 channel.QueueDeclare( queue: "
Test_Queue", //队列名称 durable: true, //队列磁盘持久化(要和消息持久化一起使用才有效) exclusive: false,//
是否排他的,false。如果一个队列声明为排他队列,该队列首次声明它的连接可见,并在连接断开时自动删除 autoDelete: false,
arguments:null ); #endregion channel.BasicQos(0, 1, false);//每次只接收一条消息
channel.QueueBind(queue:"Test_Queue", exchange: "TestTopicChange", routingKey: "
TestRouteKey.*"); var consumer = new EventingBasicConsumer(channel);
consumer.Received+= (model, ea) => { var body = ea.Body; var message =
Encoding.UTF8.GetString(body);var routingKey = ea.RoutingKey; //处理消息方法 try {
bool isMark = AutoMark(message); if (isMark) { //Function.writeMarkLog(message);
//确认该消息已被消费,发消息给RabbitMQ队列 channel.BasicAck(ea.DeliveryTag, false); } else { if
(MarkErrorSend(message))//把错误消息推到错误消息队列 channel.BasicReject(ea.DeliveryTag,
false); else //消费消息失败,拒绝此消息,重回队列,让它可以继续发送到其他消费者
channel.BasicReject(ea.DeliveryTag,true); } } catch (Exception ex) { try {
Console.WriteLine(ex.Message);if (channel != null && channel.IsOpen)//
处理RabbitMQ停止重启而自动评阅崩溃的问题 { //消费消息失败,拒绝此消息,重回队列,让它可以继续发送到其他消费者
channel.BasicReject(ea.DeliveryTag,true); } } catch { } } }; //手动确认消息
channel.BasicConsume(queue:"Test_Queue", autoAck: false, consumer: consumer); }
catch (Exception ex) { try { Console.WriteLine("接收消息方法出错:" + ex.Message); if
(channel !=null && channel.IsOpen)//关闭通道 channel.Close(); if (rabbit.conn !=
null)//处理RabbitMQ突然停止的问题 rabbit.Close(); } catch { } } }
三、处理错误消息
把处理失败的消息放到“错误队列”,然后把原队列的消息删除(这里主要解决问题是,存在多个处理失败或处理不了的消息时,如果把这些消息都放回原队列,它们会继续分发到其他线程的channel,但结果还是处理不了,就会造成一个死循环,导致后面的消息无法处理)。把第一次处理不了的消息放到“错误队列”后,重新再开一个新的连接去处理“错误队列”的消息。
/// <summary> /// 把处理错误的消息发送到“错误消息队列” /// </summary> /// <param
name="msg"></param> /// <returns></returns> private bool MarkErrorSend(string
msg) { RabbitMQHelper MQ= new RabbitMQHelper(); MQ.RabbitConnection("localhost",
1); //创建通道 var channel = MQ.conn.CreateModel(); try { //定义一个Direct类型交换机
channel.ExchangeDeclare( exchange:"ErrorTopicChange", //exchange名称 type:
ExchangeType.Topic,//Topic模式,采用路由匹配 durable: true,//exchange持久化 autoDelete:
false,//是否自动删除,一般设成false arguments: null//一些结构化参数,比如:alternate-exchange ); //
定义阅卷队列 channel.QueueDeclare( queue: "Error_Queue", //队列名称 durable: true, //
队列磁盘持久化(要和消息持久化一起使用才有效) exclusive: false,//
是否排他的,false。如果一个队列声明为排他队列,该队列首次声明它的连接可见,并在连接断开时自动删除 autoDelete: false,//
是否自动删除,一般设成false arguments: null ); //将队列绑定到交换机 string routeKey = "
ErrorRouteKey.*";//*匹配一个单词 channel.QueueBind( queue: "Error_Queue", exchange: "
ErrorTopicChange", routingKey: routeKey, arguments: null ); //
消息磁盘持久化,把DeliveryMode设成2(要和队列持久化一起使用才有效) IBasicProperties properties =
channel.CreateBasicProperties(); properties.DeliveryMode= 2;
channel.ConfirmSelect();//发送确认机制 byte[] sendBytes = Encoding.UTF8.GetBytes(msg);
//发布消息 channel.BasicPublish( exchange: "ErrorTopicChange", routingKey: "
ErrorRouteKey.one", basicProperties: properties, body: sendBytes ); bool
isAllPublished = channel.WaitForConfirms();//通道(channel)里所有消息均发送才返回true return
isAllPublished; }catch (Exception ex) { //写错误日志 return false; } finally {
channel.Close(); MQ.conn.Close(); } }
总结:RabbitMQ本身已经很稳定了,而且性能也很好,所有不稳定的因素都在我们处理消息的过程,所以可以放心使用。
Demo源码地址:https://github.com/Bingjian-Zhu/RabbitMQHelper
热门工具 换一换