开心一笑: 会买水果的狗狗

    关注微信公众号

    QQ群:831045818

    app下载

    当前位置:首页> java > 技术文档 > 正文
    SSE与WebSocket
    发布时间:2022-12-05 16:45:06.0 浏览次数:
    public class SseEmitterServer {
        private static final Logger logger = LoggerFactory.getLogger(SseEmitterServer.class);
    
        /**
         * 当前连接数
         */
        private static AtomicInteger count = new AtomicInteger(0);
    
        /**
         * 使用map对象,便于根据userId来获取对应的SseEmitter,或者放redis里面
         */
        private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
    
        /**
         * 创建用户连接并返回 SseEmitter
         *
         * @param userId 用户ID
         * @return SseEmitter
         */
        public static SseEmitter connect(String userId) {
            // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
            SseEmitter sseEmitter = new SseEmitter(0L);
            // 注册回调
            sseEmitter.onCompletion(completionCallBack(userId));
            sseEmitter.onError(errorCallBack(userId));
            sseEmitter.onTimeout(timeoutCallBack(userId));
            sseEmitterMap.put(userId, sseEmitter);
            // 数量+1
            count.getAndIncrement();
            logger.info("创建新的sse连接,当前用户:{}", userId);
            return sseEmitter;
        }
    
        /**
         * 给指定用户发送信息  -- 单播
         */
        public static void sendMessage(String userId, String message) {
            if (sseEmitterMap.containsKey(userId)) {
                try {
                    // sseEmitterMap.get(userId).send(message, MediaType.APPLICATION_JSON);
                    System.out.println(userId + "==" + message);
                    sseEmitterMap.get(userId).send(message);
                } catch (IOException e) {
                    logger.error("用户[{}]推送异常:{}", userId, e.getMessage());
                    removeUser(userId);
                }
            }
        }
    
        /**
         * 向多人发布消息   -- 组播
         * @param groupId 开头标识
         * @param message 消息内容
         */
        public static void groupSendMessage(String groupId, String message) {
            if (MapUtil.isNotEmpty(sseEmitterMap)) {
                /*Set<String> ids = sseEmitterMap.keySet().stream().filter(m -> m.startsWith(groupId)).collect(Collectors.toSet());
                batchSendMessage(message, ids);*/
                sseEmitterMap.forEach((k, v) -> {
                    try {
                        if (k.startsWith(groupId)) {
                            v.send(message, MediaType.APPLICATION_JSON);
                        }
                    } catch (IOException e) {
                        logger.error("用户[{}]推送异常:{}", k, e.getMessage());
                        removeUser(k);
                    }
                });
            }
        }
    
        /**
         * 群发所有人   -- 广播
         */
        public static void batchSendMessage(String message) {
            sseEmitterMap.forEach((k, v) -> {
                try {
                    v.send(message, MediaType.APPLICATION_JSON);
                } catch (IOException e) {
                    logger.error("用户[{}]推送异常:{}", k, e.getMessage());
                    removeUser(k);
                }
            });
        }
    
        /**
         * 群发消息
         */
        public static void batchSendMessage(String message, Set<String> ids) {
            ids.forEach(userId -> sendMessage(userId, message));
        }
    
        /**
         * 移除用户连接
         */
        public static void removeUser(String userId) {
            sseEmitterMap.remove(userId);
            // 数量-1
            count.getAndDecrement();
            logger.info("移除用户:{}", userId);
        }
    
        /**
         * 获取当前连接信息
         */
        public static List<String> getIds() {
            return new ArrayList<>(sseEmitterMap.keySet());
        }
    
        /**
         * 获取当前连接数量
         */
        public static int getUserCount() {
            return count.intValue();
        }
    
        private static Runnable completionCallBack(String userId) {
            return () -> {
                logger.info("结束连接:{}", userId);
                removeUser(userId);
            };
        }
    
        private static Runnable timeoutCallBack(String userId) {
            return () -> {
                logger.info("连接超时:{}", userId);
                removeUser(userId);
            };
        }
    
        private static Consumer<Throwable> errorCallBack(String userId) {
            return throwable -> {
                logger.info("连接异常:{}", userId);
                removeUser(userId);
            };
        }
    }
    
    
    web接口
    //连接
    @GetMapping("/online/{userId}")
    public SseEmitter online(@PathVariable String userId) throws IOException {
        return SseEmitterServer.connect(userId);
    }
    //关闭
    @GetMapping("/close/{userId}")
    public SseEmitter close(@PathVariable String userId) throws IOException {
        SseEmitterServer.removeUser(userId);
        return null;
    }
    //模拟触发,连接后,访问这个,可以看到online推出的数据
    @GetMapping("/process")
    public void sendMessage() throws InterruptedException {
        for(int i=0; i<=100; i++){
            if(i>50&&i<70){
                Thread.sleep(500L);
            }else{
                Thread.sleep(100L);
            }
            SseEmitterServer.batchSendMessage(String.valueOf(i));
        }
    }
    <!DOCTYPE html>
    <html lang="en">
    
    <head>
        <meta charset="UTF-8">
        <title>SseEmitter</title>
    </head>
    
    <body>
    <button onclick="closeSse()">关闭连接</button>
    <div id="message"></div>
    </body>
    <script>
        let source = null;
    
        // 用时间戳模拟登录用户
        const userId = new Date().getTime();
    
        if (window.EventSource) {
    
            // 建立连接
                    source = new EventSource('http://127.0.0.1/online/' + userId);
    		setMessageInnerHTML("连接用户=" + userId);
            /**
             * 连接一旦建立,就会触发open事件
             * 另一种写法:source.onopen = function (event) {}
             */
            source.addEventListener('open', function(e) {
                setMessageInnerHTML("建立连接。。。");
            }, false);
    
            /**
             * 客户端收到服务器发来的数据
             * 另一种写法:source.onmessage = function (event) {}
             */
            source.addEventListener('message', function(e) {
                setMessageInnerHTML(e.data);
            });
    
    
            /**
             * 如果发生通信错误(比如连接中断),就会触发error事件
             * 或者:
             * 另一种写法:source.onerror = function (event) {}
             */
            source.addEventListener('error', function(e) {
                if (e.readyState === EventSource.CLOSED) {
                    setMessageInnerHTML("连接关闭");
                } else {
                    console.log(e);
                }
            }, false);
    
        } else {
            setMessageInnerHTML("你的浏览器不支持SSE");
        }
    
        // 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据
        window.onbeforeunload = function() {
            closeSse();
        };
    
        // 关闭Sse连接
        function closeSse() {
            source.close();
            const httpRequest = new XMLHttpRequest();
            httpRequest.open('GET', ''http://127.0.0.1/close/' + userId, true);
            httpRequest.send();
            console.log("close");
        }
    
        // 将消息显示在网页上
        function setMessageInnerHTML(innerHTML) {
            document.getElementById('message').innerHTML += innerHTML + '<br/>';
        }
    </script>
    
    </html>


    关注"都市百货" 了解南陵

    微信咨询wanglf2r(不拉群 发广告者勿加)

    0
    0
    上一篇:mysql 返回插入自增id 上一篇:必须参数检查

    评论已有0

    提交评论

    热门评论

    南陵新闻
    公示公告
    常用查询
    风光南陵
    走出南陵
    友情链接