代码基于thinkphp框架,如有错误请自行解决 :)
首先我们先创建一个生产者.也就是消息产生者的案例
名为publish.php
<?php //引入composer代码加载器 require 'vendor/autoload.php'; //引入链接类 use PhpAmqpLib\Connection\AMQPStreamConnection; //引入消息类 use PhpAmqpLib\Message\AMQPMessage; /* 开始链接 参数依次为 1. 主机 2. 端口 3. 账号 4. 密码 5. 虚拟主机 */ $connection = new AMQPStreamConnection('192.168.0.167', 5672, 'guest', 'guest', '/'); //通过链接获得一个新通道. $channel = $connection->channel(); //创建一个队列 名为TestQueue $channel->queue_declare('TestQueue', false, true, false, false); //创建一个名为TestExchange的交换机,类型为'direct' $channel->exchange_declare('TestExchange', 'direct', false, true, false); //进行通道的绑定 $channel->queue_bind('TestQueue','TestExchange'); //创建了一个消息 $message = new AMQPMessage("Msg:".time(), array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)); //发送消息 $channel->basic_publish($message,'TestExchange'); //关闭通道 $channel->close(); //关闭链接 $connection->close();
接下来我们创建一个消费者 consume.php
<?php //引入composer代码加载器 require 'vendor/autoload.php'; //引入链接类 use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('192.168.0.167', 5672, 'guest', 'guest', '/'); //通过链接获得一个新通道. $channel = $connection->channel(); //创建一个队列 名为TestQueue $channel->queue_declare('TestQueue', false, true, false, false); //创建一个名为TestExchange的交换机,类型为'direct' $channel->exchange_declare('TestExchange', 'direct', false, true, false); //绑定通道 $channel->queue_bind('TestQueue','TestExchange'); //设立消费者 $channel->basic_consume("TestQueue", "", false, false, false, false, function ($message) { var_dump($message->body); //向MQ服务器发送确认消息 $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); } ); //如果这个通道还有消费者回调 while (count($channel->callbacks)) { $channel->wait(); }
执行结果如下
解释下这里看起来很疑惑的地方
- 虚拟主机可以理解成命名空间.因为rabbitMQ是有账号密码的概念,同时也能赋予对不同命名空间的访问权限.这样可以实现多账号安全的共用服务器资源.当然某一些账号共用一个vhost也是可以的.同时命名空间需要单独创建.
- 发送消息只能发送给交换机上,由交换机,发送给队列,而消费者是从具体的队列中取消息,所以需传一个队列名称.
-
因为消费者有可能先于生产者启动.所以双方编写了创建交换机,队列以及绑定处理.假设生产者会先去生成交换机和队列.那么
consume.php
中的如下代码都不在需要
//创建一个队列 名为TestQueue $channel->queue_declare('TestQueue', false, true, false, false); //创建一个名为TestExchange的交换机,类型为'direct' $channel->exchange_declare('TestExchange', 'direct', false, true, false); //绑定通道 $channel->queue_bind('TestQueue','TestExchange');
如需转载请注明: 转载自26点的博客
本文链接地址: rabbitMQ消息队列 – php代码示例
转载请注明:26点的博客 » rabbitMQ消息队列 – php代码示例