###实战模拟
(一般教程都是单独讲,这里就一步KO..)
-
程序监控,会监控.PHP+MYSQL的错误信息
消息中含有关键点.有警告性错误warning,严重错误error,错误的类别,是PHP.还是MYSQL -
要求中间可以加入任何一个错误处理程序,对错误进行处理.实现可扩展(要求能可以接入新消费者)
-
要求对MYSQL的ERROR错误发钉钉(获得固定值)
-
要求独立统计MYSQL所有错误和警告(获得MYSQL)
-
要求对所有错误信息.进行一个归档存储 (获得全部)
-
系统还写了另外一个服务器监控程序.相关消息也需要做归档存储.
需求图看起来应该是这样子的.
接下来做代码演练,令人欣慰的一点,rabbitMQ支持中文的队列和交换机名.
首先在回忆下交换机三种类型.
类型 | 功能 | 效率 |
---|---|---|
Fanout | 群发 | 高 |
Direct | 完全匹配 | 中 |
Topic | 通配匹配 | 低 |
-
首先我们要创建程序监控,和服务器监控.两个交换机,类型为Fanout
这样就能够让所有绑定进来的队列收到消息.其他参数默认
$channel->exchange_declare('程序监控' , 'fanout'); $channel->exchange_declare('服务器监控', 'fanout');
- 接下来创建通配交换机,因为通配交换机只接收程序监控交换机的数据,即需要设置内部使用.防止乱入
$channel->exchange_declare('程序监控通配转发, 'Topic',false,false,true,true);
- 两个交换机绑定,这样程序监控的消息会发给程序通配转发交换机一份.
$channel->exchange_bing('程序监控通配转发, '程序监控');
- 接下来创建队列
$channel->queue_declare('归档存储'); $channel->queue_declare('MYSQL错误统计'); $channel->queue_declare('钉钉提示');
- 队列绑定
//归档存储需要获取所有错误,所以直接绑定程序监控 $channel->queue_bind('归档存储','程序监控'); //一个队列是可以绑定多个交换机的消息 $channel->queue_bind('归档存储','服务器监控'); //通配方式.所有MYSQL.开头的,其中#代表必然存在,如果发送的路由名是MYSQL而不是MYSQL.ERROR会收不到,如果想收到可以使用MYSQL.* $channel->queue_bind('MYSQL错误统计','程序监控通配转发','MYSQL.#'); //完全匹配处理.如果所有的绑定都是完全匹配的,则应该使用Direct交换机提高效率 $channel->queue_bind('钉钉提示','程序监控通配转发','MYSQL.ERROR');
完整的生产者代码.并生产出消息.
<?php //引入composer代码加载器 require 'vendor/autoload.php'; //引入链接类 use PhpAmqpLib\Connection\AMQPStreamConnection; //引入消息类 use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('192.168.0.167', 5672, 'guest', 'guest', '/'); //通过链接获得一个新通道. $channel = $connection->channel(); $channel->exchange_declare('程序监控' , 'fanout'); $channel->exchange_declare('服务器监控', 'fanout'); $channel->exchange_declare('程序监控通配转发', 'topic',false,false,true,true); $channel->exchange_bind('程序监控通配转发', '程序监控'); $channel->queue_declare('归档存储'); $channel->queue_declare('MYSQL错误统计'); $channel->queue_declare('钉钉提示'); $channel->queue_bind('归档存储','程序监控'); $channel->queue_bind('归档存储','服务器监控'); $channel->queue_bind('MYSQL错误统计','程序监控通配转发','MYSQL.#'); $channel->queue_bind('钉钉提示','程序监控通配转发','MYSQL.ERROR'); $channel->basic_publish(new AMQPMessage('PHP警告'),'程序监控','PHP.WARNING'); $channel->basic_publish(new AMQPMessage('PHP错误'),'程序监控','PHP.ERROR'); $channel->basic_publish(new AMQPMessage('MYSQL警告'),'程序监控','MYSQL.WARNING'); $channel->basic_publish(new AMQPMessage('MYSQL错误'),'程序监控','MYSQL.ERROR'); $channel->basic_publish(new AMQPMessage('服务器错误'),'服务器监控','SERVER.ERROR'); //关闭通道 $channel->close(); //关闭链接 $connection->close();
消费者
<?php //引入composer代码加载器 require 'vendor/autoload.php'; //引入链接类 use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('192.168.0.167', 5672, 'guest', 'guest', '/'); //通过链接获得一个新通道. $channel = $connection->channel(); $channel->exchange_declare('程序监控' , 'fanout'); $channel->exchange_declare('服务器监控', 'fanout'); $channel->exchange_declare('程序监控通配转发', 'topic',false,false,true,true); $channel->exchange_bind('程序监控通配转发', '程序监控'); $channel->queue_declare('归档存储'); $channel->queue_declare('MYSQL错误统计'); $channel->queue_declare('钉钉提示'); $channel->queue_bind('归档存储','程序监控'); $channel->queue_bind('归档存储','服务器监控'); $channel->queue_bind('MYSQL错误统计','程序监控通配转发','MYSQL.#'); $channel->queue_bind('钉钉提示','程序监控通配转发','MYSQL.ERROR'); $channel->basic_consume("归档存储", "", false, false, false, false, function ($message) { var_dump(iconv('utf-8','gbk','归档存储'.$message->body)); } ); $channel->basic_consume("MYSQL错误统计", "", false, false, false, false, function ($message) { var_dump(iconv('utf-8','gbk','MYSQL错误统计'.$message->body)); } ); $channel->basic_consume("钉钉提示", "", false, false, false, false, function ($message) { var_dump(iconv('utf-8','gbk','钉钉提示'.$message->body)); } ); while (count($channel->callbacks)) { $channel->wait(); } //关闭通道 $channel->close(); //关闭链接 $connection->close();
执行结果如下.
##精简代码
因为代码定义交换机,队列以及绑定.默认都是auto_delete并且是并且不是持久的.所以客户端和服务端的代码.都需要增加上整个交换过程的定义,如果系统确定了固定的队列结构.个人认为可以单独编写安装代码创建结构.并持久化
代码如下
install.php
<?php //引入composer代码加载器 require 'vendor/autoload.php'; //引入链接类 use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('192.168.0.167', 5672, 'guest', 'guest', '/'); //通过链接获得一个新通道. $channel = $connection->channel(); //$channel->exchange_declare('程序监控' , 'fanout',false,true,false); //$channel->exchange_declare('服务器监控', 'fanout',false,true,false); //$channel->exchange_declare('程序监控通配转发', 'topic',false,true,false,true); //$channel->exchange_bind('程序监控通配转发', '程序监控'); //关闭通道 $channel->close(); //关闭链接 $connection->close();
我们执行看一下效果
注意:因为之前创建的是自动删除.而生产者发送消息后就断开连接了.而消费者断开连接之后,整个消息队列就没有任何访问者.就开始了整个自动删除掉,所以创建可以成功.如果要是队列或者交换机存在的情况下,必须先将其删除在进行创建.
>[warning] 注意:因为之前创建的是自动删除.而生产者发送消息后就断开连接了.而消费者断开连接之后,整个消息队列就没有任何访问者.就开始了整个自动删除掉,所以创建可以成功.如果要是队列或者交换机存在的情况下,必须先将其删除在进行创建,貌似队列中有消息没有消费.也不会被自动删除,所以可能需要手动删下.
同时删除掉消费者和生产者相关的定义与绑定的代码.
如需转载请注明: 转载自26点的博客
本文链接地址: rabbitMQ消息队列 – php案例
转载请注明:26点的博客 » rabbitMQ消息队列 – php案例