|
|
|
|
@ -1,17 +1,30 @@
|
|
|
|
|
package org.dromara.hazard.handler;
|
|
|
|
|
|
|
|
|
|
import cn.hutool.json.JSONUtil;
|
|
|
|
|
import org.dromara.common.core.utils.StringUtils;
|
|
|
|
|
import org.dromara.common.json.utils.JsonUtils;
|
|
|
|
|
import org.dromara.common.mqtt.handler.MqttMessageHandler;
|
|
|
|
|
import org.dromara.common.mqtt.server.MqttGateway;
|
|
|
|
|
import org.dromara.common.sse.dto.SseMessageDto;
|
|
|
|
|
import org.dromara.common.sse.utils.SseMessageUtils;
|
|
|
|
|
import org.dromara.hazard.domain.bo.IotSensorDataBo;
|
|
|
|
|
import org.dromara.hazard.domain.dto.Payload;
|
|
|
|
|
import org.dromara.hazard.domain.dto.SensorData;
|
|
|
|
|
import org.dromara.hazard.domain.vo.IotDeviceVo;
|
|
|
|
|
import org.dromara.hazard.domain.vo.IotSensorVo;
|
|
|
|
|
import org.dromara.hazard.service.IIotDeviceService;
|
|
|
|
|
import org.dromara.hazard.service.IIotSensorDataService;
|
|
|
|
|
import org.dromara.hazard.service.IIotSensorService;
|
|
|
|
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
|
|
|
|
import java.util.Date;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 危险源模块的MQTT消息处理器
|
|
|
|
|
@ -24,6 +37,14 @@ public class HazardMqttMessageHandler implements MqttMessageHandler {
|
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(HazardMqttMessageHandler.class);
|
|
|
|
|
@Autowired
|
|
|
|
|
private MqttGateway mqttGateway;
|
|
|
|
|
@Autowired
|
|
|
|
|
private IIotDeviceService iotDeviceService;
|
|
|
|
|
@Autowired
|
|
|
|
|
private IIotSensorService iotSensorService;
|
|
|
|
|
@Autowired
|
|
|
|
|
private IIotSensorDataService iotSensorDataService;
|
|
|
|
|
@Autowired
|
|
|
|
|
private ScheduledExecutorService scheduledExecutorService;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 处理接收到的MQTT消息
|
|
|
|
|
@ -94,6 +115,43 @@ public class HazardMqttMessageHandler implements MqttMessageHandler {
|
|
|
|
|
// 例如: 解析JSON、保存数据库、触发业务流程等
|
|
|
|
|
Payload payload1 = JsonUtils.parseObject(payload, Payload.class);
|
|
|
|
|
System.out.println(payload1);
|
|
|
|
|
String deviceCode = payload1.getDeviceCode();
|
|
|
|
|
List<SensorData> data = payload1.getData();
|
|
|
|
|
if (deviceCode!= null && data != null && data.size() > 0){
|
|
|
|
|
for (SensorData dataBo : data) {
|
|
|
|
|
String sensorCode = dataBo.getSensorCode();
|
|
|
|
|
if (sensorCode != null){
|
|
|
|
|
IotSensorVo iotSensorVo = iotSensorService.queryByCode(sensorCode);
|
|
|
|
|
IotDeviceVo deviceVo = iotDeviceService.queryByDeviceCode(deviceCode);
|
|
|
|
|
Long sensorId = iotSensorVo.getId();//获取传感器id
|
|
|
|
|
Long deviceId = deviceVo.getId();//获取设备id
|
|
|
|
|
if (sensorId!=null && deviceId != null){//只有当传感器id和设备id不为空时才处理数据,即传感器链接到设备时才会处理数据
|
|
|
|
|
IotSensorDataBo iotSensorDataBo = new IotSensorDataBo();
|
|
|
|
|
iotSensorDataBo.setTime(new Date());
|
|
|
|
|
iotSensorDataBo.setDeviceId(deviceId);
|
|
|
|
|
iotSensorDataBo.setSensorCode(sensorId);
|
|
|
|
|
iotSensorDataBo.setValue(dataBo.getValue());
|
|
|
|
|
iotSensorDataBo.setRawJson(payload);//保存原始数据
|
|
|
|
|
iotSensorDataService.insertByBo(iotSensorDataBo);
|
|
|
|
|
|
|
|
|
|
//sse发送消息到前端
|
|
|
|
|
scheduledExecutorService.schedule(() -> {
|
|
|
|
|
SseMessageDto dto = new SseMessageDto();
|
|
|
|
|
DataDto messageData = new DataDto();
|
|
|
|
|
messageData.setDeviceId(String.valueOf(deviceId));
|
|
|
|
|
messageData.setSensorId(String.valueOf(sensorId));
|
|
|
|
|
messageData.setValue(dataBo.getValue());
|
|
|
|
|
messageData.setType(iotSensorVo.getType());
|
|
|
|
|
String jsonStr = JSONUtil.toJsonStr(messageData);
|
|
|
|
|
dto.setMessage(topic+jsonStr);
|
|
|
|
|
// 如果需要指定特定用户接收消息,可以设置userIds
|
|
|
|
|
// dto.setUserIds(List.of(userId));
|
|
|
|
|
SseMessageUtils.publishMessage(dto);
|
|
|
|
|
}, 0, TimeUnit.SECONDS);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|