count = 1; // 当收到客户端发来的数据后返回hello $data给客户端 $ws_worker->onMessage = function (TcpConnection $connection, $data) { // 向客户端发送hello $data $connection->send('hello ' . $data); }; $ws_worker->onConnect = function ($connection) { global $ws_worker; $count = count($ws_worker->connections); loginfo("ws_worker->onConnect 当前在线设备数: $count"); }; $ws_worker->onClose = function ($connection) { global $ws_worker; $count = count($ws_worker->connections); loginfo("ws_worker->onClose 当前在线设备数: $count"); }; function sendMsg($data) { global $ws_worker; $count = count($ws_worker->connections); loginfo("sendMsg 当前在线设备数: $count"); foreach ($ws_worker->connections as $conn) { $conn->send($data); } } // 这个逻辑必须在 $ws_worker所在的进程中实现 // 否则拿到的 $ws_worker 变量不是监听 40002 端口的 $ws_worker 对象 $ws_worker->onWorkerStart = function () { $client = new Client('redis://127.0.0.1:6379'); // 订阅 $client->subscribe('package-queue', function ($data) { loginfo("package-queue " . $data); sendMsg($data); }); // 其他订阅... // 消费失败触发的回调(可选) $client->onConsumeFailure(function (\Throwable $exception, $package) { echo "队列 " . $package['queue'] . " 消费失败\n"; echo $exception->getMessage(), "\n"; var_export($package); }); }; // 运行worker Worker::runAll();