站長(zhǎng)資訊網(wǎng)
最全最豐富的資訊網(wǎng)站

php中如何使用簡(jiǎn)單可靠的rabbitmq組件包

在項(xiàng)目中rabbitmq得到了廣泛的時(shí)候,這里對(duì)rabbitmq的常規(guī)功能做了一個(gè)簡(jiǎn)單的總結(jié),并封裝成了composer包,composer包地址、github地址,歡迎fork,由于水平有限,難免存在bug,歡迎提出寶貴意見。

php中如何使用簡(jiǎn)單可靠的rabbitmq組件包

easy-rabbitmq 包簡(jiǎn)介

對(duì)php-amqplib/php-amqplib包的二次封裝,為常見功能提供一套開箱即用的生產(chǎn)解決方案。目前支持的功能列表如下:

  • 推送消息到直連交換機(jī)(含延遲消息)

  • 推送消息到扇形交換機(jī)(含延遲消息)

  • 推送消息到主題交換機(jī)(含延遲消息)

  • 訂閱模式下的可靠消費(fèi), 消費(fèi)者消費(fèi)失敗后將會(huì)嘗試?yán)^續(xù)消費(fèi),最多嘗試5次。

  • 拉取模式下的可靠消費(fèi), 消費(fèi)者消費(fèi)失敗后將會(huì)嘗試?yán)^續(xù)消費(fèi),最多嘗試5次。

如果還有其它場(chǎng)景,歡迎繼續(xù)補(bǔ)充,隨后進(jìn)行迭代!!

要求

安裝包對(duì)PHP版本對(duì)要求主要取決于php-amqplib/php-amqplib包本身對(duì)要求,這里為了兼顧php5.0的使用者,我們使用了php-amqplib/php-amqplib包V2.9.0的版本。

具體的要求參照這里。

不過(guò)筆者推薦使用php7.0及其以上版本, 這個(gè)開發(fā)包也是在7.0這個(gè)版本上面開發(fā)完成的!

安裝

      composer require maweibinguo/easyrabbitmq

使用

在這里我們推薦php腳本+supervisor結(jié)合使用,用以保證消費(fèi)進(jìn)程的可靠性、增強(qiáng)worker的消費(fèi)能力! 如果你還沒(méi)有聽說(shuō)過(guò)supervisor,可以點(diǎn)擊這里了解.

1、推送消息

1-1、推送消息到直連交換機(jī)

      $config = [           "host" => "127.0.0.1",             "port" => "5672",             "user" => "guest",             "password" => "guest",             "vhost" => "/",             "channel_max_num" => 10,       ];           $instance = RabbitMq::getInstance($config);              //延遲消息,30 秒中后才會(huì)到達(dá)指定的交換機(jī)       $instance->pushToDirect(                         $msg = time(), //消息體內(nèi)容                         $exchange = "easy_direct_exchange", //交換機(jī)名稱                         $routingKey = "direct_test_queue", //消息的routingKey,consume(get) 方法到bingdingKey 要和routingKey保持一致                         $delaySec = 30 //延遲秒數(shù)       );        //無(wú)延遲,推入到指定到直鏈交換機(jī)       $instance->pushToDirect(                         $msg = time(), //消息體內(nèi)容                         $exchange = "easy_direct_exchange", //交換機(jī)名稱                         $routingKey = "direct_test_queue", //消息的routingKey,consume(get) 方法到bingdingKey 要和routingKey保持一致       );

1-2、推送消息到扇形交換機(jī)

      $config = [           "host" => "127.0.0.1",             "port" => "5672",             "user" => "guest",             "password" => "guest",             "vhost" => "/",             "channel_max_num" => 10,       ];           $instance = RabbitMq::getInstance($config);              //延遲消息,30 秒中后才會(huì)到達(dá)指定的交換機(jī)       $instance->pushToFanout(                         $msg = time(), //消息體內(nèi)容                         $exchange = "easy_fanout_exchange", //交換機(jī)名稱                         $delaySec = 30 //延遲秒數(shù)       );        //無(wú)延遲,推入到指定到直鏈交換機(jī)       $instance->pushToFanout(                         $msg = time(), //消息體內(nèi)容                         $exchange = "easy_fanout_exchange" //交換機(jī)名稱       );

1-3、推送消息到主題交換機(jī)

      $config = [           "host" => "127.0.0.1",             "port" => "5672",             "user" => "guest",             "password" => "guest",             "vhost" => "/",             "channel_max_num" => 10,       ];           $instance = RabbitMq::getInstance($config);              //延遲消息,30 秒中后才會(huì)到達(dá)指定的交換機(jī)       $instance->pushToTopic(                         $msg = time(), //消息體內(nèi)容                         $exchange = "easy_topic_exchange", //交換機(jī)名稱                         /**                          * routingKey 要同consum(get)方法的bindingKey相匹配                          * bindingKey支持兩種特殊的字符"*"、“#”,用作模糊匹配, 其中"*"用于匹配一個(gè)單詞、“#”用于匹配多個(gè)單詞(也可以是0個(gè))                          * 無(wú)論是bindingKey還是routingKey, 被"."分隔開的每一段獨(dú)立的字符串就是一個(gè)單詞, easy.topic.queue, 包含三個(gè)單詞easy、topic、queue                          */                         $routingKey = "easy.topic.queue",                         $delaySec = 30 //延遲秒數(shù)       );        //無(wú)延遲,推入到指定到直鏈交換機(jī)       $instance->pushToTopic(                         $msg = time(), //消息體內(nèi)容                         $exchange = "easy_topic_exchange", //交換機(jī)名稱                         $routingKey = "easy.topic.queue"           );

2、消費(fèi)消息

消費(fèi)支持自動(dòng)重試,最多嘗試重試5次,每次消費(fèi)失敗后該消息將會(huì)被重新投入到消費(fèi)隊(duì)列中。重新的時(shí)間將會(huì)隨著失敗的次數(shù)增多逐漸推移,本客戶端支持的推移策略如下:

失敗1次(1秒鐘后會(huì)再被投遞), 失敗2次(2秒鐘后會(huì)再被投遞), 失敗3次(4秒鐘后會(huì)再被投遞), 失敗4次(8秒鐘后會(huì)再被投遞), 失敗5次(16秒鐘后會(huì)再被投遞)

2-1、訂閱模式

訂閱模式下的可靠消費(fèi)
      $config = [           "host" => "127.0.0.1",             "port" => "5672",             "user" => "guest",             "password" => "guest",             "vhost" => "/",             "channel_max_num" => 10,       ];           $instance = RabbitMq::getInstance($config);       $instance->consume(             $queueName = "direct_test_queue",//訂閱的隊(duì)列名稱             $consumerTag = "c1",//消費(fèi)標(biāo)記             $exchange = "easy_direct_exchange",//交換機(jī)名稱             $bindingKey = "direct_test_queue",//bindingkey,如果是直鏈交換機(jī)需要同routingKey保持一致             $callback = function($msg){                 $body = $msg->body;                 file_put_contents("./test.log", "time => " . time() . "t" . " body => " . $body . PHP_EOL , FILE_APPEND);                 //如果返回結(jié)果不絕對(duì)等于(===)true,那么將觸發(fā)消息重試機(jī)制                 return false;             },             //5次消費(fèi)消費(fèi)失敗后,失敗消息將會(huì)投遞到的隊(duì)列名稱             $failedQueue = "easymq@failed"       );

2-2、拉取模式

拉取模式下的可靠消費(fèi)
      $config = [           "host" => "127.0.0.1",             "port" => "5672",             "user" => "guest",             "password" => "guest",             "vhost" => "/",             "channel_max_num" => 10,       ];           $instance = RabbitMq::getInstance($config);       $instance->get(             $queue = "get_queue",             $exchange = "easy_fanout_exchange",             $bindingKey = "",             $callback = function($msg){                 $body = $msg->body;                 file_put_contents("./test.log", "time => " . time() . "t" . " body => " . $body . PHP_EOL , FILE_APPEND);                 //如果返回結(jié)果不絕對(duì)等于(===)true,那么將觸發(fā)消息重試機(jī)制                 return false;             },             //5次消費(fèi)消費(fèi)失敗后,失敗消息將會(huì)投遞到的隊(duì)列名稱             $failedQueue = 'easymq@failed'       );

推薦學(xué)習(xí):php視頻教程

贊(0)
分享到: 更多 (0)
網(wǎng)站地圖   滬ICP備18035694號(hào)-2    滬公網(wǎng)安備31011702889846號(hào)
日韩在线观看免费完整版视频| 精品国产99久久久久久麻豆| 日韩精品一区二区三区视频 | 日韩亚洲精品福利| 日本精品视频一视频高清| 91精品国产网曝事件门| 久热青青青在线视频精品| 国自产偷精品不卡在线| 精品无码国产污污污免费网站国产 | 久久久精品人妻一区二区三区 | 亚洲精品线路一在线观看| 亚洲av无码日韩av无码网站冲| 午夜精品久久久久久久99| 国产精品黄大片在线播放| 国内精品videofree720| 精品人妻少妇一区二区三区不卡| 97精品一区二区视频在线观看| 91久久精品无码一区二区毛片| 久久精品国产亚洲AV高清热| 久久国产乱子伦精品免费一| 亚洲AV永久无码精品一百度影院 | 最新69国产成人精品免费视频动漫| 亚洲中文精品久久久久久不卡| 国产精品久久久久久久久鸭| 亚洲午夜国产精品无卡| 69国产成人精品午夜福中文| 久久精品人人做人人妻人人玩| 亚洲国产成人久久精品动漫 | 久久久久se色偷偷亚洲精品av| 久久精品www人人爽人人| 99re6在线精品视频免费播放| 久久久无码精品亚洲日韩蜜臀浪潮| 婷婷国产成人精品视频| 99视频精品全部在线| 97精品伊人久久久大香线焦| 91精品91久久久久久| 91国内揄拍国内精品情侣对白| 亚洲精品伊人久久久久| 久久99热这里只频精品6| 国产精品第一区揄拍| 久久国产精品99国产精|