* @copyright walkor * @link http://www.workerman.net/ * @license http://www.opensource.org/licenses/mit-license.php MIT License */ namespace Workerman\RedisQueue; use RuntimeException; use Workerman\Timer; use Workerman\Redis\Client as Redis; use Psr\Log\LoggerInterface; /** * Class Client * @package Workerman\RedisQueue */ #[\AllowDynamicProperties] class Client { /** * Queue waiting for consumption */ const QUEUE_WAITING = '{redis-queue}-waiting'; /** * Queue with delayed consumption */ const QUEUE_DELAYED = '{redis-queue}-delayed'; /** * Queue with consumption failure */ const QUEUE_FAILED = '{redis-queue}-failed'; /** * @var Redis */ protected $_redisSubscribe; /** * @var Redis */ protected $_redisSend; /** * @var array */ protected $_subscribeQueues = []; /** * @var LoggerInterface */ protected $_logger = null; /** * consume failure callback * @var callable */ protected $_consumeFailure = null; /** * @var array */ protected $_options = [ 'retry_seconds' => 5, 'max_attempts' => 5, 'auth' => '', 'db' => 0, 'prefix' => '', ]; /** * Client constructor. * @param $address * @param array $options */ public function __construct($address, $options = []) { $this->_redisSubscribe = new Redis($address, $options); $this->_redisSubscribe->brPoping = 0; $this->_redisSend = new Redis($address, $options); if (isset($options['auth']) && $options['auth'] !== '') { $this->_redisSubscribe->auth($options['auth']); $this->_redisSend->auth($options['auth']); } if (isset($options['db'])) { $this->_redisSubscribe->select($options['db']); $this->_redisSend->select($options['db']); } $this->_options = array_merge($this->_options, $options); } /** * Send. * * @param $queue * @param $data * @param int $delay * @param callable $cb */ public function send($queue, $data, $delay = 0, $cb = null) { static $_id = 0; $id = \microtime(true) . '.' . (++$_id); $now = time(); $package_str = \json_encode([ 'id' => $id, 'time' => $now, 'delay' => $delay, 'attempts' => 0, 'queue' => $queue, 'data' => $data ]); if (\is_callable($delay)) { $cb = $delay; $delay = 0; } if ($cb) { $cb = function ($ret) use ($cb) { $cb((bool)$ret); }; if ($delay == 0) { $this->_redisSend->lPush($this->_options['prefix'] . static::QUEUE_WAITING . $queue, $package_str, $cb); } else { $this->_redisSend->zAdd($this->_options['prefix'] . static::QUEUE_DELAYED, $now + $delay, $package_str, $cb); } return; } if ($delay == 0) { $this->_redisSend->lPush($this->_options['prefix'] . static::QUEUE_WAITING . $queue, $package_str); } else { $this->_redisSend->zAdd($this->_options['prefix'] . static::QUEUE_DELAYED, $now + $delay, $package_str); } } /** * Set the consume failure callback. * * @param callable $callback */ public function onConsumeFailure(callable $callback) { $this->_consumeFailure = $callback; } /** * Subscribe. * * @param string|array $queue * @param callable $callback */ public function subscribe($queue, callable $callback) { $queue = (array)$queue; foreach ($queue as $q) { $redis_key = $this->_options['prefix'] . static::QUEUE_WAITING . $q; $this->_subscribeQueues[$redis_key] = $callback; } $this->pull(); } /** * Unsubscribe. * * @param string|array $queue * @return void */ public function unsubscribe($queue) { $queue = (array)$queue; foreach ($queue as $q) { $redis_key = $this->_options['prefix'] . static::QUEUE_WAITING . $q; unset($this->_subscribeQueues[$redis_key]); } } /** * tryToPullDelayQueue. */ protected function tryToPullDelayQueue() { static $retry_timer = 0; if ($retry_timer) { return; } $retry_timer = Timer::add(1, function () { $now = time(); $options = ['LIMIT', 0, 128]; $this->_redisSend->zrevrangebyscore($this->_options['prefix'] . static::QUEUE_DELAYED, $now, '-inf', $options, function ($items) { if ($items === false) { throw new RuntimeException($this->_redisSend->error()); } foreach ($items as $package_str) { $this->_redisSend->zRem($this->_options['prefix'] . static::QUEUE_DELAYED, $package_str, function ($result) use ($package_str) { if ($result !== 1) { return; } $package = \json_decode($package_str, true); if (!$package) { $this->_redisSend->lPush($this->_options['prefix'] . static::QUEUE_FAILED, $package_str); return; } $this->_redisSend->lPush($this->_options['prefix'] . static::QUEUE_WAITING . $package['queue'], $package_str); }); } }); }); } /** * pull. */ public function pull() { $this->tryToPullDelayQueue(); if (!$this->_subscribeQueues || $this->_redisSubscribe->brPoping) { return; } $cb = function ($data) use (&$cb) { if ($data) { $this->_redisSubscribe->brPoping = 0; $redis_key = $data[0]; $package_str = $data[1]; $package = json_decode($package_str, true); if (!$package) { $this->_redisSend->lPush($this->_options['prefix'] . static::QUEUE_FAILED, $package_str); } else { if (!isset($this->_subscribeQueues[$redis_key])) { // 取消订阅,放回队列 $this->_redisSend->rPush($redis_key, $package_str); } else { $callback = $this->_subscribeQueues[$redis_key]; try { \call_user_func($callback, $package['data']); } catch (\Throwable $e) { $this->log((string)$e); $package['max_attempts'] = $this->_options['max_attempts']; $package['error'] = $e->getMessage(); $package_modified = null; if ($this->_consumeFailure) { try { $package_modified = \call_user_func($this->_consumeFailure, $e, $package); } catch (\Throwable $ta) { $this->log((string)$ta); } } if (is_array($package_modified)) { $package['data'] = $package_modified['data'] ?? $package['data']; $package['attempts'] = $package_modified['attempts'] ?? $package['attempts']; $package['max_attempts'] = $package_modified['max_attempts'] ?? $package['max_attempts']; $package['error'] = $package_modified['error'] ?? $package['error']; } if (++$package['attempts'] > $package['max_attempts']) { $this->fail($package); } else { $this->retry($package); } } } } } if ($this->_subscribeQueues) { $this->_redisSubscribe->brPoping = 1; Timer::add(0.000001, [$this->_redisSubscribe, 'brPop'], [\array_keys($this->_subscribeQueues), 1, $cb], false); } }; $this->_redisSubscribe->brPoping = 1; $this->_redisSubscribe->brPop(\array_keys($this->_subscribeQueues), 1, $cb); } /** * @param $package */ protected function retry($package) { $delay = time() + $this->_options['retry_seconds'] * ($package['attempts']); $this->_redisSend->zAdd($this->_options['prefix'] . static::QUEUE_DELAYED, $delay, \json_encode($package, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT)); } /** * @param $package */ protected function fail($package) { $this->_redisSend->lPush($this->_options['prefix'] . static::QUEUE_FAILED, \json_encode($package, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT)); } /** * @param $message * @return void */ protected function log($message) { if ($this->_logger) { $this->_logger->info($message); return; } echo $message . PHP_EOL; } /** * @param $logger * @return mixed|LoggerInterface */ public function logger($logger = null) { if ($logger) { $this->_logger = $logger; } return $this->_logger; } }