服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - Springboot整合mqtt服务的示例代码

Springboot整合mqtt服务的示例代码

2022-10-10 14:47李泰山 Java教程

MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。本文为大家分享了Springboot整合mqtt服务的示例代码,需要的可以参考一下

首先在pom文件里引入mqtt的依赖配置

?
1
2
3
4
5
6
<!--mqtt-->
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.4</version>
</dependency>

其次在springboot 的配置yml文件,配置mqtt的服务配置

?
1
2
3
4
5
6
7
spring: 
  mqtt:
    url: tcp://127.0.0.1:1883
    client-id: niubility-tiger
    username:
    password:
    topic: [/unify/test]

创建 MqttProperties配置参数类

?
1
2
3
4
5
6
7
8
9
10
11
12
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
 
@Data
@ConfigurationProperties("spring.mqtt")
public class MqttProperties {
    private String url;
    private String clientId;
    private String username;
    private String password;
    private String[] topic;
}

创建 MqttConfiguration 配置类

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springblade.core.tool.utils.Func;
import org.springblade.ubw.listener.MqttSubscribeListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
@EnableConfigurationProperties({MqttProperties.class})
public class MqttConfiguration {
    private static final Logger log = LoggerFactory.getLogger(MqttConfiguration.class);
    @Autowired
    private MqttProperties mqttProperties;
 
    public MqttConfiguration() {
    }
 
    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions connectOptions = new MqttConnectOptions();
        connectOptions.setServerURIs(new String[]{this.mqttProperties.getUrl()});
        if (Func.isNotBlank(this.mqttProperties.getUrl())) {
            connectOptions.setUserName(this.mqttProperties.getUsername());
        }
 
        if (Func.isNotBlank(this.mqttProperties.getPassword())) {
            connectOptions.setPassword(this.mqttProperties.getPassword().toCharArray());
        }
 
        connectOptions.setKeepAliveInterval(60);
        return connectOptions;
    }
 
    @Bean
    public IMqttClient mqttClient(MqttConnectOptions options) throws MqttException {
        IMqttClient mqttClient = new MqttClient(this.mqttProperties.getUrl(), this.mqttProperties.getClientId());
        mqttClient.connect(options);
        for(int i = 0; i< this.mqttProperties.getTopic().length; ++i) {
            mqttClient.subscribe(this.mqttProperties.getTopic()[i], new MqttSubscribeListener());
        }
        return mqttClient;
    }
}

创建 订阅事件类

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.springframework.context.ApplicationEvent;
 
 
public class UWBMqttSubscribeEvent extends ApplicationEvent {
    private String topic;
 
    public UWBMqttSubscribeEvent(String topic, Object source) {
        super(source);
        this.topic = topic;
    }
 
    public String getTopic() {
        return this.topic;
    }
}

创建订阅事件监听器

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springblade.core.tool.utils.SpringUtil;
import org.springblade.ubw.event.UWBMqttSubscribeEvent;
 
 
public class MqttSubscribeListener implements IMqttMessageListener {
 
    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) {
        String content = new String(mqttMessage.getPayload());
        UWBMqttSubscribeEvent event = new UWBMqttSubscribeEvent(s, content);
        SpringUtil.publishEvent(event);
    }
}

创建mqtt消息事件异步处理监听器

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import com.baomidou.mybatisplus.core.toolkit.StringPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springblade.core.tool.utils.Func;
import org.springblade.ubw.config.MqttProperties;
import org.springblade.ubw.event.UWBMqttSubscribeEvent;
import org.springblade.ubw.service.MqttService;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
 
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.List;
 
 
@Configuration
public class MqttEventListener {
 
 
    private static final Logger log = LoggerFactory.getLogger(MqttEventListener.class);
 
    @Resource
    private MqttProperties mqttProperties;
 
    @Resource
    private MqttService mqttService;
 
    private String processTopic (String topic) {
        List<String> topics = Arrays.asList(mqttProperties.getTopic());
        for (String wild : topics) {
            wild = wild.replace(StringPool.HASH, StringPool.EMPTY);
            if (topic.startsWith(wild)) {
                return topic.replace(wild, StringPool.EMPTY);
            }
        }
        return StringPool.EMPTY;
    }
 
 
    @Async
    @EventListener(UWBMqttSubscribeEvent.class)
    public void listen (UWBMqttSubscribeEvent event) {
        String topic = processTopic(event.getTopic());
        Object source = event.getSource();
        if (Func.isEmpty(source)) {
            return;
        }
        mqttService.issue(topic,source);
//        log.info("mqtt接收到 通道 {} 的信息为:{}",topic,source);
    }
}

创建MqttService 数据处理服务类

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springblade.core.tool.utils.Func;
import org.springblade.ubw.area.entity.WorkArea;
import org.springblade.ubw.area.entity.WorkSite;
import org.springblade.ubw.area.entity.WorkSiteNeighbourInfo;
import org.springblade.ubw.area.entity.WorkSitePassInfo;
import org.springblade.ubw.area.service.WorkAreaService;
import org.springblade.ubw.area.service.WorkSiteNeighbourInfoService;
import org.springblade.ubw.area.service.WorkSitePassInfoService;
import org.springblade.ubw.area.service.WorkSiteService;
import org.springblade.ubw.constant.UbwConstant;
import org.springblade.ubw.history.entity.HistoryLocusInfo;
import org.springblade.ubw.history.entity.HistoryOverTimeSosAlarmInfo;
import org.springblade.ubw.history.service.HistoryLocusInfoService;
import org.springblade.ubw.history.service.HistoryOverTimeSosAlarmInfoService;
import org.springblade.ubw.loc.entity.LocStatusInfo;
import org.springblade.ubw.loc.entity.LocStatusInfoHistory;
import org.springblade.ubw.loc.service.LocStatusInfoHistoryService;
import org.springblade.ubw.loc.service.LocStatusInfoService;
import org.springblade.ubw.msg.entity.*;
import org.springblade.ubw.msg.service.*;
import org.springblade.ubw.system.entity.*;
import org.springblade.ubw.system.service.*;
import org.springblade.ubw.system.wrapper.MqttWrapper;
import org.springframework.stereotype.Service;
 
import javax.annotation.Resource;
import java.util.List;
import java.util.stream.Collectors;
 
 
@Service
public class MqttService {
 
    private static final Logger log = LoggerFactory.getLogger(MqttService.class);
 
    @Resource
    private EmployeeAndDepartmentService employeeAndDepartmentService;
 
    @Resource
    private VehicleInfoService vehicleInfoService;
 
    @Resource
    private WorkSiteService workSiteService;
 
    @Resource
    private LocStatusInfoService locStatusInfoService;
 
    @Resource
    private LocStatusInfoHistoryService locStatusInfoHistoryService;
 
    @Resource
    private LocOverTimeSosAlarminfoService locOverTimeSosAlarminfoService;
 
    @Resource
    private LocAreaOverSosAlarminfoService locAreaOverSosAlarmInfoService;
 
    @Resource
    private LocSosAlarminfoService locSosAlarminfoService;
 
    @Resource
    private AttendanceInfoService attendanceInfoService;
 
    @Resource
    private HistoryLocusInfoService historyLocusInfoService;
 
    @Resource
    private WorkSitePassInfoService workSitePassInfoService;
 
    @Resource
    private EnvironmentalMonitorInfoService environmentalMonitorInfoService;
 
    @Resource
    private TrAlertService trAlertService;
 
    @Resource
    private AddEvacuateInfoService addEvacuateInfoService;
 
    @Resource
    private CancelEvacuateInfoService cancelEvacuateInfoService;
 
    @Resource
    private WorkSiteNeighbourInfoService workSiteNeighbourInfoService;
 
    @Resource
    private LinkMsgAlarmInfoService linkMsgAlarmInfoService;
 
    @Resource
    private LeaderEmployeeInfoService leaderEmployeeInfoService;
 
    @Resource
    private ElectricMsgInfoService electricMsgInfoService;
 
    @Resource
    private WorkAreaService workAreaService;
 
    @Resource
    private HistoryOverTimeSosAlarmInfoService historyOverTimeSosAlarmInfoService;
 
    @Resource
    private SpecialWorksService specialWorksService;
 
    @Resource
    private AttendanceLocusInfoService attendanceLocusInfoService;
 
    @Resource
    private WorkTypeService workTypeService;
 
    @Resource
    private OfficePositionService officePositionService;
 
    @Resource
    private ClassTeamService classTeamService;
 
    /**
     * 方法描述: 消息分发
     *
     * @param topic
     * @param source
     * @author liwenbin
     * @date 2021年12月14日 14:14:09
     */
    public void issue(String topic,Object source){
        switch(topic){
            case UbwConstant.TOPIC_EMP :
                //人员和部门信息
                employeeAndDepartmentService.saveBatch(source);
                break;
            case UbwConstant.TOPIC_VEHICLE :
                //车辆信息
                List<VehicleInfo> vehicleInfos = MqttWrapper.build().toEntityList(source,new VehicleInfo());
                vehicleInfoService.deleteAll();
                vehicleInfoService.saveBatch(vehicleInfos);
                break;
            case UbwConstant.TOPIC_WORK_SITE :
                //基站信息
                List<WorkSite> workSites = MqttWrapper.build().toEntityList(source,new WorkSite());
                workSiteService.deleteAll();
                workSiteService.saveBatch(workSites);
                break;
            case UbwConstant.TOPIC_LOC_STATUS:
                //井下车辆人员实时
                List<LocStatusInfo> locStatusInfos = MqttWrapper.build().toEntityList(source,new LocStatusInfo());
                if (Func.isEmpty(locStatusInfos)){
                    break;
                }
                locStatusInfoService.deleteAll();
                //筛选入井人员列表
                List<LocStatusInfo> inWellList = locStatusInfos.stream().filter(s -> s.getIsInWell() == 1).collect(Collectors.toList());
                locStatusInfoService.saveBatch(inWellList);
                //人员历史数据入库
                List<LocStatusInfoHistory> locStatusInfoHistorys = MqttWrapper.build().toEntityList(source,new LocStatusInfoHistory());
                locStatusInfoHistoryService.saveBatch(locStatusInfoHistorys);
                break;
            case UbwConstant.TOPIC_LOC_OVER_TIME:
                //超时报警信息
                List<LocOverTimeSosAlarminfo> locOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocOverTimeSosAlarminfo());
                locOverTimeSosAlarminfoService.saveBatch(locOverTimeSosAlarmInfos);
                break;
            case UbwConstant.TOPIC_LOC_OVER_AREA:
                //超员报警信息
                List<LocAreaOverSosAlarminfo> locAreaOverSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocAreaOverSosAlarminfo());
                locAreaOverSosAlarmInfoService.saveBatch(locAreaOverSosAlarmInfos);
                break;
            case UbwConstant.TOPIC_LOC_SOS:
                //求救报警信息
                List<LocSosAlarminfo> locSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocSosAlarminfo());
                locSosAlarminfoService.saveBatch(locSosAlarmInfos);
                break;
            case UbwConstant.TOPIC_ATTEND:
                //考勤信息
                List<AttendanceInfo> attendanceInfos = MqttWrapper.build().toEntityList(source,new AttendanceInfo());
                attendanceInfoService.saveBatch(attendanceInfos);
                break;
            case UbwConstant.TOPIC_HISTORY_LOCUS:
                //精确轨迹信息
                List<HistoryLocusInfo> historyLocusInfos = MqttWrapper.build().toEntityList(source,new HistoryLocusInfo());
                historyLocusInfoService.saveBatch(historyLocusInfos);
                break;
            case UbwConstant.TOPIC_WORK_SITE_PASS:
                //基站经过信息
                List<WorkSitePassInfo> workSitePassInfos = MqttWrapper.build().toEntityList(source,new WorkSitePassInfo());
                workSitePassInfoService.saveBatch(workSitePassInfos);
                break;
            case UbwConstant.TOPIC_ENV_MON:
                //环境监测信息
                List<EnvironmentalMonitorInfo> environmentalMonitorInfos = MqttWrapper.build().toEntityList(source,new EnvironmentalMonitorInfo());
                environmentalMonitorInfoService.saveBatch(environmentalMonitorInfos);
                break;
            case UbwConstant.TOPIC_TR_ALERT:
                //环境监测报警信息
                List<TrAlert> trAlerts = MqttWrapper.build().toEntityList(source,new TrAlert());
                trAlertService.saveBatch(trAlerts);
                break;
            case UbwConstant.TOPIC_ADD_EVA:
                //下发撤离信息
                List<AddEvacuateInfo> addEvacuateInfos = MqttWrapper.build().toEntityList(source,new AddEvacuateInfo());
                addEvacuateInfoService.saveBatch(addEvacuateInfos);
                break;
            case UbwConstant.TOPIC_CANCEL_EVA:
                //取消撤离信息
                List<CancelEvacuateInfo> cancelEvacuateInfos = MqttWrapper.build().toEntityList(source,new CancelEvacuateInfo());
                cancelEvacuateInfoService.saveBatch(cancelEvacuateInfos);
                break;
            case UbwConstant.TOPIC_WORK_SITE_NEI:
                //相邻基站关系信息
                workSiteNeighbourInfoService.deleteAll();
                List<WorkSiteNeighbourInfo> workSiteNeighbourInfos = MqttWrapper.build().toEntityList(source,new WorkSiteNeighbourInfo());
                workSiteNeighbourInfoService.saveBatch(workSiteNeighbourInfos);
                break;
            case UbwConstant.TOPIC_LINK_MSG:
                //基站链路信息
                linkMsgAlarmInfoService.deleteAll();
                List<LinkMsgAlarmInfo> linkMsgAlarmInfos = MqttWrapper.build().toEntityList(source,new LinkMsgAlarmInfo());
                linkMsgAlarmInfoService.saveBatch(linkMsgAlarmInfos);
                break;
            case UbwConstant.TOPIC_LEADER_EMP:
                //带班领导信息
                leaderEmployeeInfoService.deleteAll();
                List<LeaderEmployeeInfo> leaderEmployeeInfos = MqttWrapper.build().toEntityList(source,new LeaderEmployeeInfo());
                leaderEmployeeInfoService.saveBatch(leaderEmployeeInfos);
                break;
            case UbwConstant.TOPIC_ELE_MSG:
                //低电报警信息
                List<ElectricMsgInfo> electricMsgInfos = MqttWrapper.build().toEntityList(source,new ElectricMsgInfo());
                electricMsgInfoService.saveBatch(electricMsgInfos);
                break;
            case UbwConstant.TOPIC_WORK_AREA:
                //区域信息
                workAreaService.deleteAll();
                List<WorkArea> workAreas = MqttWrapper.build().toEntityList(source,new WorkArea());
                workAreaService.saveBatch(workAreas);
                break;
            case UbwConstant.TOPIC_HIS_OVER_TIME_SOS:
                //历史超时报警信息
                List<HistoryOverTimeSosAlarmInfo> historyOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source,new HistoryOverTimeSosAlarmInfo());
                historyOverTimeSosAlarmInfoService.saveBatch(historyOverTimeSosAlarmInfos);
                break;
            case UbwConstant.TOPIC_SPECIAL_WORK:
                //特种人员预设线路信息
                specialWorksService.deleteAll();
                List<SpecialWorks> specialWorks = MqttWrapper.build().toEntityList(source,new SpecialWorks());
                specialWorksService.saveBatch(specialWorks);
                break;
            case UbwConstant.TOPIC_ATTEND_LOC:
                //历史考勤轨迹信息
                List<AttendanceLocusInfo> attendanceLocusInfos = MqttWrapper.build().toEntityList(source,new AttendanceLocusInfo());
                attendanceLocusInfoService.saveBatch(attendanceLocusInfos);
                break;
            case UbwConstant.TOPIC_WORK_TYPE:
                //工种信息
                workTypeService.deleteAll();
                List<WorkType> workTypes = MqttWrapper.build().toEntityList(source,new WorkType());
                workTypeService.saveBatch(workTypes);
                break;
            case UbwConstant.TOPIC_OFFICE_POS:
                //职务信息
                officePositionService.deleteAll();
                List<OfficePosition> officePositions = MqttWrapper.build().toEntityList(source,new OfficePosition());
                officePositionService.saveBatch(officePositions);
                break;
            case UbwConstant.TOPIC_CLASS_TEAM:
                //班组信息
                classTeamService.deleteAll();
                List<ClassTeam> classTeams = MqttWrapper.build().toEntityList(source,new ClassTeam());
                classTeamService.saveBatch(classTeams);
                break;
            default : //可选
                break;
        }
    }
}

完结,小伙伴们,可以根据这个demo 改造自己的mqtt服务处理!!!

以上就是Springboot整合mqtt服务的示例代码的详细内容,更多关于Springboot整合mqtt的资料请关注服务器之家其它相关文章!

原文链接:https://blog.csdn.net/weixin_40986713/article/details/123572101

延伸 · 阅读

精彩推荐