1 RabbitMQ简介

       RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现,官网地址:
http://www.rabbitmq.com <http://www.rabbitmq.com>
。RabbitMQ作为一个消息代理,主要负责接收、存储和转发消息,它提供了可靠的消息机制和灵活的消息路由,并支持消息集群和分布式部署,常用于应用解耦,耗时任务队列,流量削锋等场景。本系列文章将系统介绍RabbitMQ的工作机制,代码驱动和集群配置,本篇主要介绍RabbitMQ中一些基本概念,常用的RabbitMQ
Control命令,最后写一个C#驱动的简单栗子。先看一下RabbitMQ的基本结构:




  上图是RabbitMQ的一个基本结构,生产者Producer和消费者Consumer都是RabbitMQ的客户端,Producer负责发送消息,Consumer负责消费消息。

接下来我们结合这张图来理解RabbitMQ的一些概念:

  Broker(Server):接受客户端连接,实现AMQP消息队列和路由功能的进程,我们可以把Broker叫做RabbitMQ服务器。

  Virtual Host:一个虚拟概念,一个Virtual
Host里面可以有若干个Exchange和Queue,主要用于权限控制,隔离应用。如应用程序A使用VhostA,应用程序B使用VhostB,那么我们在VhostA中只存放应用程序A的exchange,queue和消息,应用程序A的用户只能访问VhostA,不能访问VhostB中的数据。

  Exchange
:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有Direct、Fanout、Topic和Header四种,不同类型的Exchange路由规则是不一样的(这些以后会详细介绍)。

  Queue:消息队列,用于存储还未被消费者消费的消息,队列是先进先出的,默认情况下先存储的消息先被处理。

  Message:就是消息,由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message
Queue接受、优先级是多少等,Body是真正传输的数据,内容格式为byte[]。

  Connection:连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。

  Channel
:信道,仅仅创建了客户端到Broker之间的连接Connection后,客户端还是不能发送消息的。需要在Connection的基础上创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令,一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的。

2 RabbitMQ安装

  因为RabbitMQ是用erlang语言开发的,所以我们在安装RabbitMQ前必须要安装erlang支持。

1 Windows平台安装

1 安装erlang

  首先下载erlang <http://www.erlang.org/downloads>,直接下载最新版本,当前下载的是 OTP 21.3 Windows
64-bit Binary File ,下载完成后一直下一步安装即可。

2 安装RabbitMQ

  下载Windows平台的RabbtMQ <https://www.rabbitmq.com/install-windows.html>,当前下载的是 
rabbitmq-server-3.7.14.exe ,下载完成后一直下一步安装即可。

3 安装Web管理插件

  打开RabbitMQ Command Prompt,执行命令 rabbitmq-plugins enable rabbitmq_management
 即可完成Web监控插件的安装。 

  安装完成后,打开浏览器输入 http://127.0.0.1:15672/ 使用默认账号[ name:guest / password:guest
]登录后界面如下,使用这个UI插件我们可以轻松的查看RabbitMQ中的交换机(exchange),队列(queue)等内容,也可以对exchange,queue,用户等进行添加、修改、删除操作。



  到这一步Windows平台安装RabbitMQ完成了。 打开服务管理器,RabbitMQ已经在正常运行了,如下:



2 Centos安装RabbitMQ

1 安装RabbitMQ

  这里虚拟机系统为Centos7,采用的安装方式是yum安装,为了简单,这里直接使用官方提供的erlang和RabbitMQ-server的自动安装脚本(
官方安装文档 <https://www.rabbitmq.com/install-rpm.html>
),逐行执行下边的代码就可以安装完成erlang和RabbitMQ。
#安装socat yum install socat #安装erlang   curl -s
https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh |
sudo bash   yum-y install erlang #安装rabbitmq-server   curl -s
https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh
| sudo bash   yum -y install rabbitmq-server #启动rabbitmq服务   systemctl start
rabbitmq-server #添加web管理插件   rabbitmq-plugins enable rabbitmq_management
补充:如果安装完成后,执行RabbitMQ执行命令特别慢,或者出现报错【rabbitmq unable to perform an operation on
node  xxx@xxx】,解决方法:

  编辑hosts,执行命令 vim /etc/hosts ,添加本机IP(或者虚拟机IP)



  命令执行结束后,使用浏览器访问 http://127.0.0.1:15672/
 也会出现web管理界面。通过上边的安装步骤安装的RabbitMQ会生成Unit文件,所以我们可以使用Systemd管理RabbitMQ服务,以下是几条常用的命令:
#启动RabbitMQ服务   systemctl start rabbitmq-server #停止RabbitMQ服务   systemctl stop
rabbitmq-server #查看RabbitMQ运行状态   systemctl status rabbitmq-server #重启RabbitMQ服务
  systemctl restart rabbitmq-server
2 RabbitMQ Control工具


  使用Web管理界面可以实现RabbitMQ的大部分常用功能,但是有些功能WebUI是做不到的,如:开启/关闭RabbitMQ应用程序和集群的管理等。RabbitMQ
Control是RabbitMQ的命令行管理工具,可以调用所有的RabbitMQ内置功能,主命令是rabbitmqctl
,下边是一个查询用户列表的命令,注意需要切换到sbin目录下执行:



  为了方便的使用RabbitMQ Control工具,我们最好添加一个环境变量,Windows默认安装时在PATH中添加一条: C:\Program
Files\RabbitMQ Server\rabbitmq_server-3.7.14\sbin
 ,不是默认安装的话找到对应的安装目录添加PATH。按照上的安装方法,Centos可以直接使用RabbitMQ
Control工具,不需要多余的配置。如果想详细了解RabbitMQ Control工具,可以参考RabbitMQ Control的官方文档
<https://www.rabbitmq.com/rabbitmqctl.8.html#cluster_status>。

  这里总结了一些最常用到的RabbitMQ Controll命令,有兴趣的小伙伴可以试着运行一下这些命令,如在命令行工具中使用命令 rabbitmqctl
add_user <username> <password>  添加一个新用户。

1 基本控制命令

  基本控制命令主要用于启动、停止应用程序、runtime等
#停止rabbitmq和runtime   rabbitmqctl shutdown #停止erlang节点   rabbitmqctl stop #
启用rabbitmq   rabbitmqctl start_app #停止rabbitmq   rabbitmqctl stop_app #查看状态   
rabbitmqctl status#查看环境   rabbitmqctl environment #
rabbitmq恢复最初状态,内部的exchange和queue都清除   rabbitmqctl reset
2 服务状态管理

  这些命令主要用于用于查看exchang、channel、binding、queue、consumers:
#返回queue的信息   list_queues [-p <vhostpath>] [<queueinfoitem> ...] #返回exchange的信息
  list_exchanges [-p <vhostpath>] [<exchangeinfoitem> ...] #返回绑定信息
  list_bindings [-p <vhostpath>] [<bindinginfoitem> ...] #返回链接信息
  list_connections [<connectioninfoitem> ...] #返回目前所有的channels   list_channels
[<channelinfoitem> ...] #返回consumers   list_consumers [-p <vhostpath>]
3 用户管理命令

  这些命令主要用于添加、修改、删除用户及管理用户权限
#在rabbitmq的内部数据库添加用户   add_user <username> <password> #删除一个用户   delete_user
<username>#改变用户密码   change_password <username> <newpassword> #清除用户密码,禁止用户登录
  clear_password <username>#设置用户tags,就是设置用户角色   set_user_tags <username> <tag> #
查看用户列表   list_users #创建一个vhost   add_vhost <vhostpath> #删除一个vhosts
  delete_vhost <vhostpath>#列出vhosts   list_vhosts [<vhostinfoitem> ...] #
针对一个vhosts 给用户赋予相关权限   set_permissions [-p <vhostpath>] <user> <conf> <write>
<read>#清除一个用户对vhost的权限   clear_permissions [-p <vhostpath>] <username> #
列出所有用户对某一vhost的权限   list_permissions [-p <vhostpath>] #列出某用户的访问权限
  list_user_permissions <username>
4 集群管理命令
#clusternode表示node名称,--ram表示node以ram node加入集群中。默认node以disc
node加入集群,在一个node加入cluster之前,必须先停止该node的rabbitmq应用,即先执行stop_app。   join_cluster
<clusternode> [--ram] #显示cluster中的所有node   cluster_status #设置集群名字
  set_cluster_name <clustername>#修改集群名字   rename_cluster_node <oldname>
<newname>#改变一个cluster中node的模式,该节点在转换前必须先停止,不能把一个集群中唯一的disk node转化为ram node
  change_cluster_node_type <disc | ram>#远程删除一个节点,删除前必须该节点必须先停止   rabbitmqctl
forget_cluster_node rabbit@rabbit1 #同步镜像队列   sync_queue <queuename> #取消同步队列
  cancel_sync_queue <queuename>#清空队列中所有消息   purge_queue [-p vhost] <queuename>
  这里列举的很多命令是现阶段用不到的,如集群控制相关的命令,这些命令的用法会在以后的章节中逐渐理解。

3  C#驱动RabbitMQ

1 一个简单的栗子

  作为开发者,我们最在意的还是怎么在代码中使用RabbitMQ,可以通过官方RabbitMQ开发文档
<https://www.rabbitmq.com/clients.html>
来学习RabbitMQ的使用,这里以.NET为例演示一下RabbitMQ的最基本用法。创建两个Console应用,一个作为发送消息的生产者(Producer),一个作为接受消息的消费者(Consumer),生产者向队列写入消息,消费者接受这条消息,结构如下:



 

  两个控制台应用都要添加RabbitMQ.Client包,命令如下:
Install-Package RabbitMQ.Client
 生产者(Producer)代码:
class Program { static void Main(string[] args) { var factory = new
ConnectionFactory() {//rabbitmq-server所在设备ip,这里就是本机 HostName = "127.0.0.1",
UserName= "wyy",//用户名 Password = "123321"//密码 }; //第一步:创建连接connection using (
var connection = factory.CreateConnection()) { //第二步:创建通道channel using (var
channel = connection.CreateModel()) { //第三步:声明交换机exchang
channel.ExchangeDeclare(exchange:"myexchange", type: ExchangeType.Direct,
durable:true, autoDelete: false, arguments: null); //第四步:声明队列queue
channel.QueueDeclare(queue:"myqueue", durable: true, exclusive: false,
autoDelete:false, arguments: null); Console.WriteLine("生产者准备就绪...."); //
第五步:绑定队列到交互机 channel.QueueBind(queue:"myqueue", exchange:"myexchange",
routingKey:"mykey"); string message = ""; //第六步:发送消息 //在控制台输入消息,按enter键发送消息
while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase)) {
message= Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); //基本发布
channel.BasicPublish(exchange:"myexchange", routingKey: "mykey",
basicProperties:null, body: body); Console.WriteLine($"消息【{message}】已发送到队列"); }
} } Console.ReadKey(); } }
 消费者(Consumer)代码: 
class Program { static void Main(string[] args) { var factory = new
ConnectionFactory() {//rabbitmq-server所在设备ip,这里就是本机 HostName = "127.0.0.1",
UserName= "wyy",//用户名 Password = "123321"//密码 }; //第一步:创建连接connection using (
var connection = factory.CreateConnection()) { //第二步:创建通道channel using (var
channel = connection.CreateModel()) { //第三步:声明队列queue
channel.QueueDeclare(queue:"myqueue", durable: true, exclusive: false,
autoDelete:false, arguments: null); //第四步:定义消费者 var consumer = new
EventingBasicConsumer(channel); consumer.Received+= (model, ea) => { var body =
ea.Body;var message = Encoding.UTF8.GetString(body); Console.WriteLine($"
接受到消息【{message}】"); }; Console.WriteLine("消费者准备就绪...."); //第五步:处理消息
channel.BasicConsume(queue:"myqueue", autoAck: true, consumer: consumer);
Console.ReadLine(); } } } } 
  依次运行Producer和Consumer两个应用程序,运行结果如下:



  注意
:上边的代码在生产者和消费者的代码中都声明了exchange和queue,这主要是为了让这两个程序可以按任意顺序启动,如:我们只在生产者代码中定义了exchange和queue,却先启动消费者,这会让造成消费者找不到自己需要的exhange和queue(出现404错误)。实际开发中创建exchange/queue、绑定队列以及设置routingKey这些工作,都可以通过WebUI管理界面或者使用Rabbitmq
Control工具完成。


  QueueDeclare方法用于声明队列,ExchangeDeclare用于声明交换机,我们在使用这两个方法声明时,可以设置队列和交换机的属性,如queue的名字,长度限制,exchange是否持久化、交换机类型等。

2 QueueDeclare方法详解

  在上边的栗子中我们使用了声明队列的方法  QueueDeclareOk QueueDeclare(string queue, bool durable,
bool exclusive, bool autoDelete, IDictionary<string, object> arguments)
 ,该方法通过参数设置队列的特性。这里介绍一下该方法 中几个参数的作用,先看一个完整的声明队列的栗子:
          //声明队列newsQueue channel.QueueDeclare(queue: "myqueue", durable:
false, exclusive: false, autoDelete: false, arguments: new Dictionary<string,
object>() { //队列中消息的过期时间是8s { "x-message-ttl",1000*8 }, //队列60s没有被使用,则删除该队列 {"
x-expires",1000*60 }, //队列最多保存100条消息 {"x-max-length",100 }, //
队列中ready类型消息总共不能超过1000字节 {"x-max-length-bytes",1000 }, //当队列消息满了时,丢弃传来后续消息 {"
x-overflow","reject-publish" }, //丢弃的消息发送到deadExchange交换机 {"
x-dead-letter-exchange","deadExchange" }, //丢弃的消息发送到deadExchange交换机时的RoutingKey
{"x-dead-letter-routing-key","deadKey" }, //
队列中最大的优先级等级为10(在Publish消息时对每条消息设置优先级) {"x-max-priority",10 }, //设置队列默认为lazy {"
x-queue-mode","lazy" } });
QueueDeclare方法的参数如下:

  queue:队列名字;

  durable
:是否持久化。设置为true时,队列信息保存在rabbitmq的内置数据库中,服务器重启时队列也会恢复(注意:重启后队列内部的消息不会恢复,怎么实现消息持久化以后会详细介绍);

  exclusive
:是否排外。设置为true时只有首次声明该队列的Connection可以访问,其他Connection不能访问该队列;且在Connection断开时,队列会被删除(即使durable设置为true也会被删除);

  autoDelete:是否自动删除。设置为true时,表示在最后一条使用该队列的连接(Connection)断开时,将自动删除这个队列;

  arguments:设置队列的一些其它属性,为Dictionary<string,object>类型,下表总结了arguments中可以设置的常用属性。

参数名                                              作用 示例
Message TTL 设置队列中消息的有效时间 { "x-message-ttl",1000*8 },设置队列中的所有消息的有效期为8s;
Auto expire 自动删除队列。一定的时间内队列没有被使用,则自动删除队列 {"x-expires",1000*60
},设置队列的过期时长为60s,如果60s没有队列被访问,则删除队列;
Max length 队列能保存消息的最大条数 {"x-max-length",100 },设置队列最多保存100条消息;
Max length bytes 队列中ready类型消息的总字节数  {"x-max-length-bytes",1000
}, 设置队列中ready类型消息总共不能超过1000字节;
Overflow behaviour  当队列消息满了时,再接收消息时的处理方法。有两种处理方案:默认为"drop-head"模式,表示从队列头部丢弃消息;"
reject-publish"表示不接收后续的消息 {"x-overflow","reject-publish" },设置当队列消息满了时,丢弃传来后续消息;
Dead letter exchange  用于存储被丢弃的消息的交换机名。Overflow behaviour
的两种处理方案中丢弃的消息都会发送到这个交换机 {"x-dead-letter-exchange","beiyongExchange"
},设置丢弃的消息发送到名字位beiyongExchange的交换机;
Dead letter routing key  被丢弃的消息发送到Dead letter exchange时的使用的routing Key
{"x-dead-letter-routing-key","deadKey"
},设置丢弃的消息发送到beiyongExchange交换机时的RoutingKey值是"deadKey";
Maximum priority  设置队列中消息优先级的最大等级,在publish消息时可以设置单条消息的优先级等级
{"x-max-priority",10 },设置中消息优先级的最大等级为10;
Lazy mode  设置队列的模式,如果设置为Lazy表示队列中消息尽可能存放在磁盘中,以减少内存占用;不设置时消息都存放在队列中,用以尽可能快的处理消息
{"x-queue-mode","lazy"},3.6以后版本可用,设置队列中消息尽可能存放在磁盘中,以减少内存占用。在消息拥堵时和消息持久化配置使用可以减少内存占用。
3 ExchangeDeclare方法详解

   声明交换机的方法 void ExchangeDeclare(string exchange, string type, bool durable,
bool autoDelete, IDictionary<string, object> arguments)
 可以设置交换机的特性,这里简单介绍一下这个方法的几个参数:
channel.ExchangeDeclare(exchange: "myexchange", type: ExchangeType.Direct,
durable:true, autoDelete: false,                arguments: new Dictionary<string
,object> {
                        {"alternate-exchange","BeiyongExchange" }//
如果消息不能路由到该交换机,就把消息路由到备用交换机BeiyongExchange上
               });
  exchange:交换机名字。

  type:交换机类型。exchange有direct、fanout、topic、header四种类型,在下一篇会详细介绍;

  durable:是否持久化。设置为true时,交换机信息保存在rabbitmq的内置数据库中,服务器重启时交换机信息也会恢复;

  autoDelete:是否自动删除。设置为true时,表示在最后一条使用该交换机的连接(Connection)断开时,自动删除这个队列;

  arguments:其他的一些参数,类型为Dictionary<string,object> 。

小结

  本节主要介绍了RabbitMQ的基本概念,在Windows和Centos上的安装方法,及RabbitMQ
Control工具的基本使用,最后演示了一个C#驱动RabbitMQ的栗子,并详细介绍了声明queue和exchange的方法。通过这一节我们大概了解了RabbitMQ的基本使用。以后的章节会逐渐介绍RabbitMQ的四种exchange、两种Consumer的特点和使用场景,以及消息确认、优先级、持久化等,最后搭建一个高可用的RabbitMQ集群,如果文中有错误的话,希望大家可以指出,我会及时修改,谢谢!

 

参考文章

【1】 https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html
<https://www.cnblogs.com/wyy1234/p/%20https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html>

【2】https://www.cnblogs.com/operationhome/p/10483840.html
<https://www.cnblogs.com/wyy1234/p/%20https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html>

 

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:637538335
关注微信