最新文章专题视频专题问答1问答10问答100问答1000问答2000关键字专题1关键字专题50关键字专题500关键字专题1500TAG最新视频文章推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37视频文章20视频文章30视频文章40视频文章50视频文章60 视频文章70视频文章80视频文章90视频文章100视频文章120视频文章140 视频2关键字专题关键字专题tag2tag3文章专题文章专题2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章专题3
问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501
当前位置: 首页 - 科技 - 知识百科 - 正文

如何基于Hyperf实现RabbitMQ+WebSocket消息推送

来源:懂视网 责编:小采 时间:2020-11-03 12:31:42
文档

如何基于Hyperf实现RabbitMQ+WebSocket消息推送

如何基于Hyperf实现RabbitMQ+WebSocket消息推送:介绍基于 Hyperf+ WebSocket +RabbitMQ 实现的一个简单大屏幕的消息推送。思路利用 WebSocket 协议让客户端和服务器端保持有状态的长链接,保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。WebSocket 服务c
推荐度:
导读如何基于Hyperf实现RabbitMQ+WebSocket消息推送:介绍基于 Hyperf+ WebSocket +RabbitMQ 实现的一个简单大屏幕的消息推送。思路利用 WebSocket 协议让客户端和服务器端保持有状态的长链接,保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。WebSocket 服务c
介绍

基于 Hyperf+ WebSocket +RabbitMQ 实现的一个简单大屏幕的消息推送。

思路

利用 WebSocket 协议让客户端和服务器端保持有状态的长链接,

保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。

WebSocket 服务

composer require hyperf/websocket-server

配置文件 [config/autoload/server.php]

<?php
return [
 'mode' => SWOOLE_PROCESS,
 'servers' => [
 [
 'name' => 'http',
 'type' => Server::SERVER_HTTP,
 'host' => '0.0.0.0',
 'port' => 11111,
 'sock_type' => SWOOLE_SOCK_TCP,
 'callbacks' => [
 SwooleEvent::ON_REQUEST => [HyperfHttpServerServer::class, 'onRequest'],
 ],
 ],
 [
 'name' => 'ws',
 'type' => Server::SERVER_WEBSOCKET,
 'host' => '0.0.0.0',
 'port' => 12222,
 'sock_type' => SWOOLE_SOCK_TCP,
 'callbacks' => [
 SwooleEvent::ON_HAND_SHAKE => [HyperfWebSocketServerServer::class, 'onHandShake'],
 SwooleEvent::ON_MESSAGE => [HyperfWebSocketServerServer::class, 'onMessage'],
 SwooleEvent::ON_CLOSE => [HyperfWebSocketServerServer::class, 'onClose'],
 ],
 ],
 ],

WebSocket 服务器端代码示例

<?php
declare(strict_types=1);
/**
 * This file is part of Hyperf.
 *
 * @link https://www.hyperf.io
 * @document https://doc.hyperf.io
 * @contact group@hyperf.io
 * @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
 */
namespace AppController;
use HyperfContractOnCloseInterface;
use HyperfContractOnMessageInterface;
use HyperfContractOnOpenInterface;
use SwooleHttpRequest;
use SwooleServer;
use SwooleWebsocketFrame;
use SwooleWebSocketServer as WebSocketServer;
class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface
{
 /**
 * 发送消息
 * @param WebSocketServer $server
 * @param Frame $frame
 */
 public function onMessage(WebSocketServer $server, Frame $frame): void
 {
 //心跳刷新缓存
 $redis = $this->container->get(Redis::class);
 //获取所有的客户端id
 $fdList = $redis->sMembers('websocket_sjd_1');
 //如果当前客户端在客户端集合中,就刷新
 if (in_array($frame->fd, $fdList)) {
 $redis->sAdd('websocket_sjd_1', $frame->fd);
 $redis->expire('websocket_sjd_1', 7200);
 }
 $server->push($frame->fd, 'Recv: ' . $frame->data);
 }
 /**
 * 客户端失去链接
 * @param Server $server
 * @param int $fd
 * @param int $reactorId
 */
 public function onClose(Server $server, int $fd, int $reactorId): void
 {
 //删掉客户端id
 $redis = $this->container->get(Redis::class);
 //移除集合中指定的value
 $redis->sRem('websocket_sjd_1', $fd);
 var_dump('closed');
 }
 /**
 * 客户端链接
 * @param WebSocketServer $server
 * @param Request $request
 */
 public function onOpen(WebSocketServer $server, Request $request): void
 {
 //保存客户端id
 $redis = $this->container->get(Redis::class);
 $res1 = $redis->sAdd('websocket_sjd_1', $request->fd);
 var_dump($res1);
 $res = $redis->expire('websocket_sjd_1', 7200);
 var_dump($res);
 $server->push($request->fd, 'Opened');
 }
}

WebSocket 前端代码

 function WebSocketTest() {
 if ("WebSocket" in window) {
 console.log("您的浏览器支持 WebSocket!");
 var num = 0
 // 打开一个 web socket
 var ws = new WebSocket("ws://127.0.0.1:12222");
 ws.onopen = function () {
 // Web Socket 已连接上,使用 send() 方法发送数据
 //alert("数据发送中...");
 //ws.send("发送数据");
 };
 window.setInterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开
 var ping = {"type": "ping"};
 ws.send(JSON.stringify(ping));
 }, 5000);
 ws.onmessage = function (evt) {
 var d = JSON.parse(evt.data);
 console.log(d);
 if (d.code == 300) {
 $(".address").text(d.address)
 }
 if (d.code == 200) {
 var v = d.data
 console.log(v);
 num++
 var str = `<div class="item">
 <p>${v.recordOutTime}</p>
 <p>${v.userOutName}</p>
 <p>${v.userOutNum}</p>
 <p>${v.doorOutName}</p>
 </div>`
 $(".tableHead").after(str)
 if (num > 7) {
 num--
 $(".table .item:nth-last-child(1)").remove()
 }
 }
 };
 ws.error = function (e) {
 console.log(e)
 alert(e)
 }
 ws.onclose = function () {
 // 关闭 websocket
 alert("连接已关闭...");
 };
 } else {
 alert("您的浏览器不支持 WebSocket!");
 }
 }

AMQP 组件

composer require hyperf/amqp

配置文件 [config/autoload/amqp.php]

<?php
return [
 'default' => [
 'host' => 'localhost',
 'port' => 5672,
 'user' => 'guest',
 'password' => 'guest',
 'vhost' => '/',
 'pool' => [
 'min_connections' => 1,
 'max_connections' => 10,
 'connect_timeout' => 10.0,
 'wait_timeout' => 3.0,
 'heartbeat' => -1,
 ],
 'params' => [
 'insist' => false,
 'login_method' => 'AMQPLAIN',
 'login_response' => null,
 'locale' => 'en_US',
 'connection_timeout' => 3.0,
 'read_write_timeout' => 6.0,
 'context' => null,
 'keepalive' => false,
 'heartbeat' => 3,
 ],
 ],
];

MQ 消费者代码

<?php
declare(strict_types=1);
namespace AppAmqpConsumer;
use HyperfAmqpAnnotationConsumer;
use HyperfAmqpMessageConsumerMessage;
use HyperfAmqpResult;
use HyperfServerServer;
use HyperfServerServerFactory;
/**
 * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1)
 */
class DemoConsumer extends ConsumerMessage
{
 /**
 * rabbmitMQ消费端代码
 * @param $data
 * @return string
 */
 public function consume($data): string
 {
 print_r($data);
 //获取集合中所有的value
 $redis = $this->container->get(Redis::class);
 $fdList=$redis->sMembers('websocket_sjd_1');
 $server=$this->container->get(ServerFactory::class)->getServer()->getServer();
 foreach($fdList as $key=>$v){
 if(!empty($v)){
 $server->push((int)$v, $data);
 }
 }
 return Result::ACK;
 }
}

控制器代码

 /**
 * test
 * @return array
 */
 public function test()
 {
 $data = array(
 'code' => 200,
 'data' => [
 'userOutName' => 'ccflow',
 'userOutNum' => '9999',
 'recordOutTime' => date("Y-m-d H:i:s", time()),
 'doorOutName' => '教师公寓',
 ]
 );
 $data = GuzzleHttpjson_encode($data);
 $message = new DemoProducer($data);
 $producer = ApplicationContext::getContainer()->get(Producer::class);
 $result = $producer->produce($message);
 var_dump($result);
 $user = $this->request->input('user', 'Hyperf');
 $method = $this->request->getMethod();
 return [
 'method' => $method,
 'message' => "{$user}.",
 ];
 }

最终效果

ab7e49780093484c182c1baf0dbedce.png

推荐:《PHP教程》

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。TEL:177 7030 7066 E-MAIL:11247931@qq.com

文档

如何基于Hyperf实现RabbitMQ+WebSocket消息推送

如何基于Hyperf实现RabbitMQ+WebSocket消息推送:介绍基于 Hyperf+ WebSocket +RabbitMQ 实现的一个简单大屏幕的消息推送。思路利用 WebSocket 协议让客户端和服务器端保持有状态的长链接,保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。WebSocket 服务c
推荐度:
  • 热门焦点

最新推荐

猜你喜欢

热门推荐

专题
Top