服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - springboot整合mqtt的详细图文教程

springboot整合mqtt的详细图文教程

2023-03-09 13:33i小喇叭 Java教程

MQTT是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布,下面这篇文章主要给大家介绍了关于springboot整合mqtt的详细图文教程,需要的朋友可以参考下

springboot 整合 mqtt

最近由于iot越来越火, 物联网的需求越来越多, 那么理所当然的使用mqtt的场景也就越来越多,

接下来是我使用springboot整合mqtt的过程, 以及踩过的一些坑.

mqtt服务器使用的是 EMQX, 官网 : 这里

搭建的时候如果你使用的是集群 记得开放以下端口:

springboot整合mqtt的详细图文教程

好了, 搭建成功下一步就是我们的java程序要与mqtt连接, 这里有两种方式(其实不止两种)进行连接.

一是 直接使用 MQTT Java 客户端库,详情可以查看官方的例子: MQTT Java 客户端 我就跳过了

二是使用 spring integration mqtt也是比较推荐的一种,也是我们主讲这种.

第一步 添加 maven dependency

?
1
2
3
4
5
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>5.5.14</version>
</dependency>

第二步 添加配置

1 先写好一些基本配置

?
1
2
3
4
5
6
7
8
9
10
11
mqtt:
 username: test                        # 账号
 password: 123456                      # 密码
 host-url: tcp://127.0.0.1:1883        # mqtt连接tcp地址
 in-client-id: ${random.value}         # 随机值,使出入站 client ID 不同
 out-client-id: ${random.value}
 client-id: ${random.int}                   # 客户端Id,不能相同,采用随机数 ${random.value}
 default-topic: test/#,topic/+/+/up         # 默认主题
 timeout: 60                                # 超时时间
 keepalive: 60                              # 保持连接
 clearSession: true                         # 清除会话(设置为false,断开连接,重连后使用原来的会话 保留订阅的主题,能接收离线期间的消息)

2.然后写一个对应的类MqttProperties

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
 
/**
 * MqttProperties
 *
 * @author hengzi
 * @date 2022/8/23
 */
@Component
public class MqttProperties {
 
    /**
     * 用户名
     */
    @Value("${mqtt.username}")
    private String username;
 
    /**
     * 密码
     */
    @Value("${mqtt.password}")
    private String password;
 
    /**
     * 连接地址
     */
    @Value("${mqtt.host-url}")
    private String hostUrl;
 
    /**
     * 进-客户Id
     */
    @Value("${mqtt.in-client-id}")
    private String inClientId;
 
    /**
     * 出-客户Id
     */
    @Value("${mqtt.out-client-id}")
    private String outClientId;
 
    /**
     * 客户Id
     */
    @Value("${mqtt.client-id}")
    private String clientId;
 
    /**
     * 默认连接话题
     */
    @Value("${mqtt.default-topic}")
    private String defaultTopic;
 
    /**
     * 超时时间
     */
    @Value("${mqtt.timeout}")
    private int timeout;
 
    /**
     * 保持连接数
     */
    @Value("${mqtt.keepalive}")
    private int keepalive;
 
    /**是否清除session*/
    @Value("${mqtt.clearSession}")
    private boolean clearSession;
 
    // ...getter and setter
 
}

接下来就是配置一些乱七八糟的东西, 这里有很多概念性的东西 比如 管道channel, 适配器 adapter, 入站Inbound, 出站Outbound,等等等等, 看起来是非常头痛的

好吧,那就一个一个来,

首先连接mqtt需要一个客户端, 那么我们就开一个客户端工厂, 这里可以产生很多很多的客户端

?
1
2
3
4
5
6
7
8
9
10
@Bean
public MqttPahoClientFactory mqttPahoClientFactory(){
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    MqttConnectOptions options = new MqttConnectOptions();
    options.setServerURIs(mqttProperties.getHostUrl().split(","));
    options.setUserName(mqttProperties.getUsername());
    options.setPassword(mqttProperties.getPassword().toCharArray());
    factory.setConnectionOptions(options);
    return factory;
}

然后再搞两根管子(channel),一个出站,一个入站

?
1
2
3
4
5
6
7
8
9
10
11
//出站消息管道,
@Bean
public MessageChannel mqttOutboundChannel(){
    return new DirectChannel();
}
 
// 入站消息管道
@Bean
public MessageChannel mqttInboundChannel(){
    return new DirectChannel();
}

为了使这些管子能流通 就需要一个适配器(adapter)

?
1
2
3
4
5
// Mqtt 管道适配器
@Bean
public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
    return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));
}

然后定义消息生产者

?
1
2
3
4
5
6
7
8
9
10
// 消息生产者
@Bean
public MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){
    adapter.setCompletionTimeout(5000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    //入站投递的通道
    adapter.setOutputChannel(mqttInboundChannel());
    adapter.setQos(1);
    return adapter;
}

那我们收到消息去哪里处理呢,答案是这里:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
    @Bean
    //使用ServiceActivator 指定接收消息的管道为 mqttInboundChannel,投递到mqttInboundChannel管道中的消息会被该方法接收并执行
    @ServiceActivator(inputChannel = "mqttInboundChannel")
    public MessageHandler handleMessage() {
        // 这个 mqttMessageHandle 其实就是一个 MessageHandler 的实现类(这个类我放下面)
        return mqttMessageHandle;
        // 你也可以这样写
//        return new MessageHandler() {
//            @Override
//            public void handleMessage(Message<?> message) throws MessagingException {
//                // do something
//            }
//        };

到这里我们其实已经可以接受到来自mqtt的消息了

接下来配置向mqtt发送消息

配置 出站处理器

?
1
2
3
4
5
6
7
8
9
10
// 出站处理器
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound(MqttPahoClientFactory factory){
    MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
    handler.setAsync(true);
    handler.setConverter(new DefaultPahoMessageConverter());
    handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);
    return handler;
}

这个 出站处理器 在我看来就是让别人 (MqttPahoMessageHandler)处理了, 我就不处理了,我只管我要发送什么,至于怎么发送,由MqttPahoMessageHandler来完成

接下来我们定义一个接口即可

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
 
/**
 * MqttGateway
 *
 * @author hengzi
 * @date 2022/8/23
 */
 
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
 
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);
 
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer Qos, String data);
}

我们直接调用这个接口就可以向mqtt 发送数据

到目前为止,整个配置文件长这样:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
 
/**
 * MqttConfig
 *
 * @author hengzi
 * @date 2022/8/23
 */
@Configuration
public class MqttConfig {
 
 
    /**
     *  以下属性将在配置文件中读取
     **/
    @Autowired
    private MqttProperties mqttProperties;
 
 
    //Mqtt 客户端工厂
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(mqttProperties.getHostUrl().split(","));
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }
 
    // Mqtt 管道适配器
    @Bean
    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
        return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));
    }
 
 
    // 消息生产者
    @Bean
    public MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        //入站投递的通道
        adapter.setOutputChannel(mqttInboundChannel());
        adapter.setQos(1);
        return adapter;
    }
 
 
    // 出站处理器
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(MqttPahoClientFactory factory){
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
        handler.setAsync(true);
        handler.setConverter(new DefaultPahoMessageConverter());
        handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);
        return handler;
    }
 
    @Bean
    //使用ServiceActivator 指定接收消息的管道为 mqttInboundChannel,投递到mqttInboundChannel管道中的消息会被该方法接收并执行
    @ServiceActivator(inputChannel = "mqttInboundChannel")
    public MessageHandler handleMessage() {
        return mqttMessageHandle;
    }
 
    //出站消息管道,
    @Bean
    public MessageChannel mqttOutboundChannel(){
        return new DirectChannel();
    }
 
 
    // 入站消息管道
    @Bean
    public MessageChannel mqttInboundChannel(){
        return new DirectChannel();
    }
}

处理消息的 MqttMessageHandle

?
1
2
3
4
5
6
7
@Component
public class MqttMessageHandle implements MessageHandler {
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
     
    }
}

在进一步了解之后,发现可以优化的地方,比如channel 的类型是有很多种的, 这里使用的DirectChannel,是Spring Integration默认的消息通道,它将消息发送给为一个订阅者,然后阻碍发送直到消息被接收,传输方式都是同步的方式,都是由一个线程来运行的.

这里我们可以将入站channel改成 ExecutorChannel一个可以使用多线程的channel

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Bean
public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor()
{
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    // 最大可创建的线程数
    int maxPoolSize = 200;
    executor.setMaxPoolSize(maxPoolSize);
    // 核心线程池大小
    int corePoolSize = 50;
    executor.setCorePoolSize(corePoolSize);
    // 队列最大长度
    int queueCapacity = 1000;
    executor.setQueueCapacity(queueCapacity);
    // 线程池维护线程所允许的空闲时间
    int keepAliveSeconds = 300;
    executor.setKeepAliveSeconds(keepAliveSeconds);
    // 线程池对拒绝任务(无线程可用)的处理策略
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    return executor;
}
 
// 入站消息管道
@Bean
public MessageChannel mqttInboundChannel(){
    // 用线程池
    return new ExecutorChannel(mqttThreadPoolTaskExecutor());
}

到这里其实可以运行了.

但是这样配置其实还是有点多, 有点乱, 于是我查找官网, f发现一种更简单的配置方法 叫 Java DSL, 官网连接: Configuring with the Java DSL

我们参考官网,稍微改一下,使用 DSL的方式进行配置:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
import java.util.concurrent.ThreadPoolExecutor;
 
/**
 * MqttConfigV2
 *
 * @author hengzi
 * @date 2022/8/24
 */
@Configuration
public class MqttConfigV2 {
 
    @Autowired
    private MqttProperties mqttProperties;
 
    @Autowired
    private MqttMessageHandle mqttMessageHandle;
 
 
    //Mqtt 客户端工厂 所有客户端从这里产生
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(mqttProperties.getHostUrl().split(","));
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }
 
    // Mqtt 管道适配器
    @Bean
    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
        return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));
    }
 
    // 消息生产者 (接收,处理来自mqtt的消息)
    @Bean
    public IntegrationFlow mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter) {
        adapter.setCompletionTimeout(5000);
        adapter.setQos(1);
        return IntegrationFlows.from( adapter)
                .channel(new ExecutorChannel(mqttThreadPoolTaskExecutor()))
                .handle(mqttMessageHandle)
                .get();
    }
 
    @Bean
    public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor()
    {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 最大可创建的线程数
        int maxPoolSize = 200;
        executor.setMaxPoolSize(maxPoolSize);
        // 核心线程池大小
        int corePoolSize = 50;
        executor.setCorePoolSize(corePoolSize);
        // 队列最大长度
        int queueCapacity = 1000;
        executor.setQueueCapacity(queueCapacity);
        // 线程池维护线程所允许的空闲时间
        int keepAliveSeconds = 300;
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // 线程池对拒绝任务(无线程可用)的处理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }
 
    // 出站处理器 (向 mqtt 发送消息)
    @Bean
    public IntegrationFlow mqttOutboundFlow(MqttPahoClientFactory factory) {
 
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
        handler.setAsync(true);
        handler.setConverter(new DefaultPahoMessageConverter());
        handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);
        return IntegrationFlows.from( "mqttOutboundChannel").handle(handler).get();
    }
 
}

这样看起来真的简单多了, 头也没那么大了, 我要是早知道多好.

好了以上就是配置相关的, 到这里其实是已经完成springboot 与 mqtt 的整合了.

但其实我一直有个想法, 就是我们接收的消息 都是在 handleMessage这个方法里面执行的,

?
1
2
3
4
@Override
public void handleMessage(Message<?> message) throws MessagingException {
            
}

所以我就有了一个想法, 能不能根据 我订阅的主题,在不同的方法执行, 对于这个问题,其实你用if ... else ...也能实现, 但很明显,如果我订阅的主题很多的话, 那写起来就很头痛了.

对于这个问题,有两种思路, 一个是添加Spring Integration的路由 router,根据不同topic路由到不同的channel, 这个我也知道能不能实现, 我这里就不讨论了.

第二种是, 我也不知道名字改如何叫, 我是参考了 spring@Controller的设计, 暂且叫他注解模式.

众所周知,我们的接口都是在类上加 @Controller这个注解, 就代表这个类是 http 接口, 再在方法加上 @RequestMapping就能实现不同的 url 调用不同的方法.

参数这个设计 我们在类上面加 @MqttService就代表这个类是专门处理mqtt消息的服务类
同时 在这个类的方法上 加上 @MqttTopic就代表 这个主题由这个方法处理.

OK, 理论有了,接下来就是 实践.

先定义 两个注解

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import org.springframework.core.annotation.AliasFor;
import org.springframework.stereotype.Component;
 
import java.lang.annotation.*;
 
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface MqttService {
 
    @AliasFor(
            annotation = Component.class
    )
    String value() default "";
}

加上 @Component注解 spring就会扫描, 并注册到IOC容器里

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
 
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MqttTopic {
 
    /**
     * 主题名字
     */
    String value() default "";
 
}

参考 @RequestMapping我们使用起来应该是这样的:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
 
/**
 * MqttTopicHandle
 *
 * @author hengzi
 * @date 2022/8/24
 */
@MqttService
public class MqttTopicHandle {
 
    public static final Logger log = LoggerFactory.getLogger(MqttTopicHandle.class);
 
    // 这里的 # 号是通配符
    @MqttTopic("test/#")
    public void test(Message<?> message){
        log.info("test="+message.getPayload());
    }
    
    // 这里的 + 号是通配符
    @MqttTopic("topic/+/+/up")
    public void up(Message<?> message){
        log.info("up="+message.getPayload());
    }
 
    // 注意 你必须先订阅
    @MqttTopic("topic/1/2/down")
    public void down(Message<?> message){
        log.info("down="+message.getPayload());
    }
}

OK 接下来就是实现这样的使用

分析 :

当我们收到消息时, 我们从IOC容器中 找到所有 带 @MqttService注解的类

然后 遍历这些类, 找到带有 @MqttTopic的方法

接着 把 @MqttTopic注解的的值 与 接受到的topic 进行对比

如果一致则执行这个方法

废话少说, 上代码

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
 
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
 
/**
 * MessageHandleService
 *
 * @author hengzi
 * @date 2022/8/24
 */
@Component
public class MqttMessageHandle implements MessageHandler {
 
    public static final Logger log = LoggerFactory.getLogger(MqttMessageHandle.class);
 
    // 包含 @MqttService注解 的类(Component)
    public static Map<String, Object> mqttServices;
 
 
    /**
     * 所有mqtt到达的消息都会在这里处理
     * 要注意这个方法是在线程池里面运行的
     * @param message message
     */
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        getMqttTopicService(message);
    }
 
    public Map<String, Object> getMqttServices(){
        if(mqttServices==null){
            mqttServices = SpringUtils.getBeansByAnnotation(MqttService.class);
        }
        return mqttServices;
    }
 
    public void getMqttTopicService(Message<?> message){
        // 在这里 我们根据不同的 主题 分发不同的消息
        String receivedTopic = message.getHeaders().get("mqtt_receivedTopic",String.class);
        if(receivedTopic==null || "".equals(receivedTopic)){
            return;
        }
        for(Map.Entry<String, Object> entry : getMqttServices().entrySet()){
            // 把所有带有 @MqttService 的类遍历
            Class<?> clazz = entry.getValue().getClass();
            // 获取他所有方法
            Method[] methods = clazz.getDeclaredMethods();
            for ( Method method: methods ){
                if (method.isAnnotationPresent(MqttTopic.class)){
                    // 如果这个方法有 这个注解
                    MqttTopic handleTopic = method.getAnnotation(MqttTopic.class);
                    if(isMatch(receivedTopic,handleTopic.value())){
                        // 并且 这个 topic 匹配成功
                        try {
                            method.invoke(SpringUtils.getBean(clazz),message);
                            return;
                        } catch (IllegalAccessException e) {
                            e.printStackTrace();
                            log.error("代理炸了");
                        } catch (InvocationTargetException e) {
                            log.error("执行 {} 方法出现错误",handleTopic.value(),e);
                        }
                    }
                }
            }
        }
    }
 
 
    /**
     * mqtt 订阅的主题与我实际的主题是否匹配
     * @param topic 是实际的主题
     * @param pattern 是我订阅的主题 可以是通配符模式
     * @return 是否匹配
     */
    public static boolean isMatch(String topic, String pattern){
 
        if((topic==null) || (pattern==null) ){
            return false;
        }
 
        if(topic.equals(pattern)){
            // 完全相等是肯定匹配的
            return true;
        }
 
        if("#".equals(pattern)){
            // # 号代表所有主题  肯定匹配的
            return true;
        }
        String[] splitTopic = topic.split("/");
        String[] splitPattern = pattern.split("/");
 
        boolean match = true;
 
        // 如果包含 # 则只需要判断 # 前面的
        for (int i = 0; i < splitPattern.length; i++) {
            if(!"#".equals(splitPattern[i])){
                // 不是# 号 正常判断
                if(i>=splitTopic.length){
                    // 此时长度不相等 不匹配
                    match = false;
                    break;
                }
                if(!splitTopic[i].equals(splitPattern[i]) && !"+".equals(splitPattern[i])){
                    // 不相等 且不等于 +
                    match = false;
                    break;
                }
            }
            else {
                // 是# 号  肯定匹配的
                break;
            }
        }
 
        return match;
    }
 
}

工具类 SpringUtils

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import org.springframework.aop.framework.AopContext;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
 
import java.util.Map;
 
/**
 * spring工具类 方便在非spring管理环境中获取bean
 *
 */
@Component
public final class SpringUtils implements BeanFactoryPostProcessor, ApplicationContextAware
{
    /** Spring应用上下文环境 */
    private static ConfigurableListableBeanFactory beanFactory;
 
    private static ApplicationContext applicationContext;
 
 
    public static Map<String, Object> getBeansByAnnotation(Class clsName) throws BeansException{
 
        return beanFactory.getBeansWithAnnotation(clsName);
    }
 
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException
    {
        SpringUtils.beanFactory = beanFactory;
    }
 
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
    {
        SpringUtils.applicationContext = applicationContext;
    }
 
    /**
     * 获取对象
     *
     * @param name
     * @return Object 一个以所给名字注册的bean的实例
     * @throws org.springframework.beans.BeansException
     *
     */
    @SuppressWarnings("unchecked")
    public static <T> T getBean(String name) throws BeansException
    {
        return (T) beanFactory.getBean(name);
    }
 
    /**
     * 获取类型为requiredType的对象
     *
     * @param clz
     * @return
     * @throws org.springframework.beans.BeansException
     *
     */
    public static <T> T getBean(Class<T> clz) throws BeansException
    {
        T result = (T) beanFactory.getBean(clz);
        return result;
    }
 
    /**
     * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true
     *
     * @param name
     * @return boolean
     */
    public static boolean containsBean(String name)
    {
        return beanFactory.containsBean(name);
    }
 
    /**
     * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)
     *
     * @param name
     * @return boolean
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException
    {
        return beanFactory.isSingleton(name);
    }
 
    /**
     * @param name
     * @return Class 注册对象的类型
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static Class<?> getType(String name) throws NoSuchBeanDefinitionException
    {
        return beanFactory.getType(name);
    }
 
    /**
     * 如果给定的bean名字在bean定义中有别名,则返回这些别名
     *
     * @param name
     * @return
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */
    public static String[] getAliases(String name) throws NoSuchBeanDefinitionException
    {
        return beanFactory.getAliases(name);
    }
 
    /**
     * 获取aop代理对象
     *
     * @param invoker
     * @return
     */
    @SuppressWarnings("unchecked")
    public static <T> T getAopProxy(T invoker)
    {
        return (T) AopContext.currentProxy();
    }
 
    /**
     * 获取当前的环境配置,无配置返回null
     *
     * @return 当前的环境配置
     */
    public static String[] getActiveProfiles()
    {
        return applicationContext.getEnvironment().getActiveProfiles();
    }
 
}

OK, 大功告成. 终于舒服了, 终于不用写if...else...了, 个人感觉这样处理起来会更加优雅. 写代码最重要是什么, 是优雅~

以上!

参考文章:

  • 使用 Spring integration 在Springboot中集成Mqtt
  • Spring Integration(一)概述

附:

动态添加主题方式:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.stereotype.Service;
 
import java.util.Arrays;
 
/**
 * MqttService
 *
 * @author hengzi
 * @date 2022/8/25
 */
@Service
public class MqttService {
 
    @Autowired
    private MqttPahoMessageDrivenChannelAdapter adapter;
 
 
    public void addTopic(String topic) {
        addTopic(topic, 1);
    }
 
    public void addTopic(String topic,int qos) {
        String[] topics = adapter.getTopic();
        if(!Arrays.asList(topics).contains(topic)){
            adapter.addTopic(topic,qos);
        }
    }
 
    public void removeTopic(String topic) {
        adapter.removeTopic(topic);
    }
 
}

直接调用就行

总结

到此这篇关于springboot整合mqtt的文章就介绍到这了,更多相关springboot整合mqtt内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://blog.csdn.net/weixin_42230797/article/details/126507310

延伸 · 阅读

精彩推荐
  • Java教程识别率很高的java文字识别技术

    识别率很高的java文字识别技术

    这篇文章主要为大家详细介绍了识别率很高的java文字识别技术,亲测,希望对大家有帮助,感兴趣的小伙伴们可以参考一下 ...

    ycb16898232020-06-08
  • Java教程一起来学习Java IO的转化流

    一起来学习Java IO的转化流

    这篇文章主要为大家详细介绍了Java IO的转化流,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来...

    WYSCODER3902022-10-14
  • Java教程Java使用组合模式实现表示公司组织结构功能示例

    Java使用组合模式实现表示公司组织结构功能示例

    这篇文章主要介绍了Java使用组合模式实现表示公司组织结构功能,简单描述了组合模式的概念、功能并结合实例形式分析了Java使用组合模式实现公司组织结...

    chengqiuming7192021-05-03
  • Java教程SpringBoot整合OpenApi的实践

    SpringBoot整合OpenApi的实践

    本文主要介绍了SpringBoot整合OpenApi,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    范闲4372021-12-06
  • Java教程Spring AOP实现多数据源动态切换

    Spring AOP实现多数据源动态切换

    本文主要介绍了Spring AOP实现多数据源动态切换,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    Carson-Zhao10422022-09-19
  • Java教程idea配置git及使用的方法详解

    idea配置git及使用的方法详解

    这篇文章主要介绍了idea配置git及使用方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...

    一飞冲天N7412020-09-02
  • Java教程Java微信退款开发

    Java微信退款开发

    这篇文章主要为大家详细介绍了Java微信退款开发的相关资料,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    thinkhui11442021-06-02
  • Java教程Spring MVC学习笔记之Controller查找(基于Spring4.0.3)

    Spring MVC学习笔记之Controller查找(基于Spring4.0.3)

    这篇文章主要给大家介绍了关于Spring MVC学习笔记之Controller查找(基于Spring4.0.3)的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有...

    芥末无疆sss11042021-04-12