服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

Mysql|Sql Server|Oracle|Redis|MongoDB|PostgreSQL|Sqlite|DB2|mariadb|Access|数据库技术|

服务器之家 - 数据库 - Redis - websocket+redis动态订阅和动态取消订阅的实现示例

websocket+redis动态订阅和动态取消订阅的实现示例

2022-10-18 15:46柯腾_ Redis

本文主要介绍了websocket+redis动态订阅和动态取消订阅,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

原理

websocket的订阅就是在前后端建立ws连接之后,前端通过发送一定格式的消息,后端解析出来去订阅或者取消订阅redis频道。

订阅频道消息格式:

?
1
2
3
4
5
6
{
    "cmd":"subscribe",
    "topic":[
        "topic_name"
    ]
}

模糊订阅格式

?
1
2
3
4
5
6
{
    "cmd":"psubscribe",
    "topic":[
        "topic_name"
    ]
}

取消订阅格式

?
1
2
3
4
5
6
{
    "cmd":"unsubscribe",
    "topic":[
        "topic_name"
    ]
}

两个核心类,一个是redis的订阅监听类,一个是websocket的发布订阅类。

redis订阅监听类

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package com.curtain.core;
 
import com.curtain.config.GetBeanUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;
 
import java.util.Arrays;
 
/**
 * @Author Curtain
 * @Date 2021/6/7 14:27
 * @Description
 */
@Component
@Slf4j
public class RedisPubSub extends JedisPubSub {
    private JedisPool jedisPool = GetBeanUtil.getBean(JedisPool.class);
    private Jedis jedis;
 
    //订阅
    public void subscribe(String... channels) {
        jedis = jedisPool.getResource();
        try {
            jedis.subscribe(this, channels);
        } catch (Exception e) {
            log.error(e.getMessage());
            if (jedis != null)
                jedis.close();
            //遇到异常后关闭连接重新订阅
            log.info("监听遇到异常,四秒后重新订阅频道:");
            Arrays.asList(channels).forEach(s -> {log.info(s);});
            try {
                Thread.sleep(4000);
            } catch (InterruptedException interruptedException) {
                interruptedException.printStackTrace();
            }
            subscribe(channels);
        }
    }
 
    //模糊订阅
    public void psubscribe(String... channels) {
        Jedis jedis = jedisPool.getResource();
        try {
            jedis.psubscribe(this, channels);
        } catch (ArithmeticException e) {//取消订阅故意造成的异常
            if (jedis != null)
                jedis.close();
        } catch (Exception e) {
            log.error(e.getMessage());
            if (jedis != null)
                jedis.close();
            //遇到异常后关闭连接重新订阅
            log.info("监听遇到异常,四秒后重新订阅频道:");
            Arrays.asList(channels).forEach(s -> {log.info(s);});
            try {
                Thread.sleep(4000);
            } catch (InterruptedException interruptedException) {
                interruptedException.printStackTrace();
            }
            psubscribe(channels);
        }
    }
 
    public void unsubscribeAndClose(String... channels){
        unsubscribe(channels);
        if (jedis != null && !isSubscribed())
            jedis.close();
    }
 
    public void punsubscribeAndClose(String... channels){
        punsubscribe(channels);
        if (jedis != null && !isSubscribed())
            jedis.close();
    }
 
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        log.info("subscribe redis channel:" + channel + ", 线程id:" + Thread.currentThread().getId());
    }
 
    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
        log.info("psubscribe redis channel:" + pattern + ", 线程id:" + Thread.currentThread().getId());
    }
 
    @Override
    public void onPMessage(String pattern, String channel, String message) {
        log.info("receive from redis channal: " + channel + ",pattern: " + pattern + ",message:" + message + ", 线程id:" + Thread.currentThread().getId());
        WebSocketServer.publish(message, pattern);
        WebSocketServer.publish(message, channel);
 
    }
 
    @Override
    public void onMessage(String channel, String message) {
        log.info("receive from redis channal: " + channel + ",message:" + message + ", 线程id:" + Thread.currentThread().getId());
        WebSocketServer.publish(message, channel);
    }
 
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        log.info("unsubscribe redis channel:" + channel);
    }
 
    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        log.info("punsubscribe redis channel:" + pattern);
    }
}

1.jedis监听redis频道的时候如果遇见异常会关闭连接导致后续没有监听该频道,所以这里在subscribe捕获到异常的时候会重新创建一个jedis连接订阅该redis频道。

webSocket订阅推送类

这个类会有两个ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>>类型类变量,分别存储订阅和模糊订阅的信息。

外面一层的String对应的值是topic_name,里面一层的String对应的值是sessionId。前端发送过来的消息里面对应的这三类操作其实就是对这两个map里面的。

还有个ConcurrentHashMap<String, RedisPubSub>类型的变量,存储的是事件-RedisPubSub,便于取消订阅的时候找到监听该频道(事件)的RedisPubSub对象。

信息进行增加或者删除;后端往前端推送数据也会根据不同的topic_name推送到不同的订阅者这边。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
package com.curtain.core;
 
import com.alibaba.fastjson.JSON;
import com.curtain.config.WebsocketProperties;
import com.curtain.service.Cancelable;
import com.curtain.service.impl.TaskExecuteService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
 
 
/**
 * @Author Curtain
 * @Date 2021/5/14 16:49
 * @Description
 */
@ServerEndpoint("/ws")
@Component
@Slf4j
public class WebSocketServer {
    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
     */
    private static volatile ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>> webSocketMap = new ConcurrentHashMap<>();
    /**
     * 存放psub的事件
     **/
    private static volatile ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>> pWebSocketMap = new ConcurrentHashMap<>();
    /**
     * 存放topic(pattern)-对应的RedisPubsub
     */
    private static volatile ConcurrentHashMap<String, RedisPubSub> redisPubSubMap = new ConcurrentHashMap<>();
    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;
    private String sessionId = "";
    //要注入的对象
    private static TaskExecuteService executeService;
    private static WebsocketProperties properties;
 
    private Cancelable cancelable;
 
    @Autowired
    public void setTaskExecuteService(TaskExecuteService taskExecuteService) {
        WebSocketServer.executeService = taskExecuteService;
    }
 
    @Autowired
    public void setWebsocketProperties(WebsocketProperties properties) {
        WebSocketServer.properties = properties;
    }
 
    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        this.sessionId = session.getId();
        //构造推送数据
        Map pubHeader = new HashMap();
        pubHeader.put("name", "connect_status");
        pubHeader.put("type", "create");
        pubHeader.put("from", "pubsub");
        pubHeader.put("time", new Date().getTime() / 1000);
        Map pubPayload = new HashMap();
        pubPayload.put("status", "success");
        Map pubMap = new HashMap();
        pubMap.put("header", pubHeader);
        pubMap.put("payload", pubPayload);
        sendMessage(JSON.toJSONString(pubMap));
        cancelable = executeService.runPeriodly(() -> {
            try {
                if (cancelable != null && !session.isOpen()) {
                    log.info("断开连接,停止发送ping");
                    cancelable.cancel();
                } else {
                    String data = "ping";
                    ByteBuffer payload = ByteBuffer.wrap(data.getBytes());
                    session.getBasicRemote().sendPing(payload);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }, properties.getPeriod());
 
    }
 
    @OnMessage
    public void onMessage(String message) {
        synchronized (session) {
            Map msgMap = (Map) JSON.parse(message);
            String cmd = (String) msgMap.get("cmd");
            //订阅消息
            if ("subscribe".equals(cmd)) {
                List<String> topics = (List<String>) msgMap.get("topic");
                //本地记录订阅信息
                for (int i = 0; i < topics.size(); i++) {
                    String topic = topics.get(i);
                    log.info("============================subscribe-start============================");
                    log.info("sessionId:" + this.sessionId + ",开始订阅:" + topic);
                    if (webSocketMap.containsKey(topic)) {//有人订阅过了
                        webSocketMap.get(topic).put(this.sessionId, this);
                    } else {//之前还没人订阅过,所以需要订阅redis频道
                        ConcurrentHashMap<String, WebSocketServer> map = new ConcurrentHashMap<>();
                        map.put(this.sessionId, this);
                        webSocketMap.put(topic, map);
                        new Thread(() -> {
                            RedisPubSub redisPubSub = new RedisPubSub();
                            //存入map
                            redisPubSubMap.put(topic, redisPubSub);
                            redisPubSub.subscribe(topic);
                        }).start();
                    }
                    log.info("sessionId:" + this.sessionId + ",完成订阅:" + topic);
                    log();
                    log.info("============================subscribe-end============================");
                }
            }
            //psubscribe
            if ("psubscribe".equals(cmd)) {
                List<String> topics = (List<String>) msgMap.get("topic");
                //本地记录订阅信息
                for (int i = 0; i < topics.size(); i++) {
                    String topic = topics.get(i);
                    log.info("============================psubscribe-start============================");
                    log.info("sessionId:" + this.sessionId + ",开始模糊订阅:" + topic);
                    if (pWebSocketMap.containsKey(topic)) {//有人订阅过了
                        pWebSocketMap.get(topic).put(this.sessionId, this);
                    } else {//之前还没人订阅过,所以需要订阅redis频道
                        ConcurrentHashMap<String, WebSocketServer> map = new ConcurrentHashMap<>();
                        map.put(this.sessionId, this);
                        pWebSocketMap.put(topic, map);
                        new Thread(() -> {
                            RedisPubSub redisPubSub = new RedisPubSub();
                            //存入map
                            redisPubSubMap.put(topic, redisPubSub);
                            redisPubSub.psubscribe(topic);
                        }).start();
                    }
                    log.info("sessionId:" + this.sessionId + ",完成模糊订阅:" + topic);
                    log();
                    log.info("============================psubscribe-end============================");
                }
            }
            //取消订阅
            if ("unsubscribe".equals(cmd)) {
                List<String> topics = (List<String>) msgMap.get("topic");
                //删除本地对应的订阅信息
                for (String topic : topics) {
                    log.info("============================unsubscribe-start============================");
                    log.info("sessionId:" + this.sessionId + ",开始删除订阅:" + topic);
                    if (webSocketMap.containsKey(topic)) {
                        ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
                        map.remove(this.sessionId);
                        if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道
                            webSocketMap.remove(topic);
                            redisPubSubMap.get(topic).unsubscribeAndClose(topic);
                            redisPubSubMap.remove(topic);
                        }
                    }
                    if (pWebSocketMap.containsKey(topic)) {
                        ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic);
                        map.remove(this.sessionId);
                        if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道
                            pWebSocketMap.remove(topic);
                            redisPubSubMap.get(topic).punsubscribeAndClose(topic);
                            redisPubSubMap.remove(topic);
                        }
                    }
                    log.info("sessionId:" + this.sessionId + ",完成删除订阅:" + topic);
                    log();
                    log.info("============================unsubscribe-end============================");
                }
            }
        }
    }
 
    @OnMessage
    public void onPong(PongMessage pongMessage) {
        try {
            log.debug(new String(pongMessage.getApplicationData().array(), "utf-8") + "接收到pong");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
 
    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        synchronized (session) {
            log.info("============================onclose-start============================");
            //删除订阅
            Iterator iterator = webSocketMap.keySet().iterator();
            while (iterator.hasNext()) {
                String topic = (String) iterator.next();
                ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
                map.remove(this.sessionId);
                if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道
                    webSocketMap.remove(topic);
                    redisPubSubMap.get(topic).unsubscribeAndClose(topic);
                    redisPubSubMap.remove(topic);
                }
            }
            //删除模糊订阅
            Iterator iteratorP = pWebSocketMap.keySet().iterator();
            while (iteratorP.hasNext()) {
                String topic = (String) iteratorP.next();
                ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic);
                map.remove(this.sessionId);
                if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道
                    pWebSocketMap.remove(topic);
                    redisPubSubMap.get(topic).punsubscribeAndClose(topic);
                    redisPubSubMap.remove(topic);
                }
            }
            log.info("sessionId:" + this.sessionId + ",断开连接:");
            //debug
            log();
            log.info("============================onclose-end============================");
        }
    }
 
 
    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        synchronized (session) {
            log.info("============================onError-start============================");
            log.error("用户错误,sessionId:" + session.getId() + ",原因:" + error.getMessage());
            error.printStackTrace();
            log.info("关闭错误用户对应的连接");
            //删除订阅
            Iterator iterator = webSocketMap.keySet().iterator();
            while (iterator.hasNext()) {
                String topic = (String) iterator.next();
                ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
                map.remove(this.sessionId);
                if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道
                    webSocketMap.remove(topic);
                    redisPubSubMap.get(topic).unsubscribeAndClose(topic);
                    redisPubSubMap.remove(topic);
                }
            }
            //删除模糊订阅
            Iterator iteratorP = pWebSocketMap.keySet().iterator();
            while (iteratorP.hasNext()) {
                String topic = (String) iteratorP.next();
                ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic);
                map.remove(this.sessionId);
                if (map.size() == 0) {//如果这个频道没有用户订阅了,则取消订阅该redis频道
                    pWebSocketMap.remove(topic);
                    redisPubSubMap.get(topic).punsubscribeAndClose(topic);
                    redisPubSubMap.remove(topic);
                }
            }
            log.info("完成错误用户对应的连接关闭");
            //debug
            log();
            log.info("============================onError-end============================");
        }
    }
 
    /**
     * 实现服务器主动推送
     */
    public void sendMessage(String message) {
        synchronized (session) {
            try {
                this.session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
 
    public static void publish(String msg, String topic) {
        ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
        if (map != null && map.values() != null) {
            for (WebSocketServer webSocketServer : map.values())
                webSocketServer.sendMessage(msg);
        }
        map = pWebSocketMap.get(topic);
        if (map != null && map.values() != null) {
            for (WebSocketServer webSocketServer : map.values())
                webSocketServer.sendMessage(msg);
        }
    }
 
    private void log() {
        log.info("<<<<<<<<<<<完成操作后,打印订阅信息开始>>>>>>>>>>");
        Iterator iterator1 = webSocketMap.keySet().iterator();
        while (iterator1.hasNext()) {
            String topic = (String) iterator1.next();
            log.info("topic:" + topic);
            Iterator iterator2 = webSocketMap.get(topic).keySet().iterator();
            while (iterator2.hasNext()) {
                String session = (String) iterator2.next();
                log.info("订阅" + topic + "的sessionId:" + session);
            }
        }
        log.info("<<<<<<<<<<<完成操作后,打印订阅信息结束>>>>>>>>>>");
    }
}

项目地址

上面介绍了核心代码,下面是完整代码地址

https://github.com/Curtain-Wang/websocket-redis-subscribe.git

Update20220415

参考评论区老哥的建议,将redis订阅监听类里面的subscribe和psubscribe方法调整如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
    //订阅
    @Override
    public void subscribe(String... channels) {
        boolean done = true;
        while (done){
            Jedis jedis = jedisPool.getResource();
            try {
                jedis.subscribe(this, channels);
                done = false;
            } catch (Exception e) {
                log.error(e.getMessage());
                if (jedis != null)
                    jedis.close();
                //遇到异常后关闭连接重新订阅
                log.info("监听遇到异常,四秒后重新订阅频道:");
                Arrays.asList(channels).forEach(s -> {log.info(s);});
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException interruptedException) {
                    interruptedException.printStackTrace();
                }
            }
        }
    }
    //模糊订阅
    @Override
    public void psubscribe(String... channels) {
        boolean done = true;
        while (done){
            Jedis jedis = jedisPool.getResource();
            try {
                jedis.psubscribe(this, channels);
                done = false;
            } catch (Exception e) {
                log.error(e.getMessage());
                if (jedis != null)
                    jedis.close();
                //遇到异常后关闭连接重新订阅
                log.info("监听遇到异常,四秒后重新订阅频道:");
                Arrays.asList(channels).forEach(s -> {log.info(s);});
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException interruptedException) {
                    interruptedException.printStackTrace();
                }
            }
        }
    }

到此这篇关于websocket+redis动态订阅和动态取消订阅的实现示例的文章就介绍到这了,更多相关websocket redis动态订阅 内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://blog.csdn.net/qq_41953872/article/details/117361928

延伸 · 阅读

精彩推荐
  • Redis聊聊数据存储系统Couchbase与Redis

    聊聊数据存储系统Couchbase与Redis

    Redis 和Couchbase都是基于内存的数据存储系统。其中,Couchbase是高性能,高伸缩性和高可用的分布式缓存系统;Redis是一个开源的内存数据结构存储系统。...

    班博编程7702022-03-07
  • RedisRedis自动化安装及集群实现搭建过程

    Redis自动化安装及集群实现搭建过程

    这篇文章主要介绍了Redis自动化安装以及集群实现搭建过程,本文给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下 ...

    mrr1952019-11-28
  • Redis基于Redis实现阻塞队列的方式

    基于Redis实现阻塞队列的方式

    本文主要讲解基于 Redis 的方式实现异步队列,基于 Redis 的 list 实现队列的方式也有多种,本文通过实例代码给大家介绍的非常详细,感兴趣的朋友跟随小...

    WeJan's Blog9452022-01-25
  • Redis详解Centos7下配置Redis并开机自启动

    详解Centos7下配置Redis并开机自启动

    本篇文章主要介绍了Centos7下配置Redis并开机自启动,具有一定的参考价值,感兴趣的小伙伴们可以参考一下。 ...

    LSGOZJ4572019-11-01
  • Redis在ssm项目中使用redis缓存查询数据的方法

    在ssm项目中使用redis缓存查询数据的方法

    本文主要简单的使用Java代码进行redis缓存,即在查询的时候先在service层从redis缓存中获取数据。如果大家对在ssm项目中使用redis缓存查询数据的相关知识感...

    caychen9052019-11-12
  • Redis嵌入式Redis服务器在Spring Boot测试中的使用教程

    嵌入式Redis服务器在Spring Boot测试中的使用教程

    这篇文章主要介绍了嵌入式Redis服务器在Spring Boot测试中的使用,本文通过实例代码场景分析给大家介绍的非常详细,需要的朋友参考下吧...

    码农熊猫8242021-08-18
  • RedisRedis中一个String类型引发的惨案

    Redis中一个String类型引发的惨案

    随着存储的数据量越来越大,Redis的内存的使用量也快速上升,结果遇到了大内存Redis实例因为生成RDB而响应变慢的问题。很显然String类型并不是一种好的选...

    公众号程序员学长7142021-08-19
  • Redisredis实现排行榜功能

    redis实现排行榜功能

    排行榜在很多地方都能使用到,redis的zset可以很方便地用来实现排行榜功能,本文就来简单的介绍一下如何使用,具有一定的参考价值,感兴趣的小伙伴们...

    乘月归4452021-08-05