使用协程分片广播并校验连接状态,结合心跳机制与消息队列解耦,可实现Swoole平滑推送。示例中每10秒将客户端分批(50个/组),通过go协程并发调用$server->push发送,避免阻塞;利用$server->isEstablished($fd)跳过无效连接,防止异常;推荐用Redis或SwooleTable管理在线状态,Worker仅转发消息,提升扩展性;需定义onClose回调清理资源,万级连接时可引入分布式架构优化性能。核心为:分批协程+状态校验+心跳+解耦。
在 Swoole 中实现平滑地给所有客户端发送消息,关键在于避免阻塞主进程、处理连接状态异常,并保证广播效率。直接遍历所有连接发消息容易因个别客户端卡顿导致整体延迟,因此需要结合协程、连接验证和异步机制来优化。
使用协程分片广播,避免单次阻塞
Swoole 的 foreach + coroutine 可以并发发送消息,减少总耗时。通过 red”>SwooleCoroutineWaitGroup 或并发协程控制,将客户端连接分批处理,防止一次性创建过多协程影响性能。
示例代码:
use SwooleCoroutine; use SwooleHttpServer; $server = new Server("0.0.0.0", 9501); $server->set(['worker_num' => 1, 'enable_coroutine' => true]); $server->on('workerStart', function () use ($server) { go(function () use ($server) { while (true) { // 模拟定时广播 co::sleep(10); $clients = $server->connections; $chunks = array_chunk($clients, 50); // 每批50个 foreach ($chunks as $chunk) { go(function () use ($chunk, $server) { foreach ($chunk as $fd) { // 确保连接有效且为 WebSocket 客户端 if ($server->isEstablished($fd)) { $server->push($fd, "来自服务器的广播消息"); } } }); } } }); });
检查连接状态,跳过无效客户端
发送前必须调用 $server->isEstablished($fd) 判断连接是否正常。对于已断开但未及时清理的 $fd,跳过可避免 write error 或异常中断。
也可以结合心跳机制,在客户端定期 ping,服务端标记活跃状态,只向活跃连接推送。
利用中间件解耦消息分发逻辑
实际项目中,广播通常由业务触发(如订单通知)。建议将消息推送交给独立的“推送服务”或“消息队列”,Worker 进程只负责转发,不直接操作连接。
例如:使用 Redis Pub/Sub 或 SwooleTable 记录在线用户,另一个协程监听频道并执行广播,实现解耦与扩展性。
注意内存与连接管理
长时间运行下,$server->connections 可能包含已关闭的连接(仅在 onClose 回调后移除)。确保定义 onClose 回调,及时清理资源。
若客户端量大(万级),考虑引入分布式方案或多进程共享 SwooleTable 存储 fd 与用户映射,提升查找效率。
基本上就这些。核心是:分批协程发送 + 连接状态校验 + 心跳维护 + 架构解耦,才能做到真正平滑广播。
swoole redis go websocket ai red swoole 架构 分布式 中间件 foreach Error 并发 异步 table redis