rabbitmq exchange 的四种模式与点对点消息队列、发布订阅消息队列的实现

2016-05-13 13:17:09   最后更新: 2019-03-04 15:55:54   访问数量:5369




在之前的日志中,我们介绍了 AMQP 协议所能实现的各种功能:

AMQP 消息服务应用协议

  1. 存储转发(多个消息发送者,单个消息接收者)
  2. 分布式事务(多个消息发送者,多个消息接收者)
  3. 发布订阅(多个消息发送者,多个消息接收者)
  4. 基于内容的路由(多个消息发送者,多个消息接收者)
  5. 文件传输队列(多个消息发送者,多个消息接收者)
  6. 点对点连接(单个消息发送者,单个消息接收者)

 

本文中,我们就来介绍一下 rabbitmq 的各种用法

本文以 php 为例,其他语言的用法非常类似

rabbitmq 的安装和监控可参看:

rabbitmq 的安装和监控

 

最基本的模式就是点对点模式,一个生产者向队列中投入消息,一个消费者循环从队列中取数据

 

 

php-amqplib

  • producer
<?php require_once __DIR__ . '/../composer/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('hello', false, false, false, false); $msg = new AMQPMessage('Hello World!'); $channel->basic_publish($msg, '', 'hello'); echo " [x] Sent 'Hello World!'\n"; $channel->close(); $connection->close(); ?>

 

 

  • consumer
<?php require_once __DIR__ . '/../composer/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('hello', false, false, false, false); echo ' [*] Waiting for messages. To exit press CTRL+C', "\n"; $callback = function($msg) { echo " [x] Received ", $msg->body, "\n"; }; $channel->basic_consume('hello', '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } ?>

 

这段代码中,producer 向名为 "hello" 的队列中放入消息 "Hello World",consumer 从其中取出消息,这是消息队列最简单的用法

basic_consume 方法的第一个参数标识队列名称,第四个参数标识是否自动 ack,第七个参数则是收到消息后执行的回调方法

 

Acknowledge

消息队列使用时,如果 consumer 意外退出,那么他没来得及处理的消息会如何处理呢?

AMQP 要求消费者需要向队列发送 ACK 消息表示消息已经处理,否则这条消息还会分发给其他 consumer 去处理,以防止消息的丢失

如果设置了 auto_ack,则 consumer 在收到消息后会立即自动发送 ACK 消息,这样在代码中无需手动发送 ack 消息,但是方便的同时带来了消息丢失的风险

下面是手动 ack 的 consumer 改进版本:

<?php require_once __DIR__ . '/../composer/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('hello', false, false, false, false); echo ' [*] Waiting for messages. To exit press CTRL+C', "\n"; $callback = function($msg) { echo " [x] Received ", $msg->body, "\n"; sleep(3); echo " [x] Done".PHP_EOL; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $channel->basic_qos(null, 1, null); $channel->basic_consume('hello', '', false, false, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } ?>

 

 

basic_qos 设置了队列的 prefetch_count 属性,它限制了消费者同时能够接收的消息数,设置为 1 也就意味着,在 consumer 手动发送 ack 前,队列不会再将新的消息发送给他

 

这样,我们可以不再仅仅用一个 consumer 来进行消费了,我们可以同时启动多个 consumer 来实现队列消息的消费了

 

PHP AMQP 扩展

下面使用 PHP 官方提供的 AMQP 扩展实现上述功能

  • producer
<?php $conn_args = array('host' => 'localhost', 'port' => 5672, 'login' => 'guest', 'password' => 'guest'); $connection = new AMQPConnection($conn_args); if ($connection->connect()) { echo "Established a connection to the broker \n"; } else { echo "Cannot connect to the broker \n "; } $channel = new AMQPChannel($connection); $queue = new AMQPQueue($channel); $queue->setName('hello'); $ex = new AMQPExchange($channel); $ex->setName('helloexchange'); $ex->setType(AMQP_EX_TYPE_DIRECT); $ex->setFlags(AMQP_DURABLE); $ex->declareExchange(); $ex->publish('Hello World!', 'hello'); ?>

 

 

  • consumer
<?php $conn_args = array('host' => 'localhost', 'port' => 5672, 'login' => 'guest', 'password' => 'guest'); $connection = new AMQPConnection($conn_args); if ($connection->connect()) { echo "Established a connection to the broker \n"; } else { echo "Cannot connect to the broker \n "; } $channel = new AMQPChannel($connection); $queue = new AMQPQueue($channel); $queue->setName('hello'); $queue->bind('helloexchange', 'hello'); $queue->qos(0, 1); while (1) { if ($message = $queue->get()) { echo " [x] Received ", $message->getBody(), "\n"; sleep(3); echo $message->getBody().PHP_EOL; echo " [x] Done".PHP_EOL; $queue->ack($message->getDeliveryTag()); } } ?>

 

 

需要注意的是:

  1. AMQPQueue 对象的 get 方法如果以 AMQP_AUTOACK 为参数则会自动发送 ack,无参数版本则需要手动调用 ack 方法发送
  2. AMQPQueue 对象的 qos 方法与上面所说的 basic_qos 方法一样,设置了能够接收的消息大小和消息数,由于 rabbitmq 并没有实现对消息大小的限制,所以这里第一个参数并没有意义,我们设为了 0
  3. 这里涉及到 exchange 的相关概念,我们马上来了解

 

在上面的例子中,我们已经看到了 exchange 的创建和使用,此前,在 AMQP 的介绍中,我们也介绍了协议中的 Exchange:

AMQP 消息服务应用协议

正如 AMQP 协议中描述的,producer 是通过 exchange 将消息发送到队列的,exchange 通过消息中的 routing key 决定最终发往的队列

 

 

上面使用 php-amqplib 的例子中,并没有出现 exchange,是因为他自动使用了默认的 exchange amq.direct 实现点对点消息队列

事实上,producer 是不能将消息发送给队列的,他只能发送给 exchange,由 exchange 决定发送到哪个队列,exchange type 决定了消息的最终处理方式

Exchange 共有四种 type(模式)可供选择:

  1. direct
  2. fanout
  3. topic
  4. headers

 

direct

 

direct 方式是最常用也是最简单的方式,当 Exchange 收到消息后,会将消息转发到消息的 routing key 所指定的消息队列中

这种模式下,queue 需要执行 bind 操作绑定到 Exchange 上并提供绑定的 routing-key

如果在 vhost 中不存在指定的 routing-key,消息就会被丢弃

 

fanout

 

fanout 模式就是常用的发布/订阅模式,也称为“路由表”模式

在这种模式下,Exchange 收到的任何消息都会被转发到所有与该 Exchange 绑定的所有 Queue 上

因此,在这种模式下 Queue 必须 bind 到 Exchange 才会被通知,进而才能使用,同时,这种模式下不需要 routing-key

一个 Queue 可以绑定多个 Exchange,一个 Exchange 也可以绑定多个 Queue

如果 Exchange 并没有绑定任何 Queue,那么消息就会被丢弃

 

topic

 

这种模式比较复杂,简单的来说,就是 Exchange 会把收到的消息转发到所有关心 routing-key 的 queue 上,Exchange 通过对消息的 routing-key 进行模糊匹配查找到对应的队列

因此,与 fanout 一样,Queue 必须 bind 到 Exchange,同时与 direct 模式一样,必须指定 routing-key

当一个 queue 执行 bind 操作绑定到 exchange 时,需要提供他关心的 routing-key,这个 routing-key 字符串可以是一个模糊匹配字符串,# 表示 0 个或若干个关键词,* 表示一个关键词,如 log.* 可以匹配成功 log.warn 但不能匹配成功 log.warn.timeout,而 #.log.# 可以匹配上述两个

如果 Exchange 没有发现任何匹配的 Queue,消息就会被丢弃

 

headers

Headers 模式一般很少被用到,他根据消息 header 中的 “x-match” 属性匹配已经绑定的消息队列

 

使用上面介绍的 Fanout 模式的 Exchange 就可以实现发布订阅模式的消息队列了,如果使用 Topic 模式则可以实现更加灵活的发布/订阅消息队列实现

 

php-amqplib

  • producer
<?php require_once __DIR__ . '/../composer/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('access_log', false, false, false, false); $channel->queue_declare('error_log', false, false, false, false); $channel->queue_declare('warning_log', false, false, false, false); $channel->exchange_declare('logs', 'fanout', false, false, false); $channel->queue_bind('access_log', 'logs'); $channel->queue_bind('error_log', 'logs'); $channel->queue_bind('warning_log', 'logs'); for ($i = 0; $i < 6; $i++) { $msg = new AMQPMessage('Hello World!'); $channel->basic_publish($msg, 'logs'); echo " [x] Sent 'Hello World!'\n"; } $channel->close(); $connection->close(); ?>

 

这里我们声明了三个队列,并且全部通过 bind 操作绑定到了名为 "logs" 的 Exchange 上,然后发送了 6 条消息到 exchange,可以看到消息,与 logs exchange 绑定的三个队列都收到了 6 条消息

 

 

欢迎关注微信公众号,以技术为主,涉及历史、人文等多领域的学习与感悟,每周三到七篇推文,全部原创,只有干货没有鸡汤

 

 

Rabbitmq Tutorial -- http://www.rabbitmq.com/tutorials/tutorial-one-php.html

book.amqp.php -- http://php.net/manual/pl/book.amqp.php

Rabbitmq 消息队列在 PHP 下的应用 -- http://www.cnblogs.com/phpinfo/p/4104551.html

Rabbitmq 三种 Exchange 模式的性能比较 -- http://hwcrazy.com/34195c9068c811e38a44000d601c5586/group/free_open_source_project/

 






技术帖      技术分享      rabbitmq      消息队列      message queue      queue      exchange      routing      pub/sub     


京ICP备15018585号