原理
websocket的订阅就是在前后端建立ws连接之后,前端通过发送一定格式的消息,后端解析出来去订阅或者取消订阅redis频道。
订阅频道消息格式:
1
2
3
4
5
6
|
{ "cmd" : "subscribe" , "topic" :[ "topic_name" ] } |
模糊订阅格式
1
2
3
4
5
6
|
{ "cmd" : "psubscribe" , "topic" :[ "topic_name" ] } |
取消订阅格式
1
2
3
4
5
6
|
{ "cmd" : "unsubscribe" , "topic" :[ "topic_name" ] } |
两个核心类,一个是redis的订阅监听类,一个是websocket的发布订阅类。
redis订阅监听类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
package com.curtain.core; import com.curtain.config.GetBeanUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPubSub; import java.util.Arrays; /** * @Author Curtain * @Date 2021/6/7 14:27 * @Description */ @Component @Slf4j public class RedisPubSub extends JedisPubSub { private JedisPool jedisPool = GetBeanUtil.getBean(JedisPool. class ); private Jedis jedis; //订阅 public void subscribe(String... channels) { jedis = jedisPool.getResource(); try { jedis.subscribe( this , channels); } catch (Exception e) { log.error(e.getMessage()); if (jedis != null ) jedis.close(); //遇到异常后关闭连接重新订阅 log.info( "监听遇到异常,四秒后重新订阅频道:" ); Arrays.asList(channels).forEach(s -> {log.info(s);}); try { Thread.sleep( 4000 ); } catch (InterruptedException interruptedException) { interruptedException.printStackTrace(); } subscribe(channels); } } //模糊订阅 public void psubscribe(String... channels) { Jedis jedis = jedisPool.getResource(); try { jedis.psubscribe( this , channels); } catch (ArithmeticException e) { //取消订阅故意造成的异常 if (jedis != null ) jedis.close(); } catch (Exception e) { log.error(e.getMessage()); if (jedis != null ) jedis.close(); //遇到异常后关闭连接重新订阅 log.info( "监听遇到异常,四秒后重新订阅频道:" ); Arrays.asList(channels).forEach(s -> {log.info(s);}); try { Thread.sleep( 4000 ); } catch (InterruptedException interruptedException) { interruptedException.printStackTrace(); } psubscribe(channels); } } public void unsubscribeAndClose(String... channels){ unsubscribe(channels); if (jedis != null && !isSubscribed()) jedis.close(); } public void punsubscribeAndClose(String... channels){ punsubscribe(channels); if (jedis != null && !isSubscribed()) jedis.close(); } @Override public void onSubscribe(String channel, int subscribedChannels) { log.info( "subscribe redis channel:" + channel + ", 线程id:" + Thread.currentThread().getId()); } @Override public void onPSubscribe(String pattern, int subscribedChannels) { log.info( "psubscribe redis channel:" + pattern + ", 线程id:" + Thread.currentThread().getId()); } @Override public void onPMessage(String pattern, String channel, String message) { log.info( "receive from redis channal: " + channel + ",pattern: " + pattern + ",message:" + message + ", 线程id:" + Thread.currentThread().getId()); WebSocketServer.publish(message, pattern); WebSocketServer.publish(message, channel); } @Override public void onMessage(String channel, String message) { log.info( "receive from redis channal: " + channel + ",message:" + message + ", 线程id:" + Thread.currentThread().getId()); WebSocketServer.publish(message, channel); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { log.info( "unsubscribe redis channel:" + channel); } @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { log.info( "punsubscribe redis channel:" + pattern); } } |
1.jedis监听redis频道的时候如果遇见异常会关闭连接导致后续没有监听该频道,所以这里在subscribe捕获到异常的时候会重新创建一个jedis连接订阅该redis频道。
webSocket订阅推送类
这个类会有两个ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>>类型类变量,分别存储订阅和模糊订阅的信息。
外面一层的String对应的值是topic_name,里面一层的String对应的值是sessionId。前端发送过来的消息里面对应的这三类操作其实就是对这两个map里面的。
还有个ConcurrentHashMap<String, RedisPubSub>类型的变量,存储的是事件-RedisPubSub,便于取消订阅的时候找到监听该频道(事件)的RedisPubSub对象。
信息进行增加或者删除;后端往前端推送数据也会根据不同的topic_name推送到不同的订阅者这边。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
|
package com.curtain.core; import com.alibaba.fastjson.JSON; import com.curtain.config.WebsocketProperties; import com.curtain.service.Cancelable; import com.curtain.service.impl.TaskExecuteService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** * @Author Curtain * @Date 2021/5/14 16:49 * @Description */ @ServerEndpoint ( "/ws" ) @Component @Slf4j public class WebSocketServer { /** * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 */ private static volatile ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>> webSocketMap = new ConcurrentHashMap<>(); /** * 存放psub的事件 **/ private static volatile ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>> pWebSocketMap = new ConcurrentHashMap<>(); /** * 存放topic(pattern)-对应的RedisPubsub */ private static volatile ConcurrentHashMap<String, RedisPubSub> redisPubSubMap = new ConcurrentHashMap<>(); /** * 与某个客户端的连接会话,需要通过它来给客户端发送数据 */ private Session session; private String sessionId = "" ; //要注入的对象 private static TaskExecuteService executeService; private static WebsocketProperties properties; private Cancelable cancelable; @Autowired public void setTaskExecuteService(TaskExecuteService taskExecuteService) { WebSocketServer.executeService = taskExecuteService; } @Autowired public void setWebsocketProperties(WebsocketProperties properties) { WebSocketServer.properties = properties; } /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session) { this .session = session; this .sessionId = session.getId(); //构造推送数据 Map pubHeader = new HashMap(); pubHeader.put( "name" , "connect_status" ); pubHeader.put( "type" , "create" ); pubHeader.put( "from" , "pubsub" ); pubHeader.put( "time" , new Date().getTime() / 1000 ); Map pubPayload = new HashMap(); pubPayload.put( "status" , "success" ); Map pubMap = new HashMap(); pubMap.put( "header" , pubHeader); pubMap.put( "payload" , pubPayload); sendMessage(JSON.toJSONString(pubMap)); cancelable = executeService.runPeriodly(() -> { try { if (cancelable != null && !session.isOpen()) { log.info( "断开连接,停止发送ping" ); cancelable.cancel(); } else { String data = "ping" ; ByteBuffer payload = ByteBuffer.wrap(data.getBytes()); session.getBasicRemote().sendPing(payload); } } catch (IOException e) { e.printStackTrace(); } }, properties.getPeriod()); } @OnMessage public void onMessage(String message) { synchronized (session) { Map msgMap = (Map) JSON.parse(message); String cmd = (String) msgMap.get( "cmd" ); //订阅消息 if ( "subscribe" .equals(cmd)) { List<String> topics = (List<String>) msgMap.get( "topic" ); //本地记录订阅信息 for ( int i = 0 ; i < topics.size(); i++) { String topic = topics.get(i); log.info( "============================subscribe-start============================" ); log.info( "sessionId:" + this .sessionId + ",开始订阅:" + topic); if (webSocketMap.containsKey(topic)) { //有人订阅过了 webSocketMap.get(topic).put( this .sessionId, this ); } else { //之前还没人订阅过,所以需要订阅redis频道 ConcurrentHashMap<String, WebSocketServer> map = new ConcurrentHashMap<>(); map.put( this .sessionId, this ); webSocketMap.put(topic, map); new Thread(() -> { RedisPubSub redisPubSub = new RedisPubSub(); //存入map redisPubSubMap.put(topic, redisPubSub); redisPubSub.subscribe(topic); }).start(); } log.info( "sessionId:" + this .sessionId + ",完成订阅:" + topic); log(); log.info( "============================subscribe-end============================" ); } } //psubscribe if ( "psubscribe" .equals(cmd)) { List<String> topics = (List<String>) msgMap.get( "topic" ); //本地记录订阅信息 for ( int i = 0 ; i < topics.size(); i++) { String topic = topics.get(i); log.info( "============================psubscribe-start============================" ); log.info( "sessionId:" + this .sessionId + ",开始模糊订阅:" + topic); if (pWebSocketMap.containsKey(topic)) { //有人订阅过了 pWebSocketMap.get(topic).put( this .sessionId, this ); } else { //之前还没人订阅过,所以需要订阅redis频道 ConcurrentHashMap<String, WebSocketServer> map = new ConcurrentHashMap<>(); map.put( this .sessionId, this ); pWebSocketMap.put(topic, map); new Thread(() -> { RedisPubSub redisPubSub = new RedisPubSub(); //存入map redisPubSubMap.put(topic, redisPubSub); redisPubSub.psubscribe(topic); }).start(); } log.info( "sessionId:" + this .sessionId + ",完成模糊订阅:" + topic); log(); log.info( "============================psubscribe-end============================" ); } } //取消订阅 if ( "unsubscribe" .equals(cmd)) { List<String> topics = (List<String>) msgMap.get( "topic" ); //删除本地对应的订阅信息 for (String topic : topics) { log.info( "============================unsubscribe-start============================" ); log.info( "sessionId:" + this .sessionId + ",开始删除订阅:" + topic); if (webSocketMap.containsKey(topic)) { ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic); map.remove( this .sessionId); if (map.size() == 0 ) { //如果这个频道没有用户订阅了,则取消订阅该redis频道 webSocketMap.remove(topic); redisPubSubMap.get(topic).unsubscribeAndClose(topic); redisPubSubMap.remove(topic); } } if (pWebSocketMap.containsKey(topic)) { ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic); map.remove( this .sessionId); if (map.size() == 0 ) { //如果这个频道没有用户订阅了,则取消订阅该redis频道 pWebSocketMap.remove(topic); redisPubSubMap.get(topic).punsubscribeAndClose(topic); redisPubSubMap.remove(topic); } } log.info( "sessionId:" + this .sessionId + ",完成删除订阅:" + topic); log(); log.info( "============================unsubscribe-end============================" ); } } } } @OnMessage public void onPong(PongMessage pongMessage) { try { log.debug( new String(pongMessage.getApplicationData().array(), "utf-8" ) + "接收到pong" ); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { synchronized (session) { log.info( "============================onclose-start============================" ); //删除订阅 Iterator iterator = webSocketMap.keySet().iterator(); while (iterator.hasNext()) { String topic = (String) iterator.next(); ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic); map.remove( this .sessionId); if (map.size() == 0 ) { //如果这个频道没有用户订阅了,则取消订阅该redis频道 webSocketMap.remove(topic); redisPubSubMap.get(topic).unsubscribeAndClose(topic); redisPubSubMap.remove(topic); } } //删除模糊订阅 Iterator iteratorP = pWebSocketMap.keySet().iterator(); while (iteratorP.hasNext()) { String topic = (String) iteratorP.next(); ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic); map.remove( this .sessionId); if (map.size() == 0 ) { //如果这个频道没有用户订阅了,则取消订阅该redis频道 pWebSocketMap.remove(topic); redisPubSubMap.get(topic).punsubscribeAndClose(topic); redisPubSubMap.remove(topic); } } log.info( "sessionId:" + this .sessionId + ",断开连接:" ); //debug log(); log.info( "============================onclose-end============================" ); } } /** * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { synchronized (session) { log.info( "============================onError-start============================" ); log.error( "用户错误,sessionId:" + session.getId() + ",原因:" + error.getMessage()); error.printStackTrace(); log.info( "关闭错误用户对应的连接" ); //删除订阅 Iterator iterator = webSocketMap.keySet().iterator(); while (iterator.hasNext()) { String topic = (String) iterator.next(); ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic); map.remove( this .sessionId); if (map.size() == 0 ) { //如果这个频道没有用户订阅了,则取消订阅该redis频道 webSocketMap.remove(topic); redisPubSubMap.get(topic).unsubscribeAndClose(topic); redisPubSubMap.remove(topic); } } //删除模糊订阅 Iterator iteratorP = pWebSocketMap.keySet().iterator(); while (iteratorP.hasNext()) { String topic = (String) iteratorP.next(); ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic); map.remove( this .sessionId); if (map.size() == 0 ) { //如果这个频道没有用户订阅了,则取消订阅该redis频道 pWebSocketMap.remove(topic); redisPubSubMap.get(topic).punsubscribeAndClose(topic); redisPubSubMap.remove(topic); } } log.info( "完成错误用户对应的连接关闭" ); //debug log(); log.info( "============================onError-end============================" ); } } /** * 实现服务器主动推送 */ public void sendMessage(String message) { synchronized (session) { try { this .session.getBasicRemote().sendText(message); } catch (IOException e) { e.printStackTrace(); } } } public static void publish(String msg, String topic) { ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic); if (map != null && map.values() != null ) { for (WebSocketServer webSocketServer : map.values()) webSocketServer.sendMessage(msg); } map = pWebSocketMap.get(topic); if (map != null && map.values() != null ) { for (WebSocketServer webSocketServer : map.values()) webSocketServer.sendMessage(msg); } } private void log() { log.info( "<<<<<<<<<<<完成操作后,打印订阅信息开始>>>>>>>>>>" ); Iterator iterator1 = webSocketMap.keySet().iterator(); while (iterator1.hasNext()) { String topic = (String) iterator1.next(); log.info( "topic:" + topic); Iterator iterator2 = webSocketMap.get(topic).keySet().iterator(); while (iterator2.hasNext()) { String session = (String) iterator2.next(); log.info( "订阅" + topic + "的sessionId:" + session); } } log.info( "<<<<<<<<<<<完成操作后,打印订阅信息结束>>>>>>>>>>" ); } } |
项目地址
上面介绍了核心代码,下面是完整代码地址
https://github.com/Curtain-Wang/websocket-redis-subscribe.git
Update20220415
参考评论区老哥的建议,将redis订阅监听类里面的subscribe和psubscribe方法调整如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
//订阅 @Override public void subscribe(String... channels) { boolean done = true ; while (done){ Jedis jedis = jedisPool.getResource(); try { jedis.subscribe( this , channels); done = false ; } catch (Exception e) { log.error(e.getMessage()); if (jedis != null ) jedis.close(); //遇到异常后关闭连接重新订阅 log.info( "监听遇到异常,四秒后重新订阅频道:" ); Arrays.asList(channels).forEach(s -> {log.info(s);}); try { Thread.sleep( 4000 ); } catch (InterruptedException interruptedException) { interruptedException.printStackTrace(); } } } } //模糊订阅 @Override public void psubscribe(String... channels) { boolean done = true ; while (done){ Jedis jedis = jedisPool.getResource(); try { jedis.psubscribe( this , channels); done = false ; } catch (Exception e) { log.error(e.getMessage()); if (jedis != null ) jedis.close(); //遇到异常后关闭连接重新订阅 log.info( "监听遇到异常,四秒后重新订阅频道:" ); Arrays.asList(channels).forEach(s -> {log.info(s);}); try { Thread.sleep( 4000 ); } catch (InterruptedException interruptedException) { interruptedException.printStackTrace(); } } } } |
到此这篇关于websocket+redis动态订阅和动态取消订阅的实现示例的文章就介绍到这了,更多相关websocket redis动态订阅 内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://blog.csdn.net/qq_41953872/article/details/117361928