From a0a867dbfa2d8b7255bad283a2d61f2151c078a8 Mon Sep 17 00:00:00 2001 From: wangrunpu <2095588299@qq.com> Date: Tue, 9 Dec 2025 18:09:01 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E6=8F=90=E5=8F=96=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91=EF=BC=8C=E5=AF=B9=E5=BA=94?= =?UTF-8?q?=E6=A8=A1=E5=9D=97=E5=8F=AF=E5=B0=86=E5=85=B6=E9=87=8D=E5=86=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mqtt/config/MqttAutoConfiguration.java | 1 - .../mqtt/handler/MqttMessageHandler.java | 23 ++++++ .../common/mqtt/server/MqttGateway.java | 65 +++++++++++++++- .../dromara/demo/service/DeviceService.java | 56 -------------- .../hazard/config/HazardMqttConfig.java | 56 ++++++++++++++ .../handler/HazardMqttMessageHandler.java | 74 +++++++++++++++++++ 6 files changed, 214 insertions(+), 61 deletions(-) create mode 100644 ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/handler/MqttMessageHandler.java delete mode 100644 ruoyi-modules/ruoyi-demo/src/main/java/org/dromara/demo/service/DeviceService.java create mode 100644 ruoyi-modules/ruoyi-hazard/src/main/java/org/dromara/hazard/config/HazardMqttConfig.java create mode 100644 ruoyi-modules/ruoyi-hazard/src/main/java/org/dromara/hazard/handler/HazardMqttMessageHandler.java diff --git a/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/config/MqttAutoConfiguration.java b/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/config/MqttAutoConfiguration.java index 055bd4601..abb16cd7a 100644 --- a/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/config/MqttAutoConfiguration.java +++ b/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/config/MqttAutoConfiguration.java @@ -1,7 +1,6 @@ package org.dromara.common.mqtt.config; import org.dromara.common.mqtt.server.MqttGateway; -import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; diff --git a/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/handler/MqttMessageHandler.java b/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/handler/MqttMessageHandler.java new file mode 100644 index 000000000..5d85276c3 --- /dev/null +++ b/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/handler/MqttMessageHandler.java @@ -0,0 +1,23 @@ +package org.dromara.common.mqtt.handler; + +import org.eclipse.paho.client.mqttv3.MqttMessage; + + +public interface MqttMessageHandler { + + + void onMessageArrived(String topic, MqttMessage message); + + + default void onConnectionLost(Throwable cause) { + + } + + default void onConnectComplete(boolean reconnect, String serverURI) { + + } + + default void onDeliveryComplete(String topic) { + + } +} diff --git a/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/server/MqttGateway.java b/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/server/MqttGateway.java index 8849a995a..8bb1112fe 100644 --- a/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/server/MqttGateway.java +++ b/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/server/MqttGateway.java @@ -3,10 +3,16 @@ package org.dromara.common.mqtt.server; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import org.dromara.common.mqtt.config.MqttProperties; +import org.dromara.common.mqtt.handler.MqttMessageHandler; import org.eclipse.paho.client.mqttv3.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.stereotype.Service; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; + +import java.util.HashMap; @Service public class MqttGateway { @@ -16,6 +22,8 @@ public class MqttGateway { private final MqttProperties mqttProperties; private MqttClient mqttClient; private MqttConnectOptions connectOptions; + private MqttMessageHandler messageHandler; + private WebClient webClient; // 可以通过构造函数注入配置 public MqttGateway(MqttProperties mqttProperties) { @@ -50,24 +58,36 @@ public class MqttGateway { @Override public void connectComplete(boolean reconnect, String serverURI) { log.info("MQTT 连接成功. (重连: {})", reconnect); - // TODO: 可以在这里添加重连成功后重新订阅主题的逻辑 + if (messageHandler != null) { + messageHandler.onConnectComplete(reconnect, serverURI); + } } @Override public void connectionLost(Throwable cause) { log.error("MQTT 连接丢失: {}", cause.getMessage()); + if (messageHandler != null) { + messageHandler.onConnectionLost(cause); + } } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { - // TODO: 收到消息处理逻辑,可以分发给不同的处理器 - log.info("接收到消息: Topic: {}, Qos: {}, Payload: {}", - topic, message.getQos(), new String(message.getPayload())); + if (messageHandler != null) { + messageHandler.onMessageArrived(topic, message); + } else { + // 默认处理逻辑 + log.info("接收到消息: Topic: {}, Qos: {}, Payload: {}", + topic, message.getQos(), new String(message.getPayload())); + } } @Override public void deliveryComplete(IMqttDeliveryToken token) { // 消息发布完成 (QoS > 0 时) + if (messageHandler != null && token.getTopics() != null && token.getTopics().length > 0) { + messageHandler.onDeliveryComplete(token.getTopics()[0]); + } } }); @@ -136,4 +156,41 @@ public class MqttGateway { log.error("MQTT 订阅失败: Topic: {}", topic, e); } } + + /** + * 向设备发送控制命令 + * @param deviceId 设备ID + * @param command 命令内容 + */ + public void sendCommand(String deviceId, String command) { + String topic = "iot-hazard/" + deviceId; + // 发布消息,QoS=1,不保留 + publish(topic, command, 1, false); + } + + /** + * 查看设备连接信息,404表示未连接 + * @param deviceId + * @return + */ + public HashMap getDeviceLinkInfo(String deviceId) { + return webClient.get() + .uri("/api/v5/clients/" + deviceId) + .retrieve() + .onStatus( + status -> status.value() == 404, + response -> Mono.empty() + ) + .bodyToMono(new ParameterizedTypeReference>() {}) + .block(); + } + + /** + * 设置自定义消息处理器 + * @param handler 消息处理器实现 + */ + public void setMessageHandler(MqttMessageHandler handler) { + this.messageHandler = handler; + log.info("已设置自定义 MQTT 消息处理器: {}", handler.getClass().getSimpleName()); + } } diff --git a/ruoyi-modules/ruoyi-demo/src/main/java/org/dromara/demo/service/DeviceService.java b/ruoyi-modules/ruoyi-demo/src/main/java/org/dromara/demo/service/DeviceService.java deleted file mode 100644 index d2ad7f5a4..000000000 --- a/ruoyi-modules/ruoyi-demo/src/main/java/org/dromara/demo/service/DeviceService.java +++ /dev/null @@ -1,56 +0,0 @@ -package org.dromara.demo.service; - -import jakarta.annotation.PostConstruct; -import org.dromara.common.mqtt.server.MqttGateway; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.core.ParameterizedTypeReference; -import org.springframework.stereotype.Service; - -import org.springframework.web.reactive.function.client.WebClient; -import reactor.core.publisher.Mono; - -import java.util.HashMap; - -@Service -public class DeviceService { - - @Autowired - private MqttGateway mqttGateway; - @Autowired - private WebClient webClient; - - // 示例:在服务启动后自动订阅一个主题 - @PostConstruct - public void initSubscription() { - // 订阅一个主题,QoS=1 - mqttGateway.subscribe("iot-hazard-server", 1); - } - - /** - * 向设备发送控制命令 - * @param deviceId 设备ID - * @param command 命令内容 - */ - public void sendCommand(String deviceId, String command) { - String topic = "iot-hazard/" + deviceId; - // 发布消息,QoS=1,不保留 - mqttGateway.publish(topic, command, 1, false); - } - - /** - * 查看设备连接信息,404表示未连接 - * @param deviceId - * @return - */ - public HashMap getDeviceLinkInfo(String deviceId) { - return webClient.get() - .uri("/api/v5/clients/" + deviceId) - .retrieve() - .onStatus( - status -> status.value() == 404, - response -> Mono.empty() - ) - .bodyToMono(new ParameterizedTypeReference>() {}) - .block(); - } -} diff --git a/ruoyi-modules/ruoyi-hazard/src/main/java/org/dromara/hazard/config/HazardMqttConfig.java b/ruoyi-modules/ruoyi-hazard/src/main/java/org/dromara/hazard/config/HazardMqttConfig.java new file mode 100644 index 000000000..38e626206 --- /dev/null +++ b/ruoyi-modules/ruoyi-hazard/src/main/java/org/dromara/hazard/config/HazardMqttConfig.java @@ -0,0 +1,56 @@ +package org.dromara.hazard.config; + +import jakarta.annotation.PostConstruct; +import org.dromara.common.mqtt.server.MqttGateway; +import org.dromara.hazard.handler.HazardMqttMessageHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; + +/** + * 危险源模块MQTT配置 + * + * @author ruoyi + */ +@Configuration +public class HazardMqttConfig { + + private static final Logger log = LoggerFactory.getLogger(HazardMqttConfig.class); + + @Autowired(required = false) + private MqttGateway mqttGateway; + + @Autowired + private HazardMqttMessageHandler hazardMqttMessageHandler; + + /** + * 应用启动后设置自定义消息处理器 + */ + @PostConstruct + public void init() { + if (mqttGateway != null) { + mqttGateway.setMessageHandler(hazardMqttMessageHandler); + log.info("危险源模块MQTT消息处理器已注册"); + + // 订阅相关主题 + subscribeTopics(); + } else { + log.warn("MqttGateway未启用,跳过MQTT消息处理器注册"); + } + } + + /** + * 订阅危险源模块关心的主题 + */ + private void subscribeTopics() { + try { + // 订阅一个主题,QoS=1 + mqttGateway.subscribe("iot-hazard-server", 1); + + log.info("危险源模块已订阅MQTT主题: iot-hazard-server"); + } catch (Exception e) { + log.error("订阅MQTT主题失败", e); + } + } +} diff --git a/ruoyi-modules/ruoyi-hazard/src/main/java/org/dromara/hazard/handler/HazardMqttMessageHandler.java b/ruoyi-modules/ruoyi-hazard/src/main/java/org/dromara/hazard/handler/HazardMqttMessageHandler.java new file mode 100644 index 000000000..1f265c6ac --- /dev/null +++ b/ruoyi-modules/ruoyi-hazard/src/main/java/org/dromara/hazard/handler/HazardMqttMessageHandler.java @@ -0,0 +1,74 @@ +package org.dromara.hazard.handler; + +import org.dromara.common.mqtt.handler.MqttMessageHandler; +import org.dromara.common.mqtt.server.MqttGateway; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * 危险源模块的MQTT消息处理器 + * + * @author ruoyi + */ +@Component +public class HazardMqttMessageHandler implements MqttMessageHandler { + + private static final Logger log = LoggerFactory.getLogger(HazardMqttMessageHandler.class); + @Autowired + private MqttGateway mqttGateway; + + /** + * 处理接收到的MQTT消息 + */ + @Override + public void onMessageArrived(String topic, MqttMessage message) { + String payload = new String(message.getPayload()); + log.info("【危险源模块】接收到MQTT消息 - Topic: {}, QoS: {}, Retained: {}, Payload: {}", + topic, message.getQos(), message.isRetained(), payload); + + // TODO: 根据不同的主题分发到不同的业务处理方法 + if (topic.equals("iot-hazard-server")) { + handleHazardMessage(topic, payload); + } else { + log.warn("未知的消息主题: {}", topic); + } + } + + /** + * 连接丢失时的处理 + */ + @Override + public void onConnectionLost(Throwable cause) { + log.error("【危险源模块】MQTT连接丢失", cause); + // TODO: 可以在这里添加告警通知、重连策略等 + } + + /** + * 连接成功时的处理 + */ + @Override + public void onConnectComplete(boolean reconnect, String serverURI) { + log.info("【危险源模块】MQTT{}连接成功: {}", reconnect ? "重" : "", serverURI); + // TODO: 可以在这里重新订阅主题 + } + + /** + * 消息发送完成时的处理 + */ + @Override + public void onDeliveryComplete(String topic) { + log.debug("【危险源模块】消息发送完成: {}", topic); + } + + /** + * 处理危险源相关消息 + */ + private void handleHazardMessage(String topic, String payload) { + log.info("处理危险源消息 - Topic: {}, Payload: {}", topic, payload); + // TODO: 实现你的业务逻辑 + // 例如: 解析JSON、保存数据库、触发业务流程等 + } +}