程序员scholar 程序员scholar
首页
  • Java 基础

    • JavaSE
    • JavaIO
    • JavaAPI速查
  • Java 高级

    • JUC
    • JVM
    • Java新特性
    • 设计模式
  • Web 开发

    • Servlet
    • Java网络编程
  • Web 标准

    • HTML
    • CSS
    • JavaScript
  • 前端框架

    • Vue2
    • Vue3
    • Vue3 + TS
    • 微信小程序
    • uni-app
  • 工具与库

    • jQuery
    • Ajax
    • Axios
    • Webpack
    • Vuex
    • WebSocket
    • 第三方登录
  • 后端与语言扩展

    • ES6
    • Typescript
    • node.js
  • Element-UI
  • Apache ECharts
  • 数据结构
  • HTTP协议
  • HTTPS协议
  • 计算机网络
  • Linux常用命令
  • Windows常用命令
  • SQL数据库

    • MySQL
    • MySQL速查
  • NoSQL数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • RabbitMQ
  • 服务器

    • Nginx
  • Spring框架

    • Spring6
    • SpringMVC
    • SpringBoot
    • SpringSecurity
  • SpringCould微服务

    • SpringCloud基础
    • 微服务之DDD架构思想
  • 日常必备

    • 开发常用工具包
    • Hutoll工具包
    • IDEA常用配置
    • 开发笔记
    • 日常记录
    • 项目部署
    • 网站导航
    • 产品学习
    • 英语学习
  • 代码管理

    • Maven
    • Git教程
    • Git小乌龟教程
  • 运维工具

    • Docker
    • Jenkins
    • Kubernetes
  • 算法笔记

    • 算法思想
    • 刷题笔记
  • 面试问题常见

    • 十大经典排序算法
    • 面试常见问题集锦
关于
GitHub (opens new window)
首页
  • Java 基础

    • JavaSE
    • JavaIO
    • JavaAPI速查
  • Java 高级

    • JUC
    • JVM
    • Java新特性
    • 设计模式
  • Web 开发

    • Servlet
    • Java网络编程
  • Web 标准

    • HTML
    • CSS
    • JavaScript
  • 前端框架

    • Vue2
    • Vue3
    • Vue3 + TS
    • 微信小程序
    • uni-app
  • 工具与库

    • jQuery
    • Ajax
    • Axios
    • Webpack
    • Vuex
    • WebSocket
    • 第三方登录
  • 后端与语言扩展

    • ES6
    • Typescript
    • node.js
  • Element-UI
  • Apache ECharts
  • 数据结构
  • HTTP协议
  • HTTPS协议
  • 计算机网络
  • Linux常用命令
  • Windows常用命令
  • SQL数据库

    • MySQL
    • MySQL速查
  • NoSQL数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • RabbitMQ
  • 服务器

    • Nginx
  • Spring框架

    • Spring6
    • SpringMVC
    • SpringBoot
    • SpringSecurity
  • SpringCould微服务

    • SpringCloud基础
    • 微服务之DDD架构思想
  • 日常必备

    • 开发常用工具包
    • Hutoll工具包
    • IDEA常用配置
    • 开发笔记
    • 日常记录
    • 项目部署
    • 网站导航
    • 产品学习
    • 英语学习
  • 代码管理

    • Maven
    • Git教程
    • Git小乌龟教程
  • 运维工具

    • Docker
    • Jenkins
    • Kubernetes
  • 算法笔记

    • 算法思想
    • 刷题笔记
  • 面试问题常见

    • 十大经典排序算法
    • 面试常见问题集锦
关于
GitHub (opens new window)
npm

(进入注册为作者充电)

  • WebSocket

    • WebSocket 基础篇
    • WebSocket 原理篇
    • J2EE WebSocket 实现方式
    • Spring WebSocket 实现方式
    • Spring WebSocket 高级实现方式
      • 1. 添加依赖
      • 2. WebSocket 常量定义
      • 3. 配置类 WebSocketProperties
      • 4. WebSocket 消息上下文 WebSocketMessageContext
      • 5. 会话管理器 WebSocketSessionManager
      • 6. 握手拦截器 WebSocketInterceptor
      • 7. WebSocket 处理器 WebSocketHandler
      • 8. WebSocket 工具类 WebSocketHelper
      • 9. Redis 消息监听器 WebSocketTopicListener
      • 10. WebSocket 容器装配 WebSocketConfiguration
    • 在线聊天(单聊)案例
  • WebSocket
  • WebSocket
scholar
2024-09-18
目录

Spring WebSocket 高级实现方式

# Spring WebSocket 高级实现方式

本节实现通过整合 Spring Boot 和 Redis,提供了一个适合分布式系统的 WebSocket 解决方案。除了基础的 WebSocket 功能外,还结合 Redis 的发布/订阅机制,实现了多服务之间的 WebSocket 消息同步。此实现特别适合于多实例部署的场景,比如多个服务需要共享 WebSocket 会话信息,并进行跨服务的消息推送。

通过以下几个主要模块进行功能划分:

  1. 依赖配置:包括 WebSocket、Redis 和 Lombok 等必要依赖。
  2. 核心常量管理:定义了 WebSocket 使用的常量和消息主题。
  3. 配置类:提供灵活的 WebSocket 路径和跨域配置。
  4. 消息上下文:用于封装 WebSocket 消息的发送对象,支持多用户会话的消息发送。
  5. 会话管理器:管理当前所有在线的 WebSocket 会话,实现会话的存储、获取和移除。
  6. 握手拦截器:在 WebSocket 连接握手阶段进行身份验证,确保连接的安全性。
  7. WebSocket 处理器:负责处理 WebSocket 连接生命周期中的事件,比如建立连接、接收消息、关闭连接等。
  8. 消息工具类:提供方法供服务端主动向客户端推送消息,支持消息的发布、订阅和广播。
  9. Redis 消息监听器:通过 Redis 实现跨服务的 WebSocket 消息同步,监听 Redis 消息并分发给指定的 WebSocket 会话。
  10. 容器装配:通过自动配置类实现 WebSocket 和 Redis 的无缝集成。

实现步骤:

# 1. 添加依赖

首先,需要在 Spring Boot 项目中引入 WebSocket 和 Redis 相关依赖。这些依赖为项目提供了 WebSocket 支持、Redis 发布订阅能力和简化的 POJO 开发(通过 Lombok 注解)。

<dependencies>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.32</version>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
  </dependency>
</dependencies>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

说明:

  • spring-boot-starter-websocket 提供了 WebSocket 的核心支持。
  • spring-boot-starter-data-redis 用于实现 Redis 发布订阅功能。
  • lombok 简化 Java Bean 的编写,自动生成 getter、setter 等常用方法。

# 2. WebSocket 常量定义

定义常量接口 WebSocketConstant,用于存放与 WebSocket 相关的常量值。这样可以在项目中避免硬编码,提高代码的可读性和维护性。

public interface WebSocketConstant {
    /**
     * WebSocketSession 中的用户标识 key
     */
    String LOGIN_USER_KEY = "loginUser";

    /**
     * WebSocket 消息订阅的频道
     */
    String WEB_SOCKET_TOPIC = "work:websocket";
}
1
2
3
4
5
6
7
8
9
10
11

说明:

  • LOGIN_USER_KEY 用于在 WebSocketSession 中存储用户登录信息。
  • WEB_SOCKET_TOPIC 是 Redis 发布订阅的频道名称,消息会通过该频道进行分发。

# 3. 配置类 WebSocketProperties

通过 WebSocketProperties 提供了灵活的配置项,允许通过 application.yml 配置 WebSocket 的路径和跨域设置。

@Data
@ConfigurationProperties("websocket")
public class WebSocketProperties {

    /**
     * 是否启用 WebSocket
     */
    private Boolean enabled;

    /**
     * WebSocket 连接的路径
     */
    private String path = "/websocket";

    /**
     * 设置允许的跨域源地址
     */
    private String allowedOrigins = "*";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

说明:

  • enabled 决定 WebSocket 是否启用,通过配置文件进行管理。
  • path 定义 WebSocket 端点的路径,默认为 /websocket。
  • allowedOrigins 设置跨域支持,默认为允许所有来源,可以根据需求配置具体的跨域策略。

# 4. WebSocket 消息上下文 WebSocketMessageContext

封装了 WebSocket 消息的上下文信息,包含消息接收者列表和消息内容。

@Data
public class WebSocketMessageContext implements Serializable {
    @Serial
    private static final long serialVersionUID = 1L;

    /**
     * 需要推送到的 WebSocket 会话标识列表
     */
    private List<String> sessionKeys;

    /**
     * 需要发送的消息内容
     */
    private String message;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

说明:

  • sessionKeys 是需要推送消息的 WebSocket 会话标识,支持群发消息。
  • message 是消息的具体内容。

# 5. 会话管理器 WebSocketSessionManager

WebSocketSessionManager 负责管理 WebSocket 的会话信息,使用 ConcurrentHashMap 存储会话,保证线程安全。

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class WebSocketSessionManager {
    
    private static final Map<String, WebSocketSession> USER_SESSION_MAP = new ConcurrentHashMap<>();

    /**
     * 将 WebSocket 会话添加到用户会话 Map 中
     *
     * @param sessionKey 会话键,用于检索会话
     * @param session    要添加的 WebSocket 会话
     */
    public static void addSession(String sessionKey, WebSocketSession session) {
        USER_SESSION_MAP.put(sessionKey, session);
    }

    /**
     * 根据会话键从用户会话 Map 中获取 WebSocket 会话
     *
     * @param sessionKey 要获取的会话键
     * @return 与给定会话键对应的 WebSocket 会话,如果不存在则返回 null
     */
    public static WebSocketSession getSessions(String sessionKey) {
        return USER_SESSION_MAP.get(sessionKey);
    }

    /**
     * 获取所有 WebSocket 会话的会话键集合
     *
     * @return 所有 WebSocket 会话的会话键集合
     */
    public static Set<String> getSessionsAll() {
        return USER_SESSION_MAP.keySet();
    }

    /**
     * 从用户会话 Map 中移除指定会话键对应的 WebSocket 会话
     *
     * @param sessionKey 要移除的会话键
     */
    public static void removeSession(String sessionKey) {
        USER_SESSION_MAP.remove(sessionKey);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

说明:

  • USER_SESSION_MAP 是存储所有在线 WebSocket 会话的集合,通过用户标识作为 key,WebSocket 会话作为 value。
  • 提供了添加、获取、移除会话的方法,支持通过用户标识查找对应的 WebSocket 连接。

# 6. 握手拦截器 WebSocketInterceptor

通过 WebSocketInterceptor 实现 WebSocket 握手前后的预处理,通常用于验证用户身份。

目的:在 WebSocket 建立连接之前,服务器需要对连接进行身份验证或者其他预处理操作。Spring 提供的 HandshakeInterceptor 接口允许开发者在 WebSocket 握手过程中进行自定义处理。该接口提供两个方法:

  • beforeHandshake: 在 WebSocket 握手请求到达时执行。通常用于身份验证,获取客户端请求中的用户信息,验证其合法性。
  • afterHandshake: 在 WebSocket 握手完成后执行。可以记录日志或做其他后续操作
public class WebSocketInterceptor implements HandshakeInterceptor {

    /**
     * WebSocket 握手之前执行的前置处理方法
     *
     * @param request    WebSocket握手请求
     * @param response   WebSocket握手响应
     * @param wsHandler  WebSocket处理程序
     * @param attributes 与 WebSocket 会话关联的属性
     * @return 是否允许握手继续进行
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) {
        // 从请求中获取用户信息
        LoginUser loginUser = LoginHelper.getLoginUser();
        // 将用户信息存储到 WebSocketSession 的属性中
        attributes.put(WebSocketConstant.LOGIN_USER_KEY, loginUser);
        return true;
    }

    /**
     * WebSocket 握手成功后执行的后置处理方法
     *
     * @param request   WebSocket握手请求
     * @param response  WebSocket握手响应
     * @param wsHandler WebSocket处理程序
     * @param exception 握手过程中出现的异常
     */
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
        // 握手后可以执行后续操作,比如记录日志
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

说明:

  • beforeHandshake: 在握手前验证用户信息,并将用户信息存入 WebSocket 会话的属性中。
  • afterHandshake: 握手成功后执行一些必要的操作,比如日志记录。

# 7. WebSocket 处理器 WebSocketHandler

通过继承 AbstractWebSocketHandler,实现 WebSocket 连接的建立、消息接收和连接关闭等功能。

目的:AbstractWebSocketHandler 是 Spring 提供的一个抽象类,允许开发者在 WebSocket 生命周期中的各个关键点上自定义行为,包括:

  • afterConnectionEstablished: 当 WebSocket 连接成功时触发。通常用于初始化 WebSocket 会话,将其存储到服务器端的会话管理器中。
  • handleTextMessage: 当服务器接收到来自客户端的消息时触发。在这个方法中,我们可以处理接收到的消息,并决定是否向客户端发送响应。
  • afterConnectionClosed: 当 WebSocket 连接关闭时触发。通常用于清理资源,移除服务器端保存的 WebSocket 会话。
@Slf4j
public class WebSocketHandler extends AbstractWebSocketHandler {

    /**
     * 当 WebSocket 连接成功后触发
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        LoginUser loginUser = (LoginUser) session.getAttributes().get(WebSocketConstant.LOGIN_USER_KEY);
        WebSocketSessionManager.addSession(loginUser.getUserId(), session);
        log.info("[connect] sessionId: {}, userId:{}, username:{}", session.getId(), loginUser.getUserId(), loginUser.getUsername());
    }

    /**
     * 处理接收到的文本消息
     *
     * @param session WebSocket 会话
     * @param message 接收到的文本消息
     */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) {
        LoginUser loginUser = (LoginUser) session.getAttributes().get(WebSocketConstant.LOGIN_USER_KEY);

        // 创建 WebSocket 消息上下文
        WebSocketMessageContext webSocketMessageContext = new WebSocketMessageContext();
        webSocketMessageContext.setSessionKeys(List.of(loginUser.getUserId()));
        webSocketMessageContext.setMessage(message.getPayload());

        // 发布消息
        WebSocketHelper.publishMessage(webSocketMessageContext);
    }

    /**
     * 在 WebSocket 连接关闭后执行
     *
     * @param session WebSocket 会话
     * @param status  连接关闭状态
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
        LoginUser loginUser = (LoginUser) session.getAttributes().get(WebSocketConstant.LOGIN_USER_KEY);
        WebSocketSessionManager.removeSession(loginUser.getUserId());
        log.info("[disconnect] sessionId: {},userId:{},username:{}", session.getId(), loginUser.getUserId(), loginUser.getUsername());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

说明:

  • afterConnectionEstablished: 连接建立成功后,存储会话信息。
  • handleTextMessage: 处理客户端发送的文本消息,生成消息上下文并通过工具类推送消息。
  • afterConnectionClosed: 连接关闭后,移除会话信息。

# 8. WebSocket 工具类 WebSocketHelper

WebSocketHelper 提供了服务端主动向客户端推送消息的功能,支持单用户消息、群发消息和 Redis 发布订阅模式。

目的:WebSocketHelper 工具类用于简化 WebSocket 消息的发送。服务端可以通过该类主动向指定客户端或所有客户端推送消息。该工具类可以通过 WebSocketSessionManager 来查找客户端的 WebSocket 会话,并通过会话将消息发送给客户端。

实现:WebSocketHelper 提供了几种消息推送的方法:

  • 向单个用户发送消息:通过用户标识在 WebSocketSessionManager 中查找对应的 WebSocket 会话,并发送消息。
  • 向多个用户发送消息:遍历多个用户标识,依次查找 WebSocket 会话并发送消息。
  • 向所有在线用户广播消息:通过遍历会话管理器中的所有会话,将消息推送给所有在线用户。
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class WebSocketHelper {

    /**
     * 向指定的 WebSocket 会话发送消息
     *
     * @param sessionKey 要发送消息的用户标识
     * @param message    要发送的消息内容
     */
    public static void sendMessage(String sessionKey, String message) {
        WebSocketSession session = WebSocketSessionManager.getSessions(sessionKey);
        sendMessage(session, message);
    }

    /**
     * 发布 WebSocket 消息到 Redis 频道
     *
     * @param webSocketMessage 要发布的 WebSocket 消息对象
     */
    public static void publishMessage(WebSocketMessageContext webSocketMessage) {
        // 遍历目标用户,逐个发送消息
        webSocketMessage.getSessionKeys().forEach(sessionKey -> {
            WebSocketSession session = WebSocketSessionManager.getSessions(sessionKey);
            sendMessage(session, webSocketMessage.getMessage());
        });
        // 如果部分用户不在当前服务内,则通过 Redis 发布消息
        RedisUtil.publish(WebSocketConstant.WEB_SOCKET_TOPIC, webSocketMessage);
    }

    /**
     * 向所有的 WebSocket 会话广播消息
     *
     * @param message 要广播的消息内容
     */
    public static void publishAll(String message) {
        WebSocketMessageContext context = new WebSocketMessageContext();
        context.setMessage(message);
        RedisUtil.publish(WebSocketConstant.WEB_SOCKET_TOPIC, context);
    }

    /**
     * 向指定的 WebSocket 会话发送消息
     *
     * @param session WebSocket 会话
     * @param message 要发送的消息内容
     */
    private static void sendMessage(WebSocketSession session, String message) {
        if (session != null && session.isOpen()) {
            try {
                session.sendMessage(new TextMessage(message));
            } catch (IOException e) {
                log.error("发送消息失败", e);
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

说明:

  • 提供单用户消息推送和群发消息推送功能。
  • 通过 Redis 发布消息实现跨服务的消息同步。

# 9. Redis 消息监听器 WebSocketTopicListener

该监听器负责从 Redis 中接收消息,并将消息推送到指定的 WebSocket 会话。

@Slf4j
@RequiredArgsConstructor
public class WebSocketTopicListener implements MessageListener {

    private final RedisTemplate<String, Object> redisTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = new String(message.getChannel());
        if (WebSocketConstant.WEB_SOCKET_TOPIC.equals(channel)) {
            // 反序列化消息
            WebSocketMessageContext context = (WebSocketMessageContext) redisTemplate.getValueSerializer().deserialize(message.getBody());
            if (context != null) {
                context.getSessionKeys().forEach(sessionKey -> {
                    WebSocketHelper.sendMessage(sessionKey, context.getMessage());
                });
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

说明:

  • 监听 Redis 频道 WEB_SOCKET_TOPIC,当有消息发布时,将消息推送给指定的 WebSocket 会话。

# 10. WebSocket 容器装配 WebSocketConfiguration

通过自动配置类,将 WebSocket 和 Redis 相关的组件注入 Spring 容器。

@AutoConfiguration
@ConditionalOnProperty(value = "websocket.enabled", havingValue = "true")
@EnableConfigurationProperties(WebSocketProperties.class)
@EnableWebSocket
public class WebSocketConfiguration {

    @Bean
    public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor handshakeInterceptor, WebSocketHandler webSocketHandler, WebSocketProperties webSocketProperties) {
        return registry -> registry
                .addHandler(webSocketHandler, webSocketProperties.getPath())
                .addInterceptors(handshakeInterceptor)
                .setAllowedOrigins(webSocketProperties.getAllowedOrigins());
    }

    @Bean
    public HandshakeInterceptor handshakeInterceptor() {
        return new WebSocketInterceptor();
    }

    @Bean
    public WebSocketHandler webSocketHandler() {
        return new WebSocketHandler();
    }

    @Bean
    public WebSocketTopicListener webSocketTopicListener(RedisTemplate<String, Object> redisTemplate) {
        return new WebSocketTopicListener(redisTemplate);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

说明:

  • 自动装配 WebSocket 配置,并通过 Redis 实现跨服务的 WebSocket 消息分发。

总结

本实现基于 Spring Boot WebSocket 和 Redis 发布订阅机制,支持多服务之间的 WebSocket 消息同步和跨域通信。通过集成 WebSocket 处理器、握手拦截器、会话管理器等模块,提供了一个灵活、扩展性强的 WebSocket 解决方案。

  • WebSocket 会话管理:通过 WebSocketSessionManager 管理所有的 WebSocket 会话,支持跨服务的会话管理。
  • Redis 发布订阅:利用 Redis 的发布订阅机制,实现了分布式系统中的消息同步。
  • 自动配置与灵活性:通过配置类,实现 WebSocket 路径、跨域策略等灵活配置,方便项目集成和扩展。

这种架构非常适合分布式系统中的 WebSocket 应用,比如在线聊天、实时推送等场景。

编辑此页 (opens new window)
上次更新: 2024/12/28, 18:32:08
Spring WebSocket 实现方式
在线聊天(单聊)案例

← Spring WebSocket 实现方式 在线聊天(单聊)案例→

Theme by Vdoing | Copyright © 2019-2025 程序员scholar
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式