RabbitMQ

RabbitMQ官网

RabbitMQ is the most widely deployed open source message broker.

RabbitMQ 是部署最广泛的开源消息代理。

消息队列概述

消息队列概述可以参考视频What is RabbitMQ?和**RabbitMQ : Message Queues for beginners**。

什么是消息队列

消息(message)指的是两个应用间传递的数据。数据的类型有很多种形式,可能只包含文本字符串,也可能包含嵌入对象。

“消息队列(Message Queue)”是在消息的传输过程中保存消息的容器。在消息队列中,通常有生产者和消费者两个角色。生产者只负责发送数据到消息队列,谁从消息队列中取出数据处理,他不管。消费者只负责从消息队列中取出数据处理,他不管这是谁发送的数据。

为什么使用消息队列

主要有三个作用:

  1. 解耦。如下图所示。假设有系统B、C、D都需要系统A的数据,于是系统A调用三个方法发送数据到B、C、D。这时,系统D不需要了,那就需要在系统A把相关的代码删掉。假设这时有个新的系统E需要数据,这时系统A又要增加调用系统E的代码。为了降低这种强耦合,就可以使用MQ,系统A只需要把数据发送到MQ,其他系统如果需要数据,则从MQ中获取即可

  2. 异步。如下图所示。一个客户端请求发送进来,系统A会调用系统B、C、D三个系统,同步请求的话,响应时间就是系统A、B、C、D的总和,也就是800ms。如果使用MQ,系统A发送数据到MQ,然后就可以返回响应给客户端,不需要再等待系统B、C、D的响应,可以大大地提高性能。对于一些非必要的业务,比如发送短信,发送邮件等等,就可以采用MQ。

  3. 削峰。如下图所示。这其实是MQ一个很重要的应用。假设系统A在某一段时间请求数暴增,有5000个请求发送过来,系统A这时就会发送5000条SQL进入MySQL进行执行,MySQL对于如此庞大的请求当然处理不过来,MySQL就会崩溃,导致系统瘫痪。如果使用MQ,系统A不再是直接发送SQL到数据库,而是把数据发送到MQ,MQ短时间积压数据是可以接受的,然后由消费者每次拉取2000条进行处理,防止在请求峰值时期大量的请求直接发送到MySQL导致系统崩溃

RabbitMQ的特点

RabbitMQ是由Erlang语言开发的AMQP的开源实现。

AMQP:Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

RabbitMQ特点:

  • 可靠性(Reliablity):使用了一些机制来保证可靠性,比如持久化、传输确认、发布确认。
  • 灵活的路由(Flexible Routing):在消息进入队列之前,通过Exchange来路由消息。对于典型的路由功能,Rabbit已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,可以将多个Exchange绑定在一起,也通过插件机制实现自己的Exchange。
  • 消息集群(Clustering):多个RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
  • 高可用(Highly Avaliable Queues):队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
  • 多种协议(Multi-protocol):支持多种消息队列协议,如STOMP、MQTT等。
  • 多种语言客户端(Many Clients):几乎支持所有常用语言,比如Java.NETRubyGo等。
  • 管理界面(Management UI):提供了易用的用户界面,使得用户可以监控和管理消息Broker的许多方面。
  • 跟踪机制(Tracing):如果消息异常,RabbitMQ提供了消息的跟踪机制,使用者可以找出发生了什么。
  • 插件机制(Plugin System):提供了许多插件,来从多方面进行扩展,也可以编辑自己的插件。

常见消息队列对比

当前市面上mq的产品很多,比如RabbitMQ、Kafka、ActiveMQ、ZeroMQ和阿里巴巴捐献给Apache的RocketMQ。甚至连redis这种NoSQL都支持MQ的功能。

  • ActiveMQ:基于JMS,Apache
  • RocketMQ:(Rocket,火箭)阿里巴巴的产品,基于JMS,目前由Apache维护
  • Kafka:分布式消息系统,亮点:吞吐量超级高,每秒数十万的并发。
  • RabbitMQ:(Rabbit,兔子)由erlang语言开发,基于AMQP协议,在erlang语言特性的加持下,RabbitMQ稳定性要比其他的MQ产品好一些,而且erlang语言本身是面向高并发的编程的语言,所以RabbitMQ速度也非常快。且它基于AMQP协议,对分布式、微服务更友好。

MQ的选择:

  • 一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,不推荐使用;
  • 后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高;
  • 不过现在确实越来越多的公司会去用 RocketMQ,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险(目前 RocketMQ 已捐给 Apache,但 GitHub 上的活跃度其实不算高)对自己公司技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。
  • 所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。
  • 如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

Docker安装RabbitMQ

这里我们以安装rabbitmq 3.8-management版本为例。

带有mangement的版本包含web管理页面

首先docker pull拉取镜像:

1
docker pull rabbitmq:3.8-management

接下来docker run启动容器:

1
2
3
4
5
6
7
8
docker run -d --name rabbitmq3.8 \
-p 5672:5672 -p 15672:15672 \
-v /home/root/dockerVolumes/rabbit/rabbit3.8/data:/var/lib/rabbitmq \
--hostname myrabbit \
-e RABBITMQ_DEFAULT_VHOST=my_vhost \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
rabbitmq:3.8-management

参数说明:

  • -d:后台运行容器,并返回容器ID
  • --name rabbitmq3.8:为该容器指定名称为rabbitmq3.8
  • -p 5672:5672 -p 15672:15672:端口映射(主机(宿主)端口:容器端口)
    • 5672:应用访问端口
    • 15672:控制台Web端口号
  • -v /home/root/dockerVolumes/rabbit/rabbit3.8/data:/var/lib/rabbitmq:绑定数据卷(/宿主机绝对路径目录:/容器内目录),将容器内的/var/lib/rabbitmq挂载到宿主机的/home/root/dockerVolumes/rabbit/rabbit3.8/data文件夹下
  • --hostname myrabbit:指定主机名(RabbitMQ的一个重要注意事项是它根据所谓的节点名称存储数据,默认为主机名)
  • -e设置环境变量:
    • -e RABBITMQ_DEFAULT_VHOST=my_vhost:默认虚拟机名为my_vhost
    • -e RABBITMQ_DEFAULT_USER=admin:默认用户名为admin
    • -e RABBITMQ_DEFAULT_PASS=admin:默认用户admin的密码为admin

进入rabbit容器内:docker exec -it rabbitmq3.8 bash

运行后访问http://ip:15672/,进入rabbit的web管理界面。

Rabbitmq相关概念

“P” is our producer and “C” is our consumer. The box in the middle is a queue - a message buffer that RabbitMQ keeps on behalf of the consumer.

所有MQ产品从模型抽象来说,都是一样的过程:

  • 消费者(consumer)订阅某个队列。
  • 生产者(producter)创建消息,然后发布到队列中(queue),最终将消息发送到监听的消费者。

当然这只是最简单抽象的描述,具体到RabbitMQ则有更详细的概念需要解释。

(参考:RabbitMQ介绍 · Go语言中文文档

  • Broker:标识消息队列服务器实体。
  • Virtual Host:虚拟主机。标识一批交换机、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 /
  • Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  • Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
  • Banding:绑定,用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
  • Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟链接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过Channel完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。
  • Connection:网络连接,比如一个TCP连接。
  • Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
  • Consumer:消息的消费者,表示一个从一个消息队列中取得消息的客户端应用程序。
  • Message:消息,消息是不具名的,它是由消息头和消息体组成。消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(消息可能需要持久性存储[消息的路由模式])等。

RabbitMQ的六种工作模式

RabbitMQ的六种工作模式

具体要多看官网:RabbitMQ Tutorials

Simple

RabbitMQ tutorial - “Hello world!”

simple简单模式:

  • 生产者(producter)将消息放入队列(queen)
  • 消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉。消息被拿走后,自动从队列中删除(隐患:消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)。应用场景:聊天(中间有一个过度的服务器;p端,c端)

做simple简单模式之前首先我们新建一个Virtual Host并且给他分配一个用户名,用来隔离数据,根据自己需要自行创建。

在web管理页面上具体操作步骤参考:Simple模式

Work queues

RabbitMQ tutorial - Work Queues

Work queues(工作队列模式):

在 Work queues 工作模式中,不需要设置交换器(RabbitMQ 会使用内部默认交换器进行消息转换),需要指定唯一的消息队列进行消息传递,并且可以有多个消息消费者。在这种模式下,多个消息消费者通过轮询的方式依次接收消息队列中存储的消息,一旦消息被某一个消费者接收,消息队列会将消息移除,而接收并处理消息的消费者必须在消费完一条消息后再准备接收下一条消息。

Work queues 工作模式适用于那些较为繁重,并且可以进行拆分处理的业务,这种情况下可以分派给多个消费者轮流处理业务。

特点:

  • 一条消息只会被一个消费端接收
  • 队列采用轮询的方式将消息是平均发送给消费者的
  • 消费者在处理完某条消息后,才会收到下一条消息

消生产者将消息放入队列,消费者可以有多个:消费者1、消费者2同时监听同一个队列,C1、C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize,与同步锁的性能不一样) 保证一条消息只能被一个消费者使用)

应用场景:

  • 红包
  • 大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢)

Publish/Subscribe

RabbitMQ tutorial - Publish/Subscribe

Publish/Subscribe(发布订阅模式):

在 Publish/Subscribe 工作模式中,必须先配置一个 fanout 类型的交换器,不需要指定对应的路由键(Routing key),同时会将消息路由到每一个消息队列上,然后每个消息队列都可以对相同的消息进行接收存储,进而由各自消息队列关联的消费者进行消费。

Publish/Subscribe 工作模式适用于进行相同业务功能处理的场合。例如,用户注册成功后,需要同时发送邮件通知和短信通知,那么邮件服务消费者和短信服务消费者需要共同消费 “用户注册成功” 这一条消息。

特点:

  • 每个消费者监听自己的队列
  • 生产者将消息发给 broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

X代表交换机(rabbitMQ内部组件),生产者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费。

应用场景:

  • 邮件群发
  • 群聊天
  • 广播(广告)

Routing

RabbitMQ tutorial - Routing

Routing(路由模式):

在 Routing 工作模式中,必须先配置一个 direct 类型的交换器,并指定不同的路由键值(Routing key)将对应的消息从交换器路由到不同的消息队列进行存储,由消费者进行各自消费。

Routing 工作模式适用于进行不同类型消息分类处理的场合。例如,日志收集处理,用户可以配置不同的路由键值分别对不同级别的日志信息进行分类处理。

特点:

  • 每个消费者监听自己的队列,并且设置 routingkey
  • 生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列

生产者将消息发送给交换机,按照路由判断(路由是字符串(info) ),当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只有匹配上路由key对应的消息队列,对应的消费者才能消费消息。

特点:

  • 每个消费者监听自己的队列,并且设置 routingkey
  • 生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列

Topics

RabbitMQ tutorial - Topics

Topics(通配符模式):

在 Topics 工作模式中,必须先配置一个 topic 类型的交换器,并指定不同的路由键值(Routing key)将对应的消息从交换器路由到不同的消息队列进行存储,然后由消费者进行各自消费。

Topics 模式与 Routing 模式的主要不同在于:Topics 模式设置的路由键是包含通配符的,其中,# 匹配多个字符,* 匹配一个字符,然后与其他字符一起使用.进行连接,从而组成动态路由键,在发送消息时可以根据需求设置不同的路由键从而将消息路由到不同的消息队列。

Topics 工作模式适用于根据不同需求动态传递处理业务的场合。例如,一些订阅客户喜欢只接收邮件消息,一些订阅客户只喜欢接收短信消息,而有些订阅可以都接收,那么可以根据客户需求进行动态路由匹配,从而将订阅消息分发到不同的消息队列中。

特点:

  1. 每个消费者监听自己的队列,并且设置带统配符的 routingkey
  2. 生产者将消息发给 broker,由交换机根据 routingkey 来转发消息到指定的队列

RPC

RabbitMQ tutorial - Remote procedure call (RPC)

RPC:

RPC 工作模式与 Work queues 工作模式主体流程相似,都不需要设置交换器,需要指定唯一的消息队列进行消息传递。

RPC 模式与 Work queues 模式的主要不同在于:RPC 模式是一个回环结构,主要针对分布式架构的消息传递业务,客户端 Client 先发送消息到消息队列,远程服务端 Server 获取消息,然后再写入另一个消息队列,向原始客户端 Client 响应消息处理结果。

RPC 工作模式适用于远程服务调用的业务处理场合。例如,在分布式架构中必须考虑的分布式事务管理问题。

RabbitMQ消息流

RabbitMQ消息流

AMQP

AMQP 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。AMQP 是一个二进制协议,拥有一些现代化特点:多信道、协商式,异步,安全,扩平台,中立,高效。

RabbitMQ是AMQP协议的Erlang的实现。

概念 说明
连接 Connection 一个网络连接,比如 TCP/IP 套接字连接。
会话 Session 端点之间的命名对话。在一个会话上下文中,保证 “恰好传递一次”。
信道 Channel 多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。
客户端 Client AMQP 连接或者会话的发起者。AMQP 是非对称的,客户端生产和消费消息,服务器存储和路由这些消息。
服务节点 Broker 消息中间件的服务节点;一般情况下可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。
端点 AMQP 对话的任意一方。一个AMQP连接包括两个端点(一个是客户端,一个是服务器)。
消费者 Consumer 一个从消息队列里请求消息的客户端程序。
生产者 Producer 一个向交换机发布消息的客户端应用程序。

RabbitMQ运转流程

生产者发送消息:

  1. 生产者创建连接(Connection),开启一个信道(Channel),连接到 RabbitMQ Broker
  2. 声明队列并设置属性;如是否排它,是否持久化,是否自动删除
  3. 将路由键(空字符串)与队列绑定起来
  4. 发送消息至 RabbitMQ Broker
  5. 关闭信道
  6. 关闭连接

消费者接收消息:

  1. 消费者创建连接(Connection),开启一个信道(Channel),连接到 RabbitMQ Broker
  2. 向 Broker 请求消费相应队列中的消息,设置相应的回调函数
  3. 等待 Broker 回应闭关投递响应队列中的消息,消费者接收消息
  4. 确认(ack,自动确认)接收到的消息
  5. RabbitMQ 从队列中删除相应已经被确认的消息
  6. 关闭信道
  7. 关闭连接

生产者流转过程说明:

  1. 客户端与代理服务器 Broker 建立连接。会调用 newConnection() 方法,这个方法会进一步封装 Protocol Header 0-9-1 的报文头发送给 Broker ,以此通知 Broker 本次交互采用的是 AMQPO-9-1 协议,紧接着 Broker 返回 Connection.Start 来建立连接,在连接的过程中涉及 Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/.Open-Ok 这 6 个命令的交互。
  2. 客户端调用 connection.createChannel 方法。此方法开启信道,其包装的 channel.open 命令发送给 Broker,等待 channel.basicPublish 方法,对应的 AMQP 命令为 Basic.Publish,这个命令包含了 content Header 和 content Body()。content Header 包含了消息体的属性,例如:投递模式,优先级等,content Body 包含了消息体本身。
  3. 客户端发送完消息需要关闭资源时,涉及到 Channel.Close 和 Channl.Close-Ok 与 Connetion.Close 和 Connection.Close-Ok 的命令交互。

消费者流转过程说明:

  1. 消费者客户端与代理服务器 Broker 建立连接。会调用 newConnection() 方法,这个方法会进一步封装 Protocol Header 0-9-1 的报文头发送给 Broker,以此通知 Broker 本次交互采用的是 AMQPO-9-1 协议,紧接着 Broker 返回 Connection.Start 来建立连接,在连接的过程中涉及 Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/.Open-Ok 这 6 个命令的交互。
  2. 消费者客户端调用 connection.createChannel 方法。和生产者客户端一样,协议涉及 Channel.Open/Open-Ok 命令。
  3. 在真正消费之前,消费者客户端需要向 Broker 发送 Basic.Consume 命令(即调用 channel.basicConsume 方法〉将 Channel 置为接收模式,之后 Broker 回执 Basic.Consume-Ok 以告诉消费者客户端准备好消费消息。
  4. Broker 向消费者客户端推送(Push) 消息,即 Basic.Deliver 命令,这个命令和 Basic.Publish 命令一样会携带 Content Header 和 Content Body。
  5. 消费者接收到消息并正确消费之后,向 Broker 发送确认, 即Basic.Ack 命令。
  6. 客户端发送完消息需要关闭资源时,涉及到 Channel.Close 和 Channl.Close-Ok 与 Connetion.Close 和 Connection.Close-Ok 的命令交互。

RabbitMQ常用命令

RabbitMQ常用命令

插件管理:

  • 查看插件打开情况:rabbitmq-plugins list
  • 启动监控管理器:rabbitmq-plugins enable rabbitmq_management
  • 关闭监控管理器:rabbitmq-plugins disable rabbitmq_management

服务器管理:

  • 服务器启动:service rabbitmq-server start
  • 服务器关闭:service rabbitmq-server stop
  • 服务器重启:service rabbitmq-server restart

服务管理:

  • 启动rabbitmq:rabbitmq-service start
  • 关闭rabbitmq:rabbitmq-service stop

应用管理:

  • 关闭应用:rabbitmqctl stop_app
  • 启动应用:rabbitmqctl start_app
  • 查看状态:rabbitmqctl status
  • 多应用使用:rabbitmqctl -n rabbit_ceilometer

队列管理:

  • 查看所有的队列:rabbitmqctl list_queues
  • 清除所有的队列:rabbitmqctl reset

用户管理:

  • 新增用户:rabbitmqctl add_user admin admin
  • 删除用户:rabbitmqctl delete_user admin
  • 修改用户:rabbitmqctl change_password admin admin123
  • 查看用户列表:rabbitmqctl list_users
  • 设置用户角色:rabbitmqctl set_user_tags admin administrator monitoring policymaker management

权限管理:

  • 设置用户权限:rabbitmqctl set_permissions -p VHostPath admin ConfP WriteP ReadP
  • 查询所有权限:rabbitmqctl list_permissions [-p VHostPath]
  • 指定用户权限:rabbitmqctl list_user_permissions admin
  • 清除用户权限:rabbitmqctl clear_permissions [-p VHostPath] admin

集群管理:

  • 加入集群:有host1 和 host2,在 host2 上操作:
    • 先停止:rabbitmqctl -n rabbit stop_app
    • 加入:rabbitmqctl -n rabbit join_cluster rabbit@$rabbit_hostname1
    • 再启动:rabbitmqctl -n rabbit start_app
  • 查看集群状态:rabbitmqctl cluster_status

RabbitMQ添加用户

在web管理页面上新建一个qingbo用户参考:Simple模式

RabbitMQ添加用户

角色说明

RabbitMQ中的角色有如下几种:

  1. 超级管理员(administrator):可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
  2. 监控者(monitoring):可登陆管理控制台,同时可以查看 rabbitmq 节点的相关信息(进程数,内存使用情况,磁盘使用情况等)。
  3. 策略制定者(policymaker):可登陆管理控制台, 同时可以对 policy 进行管理。但无法查看节点的相关信息。
  4. 普通管理者(management):仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
  5. 其他:无法登陆管理控制台,通常就是普通的生产者和消费者。

Virtual Hosts

就像Mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限,RabbitMQ也有类似的权限管理。

在 RabbitMQ中可以虚拟消息服务器Virtual Host,每个Virtual Hosts相当于一个相对独立的RabbitMQ服务器,每个Virtual Host之间是相互隔离的。exchange、queue、message 不能互通。 相当于mysql的db。Virtual Name 一般以 / 开头。

RabbitMQ高级