diff --git a/pom.xml b/pom.xml
index 9a9114029..04e5f86db 100644
--- a/pom.xml
+++ b/pom.xml
@@ -49,6 +49,8 @@
8.7.2-20250603
1.8.2
+
+ 1.2.5
3.4.2
@@ -99,7 +101,11 @@
-
+
+ org.eclipse.paho
+ org.eclipse.paho.client.mqttv3
+ ${client.mqtt.version}
+
org.springframework.boot
diff --git a/ruoyi-admin/src/main/resources/application-dev.yml b/ruoyi-admin/src/main/resources/application-dev.yml
index b2002fc3e..0e9e259a6 100644
--- a/ruoyi-admin/src/main/resources/application-dev.yml
+++ b/ruoyi-admin/src/main/resources/application-dev.yml
@@ -270,3 +270,12 @@ justauth:
client-id: 10**********6
client-secret: 1f7d08**********5b7**********29e
redirect-uri: ${justauth.address}/social-callback?source=gitea
+
+mqtt:
+ server: http://localhost:18083
+ host: tcp://83l6076j43.goho.co:37690
+ client-id: stm32-iot-server # 推荐使用模块名+端口,确保唯一性
+ username:
+ password:
+ api-key: 890c854975af5456
+ secret-key: aHONcCJLm8H3fu301vOkRRe8aiXV9AGsRjuAjganacmH
diff --git a/ruoyi-common/pom.xml b/ruoyi-common/pom.xml
index 2930fd0b0..037c837a9 100644
--- a/ruoyi-common/pom.xml
+++ b/ruoyi-common/pom.xml
@@ -34,6 +34,7 @@
ruoyi-common-tenant
ruoyi-common-websocket
ruoyi-common-sse
+ ruoyi-common-mqtt
ruoyi-common
diff --git a/ruoyi-common/ruoyi-common-bom/pom.xml b/ruoyi-common/ruoyi-common-bom/pom.xml
index f1407814b..eec03c708 100644
--- a/ruoyi-common/ruoyi-common-bom/pom.xml
+++ b/ruoyi-common/ruoyi-common-bom/pom.xml
@@ -68,6 +68,13 @@
${revision}
+
+
+ org.dromara
+ ruoyi-common-mqtt
+ ${revision}
+
+
org.dromara
diff --git a/ruoyi-common/ruoyi-common-mqtt/pom.xml b/ruoyi-common/ruoyi-common-mqtt/pom.xml
new file mode 100644
index 000000000..b2cf6c401
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-mqtt/pom.xml
@@ -0,0 +1,38 @@
+
+
+
+ org.dromara
+ ruoyi-common
+ ${revision}
+
+ 4.0.0
+
+ ruoyi-common-mqtt
+
+
+ ruoyi-common-mqtt 数据交互服务
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-webflux
+
+
+
+ org.eclipse.paho
+ org.eclipse.paho.client.mqttv3
+
+
+
+
+ org.dromara
+ ruoyi-common-core
+ ${revision}
+
+
+
+
diff --git a/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/config/MqttAutoConfiguration.java b/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/config/MqttAutoConfiguration.java
new file mode 100644
index 000000000..055bd4601
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/config/MqttAutoConfiguration.java
@@ -0,0 +1,32 @@
+package org.dromara.common.mqtt.config;
+
+import org.dromara.common.mqtt.server.MqttGateway;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.reactive.function.client.WebClient;
+
+import java.util.Base64;
+
+/**
+ * MQTT 自动配置类
+ */
+@Configuration
+@EnableConfigurationProperties(MqttProperties.class)
+// 只有当配置中设置了 mqtt.host 时才启用此自动配置
+@ConditionalOnProperty(prefix = "mqtt", name = "host")
+public class MqttAutoConfiguration {
+
+ @Bean
+ public MqttGateway mqttGateway(MqttProperties mqttProperties) {
+ return new MqttGateway(mqttProperties);
+ }
+
+ @Bean
+ public WebClient webClient(MqttProperties mqttProperties) {
+ String basicAuth = Base64.getEncoder().encodeToString((mqttProperties.getApiKey() + ":" + mqttProperties.getSecretKey()).getBytes());
+ return WebClient.builder().baseUrl(mqttProperties.getServer()).defaultHeader("Authorization", "Basic " + basicAuth).build();
+ }
+}
diff --git a/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/config/MqttProperties.java b/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/config/MqttProperties.java
new file mode 100644
index 000000000..658ac22dd
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/config/MqttProperties.java
@@ -0,0 +1,37 @@
+package org.dromara.common.mqtt.config;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import lombok.Data;
+
+@Data
+@ConfigurationProperties(prefix = "mqtt") // 配置文件前缀
+public class MqttProperties {
+ private String server;
+
+ /** Broker 地址 (例如: tcp://broker.emqx.io:1883) */
+ private String host;
+
+ /** 客户端 ID */
+ private String clientId;
+
+ /** 用户名 */
+ private String username;
+
+ /** 密码 */
+ private String password;
+
+ /** 连接超时时间 (秒) */
+ private int connectionTimeout = 30;
+
+ /** 心跳间隔 (秒) */
+ private int keepAliveInterval = 60;
+
+ /** 是否清除会话 */
+ private boolean cleanSession = true;
+
+ /** 断开重连是否启用 */
+ private boolean automaticReconnect = true;
+
+ private String apiKey;
+ private String secretKey;
+}
diff --git a/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/server/MqttGateway.java b/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/server/MqttGateway.java
new file mode 100644
index 000000000..8849a995a
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/server/MqttGateway.java
@@ -0,0 +1,139 @@
+package org.dromara.common.mqtt.server;
+
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import org.dromara.common.mqtt.config.MqttProperties;
+import org.eclipse.paho.client.mqttv3.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+@Service
+public class MqttGateway {
+
+ private static final Logger log = LoggerFactory.getLogger(MqttGateway.class);
+
+ private final MqttProperties mqttProperties;
+ private MqttClient mqttClient;
+ private MqttConnectOptions connectOptions;
+
+ // 可以通过构造函数注入配置
+ public MqttGateway(MqttProperties mqttProperties) {
+ this.mqttProperties = mqttProperties;
+ this.initOptions(); // 初始化连接选项
+ }
+
+ // 初始化连接选项
+ private void initOptions() {
+ connectOptions = new MqttConnectOptions();
+ connectOptions.setUserName(mqttProperties.getUsername());
+ connectOptions.setPassword(mqttProperties.getPassword() != null ? mqttProperties.getPassword().toCharArray() : null);
+ connectOptions.setConnectionTimeout(mqttProperties.getConnectionTimeout());
+ connectOptions.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
+ connectOptions.setCleanSession(mqttProperties.isCleanSession());
+ connectOptions.setAutomaticReconnect(mqttProperties.isAutomaticReconnect());
+ // 可以设置遗嘱消息 (Last Will and Testament)
+ // options.setWill("topic/lwt", "I'm dead".getBytes(), 1, true);
+ }
+
+ /** 建立连接并在应用启动后自动执行 */
+ @PostConstruct
+ public void connect() {
+ try {
+ log.info("尝试连接 MQTT Broker: {}", mqttProperties.getHost());
+
+ // 1. 创建客户端实例
+ mqttClient = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientId());
+
+ // 2. 设置回调 (处理接收消息和连接状态变化)
+ mqttClient.setCallback(new MqttCallbackExtended() {
+ @Override
+ public void connectComplete(boolean reconnect, String serverURI) {
+ log.info("MQTT 连接成功. (重连: {})", reconnect);
+ // TODO: 可以在这里添加重连成功后重新订阅主题的逻辑
+ }
+
+ @Override
+ public void connectionLost(Throwable cause) {
+ log.error("MQTT 连接丢失: {}", cause.getMessage());
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage message) throws Exception {
+ // TODO: 收到消息处理逻辑,可以分发给不同的处理器
+ log.info("接收到消息: Topic: {}, Qos: {}, Payload: {}",
+ topic, message.getQos(), new String(message.getPayload()));
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ // 消息发布完成 (QoS > 0 时)
+ }
+ });
+
+ // 3. 执行连接
+ mqttClient.connect(connectOptions);
+
+ log.info("MQTT 客户端已启动, ID: {}", mqttProperties.getClientId());
+
+ } catch (MqttException e) {
+ log.error("MQTT 连接失败: {}", e.getMessage());
+ // 通常在 connectOptions.setAutomaticReconnect(true) 的情况下,Paho 会自动尝试重连
+ }
+ }
+
+ /** 优雅断开连接 */
+ @PreDestroy
+ public void disconnect() {
+ try {
+ if (mqttClient != null && mqttClient.isConnected()) {
+ mqttClient.disconnect();
+ mqttClient.close();
+ log.info("MQTT 客户端已断开连接并关闭.");
+ }
+ } catch (MqttException e) {
+ log.error("MQTT 断开连接失败: {}", e.getMessage());
+ }
+ }
+
+ /**
+ * 发布消息
+ * @param topic 主题
+ * @param payload 消息内容
+ * @param qos QoS等级 (0, 1, 2)
+ * @param retained 是否保留消息
+ */
+ public void publish(String topic, String payload, int qos, boolean retained) {
+ if (mqttClient == null || !mqttClient.isConnected()) {
+ log.warn("MQTT 客户端未连接,无法发布消息到主题: {}", topic);
+ return;
+ }
+ try {
+ MqttMessage message = new MqttMessage(payload.getBytes());
+ message.setQos(qos);
+ message.setRetained(retained);
+ mqttClient.publish(topic, message);
+ log.debug("MQTT 消息已发布: Topic: {}, Qos: {}, Payload: {}", topic, qos, payload);
+ } catch (MqttException e) {
+ log.error("MQTT 消息发布失败: Topic: {}", topic, e);
+ }
+ }
+
+ /**
+ * 订阅主题
+ * @param topic 订阅主题
+ * @param qos QoS等级
+ */
+ public void subscribe(String topic, int qos) {
+ if (mqttClient == null || !mqttClient.isConnected()) {
+ log.warn("MQTT 客户端未连接,无法订阅主题: {}", topic);
+ return;
+ }
+ try {
+ mqttClient.subscribe(topic, qos);
+ log.info("成功订阅 MQTT 主题: Topic: {}, Qos: {}", topic, qos);
+ } catch (MqttException e) {
+ log.error("MQTT 订阅失败: Topic: {}", topic, e);
+ }
+ }
+}
diff --git a/ruoyi-common/ruoyi-common-mqtt/src/main/resources/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/ruoyi-common/ruoyi-common-mqtt/src/main/resources/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 000000000..265d0ddcc
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-mqtt/src/main/resources/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+org.dromara.common.mqtt.config.MqttAutoConfiguration
diff --git a/ruoyi-modules/ruoyi-demo/pom.xml b/ruoyi-modules/ruoyi-demo/pom.xml
index 119fe61b7..2967b6491 100644
--- a/ruoyi-modules/ruoyi-demo/pom.xml
+++ b/ruoyi-modules/ruoyi-demo/pom.xml
@@ -16,6 +16,10 @@
+
+ org.dromara
+ ruoyi-common-mqtt
+
diff --git a/ruoyi-modules/ruoyi-demo/src/main/java/org/dromara/demo/server/IotTcpServer.java b/ruoyi-modules/ruoyi-demo/src/main/java/org/dromara/demo/server/IotTcpServer.java
deleted file mode 100644
index 776494693..000000000
--- a/ruoyi-modules/ruoyi-demo/src/main/java/org/dromara/demo/server/IotTcpServer.java
+++ /dev/null
@@ -1,313 +0,0 @@
-package org.dromara.demo.server;
-
-import cn.hutool.core.util.ObjectUtil;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.dromara.demo.domain.ClientConnection;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.CommandLineRunner;
-import org.springframework.stereotype.Component;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import jakarta.annotation.PreDestroy;
-
-/**
- * TCP服务器,用于接收单片机4G模块发送的传感器数据
- * 协议:每帧以 "hellohello" 开始,以 "byebye" 结束,帧内为 JSON 文本
- * 说明:使用起止分隔符解析,解决无换行、粘包、半包问题
- */
-@Slf4j
-@Component
-public class IotTcpServer implements CommandLineRunner {
-
- @Value("${tcp.server.port:8888}")
- private int tcpPort;
-
- private final ObjectMapper objectMapper = new ObjectMapper();
- private final ExecutorService executorService = Executors.newCachedThreadPool();
- private final ScheduledExecutorService heartbeatService = Executors.newScheduledThreadPool(2);
- private final AtomicBoolean started = new AtomicBoolean(false);
- private ServerSocket serverSocket;
-
- // 用于存储客户端连接和起信息
- @Getter
- private final ConcurrentHashMap clientMaps = new ConcurrentHashMap<>();
-
- @Override
- public void run(String... args) throws Exception {
- executorService.submit(this::startTcpServer);
- // 启动心跳检测任务,每10秒检查一次
- heartbeatService.scheduleAtFixedRate(this::checkHeartbeats, 10, 30, TimeUnit.SECONDS);
- }
-
- /**
- * 启动TCP服务器
- */
- public void startTcpServer() {
- if (!started.compareAndSet(false, true)) {
- log.warn("TCP服务器已启动,忽略重复启动");
- return;
- }
- try {
- serverSocket = new ServerSocket(tcpPort);
- log.info("TCP服务器启动成功,监听端口: {}", tcpPort);
-
- // 持续监听客户端连接
- while (!serverSocket.isClosed()) {
- try {
- Socket clientSocket = serverSocket.accept();
- configureClientSocket(clientSocket);
- // 为每个客户端连接创建独立的处理线程
- executorService.submit(() -> handleClient(clientSocket));
- } catch (IOException e) {
- if (!serverSocket.isClosed()) {
- log.error("接受客户端连接时发生错误", e);
- }
- }
- }
- } catch (IOException e) {
- log.error("启动TCP服务器失败", e);
- }
- }
-
- /**
- * 处理客户端连接
- * 协议:以 "hellohello" 开始,以 "byebye" 结束;帧内为 JSON
- * 处理粘包/半包:累积缓冲区,查找起止分隔符,可一次解析多帧。
- */
- private void handleClient(Socket clientSocket) {
- //TODO 应该处理为 首个hellohello 和最后一个bygbye
- //并且 如果开头和结尾不为这样的数据,则丢弃
- final String START = "hellohello";
- final String END = "byebye";
- final String HEARTBEAT = "heartbeat";
- final Charset charset = StandardCharsets.UTF_8;
- String clientId = null;
- try (InputStream in = clientSocket.getInputStream()) {
- StringBuilder sb = new StringBuilder(4096);
- byte[] buf = new byte[4096];
- int n;
-
- // 第一个完整的帧包含设备唯一标识符
- boolean identifierReceived = false;
-
-
- while ((n = in.read(buf)) != -1) {
- if (n <= 0) continue;
- String chunk = new String(buf, 0, n, charset);
- sb.append(chunk);
-
- while (true) {
- int s = sb.indexOf(START);
- if (s < 0) {
- // 可选:避免缓冲区无限增长(很少触达)
- if (sb.length() > 65536) {
- sb.delete(0, sb.length() - 65536);
- }
- break;
- }
- int e = sb.indexOf(END, s + START.length());
- if (e < 0) {
- // 起始存在但尚无结束,保留从起始开始的内容,丢弃起始前的噪声
- if (s > 0) sb.delete(0, s);
- break;
- }
- // 帧内容为 [s+START.length, e)
- int payloadStart = s + START.length();
- String json = sb.substring(payloadStart, e).trim();
-
- if (!identifierReceived) {
- // 解析首个帧作为设备标识符
- clientId = json;
- ClientConnection clientConnection = clientMaps.get(clientId);
- if (ObjectUtil.isNotNull(clientConnection) && ObjectUtil.isNotNull(clientConnection.getSocket())) {
- log.info("客户端连接重复: {},断开旧连接。", clientId);
- clientConnection.getSocket().close(); //有重复连接,断开旧连接
-
- }else {
- clientConnection = new ClientConnection();
- }
- clientConnection.setSocket(clientSocket);
- clientConnection.setConnectTime(System.currentTimeMillis());
- clientConnection.setLastActiveTime(System.currentTimeMillis());
- clientMaps.put(clientId, clientConnection); // 关联标识符与Socket
- log.info("新客户端连接: {}", clientId);
- identifierReceived = true;
- continue;
- }
-
- // 若尾部含其他字符,兜底裁到最后一个 '}'
- int lastBrace = json.lastIndexOf('}');
- if (lastBrace >= 0) {
- json = json.substring(0, lastBrace + 1);
- }
-
- // 检查是否是心跳包
- if (HEARTBEAT.equals(json.trim())) {
- log.info("收到客户端 {} 的心跳包", clientId);
- // 更新客户端最后活动时间
- ClientConnection clientConnection = clientMaps.get(clientId);
- clientConnection.setLastActiveTime(System.currentTimeMillis());
- // 发送心跳响应
- String finalClientId = clientId;
- heartbeatService.schedule(() -> writeFrame(finalClientId, "heartbeat_ack", charset), 100, TimeUnit.MILLISECONDS);
- sb.delete(0, e + END.length());
- continue;
- }else {
- log.info("接收到帧: {}", json);
- try {
- // 解析成功后,封装并发送温湿度响应
- try {
- JsonNode root = objectMapper.readTree(json);
- log.info("解析数据成功:{}",root);
- } catch (Exception sendEx) {
- log.warn("构造/发送温湿度响应失败: {}", sendEx.getMessage());
- }
- } catch (Exception ex) {
- log.error("解析传感器数据失败: {}", json, ex);
- }
-
- }
-
- // 删除到结束标记之后,继续解析后续帧
- sb.delete(0, e + END.length());
- }
- }
-
- // 连接关闭时的尾帧兜底
- int s = sb.indexOf(START);
- int e = (s >= 0) ? sb.indexOf(END, s + START.length()) : -1;
- if (s >= 0 && e > s) {
- String json = sb.substring(s + START.length(), e).trim();
- int lastBrace = json.lastIndexOf('}');
- if (lastBrace >= 0) json = json.substring(0, lastBrace + 1);
- if (!json.isEmpty()) {
- log.info("接收到尾帧: {}", json);
- try {
- } catch (Exception ex) {
- log.error("解析传感器数据失败(尾帧): {}", json, ex);
- }
- }
- }
- } catch (IOException e) {
- log.error("处理客户端连接时发生错误", e);
- } finally {
- if (clientId != null) {
- clientMaps.remove(clientId);
- }
- try {
- // 客户端断开连接时执行的代码
- clientSocket.close();
- log.info("客户端连接已关闭: {}", clientId);
- } catch (IOException e) {
- log.error("关闭客户端连接时发生错误", e);
- }
- }
- }
-
- /**
- * 检查客户端心跳,断开超时的连接
- */
- private void checkHeartbeats() {
- long currentTime = System.currentTimeMillis();
- long timeout = 90 * 1000; // 90秒超时
-
- clientMaps.entrySet().removeIf(entry -> {
- String clientId = entry.getKey();
- ClientConnection clientConnection = entry.getValue();
- if (ObjectUtil.isNotNull(clientConnection) && ObjectUtil.isNotNull(clientConnection.getLastActiveTime())) {
- // 检查是否超时
- if (currentTime - clientConnection.getLastActiveTime() > timeout) {
- log.warn("客户端 {} 心跳超时,断开连接", clientId);
- try {
- clientConnection.getSocket().close();
- } catch (IOException e) {
- log.error("关闭超时客户端连接时发生错误", e);
- }
- return true; // 移除该客户端
- }
- return false; // 保留该客户端
- }else{
- return true;
- }
- });
- }
-
- /**
- * 封装:按协议发送一帧数据(hellohello + json + byebye)
- */
- private void writeFrame(String clientId, String json, Charset charset) {
- String frame = "hellohello" + json + "byebye";
- try {
- ClientConnection clientConnection = clientMaps.get(clientId);
- if (ObjectUtil.isNotNull(clientConnection)) {
- clientConnection.getSocket().getOutputStream().write(frame.getBytes(charset));
- clientConnection.getSocket().getOutputStream().flush();
- }
- log.info("向客户端 {} 发送帧:{}", clientId, frame);
- } catch (IOException e) {
- log.warn("向客户端 {} 发送帧失败: {}", clientId, e.getMessage());
- }
- }
-
- /**
- * 向特定单片机发送数据
- * @param clientId 单片机的唯一标识符
- * @param message 要发送的消息
- * @throws IOException 如果发送过程中遇到IO错误
- */
- public void sendDataToClient(String clientId, String message) throws IOException {
- ClientConnection clientConnection = clientMaps.get(clientId);
- if (ObjectUtil.isNotNull(clientConnection) && clientConnection.getSocket() != null && !clientConnection.getSocket().isClosed()) {
- writeFrame(clientId, message, StandardCharsets.UTF_8);
- } else {
- log.warn("无法向客户端 {} 发送数据,因为找不到对应的Socket或已关闭", clientId);
- }
- }
-
- /**
- * 配置客户端Socket参数,增强长连接稳定性与实时性
- */
- private void configureClientSocket(Socket socket) {
- try {
- socket.setKeepAlive(true);
- socket.setTcpNoDelay(true);
- // 设置读超时,避免永久阻塞
- // socket.setSoTimeout(60000); // 60秒超时
- } catch (Exception e) {
- log.warn("配置客户端Socket参数失败: {}", e.getMessage());
- }
- }
-
- /**
- * 停止TCP服务器
- */
- @PreDestroy
- public void stopTcpServer() {
- try {
- if (serverSocket != null && !serverSocket.isClosed()) {
- serverSocket.close();
- log.info("TCP服务器已停止");
- }
- executorService.shutdown();
- heartbeatService.shutdown();
- started.set(false);
- } catch (IOException e) {
- log.error("停止TCP服务器时发生错误", e);
- }
- }
-
-}
diff --git a/ruoyi-modules/ruoyi-demo/src/main/java/org/dromara/demo/service/DeviceService.java b/ruoyi-modules/ruoyi-demo/src/main/java/org/dromara/demo/service/DeviceService.java
new file mode 100644
index 000000000..d2ad7f5a4
--- /dev/null
+++ b/ruoyi-modules/ruoyi-demo/src/main/java/org/dromara/demo/service/DeviceService.java
@@ -0,0 +1,56 @@
+package org.dromara.demo.service;
+
+import jakarta.annotation.PostConstruct;
+import org.dromara.common.mqtt.server.MqttGateway;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.stereotype.Service;
+
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Mono;
+
+import java.util.HashMap;
+
+@Service
+public class DeviceService {
+
+ @Autowired
+ private MqttGateway mqttGateway;
+ @Autowired
+ private WebClient webClient;
+
+ // 示例:在服务启动后自动订阅一个主题
+ @PostConstruct
+ public void initSubscription() {
+ // 订阅一个主题,QoS=1
+ mqttGateway.subscribe("iot-hazard-server", 1);
+ }
+
+ /**
+ * 向设备发送控制命令
+ * @param deviceId 设备ID
+ * @param command 命令内容
+ */
+ public void sendCommand(String deviceId, String command) {
+ String topic = "iot-hazard/" + deviceId;
+ // 发布消息,QoS=1,不保留
+ mqttGateway.publish(topic, command, 1, false);
+ }
+
+ /**
+ * 查看设备连接信息,404表示未连接
+ * @param deviceId
+ * @return
+ */
+ public HashMap getDeviceLinkInfo(String deviceId) {
+ return webClient.get()
+ .uri("/api/v5/clients/" + deviceId)
+ .retrieve()
+ .onStatus(
+ status -> status.value() == 404,
+ response -> Mono.empty()
+ )
+ .bodyToMono(new ParameterizedTypeReference>() {})
+ .block();
+ }
+}
diff --git a/ruoyi-modules/ruoyi-hazard/pom.xml b/ruoyi-modules/ruoyi-hazard/pom.xml
index 73cbdbe78..6d7ab1db0 100644
--- a/ruoyi-modules/ruoyi-hazard/pom.xml
+++ b/ruoyi-modules/ruoyi-hazard/pom.xml
@@ -16,6 +16,11 @@
+
+ org.dromara
+ ruoyi-common-mqtt
+
+
org.dromara