您好,欢迎来到测品娱乐。
搜索
您的当前位置:首页swoole如何做消息通知

swoole如何做消息通知

来源:测品娱乐


基于swoole、redis做一个消息通知功能

利用swoole开启常驻进程,需要几个按自己的情况来定,swoole进程数最好是和服务器cpu核数相等 (推荐学习: swoole视频教程)

利用swoole启动的常驻进程不断的去探测redis队列里面的值,可以按键值来做一个快中慢这样的权重处理,需要急需处理,数据量大的可以用多几个进程,一般的可以分配不同的进程数来执行。

下面上代码:

swoole启动代码

function run()
{
 try {
 $swoole = new swoole_server(127.0.0.1, 9999);
 $swoole->set([
 'daemonize' => 1, //是否开启守护进程
 'worker_num' => 8, //实际需要去设定
 'log_file' => __APP_LOGS_PATH__ . '/swoole.log'
 ]);
 $swoole->on('WorkerStart', 'onWorkerStart');
 $swoole->on('Receive', 'onReceive');
 $swoole->start();
 } catch (Exception $e) {
 logs(['err_code' => $e->getCode(), 'err_msg' => $e->getMessage()], 'error');
 }
}

swoole实时监测redis队列里的数据,根据键值进行权重排比

代码

function onWorkerStart(swoole_server $swoole, $worker_id)
{
 $chQuick = [0, 1, 2, 3];
 $chNormal = [4, 5];
 $chSlow = [6];
 for ($i = 1; $i <= 3000; $i++) {
 $redis = connectRedis();//断线重连redis
 $queueData = $keys = [];
 if (in_array($worker_id, $chQuick)) {
 if ($redis->llen(QUEUE_QUICK))
 $keys[] = QUEUE_QUICK;
 if ($keys)
 $queueData = $redis->brpop(QUEUE_QUICK, 5);
 } elseif (in_array($worker_id, $chNormal)) {
 if ($redis->llen(QUEUE_NORMAL))
 $keys[] = QUEUE_NORMAL;
 if ($redis->llen(QUEUE_QUICK))
 $keys[] = QUEUE_QUICK;
 if ($keys)
 $queueData = $redis->brpop(QUEUE_NORMAL, QUEUE_QUICK, 5);
 } elseif (in_array($worker_id, $chSlow)) {
 if ($redis->llen(QUEUE_SLOW))
 $keys[] = QUEUE_SLOW;
 if ($redis->llen(QUEUE_NORMAL))
 $keys[] = QUEUE_NORMAL;
 if ($redis->llen(QUEUE_QUICK))
 $keys[] = QUEUE_QUICK;
 if ($keys)
 $queueData = $redis->brpop(QUEUE_SLOW, QUEUE_QUICK, QUEUE_NORMAL, 5);
 } else {
 if ($redis->llen(QUEUE_FAIL))
 $keys[] = QUEUE_FAIL;
 if ($redis->llen(QUEUE_SLOW))
 $keys[] = QUEUE_SLOW;
 if ($redis->llen(QUEUE_NORMAL))
 $keys[] = QUEUE_NORMAL;
 if ($redis->llen(QUEUE_QUICK))
 $keys[] = QUEUE_QUICK;
 if ($keys)
 $queueData = $redis->brpop(QUEUE_FAIL, QUEUE_QUICK, QUEUE_NORMAL, QUEUE_SLOW, 5);
 }
 logs('test'.$keys.'%%'.$queueData);
 if ($queueData) {
 $queueName = $queueData[0];
 $message = $queueData[1];
 if ($worker_id == QUEUE_FAIL_WORKER_ID && $queueName == QUEUE_FAIL) {
 call_user_func_array('retryPostMessage', [&$message, &$redis]);
 } else {
 call_user_func_array('postMessage', [&$message, &$redis]);
 }
 }
 else
 {
 sleep(5);
 }
 }
sleep(10);
 $redis->close();
 unset($redis);
 method_exists($swoole, 'stop') ? $swoole->stop() : @exit;
}

里面的for循环是为了配合sleep函数来使用,三次失败的可以记入失败,可以手动去处理。

Copyright © 2019- cepb.cn 版权所有 湘ICP备2022005869号-7

违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务