二、RabbitMQ - PHP 操作 RabbitMQ - 工作模式

作者: 温新

分类: 【PHP RabbitMQ】

阅读: 981

时间: 2023-02-27 07:29:46

hi,我是温新,一名 PHPer

版本:erlang-25.2.1、rabbitmq_server-3.11.9

系统版本:Rocky Linux 9.1

学习目标:PHP 操作 RabbitMQ 之工作模式

本篇文章结合官方文档编写及参考网络资料编写,虽非全部原创,但也是结合了自己的理解,若转载请附带本文 URL,编写不易,持续编写更不易,谢谢!

该系列文章采用 Laravel 10 & "php-amqplib/php-amqplib": "^3.5" 学习演示。

在这篇教程中,我们将创建一个工作队列(Work Queue),它会发送一些耗时的任务给多个工作者(Worker)。

工作队列(又称:任务队列——Task Queues)是为了避免等待一些占用大量资源、时间的操作。当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。

3-3 工作模式.png

工作模式-系统自动确认

消息生产者

<span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(98, 151, 85) !important">// MqProducerController.php</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box"></span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(98, 151, 85) !important">// 工作模式</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(198, 120, 221) !important">public</span> <span style="box-sizing: border-box;color: rgb(198, 120, 221) !important">function</span> <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">workerMp</span>()</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">{</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">    <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$this</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-></span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">channel</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-></span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">queue_declare</span>(<span style="box-sizing: border-box;color: rgb(152, 195, 121) !important">'send_sms'</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">false</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">true</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">false</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">false</span>);</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">    <span style="box-sizing: border-box;color: rgb(198, 120, 221) !important">for</span> (<span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$i</span> <span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">=</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">0</span>; <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$i</span> <span style="box-sizing: border-box;color: rgb(86, 182, 194) !important"><</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">10</span>; <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$i</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">++</span>) {</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">        <span style="box-sizing: border-box;color: rgb(98, 151, 85) !important">// 注意,引入了 Str 门面</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">        <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$data</span> <span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">=</span> <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$i</span> .  <span style="box-sizing: border-box;color: rgb(152, 195, 121) !important">' hi, 我是王美丽 '</span> . <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">Str</span>::<span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">random</span>();</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">        <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$msg</span> <span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">=</span> <span style="box-sizing: border-box;color: rgb(198, 120, 221) !important">new</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">AMQPMessage</span>(<span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$data</span>, [<span style="box-sizing: border-box;color: rgb(152, 195, 121) !important">'delivery_mode'</span> <span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">=></span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">AMQPMessage</span>::<span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">DELIVERY_MODE_PERSISTENT</span>]);</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">        <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$this</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-></span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">channel</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-></span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">basic_publish</span>(<span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$msg</span>, <span style="box-sizing: border-box;color: rgb(152, 195, 121) !important">''</span>, <span style="box-sizing: border-box;color: rgb(152, 195, 121) !important">'worker_task'</span>);</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">    }</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">    <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$this</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-></span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">channel</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-></span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">close</span>();</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">    <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$this</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-></span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">mqConnection</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-></span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">close</span>();</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">}</span>

不要疑惑,生产消息者就是没有发生变化。

消息消费者

<span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important"><?</span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">php</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(98, 151, 85) !important">// App\Console\Commands\WorkerMq.php</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(198, 120, 221) !important">namespace</span> <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">App\Console\Commands</span>;</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box"></span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(198, 120, 221) !important">use</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">Illuminate\Console\Command</span>;</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(198, 120, 221) !important">use</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">PhpAmqpLib\Connection\AMQPStreamConnection</span>;</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box"></span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(98, 151, 85) !important">// 工作模式,消息消费者</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(198, 120, 221) !important">class</span> <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">WorkerMq</span> <span style="box-sizing: border-box;color: rgb(198, 120, 221) !important">extends</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">Command</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">{</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">    <span style="box-sizing: border-box;color: rgb(198, 120, 221) !important">protected</span> <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$signature</span> <span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">=</span> <span style="box-sizing: border-box;color: rgb(152, 195, 121) !important">'mq:worker-client'</span>;</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box"></span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">    <span style="box-sizing: border-box;color: rgb(198, 120, 221) !important">protected</span> <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$description</span> <span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">=</span> <span style="box-sizing: border-box;color: rgb(152, 195, 121) !important">'Command description'</span>;</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box"></span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">    <span style="box-sizing: border-box;color: rgb(198, 120, 221) !important">public</span> <span style="box-sizing: border-box;color: rgb(198, 120, 221) !important">function</span> <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">handle</span>(): <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">void</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">    {</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">        <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$connection</span> <span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">=</span> <span style="box-sizing: border-box;color: rgb(198, 120, 221) !important">new</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">AMQPStreamConnection</span>(<span style="box-sizing: border-box;color: rgb(152, 195, 121) !important">'localhost'</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">5672</span>, <span style="box-sizing: border-box;color: rgb(152, 195, 121) !important">'test'</span>, <span style="box-sizing: border-box;color: rgb(152, 195, 121) !important">'123456'</span>, <span style="box-sizing: border-box;color: rgb(152, 195, 121) !important">'sms'</span>);</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">        <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$channel</span>    <span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">=</span> <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$connection</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-></span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">channel</span>();</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">        <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$channel</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-></span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">queue_declare</span>(<span style="box-sizing: border-box;color: rgb(152, 195, 121) !important">'worker_task'</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">false</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">true</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">false</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">false</span>);</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box"></span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">        <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$callback</span> <span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">=</span> <span style="box-sizing: border-box;color: rgb(198, 120, 221) !important">function</span> (<span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$msg</span>) {</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">            <span style="box-sizing: border-box;color: rgb(232, 191, 106) !important">sleep</span>(<span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">1</span>);</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">            <span style="box-sizing: border-box;color: rgb(98, 151, 85) !important">// 输出消费的消息</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">            <span style="box-sizing: border-box;color: rgb(198, 120, 221) !important">echo</span> <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$msg</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-></span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">body</span> . <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">PHP_EOL</span>;</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">        };</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box"></span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">        <span style="box-sizing: border-box;color: rgb(98, 151, 85) !important"># 自动确认回复</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">        <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$channel</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-></span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">basic_consume</span>(<span style="box-sizing: border-box;color: rgb(152, 195, 121) !important">'worker_task'</span>, <span style="box-sizing: border-box;color: rgb(152, 195, 121) !important">''</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">false</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">true</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">false</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">false</span>, <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$callback</span>);</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box"></span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">        <span style="box-sizing: border-box;color: rgb(198, 120, 221) !important">while</span> (<span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$channel</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-></span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">is_open</span>()) {</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">            <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$channel</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-></span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">wait</span>();</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">        }</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box"></span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">        <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$channel</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-></span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">close</span>();</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">        <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$connection</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-></span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">close</span>();</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">    }</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">}</span>

不要疑惑,消费者同样没有发生变化。下面来演示多个消费的情况。多开几个 DOM 窗口,执行如下命令:

<span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(98, 151, 85) !important"># 命令行执行</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">php</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">artisan</span>  <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">mq</span>:<span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">worker</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-</span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">client</span></span>

执行结果

<span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(98, 151, 85) !important"># 窗口1</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">php</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">artisan</span>  <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">mq</span>:<span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">worker</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-</span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">client</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">0</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">hi,</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">我是王美丽</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">rlPdYsygYapGP65e</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">2</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">hi,</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">我是王美丽</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">5</span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">ORjr9dq4orhjIgq</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">4</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">hi,</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">我是王美丽</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">P2ANiGvXZo8kgoaY</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">6</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">hi,</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">我是王美丽</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">vNKXeqo4mYOuRrXt</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">8</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">hi,</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">我是王美丽</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">ncdlldEH5R4tQX4f</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">    </span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(98, 151, 85) !important"># 窗口2</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">php</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">artisan</span>  <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">mq</span>:<span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">worker</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-</span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">client</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box"></span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">1</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">hi,</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">我是王美丽</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">jSGhGqXwJOhH3HsB</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">3</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">hi,</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">我是王美丽</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">7F</span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">dmlO7EfzlfmSSt</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">5</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">hi,</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">我是王美丽</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">nT2zTAjYxyWHYkBJ</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">7</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">hi,</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">我是王美丽</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">XnhaYEkZheb3xI0l</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">9</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">hi,</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">我是王美丽</span> <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">f5X8EQ4ElOlZlf94</span></span>

乍一看,和简单模式没有什么区别,就是多开了几个窗口。

现在就要说说问题了,我们似乎什么也没有做,那是因为我们在消费中使用了自动确认,第三个参数为 true,使用自动确认。接收下的案例将使用手动确认。

<span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"> <span style="box-sizing: border-box;color: rgb(98, 151, 85) !important"># 自动确认回复</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$channel</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-></span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">basic_consume</span>(<span style="box-sizing: border-box;color: rgb(152, 195, 121) !important">'worker_task'</span>, <span style="box-sizing: border-box;color: rgb(152, 195, 121) !important">''</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">false</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">true</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">false</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">false</span>, <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$callback</span>);</span>

工作模式- ack 手动确认

修改为手动确认,不需要要有太大的改动,因此列出片段代码。

<span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(98, 151, 85) !important">// App\Console\Commands\WorkerMq.php</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(198, 120, 221) !important">public</span> <span style="box-sizing: border-box;color: rgb(198, 120, 221) !important">function</span> <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">handle</span>(): <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">void</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">{</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"> <span style="box-sizing: border-box;color: rgb(98, 151, 85) !important">// 未改动,省略</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box"></span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">    <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$callback</span> <span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">=</span> <span style="box-sizing: border-box;color: rgb(198, 120, 221) !important">function</span> (<span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$msg</span>) {</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">        <span style="box-sizing: border-box;color: rgb(232, 191, 106) !important">sleep</span>(<span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">1</span>);</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">        <span style="box-sizing: border-box;color: rgb(98, 151, 85) !important">// 输出消费的消息</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">        <span style="box-sizing: border-box;color: rgb(198, 120, 221) !important">echo</span> <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$msg</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-></span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">body</span> . <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">PHP_EOL</span>;</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">        <span style="box-sizing: border-box;color: rgb(98, 151, 85) !important">// 第三步:手动确认消息</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">        <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$msg</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-></span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">ack</span>();</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">    };</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box"></span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">    <span style="box-sizing: border-box;color: rgb(98, 151, 85) !important">// 第一步:修改为手动确认</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">    <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$channel</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-></span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">basic_consume</span>(<span style="box-sizing: border-box;color: rgb(152, 195, 121) !important">'worker_task'</span>, <span style="box-sizing: border-box;color: rgb(152, 195, 121) !important">''</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">false</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">false</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">false</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">false</span>, <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$callback</span>);</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">    <span style="box-sizing: border-box;color: rgb(98, 151, 85) !important">// 第二步:收到确认后再处理下一条</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">    <span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$channel</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-></span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">basic_qos</span>(<span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">null</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">1</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">null</span>);</span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box"></span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">    <span style="box-sizing: border-box;color: rgb(98, 151, 85) !important">// 未改动,省略</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px">}</span>

这样修改完成后就变成了手动确认了。但是中间的状态需要有所了解,因此,可以按照如下步骤进行观察:

第一步:修改为手动确认 后,走一遍生产者与消费者的流程,然后查看 web 管理页面中队列的状态,会显示为 Unacked;

第二步:basic_qos(null, 1, null); 只有收到回复后再处理一条消息。这里可以再走一遍生产与消费流程,会发现只一条消息被打印出来;

第三步:确认消息,$msg->ack();,如此,手动确认完成。

关于调度

使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。

通过案例输出结果,可以发现挺有规律的。默认情况下,RabbitMQ 会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)。

关于消息 ack 确认

当消费被 RabbitMQ 发送给消费者后,就会从内存中移除。因此,要是消费者突然挂了,消息也就消失了。

我们当然不希望消费被丢失,当一个 worker 挂了之后,希望任务重新发送给其他工作者。为了防止消息丢失,RabbitMQ 提供了消息响应(acknowledgments)。消费者通过一个 ack(响应),告诉 RabbitMQ 已收到并处理了该条消息,然后 RabbitMQ 就会释放并删除该消息。

若消费者突然挂了,没有发送 ack 确认,RabbitMQ 会认为消息没有被完全处理,然后就会重新发送给其他消费者。

消息是没有超时这个概念的;当工作者与它断开连的时候,RabbitMQ会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。

消息持久化

如果不设置持久化,一旦崩溃,将丢失所有的队列和消息。为了防止消息丢失,有两个事情需要注意:必须持久化队列消息

持久化设置:设置第三个参数为 true

<span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(98, 151, 85) !important">#</span></span><br></br><span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$channel</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-></span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">queue_declare</span>(<span style="box-sizing: border-box;color: rgb(152, 195, 121) !important">'sms'</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">false</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">true</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">false</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">false</span>);</span>

公平调度

<span style="box-sizing: border-box;color: rgb(171, 178, 191);padding-right: 0.1px"><span style="box-sizing: border-box;color: rgb(224, 108, 117) !important">$channel</span><span style="box-sizing: border-box;color: rgb(86, 182, 194) !important">-></span><span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">basic_qos</span>(<span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">null</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">1</span>, <span style="box-sizing: border-box;color: rgb(209, 154, 102) !important">null</span>);</span>

basic_qos 告诉 RabbitMQ,同一时刻,不要发送超过 1 条消息给一个 worker,知道它已经处理完上一条消息并做了 ack 确认。

请登录后再评论