Spring WebSocket 高级实现方式
# Spring WebSocket 高级实现方式
本节实现通过整合 Spring Boot 和 Redis,提供了一个适合分布式系统的 WebSocket 解决方案。除了基础的 WebSocket 功能外,还结合 Redis 的发布/订阅机制,实现了多服务之间的 WebSocket 消息同步。此实现特别适合于多实例部署的场景,比如多个服务需要共享 WebSocket 会话信息,并进行跨服务的消息推送。
通过以下几个主要模块进行功能划分:
- 依赖配置:包括 WebSocket、Redis 和 Lombok 等必要依赖。
- 核心常量管理:定义了 WebSocket 使用的常量和消息主题。
- 配置类:提供灵活的 WebSocket 路径和跨域配置。
- 消息上下文:用于封装 WebSocket 消息的发送对象,支持多用户会话的消息发送。
- 会话管理器:管理当前所有在线的 WebSocket 会话,实现会话的存储、获取和移除。
- 握手拦截器:在 WebSocket 连接握手阶段进行身份验证,确保连接的安全性。
- WebSocket 处理器:负责处理 WebSocket 连接生命周期中的事件,比如建立连接、接收消息、关闭连接等。
- 消息工具类:提供方法供服务端主动向客户端推送消息,支持消息的发布、订阅和广播。
- Redis 消息监听器:通过 Redis 实现跨服务的 WebSocket 消息同步,监听 Redis 消息并分发给指定的 WebSocket 会话。
- 容器装配:通过自动配置类实现 WebSocket 和 Redis 的无缝集成。
实现步骤:
# 1. 添加依赖
首先,需要在 Spring Boot 项目中引入 WebSocket 和 Redis 相关依赖。这些依赖为项目提供了 WebSocket 支持、Redis 发布订阅能力和简化的 POJO 开发(通过 Lombok 注解)。
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.32</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
说明:
spring-boot-starter-websocket
提供了 WebSocket 的核心支持。spring-boot-starter-data-redis
用于实现 Redis 发布订阅功能。lombok
简化 Java Bean 的编写,自动生成 getter、setter 等常用方法。
# 2. WebSocket 常量定义
定义常量接口 WebSocketConstant
,用于存放与 WebSocket 相关的常量值。这样可以在项目中避免硬编码,提高代码的可读性和维护性。
public interface WebSocketConstant {
/**
* WebSocketSession 中的用户标识 key
*/
String LOGIN_USER_KEY = "loginUser";
/**
* WebSocket 消息订阅的频道
*/
String WEB_SOCKET_TOPIC = "work:websocket";
}
2
3
4
5
6
7
8
9
10
11
说明:
LOGIN_USER_KEY
用于在 WebSocketSession 中存储用户登录信息。WEB_SOCKET_TOPIC
是 Redis 发布订阅的频道名称,消息会通过该频道进行分发。
# 3. 配置类 WebSocketProperties
通过 WebSocketProperties
提供了灵活的配置项,允许通过 application.yml
配置 WebSocket 的路径和跨域设置。
@Data
@ConfigurationProperties("websocket")
public class WebSocketProperties {
/**
* 是否启用 WebSocket
*/
private Boolean enabled;
/**
* WebSocket 连接的路径
*/
private String path = "/websocket";
/**
* 设置允许的跨域源地址
*/
private String allowedOrigins = "*";
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
说明:
enabled
决定 WebSocket 是否启用,通过配置文件进行管理。path
定义 WebSocket 端点的路径,默认为/websocket
。allowedOrigins
设置跨域支持,默认为允许所有来源,可以根据需求配置具体的跨域策略。
# 4. WebSocket 消息上下文 WebSocketMessageContext
封装了 WebSocket 消息的上下文信息,包含消息接收者列表和消息内容。
@Data
public class WebSocketMessageContext implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
/**
* 需要推送到的 WebSocket 会话标识列表
*/
private List<String> sessionKeys;
/**
* 需要发送的消息内容
*/
private String message;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
说明:
sessionKeys
是需要推送消息的 WebSocket 会话标识,支持群发消息。message
是消息的具体内容。
# 5. 会话管理器 WebSocketSessionManager
WebSocketSessionManager
负责管理 WebSocket 的会话信息,使用 ConcurrentHashMap
存储会话,保证线程安全。
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class WebSocketSessionManager {
private static final Map<String, WebSocketSession> USER_SESSION_MAP = new ConcurrentHashMap<>();
/**
* 将 WebSocket 会话添加到用户会话 Map 中
*
* @param sessionKey 会话键,用于检索会话
* @param session 要添加的 WebSocket 会话
*/
public static void addSession(String sessionKey, WebSocketSession session) {
USER_SESSION_MAP.put(sessionKey, session);
}
/**
* 根据会话键从用户会话 Map 中获取 WebSocket 会话
*
* @param sessionKey 要获取的会话键
* @return 与给定会话键对应的 WebSocket 会话,如果不存在则返回 null
*/
public static WebSocketSession getSessions(String sessionKey) {
return USER_SESSION_MAP.get(sessionKey);
}
/**
* 获取所有 WebSocket 会话的会话键集合
*
* @return 所有 WebSocket 会话的会话键集合
*/
public static Set<String> getSessionsAll() {
return USER_SESSION_MAP.keySet();
}
/**
* 从用户会话 Map 中移除指定会话键对应的 WebSocket 会话
*
* @param sessionKey 要移除的会话键
*/
public static void removeSession(String sessionKey) {
USER_SESSION_MAP.remove(sessionKey);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
说明:
USER_SESSION_MAP
是存储所有在线 WebSocket 会话的集合,通过用户标识作为 key,WebSocket 会话作为 value。- 提供了添加、获取、移除会话的方法,支持通过用户标识查找对应的 WebSocket 连接。
# 6. 握手拦截器 WebSocketInterceptor
通过 WebSocketInterceptor
实现 WebSocket 握手前后的预处理,通常用于验证用户身份。
目的:在 WebSocket 建立连接之前,服务器需要对连接进行身份验证或者其他预处理操作。Spring 提供的 HandshakeInterceptor
接口允许开发者在 WebSocket 握手过程中进行自定义处理。该接口提供两个方法:
beforeHandshake
: 在 WebSocket 握手请求到达时执行。通常用于身份验证,获取客户端请求中的用户信息,验证其合法性。afterHandshake
: 在 WebSocket 握手完成后执行。可以记录日志或做其他后续操作
public class WebSocketInterceptor implements HandshakeInterceptor {
/**
* WebSocket 握手之前执行的前置处理方法
*
* @param request WebSocket握手请求
* @param response WebSocket握手响应
* @param wsHandler WebSocket处理程序
* @param attributes 与 WebSocket 会话关联的属性
* @return 是否允许握手继续进行
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) {
// 从请求中获取用户信息
LoginUser loginUser = LoginHelper.getLoginUser();
// 将用户信息存储到 WebSocketSession 的属性中
attributes.put(WebSocketConstant.LOGIN_USER_KEY, loginUser);
return true;
}
/**
* WebSocket 握手成功后执行的后置处理方法
*
* @param request WebSocket握手请求
* @param response WebSocket握手响应
* @param wsHandler WebSocket处理程序
* @param exception 握手过程中出现的异常
*/
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
// 握手后可以执行后续操作,比如记录日志
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
说明:
beforeHandshake
: 在握手前验证用户信息,并将用户信息存入 WebSocket 会话的属性中。afterHandshake
: 握手成功后执行一些必要的操作,比如日志记录。
# 7. WebSocket 处理器 WebSocketHandler
通过继承 AbstractWebSocketHandler
,实现 WebSocket 连接的建立、消息接收和连接关闭等功能。
目的:AbstractWebSocketHandler
是 Spring 提供的一个抽象类,允许开发者在 WebSocket 生命周期中的各个关键点上自定义行为,包括:
afterConnectionEstablished
: 当 WebSocket 连接成功时触发。通常用于初始化 WebSocket 会话,将其存储到服务器端的会话管理器中。handleTextMessage
: 当服务器接收到来自客户端的消息时触发。在这个方法中,我们可以处理接收到的消息,并决定是否向客户端发送响应。afterConnectionClosed
: 当 WebSocket 连接关闭时触发。通常用于清理资源,移除服务器端保存的 WebSocket 会话。
@Slf4j
public class WebSocketHandler extends AbstractWebSocketHandler {
/**
* 当 WebSocket 连接成功后触发
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) {
LoginUser loginUser = (LoginUser) session.getAttributes().get(WebSocketConstant.LOGIN_USER_KEY);
WebSocketSessionManager.addSession(loginUser.getUserId(), session);
log.info("[connect] sessionId: {}, userId:{}, username:{}", session.getId(), loginUser.getUserId(), loginUser.getUsername());
}
/**
* 处理接收到的文本消息
*
* @param session WebSocket 会话
* @param message 接收到的文本消息
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
LoginUser loginUser = (LoginUser) session.getAttributes().get(WebSocketConstant.LOGIN_USER_KEY);
// 创建 WebSocket 消息上下文
WebSocketMessageContext webSocketMessageContext = new WebSocketMessageContext();
webSocketMessageContext.setSessionKeys(List.of(loginUser.getUserId()));
webSocketMessageContext.setMessage(message.getPayload());
// 发布消息
WebSocketHelper.publishMessage(webSocketMessageContext);
}
/**
* 在 WebSocket 连接关闭后执行
*
* @param session WebSocket 会话
* @param status 连接关闭状态
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
LoginUser loginUser = (LoginUser) session.getAttributes().get(WebSocketConstant.LOGIN_USER_KEY);
WebSocketSessionManager.removeSession(loginUser.getUserId());
log.info("[disconnect] sessionId: {},userId:{},username:{}", session.getId(), loginUser.getUserId(), loginUser.getUsername());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
说明:
afterConnectionEstablished
: 连接建立成功后,存储会话信息。handleTextMessage
: 处理客户端发送的文本消息,生成消息上下文并通过工具类推送消息。afterConnectionClosed
: 连接关闭后,移除会话信息。
# 8. WebSocket 工具类 WebSocketHelper
WebSocketHelper
提供了服务端主动向客户端推送消息的功能,支持单用户消息、群发消息和 Redis 发布订阅模式。
目的:WebSocketHelper
工具类用于简化 WebSocket 消息的发送。服务端可以通过该类主动向指定客户端或所有客户端推送消息。该工具类可以通过 WebSocketSessionManager
来查找客户端的 WebSocket 会话,并通过会话将消息发送给客户端。
实现:WebSocketHelper 提供了几种消息推送的方法:
- 向单个用户发送消息:通过用户标识在
WebSocketSessionManager
中查找对应的 WebSocket 会话,并发送消息。 - 向多个用户发送消息:遍历多个用户标识,依次查找 WebSocket 会话并发送消息。
- 向所有在线用户广播消息:通过遍历会话管理器中的所有会话,将消息推送给所有在线用户。
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class WebSocketHelper {
/**
* 向指定的 WebSocket 会话发送消息
*
* @param sessionKey 要发送消息的用户标识
* @param message 要发送的消息内容
*/
public static void sendMessage(String sessionKey, String message) {
WebSocketSession session = WebSocketSessionManager.getSessions(sessionKey);
sendMessage(session, message);
}
/**
* 发布 WebSocket 消息到 Redis 频道
*
* @param webSocketMessage 要发布的 WebSocket 消息对象
*/
public static void publishMessage(WebSocketMessageContext webSocketMessage) {
// 遍历目标用户,逐个发送消息
webSocketMessage.getSessionKeys().forEach(sessionKey -> {
WebSocketSession session = WebSocketSessionManager.getSessions(sessionKey);
sendMessage(session, webSocketMessage.getMessage());
});
// 如果部分用户不在当前服务内,则通过 Redis 发布消息
RedisUtil.publish(WebSocketConstant.WEB_SOCKET_TOPIC, webSocketMessage);
}
/**
* 向所有的 WebSocket 会话广播消息
*
* @param message 要广播的消息内容
*/
public static void publishAll(String message) {
WebSocketMessageContext context = new WebSocketMessageContext();
context.setMessage(message);
RedisUtil.publish(WebSocketConstant.WEB_SOCKET_TOPIC, context);
}
/**
* 向指定的 WebSocket 会话发送消息
*
* @param session WebSocket 会话
* @param message 要发送的消息内容
*/
private static void sendMessage(WebSocketSession session, String message) {
if (session != null && session.isOpen()) {
try {
session.sendMessage(new TextMessage(message));
} catch (IOException e) {
log.error("发送消息失败", e);
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
说明:
- 提供单用户消息推送和群发消息推送功能。
- 通过 Redis 发布消息实现跨服务的消息同步。
# 9. Redis 消息监听器 WebSocketTopicListener
该监听器负责从 Redis 中接收消息,并将消息推送到指定的 WebSocket 会话。
@Slf4j
@RequiredArgsConstructor
public class WebSocketTopicListener implements MessageListener {
private final RedisTemplate<String, Object> redisTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = new String(message.getChannel());
if (WebSocketConstant.WEB_SOCKET_TOPIC.equals(channel)) {
// 反序列化消息
WebSocketMessageContext context = (WebSocketMessageContext) redisTemplate.getValueSerializer().deserialize(message.getBody());
if (context != null) {
context.getSessionKeys().forEach(sessionKey -> {
WebSocketHelper.sendMessage(sessionKey, context.getMessage());
});
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
说明:
- 监听 Redis 频道
WEB_SOCKET_TOPIC
,当有消息发布时,将消息推送给指定的 WebSocket 会话。
# 10. WebSocket 容器装配 WebSocketConfiguration
通过自动配置类,将 WebSocket 和 Redis 相关的组件注入 Spring 容器。
@AutoConfiguration
@ConditionalOnProperty(value = "websocket.enabled", havingValue = "true")
@EnableConfigurationProperties(WebSocketProperties.class)
@EnableWebSocket
public class WebSocketConfiguration {
@Bean
public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor handshakeInterceptor, WebSocketHandler webSocketHandler, WebSocketProperties webSocketProperties) {
return registry -> registry
.addHandler(webSocketHandler, webSocketProperties.getPath())
.addInterceptors(handshakeInterceptor)
.setAllowedOrigins(webSocketProperties.getAllowedOrigins());
}
@Bean
public HandshakeInterceptor handshakeInterceptor() {
return new WebSocketInterceptor();
}
@Bean
public WebSocketHandler webSocketHandler() {
return new WebSocketHandler();
}
@Bean
public WebSocketTopicListener webSocketTopicListener(RedisTemplate<String, Object> redisTemplate) {
return new WebSocketTopicListener(redisTemplate);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
说明:
- 自动装配 WebSocket 配置,并通过 Redis 实现跨服务的 WebSocket 消息分发。
总结
本实现基于 Spring Boot WebSocket 和 Redis 发布订阅机制,支持多服务之间的 WebSocket 消息同步和跨域通信。通过集成 WebSocket 处理器、握手拦截器、会话管理器等模块,提供了一个灵活、扩展性强的 WebSocket 解决方案。
- WebSocket 会话管理:通过
WebSocketSessionManager
管理所有的 WebSocket 会话,支持跨服务的会话管理。 - Redis 发布订阅:利用 Redis 的发布订阅机制,实现了分布式系统中的消息同步。
- 自动配置与灵活性:通过配置类,实现 WebSocket 路径、跨域策略等灵活配置,方便项目集成和扩展。
这种架构非常适合分布式系统中的 WebSocket 应用,比如在线聊天、实时推送等场景。