Symfony使用RabbitMQ

  1. 我的DEMO

  2. 这里以在Symfony3.x中使用为例

  3. 主配置文件parameters.yml

     parameters:
         rabbitmq_host: 127.0.0.1
         rabbitmq_port: 5672
         rabbitmq_user: guest
         rabbitmq_pswd: guest
         rabbitmq_path: '/'
    
  4. 引入相关包^2.7

     composer require php-amqplib/php-amqplib
    
  5. 命令行内实现

     生产者:
     <?php
    
     namespace TestBundle\Command;
    
     use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
     use Symfony\Component\Console\Input\InputInterface;
     use Symfony\Component\Console\Output\OutputInterface;
     use PhpAmqpLib\Message\AMQPMessage;
     use PhpAmqpLib\Connection\AMQPStreamConnection;
    
     class SendCommand extends ContainerAwareCommand
     {
         protected function configure()
         {
             $this->setName('test:rabbitmq_send')
                 ->setDescription('rabbitmq发送消息测试');
         }
    
         protected function execute(InputInterface $input, OutputInterface $output)
         {
             unset($input, $output);
             $msg = '这是要发送的消息';
             $rabbitmq_host = $this->getContainer()->getParameter('rabbitmq_host');//服务地址
             $rabbitmq_port = $this->getContainer()->getParameter('rabbitmq_port');//服务端口
             $rabbitmq_user = $this->getContainer()->getParameter('rabbitmq_user');//服务用户
             $rabbitmq_pswd = $this->getContainer()->getParameter('rabbitmq_pswd');//服务密码
             $rabbitmq_path = $this->getContainer()->getParameter('rabbitmq_path');//虚拟主机
             $rabbitmq_excg = 'test';//exchange
             $rabbitmq_queu = 'test';//服务队列
             $exchange_type = 'direct';
             $consumer_tag = '';
             $passive = false;
             $durable = true;
             $exclusive = false;
             $auto_delete = false;
             $no_local = false;
             $no_ack = true;
             $nowait = false;
             $connection = new AMQPStreamConnection($rabbitmq_host, $rabbitmq_port, $rabbitmq_user, $rabbitmq_pswd, $rabbitmq_path);
             $channel = $connection->channel();
             $channel->queue_declare($rabbitmq_queu, $passive, $durable, $exclusive, $auto_delete);
             $channel->exchange_declare($rabbitmq_excg, $exchange_type, $passive, $durable, $auto_delete);
             $channel->queue_bind($rabbitmq_queu, $rabbitmq_excg);
             $properties = [
                 'content_type' => 'text/plain',
                 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
             ];
             ///////////////////////////发送↓/////////////////////////
             $message = new AMQPMessage($msg, $properties);
             $channel->basic_publish($message, $rabbitmq_excg);
             ///////////////////////////发送↑/////////////////////////
             $channel->close();
             $connection->close();
             return true;
         }
     }
    
     消费者:
     <?php
    
     namespace TestBundle\Command;
    
     use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
     use Symfony\Component\Console\Input\InputInterface;
     use Symfony\Component\Console\Output\OutputInterface;
     use PhpAmqpLib\Connection\AMQPStreamConnection;
    
     class GetMsgCommand extends ContainerAwareCommand
     {
         protected function configure()
         {
             $this->setName('test:get_msg')
                 ->setDescription('rabbitmq接收消息测试');
         }
    
         protected function execute(InputInterface $input, OutputInterface $output)
         {
             unset($input, $output);
             $rabbitmq_host = $this->getContainer()->getParameter('rabbitmq_host');//服务地址
             $rabbitmq_port = $this->getContainer()->getParameter('rabbitmq_port');//服务端口
             $rabbitmq_user = $this->getContainer()->getParameter('rabbitmq_user');//服务用户
             $rabbitmq_pswd = $this->getContainer()->getParameter('rabbitmq_pswd');//服务密码
             $rabbitmq_path = $this->getContainer()->getParameter('rabbitmq_path');//虚拟主机
             $rabbitmq_excg = 'test';//exchange
             $rabbitmq_queu = 'test';//服务队列
             $exchange_type = 'direct';
             $consumer_tag = '';
             $passive = false;
             $durable = true;
             $exclusive = false;
             $auto_delete = false;
             $no_local = false;
             $no_ack = true;
             $nowait = false;
             $connection = new AMQPStreamConnection($rabbitmq_host, $rabbitmq_port, $rabbitmq_user, $rabbitmq_pswd, $rabbitmq_path);
             $channel = $connection->channel();
             $channel->queue_declare($rabbitmq_queu, $passive, $durable, $exclusive, $auto_delete);
             $channel->exchange_declare($rabbitmq_excg, $exchange_type, $passive, $durable, $auto_delete);
             $channel->queue_bind($rabbitmq_queu, $rabbitmq_excg);
             $channel->basic_consume($rabbitmq_queu, $consumer_tag, $no_local, $no_ack, $exclusive, $nowait, [$this, 'getMsg']);
             while (count($channel->callbacks)) {
                 $channel->wait();
             }
             $channel->close();
             $connection->close();
         }
    
         public function getMsg($msg)
         {
             $msg = $msg->body;
             echo $msg . PHP_EOL;
             $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
             if ($msg->body === 'quit') {
                 $msg->delivery_info['channel']->basic_cancel($msg->delivery_info['consumer_tag']);
             }
         }
     }
    
@耿志环 2012-∞ 冀ICP备17033181号, powered by Gitbook修订: 2019-07-08 16:49:59

results matching ""

    No results matching ""