Spring integration交互逻辑
对于发布者:
1.消息通过消息网关发送出去,由 MessageChannel
的实例 DirectChannel
处理发送的细节。
2.DirectChannel
收到消息后,内部通过 MessageHandler
的实例 MqttPahoMessageHandler
发送到指定的 Topic。
对于订阅者:
1.通过注入 MessageProducerSupport
的实例 MqttPahoMessageDrivenChannelAdapter
,实现订阅 Topic 和绑定消息消费的 MessageChannel
。
2.同样由 MessageChannel
的实例 DirectChannel
处理消费细节。
Channel 消息后会发送给我们自定义的 MqttInboundMessageHandler
实例进行消费。
可以看到整个处理的流程和前面将的基本一致。Spring Integration 就是抽象出了这么一套消息通信的机制,具体的通信细节由它集成的中间件来决定
1、maven依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-integration --> < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-integration</ artifactId > < version >2.5.1</ version > </ dependency > <!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-stream --> < dependency > < groupId >org.springframework.integration</ groupId > < artifactId >spring-integration-stream</ artifactId > < version >5.5.5</ version > </ dependency > <!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-mqtt --> < dependency > < groupId >org.springframework.integration</ groupId > < artifactId >spring-integration-mqtt</ artifactId > < version >5.5.5</ version > </ dependency > |
2、yaml配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
#mqtt配置 mqtt: username: 123 password: 123 #MQTT-服务器连接地址,如果有多个,用逗号隔开 url: tcp : //127 .0.0.1: 1883 #MQTT-连接服务器默认客户端ID client: id: $ { random.value } default: #MQTT-默认的消息推送主题,实际可在调用接口时指定 topic: topic , mqtt/test/ # #连接超时 completionTimeout: 3000 |
3、mqtt生产者消费者配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
|
import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import java.util.Arrays; import java.util.List; /** * mqtt 推送and接收 消息类 **/ @Configuration @IntegrationComponentScan @Slf4j public class MqttSenderAndReceiveConfig { private static final byte [] WILL_DATA; static { WILL_DATA = "offline" .getBytes(); } @Autowired private MqttReceiveHandle mqttReceiveHandle; @Value ( "${mqtt.username}" ) private String username; @Value ( "${mqtt.password}" ) private String password; @Value ( "${mqtt.url}" ) private String hostUrl; @Value ( "${mqtt.client.id}" ) private String clientId; @Value ( "${mqtt.default.topic}" ) private String defaultTopic; @Value ( "${mqtt.completionTimeout}" ) private int completionTimeout; //连接超时 /** * MQTT连接器选项 **/ @Bean (value = "getMqttConnectOptions" ) public MqttConnectOptions getMqttConnectOptions1() { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 mqttConnectOptions.setCleanSession( true ); // 设置超时时间 单位为秒 mqttConnectOptions.setConnectionTimeout( 10 ); mqttConnectOptions.setAutomaticReconnect( true ); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setServerURIs( new String[]{hostUrl}); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制 mqttConnectOptions.setKeepAliveInterval( 10 ); // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。 //mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false); return mqttConnectOptions; } /** * MQTT工厂 **/ @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions1()); return factory; } /** * MQTT信息通道(生产者) **/ @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } /** * MQTT消息处理器(生产者) **/ @Bean @ServiceActivator (inputChannel = "mqttOutboundChannel" ) public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_producer" , mqttClientFactory()); messageHandler.setAsync( true ); messageHandler.setDefaultTopic(defaultTopic); messageHandler.setAsyncEvents( true ); // 消息发送和传输完成会有异步的通知回调 //设置转换器 发送bytes数据 DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); converter.setPayloadAsBytes( true ); return messageHandler; } /** * 配置client,监听的topic * MQTT消息订阅绑定(消费者) **/ @Bean public MessageProducer inbound() { List<String> topicList = Arrays.asList(defaultTopic.trim().split( "," )); String[] topics = new String[topicList.size()]; topicList.toArray(topics); MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "_consumer" , mqttClientFactory(), topics); adapter.setCompletionTimeout(completionTimeout); DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); converter.setPayloadAsBytes( true ); adapter.setConverter(converter); adapter.setQos( 2 ); adapter.setOutputChannel(mqttInputChannel()); return adapter; } /** * MQTT信息通道(消费者) **/ @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } /** * MQTT消息处理器(消费者) **/ @Bean @ServiceActivator (inputChannel = "mqttInputChannel" ) public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { //处理接收消息 mqttReceiveHandle.handle(message); } }; } } |
4、消息处理类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
/** * mqtt客户端消息处理类 **/ @Slf4j @Component public class MqttReceiveHandle { public void handle(Message<?> message) { log.info( "收到订阅消息: {}" , message); String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); log.info( "消息主题:{}" , topic); Object payLoad = message.getPayload(); byte [] data = ( byte []) payLoad; Packet packet = Packet.parse(data); log.info( "发送的Packet数据{}" , JSON.toJSONString(packet)); } } |
5、mqtt发送接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; /** * mqtt发送消息 * (defaultRequestChannel = "mqttOutboundChannel" 对应config配置) * **/ @MessagingGateway (defaultRequestChannel = "mqttOutboundChannel" ) public interface MqttGateway { /** * 发送信息到MQTT服务器 * * @param */ void sendToMqttObject( @Header (MqttHeaders.TOPIC) String topic, byte [] payload); /** * 发送信息到MQTT服务器 * * @param topic 主题 * @param payload 消息主体 */ void sendToMqtt( @Header (MqttHeaders.TOPIC) String topic, String payload); /** * 发送信息到MQTT服务器 * * @param topic 主题 * @param qos 对消息处理的几种机制。 * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。 * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。 * 2 多了一次去重的动作,确保订阅者收到的消息有一次。 * @param payload 消息主体 */ void sendToMqtt( @Header (MqttHeaders.TOPIC) String topic, @Header (MqttHeaders.QOS) int qos, String payload); /** * 发送信息到MQTT服务器 * * @param topic 主题 * @param payload 消息主体 */ void sendToMqtt( @Header (MqttHeaders.TOPIC) String topic, Object payload); /** * 发送信息到MQTT服务器 * * @param topic 主题 * @param payload 消息主体 */ void sendToMqtt( @Header (MqttHeaders.TOPIC) String topic, byte [] payload); } |
6、mqtt事件监听类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent; import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent; import org.springframework.integration.mqtt.event.MqttMessageSentEvent; import org.springframework.integration.mqtt.event.MqttSubscribedEvent; import org.springframework.stereotype.Component; @Slf4j @Component public class MqttListener { /** * 连接失败的事件通知 * @param mqttConnectionFailedEvent */ @EventListener (classes = MqttConnectionFailedEvent. class ) public void listenerAction(MqttConnectionFailedEvent mqttConnectionFailedEvent) { log.info( "连接失败的事件通知" ); } /** * 已发送的事件通知 * @param mqttMessageSentEvent */ @EventListener (classes = MqttMessageSentEvent. class ) public void listenerAction(MqttMessageSentEvent mqttMessageSentEvent) { log.info( "已发送的事件通知" ); } /** * 已传输完成的事件通知 * 1.QOS == 0,发送消息后会即可进行此事件回调,因为不需要等待回执 * 2.QOS == 1,发送消息后会等待ACK回执,ACK回执后会进行此事件通知 * 3.QOS == 2,发送消息后会等待PubRECV回执,知道收到PubCOMP后会进行此事件通知 * @param mqttMessageDeliveredEvent */ @EventListener (classes = MqttMessageDeliveredEvent. class ) public void listenerAction(MqttMessageDeliveredEvent mqttMessageDeliveredEvent) { log.info( "已传输完成的事件通知" ); } /** * 消息订阅的事件通知 * @param mqttSubscribedEvent */ @EventListener (classes = MqttSubscribedEvent. class ) public void listenerAction(MqttSubscribedEvent mqttSubscribedEvent) { log.info( "消息订阅的事件通知" ); } } |
7、接口测试
1
2
3
4
5
6
7
8
9
10
11
12
13
|
@Resource private MqttGateway mqttGateway; /** * sendData 消息 * topic 订阅主题 **/ @RequestMapping (value = "/sendMqtt" ,method = RequestMethod.POST) public String sendMqtt(String sendData, String topic) { MqttMessage mqttMessage = new MqttMessage(); mqttGateway.sendToMqtt(topic, sendData); //mqttGateway.sendToMqttObject(topic, sendData.getBytes()); return "OK" ; } |
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/xrq1995/article/details/126465027