SSE 服务端消息推送

SSE(Server-sent events)

SSE 它是基于 HTTP 协议的,一般意义上的 HTTP 协议是无法做到服务端主动向客户端推送消息的。有一种变通方法,就是服务器向客户端声明,发送的是流信息,本质上,这种通信就是以流信息的方式。

SSE 在服务器和客户端之间打开一个单向通道,服务端响应的不再是一次性的数据包而是 text/event-stream 类型的数据流信息,在有数据变更时从服务器流式传输到客户端。

SSE 与 WebSocket 作用相似,都可以建立服务端与浏览器之间的通信,实现服务端向客户端推送消息,两者区别:

  • SSE 是基于 HTTP 协议的,不需要特殊的协议或服务器实现即可工作,WebSocket 需单独服务器来处理协议;

  • SSE 单向通信,只能由服务端向客户端单向通信,webSocket 全双工通信,即通信的双方可以同时发送和接受信息。

  • SSE 实现简单开发成本低,无需引入其他组件,WebSocket 传输数据需做二次解析,开发门槛高一些。

  • SSE 默认支持断线重连,WebSocket 则需要自己实现。

  • SSE 只能传送文本消息,二进制数据需要经过编码后传送,WebSocket 默认支持传送二进制数据。

SSE 具有 WebSockets 在设计上缺乏的多种功能,例如:自动重新连接、事件 ID 和发送任意事件的能力。

编码

1.SseEmitterUtils

package com.demo.utils;  import cn.hutool.core.map.MapUtil; import lombok.extern.slf4j.Slf4j; import org.Springframework.http.MediaType; import org.springframework.stereotype.Component; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;  import Java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer;  /**  * @ClassName:SseEmitterUtils.java  * @ClassPath:com.demo.utils.SseEmitterUtils.java  * @Description:SSE 服务器发送事件  * @Author:tanyp  * @Date:2022/9/13 11:03  **/ @Slf4j @Component public class SseEmitterUtils {      // 当前连接数     private static AtomicInteger count = new AtomicInteger(0);     // 存储 SseEmitter 信息     private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();      /**      * @MonthName:connect      * @Description: 创建用户连接并返回 SseEmitter      * @Author:tanyp      * @Date:2022/9/13 11:09      * @Param: [userId]      * @return:org.springframework.web.servlet.mvc.method.annotation.SseEmitter      **/     public static SseEmitter connect(String key) {         if (sseEmitterMap.containsKey(key)) {             return sseEmitterMap.get(key);         }          try {             // 设置超时时间,0表示不过期。默认30秒             SseEmitter sseEmitter = new SseEmitter(0L);             // 注册回调             sseEmitter.onCompletion(completionCallBack(key));             sseEmitter.onError(errorCallBack(key));             sseEmitter.onTimeout(timeoutCallBack(key));             sseEmitterMap.put(key, sseEmitter);             // 数量+1             count.getAndIncrement();             return sseEmitter;         } catch (Exception e) {             log.info("创建新的SSE连接异常,当前连接Key为:{}", key);         }         return null;     }      /**      * @MonthName:sendMessage      * @Description: 给指定用户发送消息      * @Author:tanyp      * @Date:2022/9/13 11:10      * @Param: [userId, message]      * @return:void      **/     public static void sendMessage(String key, String message) {         if (sseEmitterMap.containsKey(key)) {             try {                 sseEmitterMap.get(key).send(message);             } catch (IOException e) {                 log.error("用户[{}]推送异常:{}", key, e.getMessage());                 remove(key);             }         }     }      /**      * @MonthName:groupSendMessage      * @Description: 向同组人发布消息,要求:key + groupId      * @Author:tanyp      * @Date:2022/9/13 11:15      * @Param: [groupId, message]      * @return:void      **/     public static void groupSendMessage(String groupId, String message) {         if (MapUtils.isNotEmpty(sseEmitterMap)) {             sseEmitterMap.forEach((k, v) -> {                 try {                     if (k.startsWith(groupId)) {                         v.send(message, MediaType.APPLICATION_JSON);                     }                 } catch (IOException e) {                     log.error("用户[{}]推送异常:{}", k, e.getMessage());                     remove(k);                 }             });         }     }      /**      * @MonthName:batchSendMessage      * @Description: 广播群发消息      * @Author:tanyp      * @Date:2022/9/13 11:15      * @Param: [message]      * @return:void      **/     public static void batchSendMessage(String message) {         sseEmitterMap.forEach((k, v) -> {             try {                 v.send(message, MediaType.APPLICATION_JSON);             } catch (IOException e) {                 log.error("用户[{}]推送异常:{}", k, e.getMessage());                 remove(k);             }         });     }      /**      * @MonthName:batchSendMessage      * @Description: 群发消息      * @Author:tanyp      * @Date:2022/9/13 11:16      * @Param: [message, ids]      * @return:void      **/     public static void batchSendMessage(String message, Set<String> ids) {         ids.forEach(userId -> sendMessage(userId, message));     }      /**      * @MonthName:remove      * @Description: 移除连接      * @Author:tanyp      * @Date:2022/9/13 11:17      * @Param: [userId]      * @return:void      **/     public static void remove(String key) {         sseEmitterMap.remove(key);         // 数量-1         count.getAndDecrement();         log.info("移除连接:{}", key);     }      /**      * @MonthName:getIds      * @Description: 获取当前连接信息      * @Author:tanyp      * @Date:2022/9/13 11:17      * @Param: []      * @return:java.util.List<java.lang.String>      **/     public static List<String> getIds() {         return new ArrayList<>(sseEmitterMap.keySet());     }      /**      * @MonthName:getUserCount      * @Description: 获取当前连接数量      * @Author:tanyp      * @Date:2022/9/13 11:18      * @Param: []      * @return:int      **/     public static int getCount() {         return count.intValue();     }      private static Runnable completionCallBack(String key) {         return () -> {             log.info("结束连接:{}", key);             remove(key);         };     }      private static Runnable timeoutCallBack(String key) {         return () -> {             log.info("连接超时:{}", key);             remove(key);         };     }      private static Consumer<Throwable> errorCallBack(String key) {         return throwable -> {             log.info("连接异常:{}", key);             remove(key);         };     }  }  

2.服务端

package com.demo.controller;  import com.demo.utils.SseEmitterUtils; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;  import javax.servlet.http.HttpServletRequest;  /**  * @ClassName:SSEController.java  * @ClassPath:com.demo.controller.SSEController.java  * @Description:SSE消息推送  * @Author:tanyp  * @Date:2022/9/13 11:29  **/ @Slf4j @RestController @RequestMapping("/sse") @Api(value = "sse", tags = "SSE消息推送") public class SSEController {      @ApiOperation(value = "订阅消息", notes = "订阅消息")     @GetMapping(path = "subscribe/{id}", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})     public SseEmitter subscribe(@PathVariable String id) {         return SseEmitterUtils.connect(id);     }      @ApiOperation(value = "发布消息", notes = "发布消息")     @GetMapping(path = "push")     public void push(String id, String content) {         SseEmitterUtils.sendMessage(id, content);     }      @ApiOperation(value = "清除连接", notes = "清除连接")     @GetMapping(path = "close")     public void close(String id, HttpServletRequest request) {         request.startAsync();         SseEmitterUtils.remove(id);     }  } 

3.浏览器端

<!DOCTYPE html> <html lang="en">     <head>         <title>SSE</title>         <meta charset="UTF-8">         <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js" type="text/JavaScript"></script>         <script>             let source = null;             const id = "k000001";             if (window.EventSource) {                 // 建立连接                 source = new EventSource('http://localhost:8000/sse/subscribe/' + id);                 setMessageInnerHTML("连接key:" + id);                 /**                  * 连接一旦建立,就会触发open事件                  * 另一种写法:source.onopen = function (event) {}                  */                 source.addEventListener('open', function (e) {                     setMessageInnerHTML("建立连接。。。");                 }, false);                      /**                  * 客户端收到服务器发来的数据                  * 另一种写法:source.onmessage = function (event) {}                  */                 source.addEventListener('message', function (e) {                     setMessageInnerHTML(e.data);                 });                      /**                  * 如果发生通信错误(比如连接中断),就会触发error事件                  * 另一种写法:source.onerror = function (event) {}                  */                 source.addEventListener('error', function (e) {                     if (e.readyState === EventSource.CLOSED) {                         setMessageInnerHTML("连接关闭");                     } else {                         console.log(e);                     }                 }, false);             } else {                 setMessageInnerHTML("浏览器不支持SSE");             }                  // 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据             window.onbeforeunload = function () {                 source.close();                 const httpRequest = new XMLHttpRequest();                 httpRequest.open('GET', 'http://localhost:8000/sse/close/' + id, true);                 httpRequest.send();                 console.log("close");             };                  // 将消息显示在网页上             function setMessageInnerHTML(innerHTML) {                 $("#contentDiv").append("<br/>" + innerHTML);             }         </script>     </head>          <body>         <div>             <div>                 <div id="contentDiv" style="height:800px; width:1000px; overflow:scroll; background:#ccc;">                 </div>             </div>         </div>     </body> </html> 

注:SSE 是基于 HTTP 协议,目前除了 IE/Edge,其他浏览器都支持。

关注了解更多

SSE 服务端消息推送

SSE 服务端消息推送


原文始发于微信公众号(全栈客):SSE 服务端消息推送

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/162612.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!