fix:处理MQTT重连没有订阅的问题;feat:新增接收数据实体类

master
wangrunpu 1 month ago
parent dc69c03e7a
commit 60a594c441

@ -3,6 +3,7 @@ package org.dromara.common.json.utils;
import cn.hutool.core.lang.Dict;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.ObjectUtil;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
@ -31,6 +32,12 @@ public class JsonUtils {
return OBJECT_MAPPER;
}
static {
// **关键配置:允许使用单引号。**
// 您的输入字符串使用了单引号标准JSON使用双引号。
OBJECT_MAPPER.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
}
/**
* JSON
*
@ -222,4 +229,6 @@ public class JsonUtils {
}
}
}

@ -159,23 +159,23 @@ public class MqttGateway {
/**
*
* @param deviceId ID
* @param deviceCode ID
* @param command
*/
public void sendCommand(String deviceId, String command) {
String topic = "iot-hazard/" + deviceId;
public void sendCommand(String deviceCode, String command) {
String topic = "iot-hazard/" + deviceCode;
// 发布消息QoS=1不保留
publish(topic, command, 1, false);
}
/**
* 404
* @param deviceId
* @param deviceCode
* @return
*/
public HashMap<String, Object> getDeviceLinkInfo(String deviceId) {
public HashMap<String, Object> getDeviceLinkInfo(String deviceCode) {
return webClient.get()
.uri("/api/v5/clients/" + deviceId)
.uri("/api/v5/clients/" + deviceCode)
.retrieve()
.onStatus(
status -> status.value() == 404,

@ -33,24 +33,11 @@ public class HazardMqttConfig {
mqttGateway.setMessageHandler(hazardMqttMessageHandler);
log.info("危险源模块MQTT消息处理器已注册");
// 订阅相关主题
subscribeTopics();
// 订阅主题
hazardMqttMessageHandler.subscribeTopics();
} else {
log.warn("MqttGateway未启用跳过MQTT消息处理器注册");
}
}
/**
*
*/
private void subscribeTopics() {
try {
// 订阅一个主题QoS=1
mqttGateway.subscribe("iot-hazard-server", 1);
mqttGateway.subscribe("status/online", 1);
log.info("危险源模块已订阅MQTT主题: iot-hazard-server");
} catch (Exception e) {
log.error("订阅MQTT主题失败", e);
}
}
}

@ -0,0 +1,20 @@
package org.dromara.hazard.domain.dto;
import java.util.List;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
@Data
public class Payload {
@JsonProperty("device_code")
private String deviceCode;
@JsonProperty("data")
private List<SensorData> data;
public Payload() {}
public Payload(String deviceCode, List<SensorData> data) {
this.deviceCode = deviceCode;
this.data = data;
}
}

@ -0,0 +1,20 @@
package org.dromara.hazard.domain.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
@Data
public class SensorData {
@JsonProperty("sensor_code")
private String sensorCode;
@JsonProperty("value")
private String value;
public SensorData(){}
public SensorData(String sensorCode, String value) {
this.sensorCode = sensorCode;
this.value = value;
}
}

@ -1,8 +1,10 @@
package org.dromara.hazard.handler;
import org.dromara.common.core.utils.StringUtils;
import org.dromara.common.json.utils.JsonUtils;
import org.dromara.common.mqtt.handler.MqttMessageHandler;
import org.dromara.common.mqtt.server.MqttGateway;
import org.dromara.hazard.domain.dto.Payload;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -57,7 +59,8 @@ public class HazardMqttMessageHandler implements MqttMessageHandler {
@Override
public void onConnectComplete(boolean reconnect, String serverURI) {
log.info("【危险源模块】MQTT{}连接成功: {}", reconnect ? "重" : "", serverURI);
// TODO: 可以在这里重新订阅主题
// 订阅主题
subscribeTopics();
}
/**
@ -68,6 +71,20 @@ public class HazardMqttMessageHandler implements MqttMessageHandler {
log.debug("【危险源模块】消息发送完成: {}", topic);
}
/**
*
*/
public void subscribeTopics() {
try {
// 订阅一个主题QoS=1
mqttGateway.subscribe("iot-hazard-server", 1);
mqttGateway.subscribe("status/online", 1);
log.info("危险源模块已订阅MQTT主题: iot-hazard-server");
} catch (Exception e) {
log.error("订阅MQTT主题失败", e);
}
}
/**
*
*/
@ -75,6 +92,9 @@ public class HazardMqttMessageHandler implements MqttMessageHandler {
log.info("处理危险源消息 - Topic: {}, Payload: {}", topic, payload);
// TODO: 实现你的业务逻辑
// 例如: 解析JSON、保存数据库、触发业务流程等
Payload payload1 = JsonUtils.parseObject(payload, Payload.class);
System.out.println(payload1);
}
/**

Loading…
Cancel
Save