首先在pom文件里引入mqtt的依赖配置
1
2
3
4
5
6
|
<!--mqtt--> < dependency > < groupId >org.eclipse.paho</ groupId > < artifactId >org.eclipse.paho.client.mqttv3</ artifactId > < version >1.2.4</ version > </ dependency > |
其次在springboot 的配置yml文件,配置mqtt的服务配置
1
2
3
4
5
6
7
|
spring: mqtt: url: tcp://127.0.0.1:1883 client-id: niubility-tiger username: password: topic: [/unify/test] |
创建 MqttProperties配置参数类
1
2
3
4
5
6
7
8
9
10
11
12
|
import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; @Data @ConfigurationProperties ( "spring.mqtt" ) public class MqttProperties { private String url; private String clientId; private String username; private String password; private String[] topic; } |
创建 MqttConfiguration 配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
import org.eclipse.paho.client.mqttv3.IMqttClient; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springblade.core.tool.utils.Func; import org.springblade.ubw.listener.MqttSubscribeListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @EnableConfigurationProperties ({MqttProperties. class }) public class MqttConfiguration { private static final Logger log = LoggerFactory.getLogger(MqttConfiguration. class ); @Autowired private MqttProperties mqttProperties; public MqttConfiguration() { } @Bean public MqttConnectOptions mqttConnectOptions() { MqttConnectOptions connectOptions = new MqttConnectOptions(); connectOptions.setServerURIs( new String[]{ this .mqttProperties.getUrl()}); if (Func.isNotBlank( this .mqttProperties.getUrl())) { connectOptions.setUserName( this .mqttProperties.getUsername()); } if (Func.isNotBlank( this .mqttProperties.getPassword())) { connectOptions.setPassword( this .mqttProperties.getPassword().toCharArray()); } connectOptions.setKeepAliveInterval( 60 ); return connectOptions; } @Bean public IMqttClient mqttClient(MqttConnectOptions options) throws MqttException { IMqttClient mqttClient = new MqttClient( this .mqttProperties.getUrl(), this .mqttProperties.getClientId()); mqttClient.connect(options); for ( int i = 0 ; i< this .mqttProperties.getTopic().length; ++i) { mqttClient.subscribe( this .mqttProperties.getTopic()[i], new MqttSubscribeListener()); } return mqttClient; } } |
创建 订阅事件类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
import org.springframework.context.ApplicationEvent; public class UWBMqttSubscribeEvent extends ApplicationEvent { private String topic; public UWBMqttSubscribeEvent(String topic, Object source) { super (source); this .topic = topic; } public String getTopic() { return this .topic; } } |
创建订阅事件监听器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
import org.eclipse.paho.client.mqttv3.IMqttMessageListener; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springblade.core.tool.utils.SpringUtil; import org.springblade.ubw.event.UWBMqttSubscribeEvent; public class MqttSubscribeListener implements IMqttMessageListener { @Override public void messageArrived(String s, MqttMessage mqttMessage) { String content = new String(mqttMessage.getPayload()); UWBMqttSubscribeEvent event = new UWBMqttSubscribeEvent(s, content); SpringUtil.publishEvent(event); } } |
创建mqtt消息事件异步处理监听器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
import com.baomidou.mybatisplus.core.toolkit.StringPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springblade.core.tool.utils.Func; import org.springblade.ubw.config.MqttProperties; import org.springblade.ubw.event.UWBMqttSubscribeEvent; import org.springblade.ubw.service.MqttService; import org.springframework.context.annotation.Configuration; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import javax.annotation.Resource; import java.util.Arrays; import java.util.List; @Configuration public class MqttEventListener { private static final Logger log = LoggerFactory.getLogger(MqttEventListener. class ); @Resource private MqttProperties mqttProperties; @Resource private MqttService mqttService; private String processTopic (String topic) { List<String> topics = Arrays.asList(mqttProperties.getTopic()); for (String wild : topics) { wild = wild.replace(StringPool.HASH, StringPool.EMPTY); if (topic.startsWith(wild)) { return topic.replace(wild, StringPool.EMPTY); } } return StringPool.EMPTY; } @Async @EventListener (UWBMqttSubscribeEvent. class ) public void listen (UWBMqttSubscribeEvent event) { String topic = processTopic(event.getTopic()); Object source = event.getSource(); if (Func.isEmpty(source)) { return ; } mqttService.issue(topic,source); // log.info("mqtt接收到 通道 {} 的信息为:{}",topic,source); } } |
创建MqttService 数据处理服务类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
|
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springblade.core.tool.utils.Func; import org.springblade.ubw.area.entity.WorkArea; import org.springblade.ubw.area.entity.WorkSite; import org.springblade.ubw.area.entity.WorkSiteNeighbourInfo; import org.springblade.ubw.area.entity.WorkSitePassInfo; import org.springblade.ubw.area.service.WorkAreaService; import org.springblade.ubw.area.service.WorkSiteNeighbourInfoService; import org.springblade.ubw.area.service.WorkSitePassInfoService; import org.springblade.ubw.area.service.WorkSiteService; import org.springblade.ubw.constant.UbwConstant; import org.springblade.ubw.history.entity.HistoryLocusInfo; import org.springblade.ubw.history.entity.HistoryOverTimeSosAlarmInfo; import org.springblade.ubw.history.service.HistoryLocusInfoService; import org.springblade.ubw.history.service.HistoryOverTimeSosAlarmInfoService; import org.springblade.ubw.loc.entity.LocStatusInfo; import org.springblade.ubw.loc.entity.LocStatusInfoHistory; import org.springblade.ubw.loc.service.LocStatusInfoHistoryService; import org.springblade.ubw.loc.service.LocStatusInfoService; import org.springblade.ubw.msg.entity.*; import org.springblade.ubw.msg.service.*; import org.springblade.ubw.system.entity.*; import org.springblade.ubw.system.service.*; import org.springblade.ubw.system.wrapper.MqttWrapper; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.List; import java.util.stream.Collectors; @Service public class MqttService { private static final Logger log = LoggerFactory.getLogger(MqttService. class ); @Resource private EmployeeAndDepartmentService employeeAndDepartmentService; @Resource private VehicleInfoService vehicleInfoService; @Resource private WorkSiteService workSiteService; @Resource private LocStatusInfoService locStatusInfoService; @Resource private LocStatusInfoHistoryService locStatusInfoHistoryService; @Resource private LocOverTimeSosAlarminfoService locOverTimeSosAlarminfoService; @Resource private LocAreaOverSosAlarminfoService locAreaOverSosAlarmInfoService; @Resource private LocSosAlarminfoService locSosAlarminfoService; @Resource private AttendanceInfoService attendanceInfoService; @Resource private HistoryLocusInfoService historyLocusInfoService; @Resource private WorkSitePassInfoService workSitePassInfoService; @Resource private EnvironmentalMonitorInfoService environmentalMonitorInfoService; @Resource private TrAlertService trAlertService; @Resource private AddEvacuateInfoService addEvacuateInfoService; @Resource private CancelEvacuateInfoService cancelEvacuateInfoService; @Resource private WorkSiteNeighbourInfoService workSiteNeighbourInfoService; @Resource private LinkMsgAlarmInfoService linkMsgAlarmInfoService; @Resource private LeaderEmployeeInfoService leaderEmployeeInfoService; @Resource private ElectricMsgInfoService electricMsgInfoService; @Resource private WorkAreaService workAreaService; @Resource private HistoryOverTimeSosAlarmInfoService historyOverTimeSosAlarmInfoService; @Resource private SpecialWorksService specialWorksService; @Resource private AttendanceLocusInfoService attendanceLocusInfoService; @Resource private WorkTypeService workTypeService; @Resource private OfficePositionService officePositionService; @Resource private ClassTeamService classTeamService; /** * 方法描述: 消息分发 * * @param topic * @param source * @author liwenbin * @date 2021年12月14日 14:14:09 */ public void issue(String topic,Object source){ switch (topic){ case UbwConstant.TOPIC_EMP : //人员和部门信息 employeeAndDepartmentService.saveBatch(source); break ; case UbwConstant.TOPIC_VEHICLE : //车辆信息 List<VehicleInfo> vehicleInfos = MqttWrapper.build().toEntityList(source, new VehicleInfo()); vehicleInfoService.deleteAll(); vehicleInfoService.saveBatch(vehicleInfos); break ; case UbwConstant.TOPIC_WORK_SITE : //基站信息 List<WorkSite> workSites = MqttWrapper.build().toEntityList(source, new WorkSite()); workSiteService.deleteAll(); workSiteService.saveBatch(workSites); break ; case UbwConstant.TOPIC_LOC_STATUS: //井下车辆人员实时 List<LocStatusInfo> locStatusInfos = MqttWrapper.build().toEntityList(source, new LocStatusInfo()); if (Func.isEmpty(locStatusInfos)){ break ; } locStatusInfoService.deleteAll(); //筛选入井人员列表 List<LocStatusInfo> inWellList = locStatusInfos.stream().filter(s -> s.getIsInWell() == 1 ).collect(Collectors.toList()); locStatusInfoService.saveBatch(inWellList); //人员历史数据入库 List<LocStatusInfoHistory> locStatusInfoHistorys = MqttWrapper.build().toEntityList(source, new LocStatusInfoHistory()); locStatusInfoHistoryService.saveBatch(locStatusInfoHistorys); break ; case UbwConstant.TOPIC_LOC_OVER_TIME: //超时报警信息 List<LocOverTimeSosAlarminfo> locOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source, new LocOverTimeSosAlarminfo()); locOverTimeSosAlarminfoService.saveBatch(locOverTimeSosAlarmInfos); break ; case UbwConstant.TOPIC_LOC_OVER_AREA: //超员报警信息 List<LocAreaOverSosAlarminfo> locAreaOverSosAlarmInfos = MqttWrapper.build().toEntityList(source, new LocAreaOverSosAlarminfo()); locAreaOverSosAlarmInfoService.saveBatch(locAreaOverSosAlarmInfos); break ; case UbwConstant.TOPIC_LOC_SOS: //求救报警信息 List<LocSosAlarminfo> locSosAlarmInfos = MqttWrapper.build().toEntityList(source, new LocSosAlarminfo()); locSosAlarminfoService.saveBatch(locSosAlarmInfos); break ; case UbwConstant.TOPIC_ATTEND: //考勤信息 List<AttendanceInfo> attendanceInfos = MqttWrapper.build().toEntityList(source, new AttendanceInfo()); attendanceInfoService.saveBatch(attendanceInfos); break ; case UbwConstant.TOPIC_HISTORY_LOCUS: //精确轨迹信息 List<HistoryLocusInfo> historyLocusInfos = MqttWrapper.build().toEntityList(source, new HistoryLocusInfo()); historyLocusInfoService.saveBatch(historyLocusInfos); break ; case UbwConstant.TOPIC_WORK_SITE_PASS: //基站经过信息 List<WorkSitePassInfo> workSitePassInfos = MqttWrapper.build().toEntityList(source, new WorkSitePassInfo()); workSitePassInfoService.saveBatch(workSitePassInfos); break ; case UbwConstant.TOPIC_ENV_MON: //环境监测信息 List<EnvironmentalMonitorInfo> environmentalMonitorInfos = MqttWrapper.build().toEntityList(source, new EnvironmentalMonitorInfo()); environmentalMonitorInfoService.saveBatch(environmentalMonitorInfos); break ; case UbwConstant.TOPIC_TR_ALERT: //环境监测报警信息 List<TrAlert> trAlerts = MqttWrapper.build().toEntityList(source, new TrAlert()); trAlertService.saveBatch(trAlerts); break ; case UbwConstant.TOPIC_ADD_EVA: //下发撤离信息 List<AddEvacuateInfo> addEvacuateInfos = MqttWrapper.build().toEntityList(source, new AddEvacuateInfo()); addEvacuateInfoService.saveBatch(addEvacuateInfos); break ; case UbwConstant.TOPIC_CANCEL_EVA: //取消撤离信息 List<CancelEvacuateInfo> cancelEvacuateInfos = MqttWrapper.build().toEntityList(source, new CancelEvacuateInfo()); cancelEvacuateInfoService.saveBatch(cancelEvacuateInfos); break ; case UbwConstant.TOPIC_WORK_SITE_NEI: //相邻基站关系信息 workSiteNeighbourInfoService.deleteAll(); List<WorkSiteNeighbourInfo> workSiteNeighbourInfos = MqttWrapper.build().toEntityList(source, new WorkSiteNeighbourInfo()); workSiteNeighbourInfoService.saveBatch(workSiteNeighbourInfos); break ; case UbwConstant.TOPIC_LINK_MSG: //基站链路信息 linkMsgAlarmInfoService.deleteAll(); List<LinkMsgAlarmInfo> linkMsgAlarmInfos = MqttWrapper.build().toEntityList(source, new LinkMsgAlarmInfo()); linkMsgAlarmInfoService.saveBatch(linkMsgAlarmInfos); break ; case UbwConstant.TOPIC_LEADER_EMP: //带班领导信息 leaderEmployeeInfoService.deleteAll(); List<LeaderEmployeeInfo> leaderEmployeeInfos = MqttWrapper.build().toEntityList(source, new LeaderEmployeeInfo()); leaderEmployeeInfoService.saveBatch(leaderEmployeeInfos); break ; case UbwConstant.TOPIC_ELE_MSG: //低电报警信息 List<ElectricMsgInfo> electricMsgInfos = MqttWrapper.build().toEntityList(source, new ElectricMsgInfo()); electricMsgInfoService.saveBatch(electricMsgInfos); break ; case UbwConstant.TOPIC_WORK_AREA: //区域信息 workAreaService.deleteAll(); List<WorkArea> workAreas = MqttWrapper.build().toEntityList(source, new WorkArea()); workAreaService.saveBatch(workAreas); break ; case UbwConstant.TOPIC_HIS_OVER_TIME_SOS: //历史超时报警信息 List<HistoryOverTimeSosAlarmInfo> historyOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source, new HistoryOverTimeSosAlarmInfo()); historyOverTimeSosAlarmInfoService.saveBatch(historyOverTimeSosAlarmInfos); break ; case UbwConstant.TOPIC_SPECIAL_WORK: //特种人员预设线路信息 specialWorksService.deleteAll(); List<SpecialWorks> specialWorks = MqttWrapper.build().toEntityList(source, new SpecialWorks()); specialWorksService.saveBatch(specialWorks); break ; case UbwConstant.TOPIC_ATTEND_LOC: //历史考勤轨迹信息 List<AttendanceLocusInfo> attendanceLocusInfos = MqttWrapper.build().toEntityList(source, new AttendanceLocusInfo()); attendanceLocusInfoService.saveBatch(attendanceLocusInfos); break ; case UbwConstant.TOPIC_WORK_TYPE: //工种信息 workTypeService.deleteAll(); List<WorkType> workTypes = MqttWrapper.build().toEntityList(source, new WorkType()); workTypeService.saveBatch(workTypes); break ; case UbwConstant.TOPIC_OFFICE_POS: //职务信息 officePositionService.deleteAll(); List<OfficePosition> officePositions = MqttWrapper.build().toEntityList(source, new OfficePosition()); officePositionService.saveBatch(officePositions); break ; case UbwConstant.TOPIC_CLASS_TEAM: //班组信息 classTeamService.deleteAll(); List<ClassTeam> classTeams = MqttWrapper.build().toEntityList(source, new ClassTeam()); classTeamService.saveBatch(classTeams); break ; default : //可选 break ; } } } |
完结,小伙伴们,可以根据这个demo 改造自己的mqtt服务处理!!!
以上就是Springboot整合mqtt服务的示例代码的详细内容,更多关于Springboot整合mqtt的资料请关注服务器之家其它相关文章!
原文链接:https://blog.csdn.net/weixin_40986713/article/details/123572101