先决条件
已安装PHP,Erlang和RabbitMQ。
安装PHP环境下使用的RabbitMQ第三方库——php-amqplib
使用composer安装php-amqplib库。
生产者代码
- <?php
- require 'vendor/autoload.php';
- use PhpAmqpLib\Connection\AMQPStreamConnection;
- use PhpAmqpLib\Message\AMQPMessage;
- $conf = [
- 'host' => 'localhost',
- 'port' => 5672,
- 'user' => 'admin',
- 'password' => 'admin',
- 'vhost' => '/',
- ];
- $exchangeName = 'testExch'; //交换机名称
- $queueName = 'testQue'; //队列名称
- $routingKey = 'testRoute'; //路由关键字(也可以省略)
- //建立生产者与mq之间的连接
- $conn = new AMQPStreamConnection($conf['host'], $conf['port'], $conf['user'], $conf['password'], $conf['vhost']);
- $channel = $conn->channel(); //在已连接基础上建立生产者与mq之间的通道
- $channel->exchange_declare($exchangeName, 'direct', false, true, false); //声明初始化交换机
- $channel->queue_declare($queueName, false, true, false, false); //声明初始化一条队列
- $channel->queue_bind($queueName, $exchangeName, $routingKey); //将队列与某个交换机进行绑定,并使用路由关键字
-
- for ($i=1; $i<=20; $i++){
- $msgBody = json_encode(["name" => "WCW", "no" => $i]);
- $msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => 2]); //构建消息
- $ret = $channel->basic_publish($msg, $exchangeName, $routingKey); //发布消息到某个交换机
- }
- $channel->close();
- $conn->close();
消费者代码
- <?php
- require 'vendor/autoload.php';
- use PhpAmqpLib\Connection\AMQPStreamConnection;
- $conf = [
- 'host' => 'localhost',
- 'port' => 5672,
- 'user' => 'admin',
- 'password' => 'admin',
- 'vhost' => '/',
- ];
- $exchangeName = 'testExch'; //交换机名
- $queueName = 'testQue'; //队列名称
- $routingKey = 'testRoute'; //路由关键字(也可以省略)
- //建立生产者与mq之间的连接
- $conn = new AMQPStreamConnection($conf['host'], $conf['port'], $conf['user'], $conf['password'], $conf['vhost']);
- $channel = $conn->channel(); //在已连接基础上建立生产者与mq之间的通道
- $channel->queue_declare($queueName, false, true, false, false); //声明初始化一条队列
- //回调函数,数据处理
- $callback = function($msg) {
- echo " Received: ", $msg->body, "\n";
- };
- $channel->basic_consume($queueName, '', false, true, false, false, $callback); //消费接收消息
- //监听消息,一有消息,立马就处理
- while(count($channel->callbacks)) {
- $channel->wait();
- }
效果测试
执行生产者程序:

执行消费者程序:

消息接收成功!
至此。转载请注明出处,记得扫码打赏支持哦,谢谢!
