RabbitMQ实战:理解消息通信

本系列是「RabbitMQ实战:高效部署分布式消息队列」书籍的总结笔记。

前段时间总结完了「深入浅出MyBatis」系列,对MyBatis有了更全面和深入的了解,在掘金社区也收到了一些博友的喜欢,很高兴。另外,短暂的陪产假就要结束了,小宝也二周了,下周二就要投入工作了,希望自己尽快调整过来,加油努力。

从本篇开始总结「RabbitMQ实战」系列的阅读笔记,RabbitMQ是一个开源的消息代理和队列服务器,可以通过基本协议在完全不同的应用之间共享数据,可以将作业排队以便让分布式服务进行处理。

本篇介绍下消息通信,首先介绍基础概念,将这些概念映射到AMQP协议,然后介绍消息持久化、发送方确认模式等消息可靠性保证。

通过本篇介绍,你会了解到:

  • 消息通信概念:消费者、生产者和代理
  • AMQP元素:队列、交换器、绑定
  • 虚拟主机
  • 消息持久化
  • 发送方确认模式

消息通信概念

此部分的介绍,会牵涉到AMQP的元素,如果之前没接触过的,可以结合下面的「AMQP元素」进行理解。

消息

消息是传输的主体,消息包括两部分:有效载荷(payload)和标签(label);有效载荷是要传输的数据,可以是任何内容,比如JSON串、二进制、自定义的数据协议等;标签描述了有效载荷,并且Rabbit用它来决定谁将获得消息的投递。

可以与HTTP协议类比,HTTP消息头部描述了消息体的类型、大小等,HTTP消息体是要传输的数据,HTTP服务端通过消息头部决定如何处理请求和数据。

生产者和消费者

生产者创建消息,然后发送到代理服务器(RabbitMQ Server),AMQP只会用标签表述这条消息(一个交换器名称和可选的主题标记),Rabbit服务器会根据标签把消息发送给订阅的消费者。

消费者消费消息,它会订阅到队列(queue)上,每当有消息到达RabbitMQ服务器时,会发送给消费者,消费者收到消息时,会进行处理。

注意:消费者收到的消息只包括有效载荷,所有不会知道是从哪里发来的。

连接和信道

要想发布或消费消息,必须先与RabbitMQ Server建立一条TCP连接,建立TCP连接之后,要创建一条信道,信道是建立在真实TCP连接的虚拟连接。

AMQP命令都是通过信道发送出去的,每条信道会被指派一个唯一的ID,为什么不直接通过TCP连接发送AMQP命令呢? 因为操作系统建立和销毁TCP会话是很昂贵的,而且创建的连接数也有限。 通过引入通道,可以在连接上建立通道,而且通道是私密的,相互不受影响。

通道的概念还是有点抽象,后面专门写一篇文章进行分析介绍,这里简单理解下吧。

AMQP元素

AMQP消息路由有三部分组成:队列、交换器和绑定,队列是存放消息的地方,交换器是决定不同的分发策略,绑定是队列和交换器的桥梁,定义匹配规则。

生产者发送消息到交换器,交换器根据自身类型和绑定规则,将消息存放在对应队列中,然后将消息发送到监听队列的消费者。

AMQP基本模型

如上图:P为生产者,X为交换器,交换器类型为direct,根据不同的绑定规则(orange、black、green),分发给不同的队列,C为消费者,从不同的队列介绍消息。

队列

消费者通过两种方式从特定的队列接收消息:

  • basic.consume,这样会将信道置为接收模式,直到取消对队列的订阅;
  • basic.get,主动让消费者接收队列中的下一条消息;

basic.get会影响性能,推荐使用basic.consume来实现高吞吐量,因为其处理过程是先订阅消息,获取单条消息,再取取消订阅。

如果队列拥有多个消费者时,队列收到的消息将以循环的方式发给消费者,即多个消费者平均消费这些消息。

另外,消费者接收到的每一条消息都要进行确认,必须通过basic.ack命令向rabbitmq服务端发送一个确认。 也可以设置auto_ack为true,只要消费者接收到消息,就自动视为确认,不过不建议这样,因为接收到不代表业务逻辑处理成功。 服务端接收到确认后,会从队列中删除对应消息。

还有一种场景,在接收到消息后,如果不想处理,可以通过下面方式处理:

  • 把消费者从RabbitMQ服务器断开连接,,这样RabbitMQ会自动将消息入队并发送给另外一个消费者;
  • 如果不想发送给其他消费者处理,就是想忽略这个消息,可以发送basic.reject命令;

最后来介绍下如何创建队列,首先明确下是生成者还是消费者创建,关键点是:生产者能否承担起丢失消息,因为发出去的消息如果路由到了不存在的队列,Rabbit会忽略它们。所以,建议生成者和消费者都尝试去创建队列,可以通过设置queue.declare的passive选项设置为ture来判断队列是否存在,如果不存在会返回一个错误。

通过queue.declare命令来创建队列,有一些选项说明下:

  • exclusive:如果设置true的化,队列将变成私有的,只有创建队列的应用程序才能够消费队列消息;
  • auto-delete:当最后一个消费者取消订阅的时候,队列会自动移除;
  • durable:是否要持久化;
1
2
3
4
queueDeclare(String queue, 
boolean durable,
boolean exclusive,
Map<String, Object> arguments);
交换器和绑定

交换器有四种类型:direct、fanout、topic、headers,其中headers匹配消息的header而非路由键,不太实用,就不详细介绍了。

第一种:direct交换器

direct交换器比较简单,如果和路由键 完全匹配 的话,就会投递到对应的队列:

1
2
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);

服务器默认包含一个空白字符串名称的默认路由器,当声明一个队列时,会自定绑定到默认交换器,并以队列名称作为路由键。

第二种:fanout交换器

fanout交换器,不处理路由键,只需要简单的将队列绑定到交换机上,为会每个消费者自动生成一个随机队列,所有的消费者都会收到所有消息。

fanout交换器

1
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

第三种:topic交换器

topic交换器,将路由键和某模式进行匹配,此时队列需要绑定要一个模式上。

tipic交换器

1
2
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");

关于模式,符号#匹配一个或多个词,符号匹配一个词,因此kfs.#能够匹配到kfs.session.message,但是audit.只会匹配到audit.session。

虚拟主机

每个RabbitMQ服务器都能创建虚拟消息服务器,称为虚拟主机(vhost),每个RabbitMQ本质上是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定,还有自己的权限机制。

连接时,必须制定vhost,rabbitmq包含了默认的vhost:”/“。当创建一个用户时,会被指派给至少一个vhsot,并且相互隔离。

vhost不能通过AMQP协议创建,需要使用rabbitmqctl工具创建。

消息持久化和发送方确认模式

如果没有持久化,重启rabbitmq后,队列、交换器都会消失,RabbitMQ提供了持久化的功能,需要满足以下三个条件:

  • 交换器设置为持久化,通过durable属性;
  • 队列设置为持久化,通过durable属性;
  • 消息投递模式delivery设置为2;

当发布一条持久化消息到持久化交换器上时,rabbit会在消息提交到日志文件后才会发送响应,所有会损失性能,所以,只对重要数据持久化即可。

考虑这种情况:由于发布消息后,不返回任何信息给生产者,如何只对服务器已经持久化到硬盘了呢,可能在传输过程中丢失,或者持久化前服务器宕机,导致消息丢失。

RabbitMQ通过「发送方确认模式」来解决上面的问题。首先,需要将信道设置成confirm模式,这样所有在信道上发布的消息都会被指派一个唯一的ID号,一旦消息被投递到所有匹配的队列或持久化到磁盘,会发送一个确认消息给生产者。

通过本篇的介绍,对Rabbit的消息模型有了整体了解,下一篇会写个DEMO,并介绍下运行和管理RabbitMQ。

欢迎扫描下方二维码,关注我的个人微信公众号 ~


情情说