diff --git a/yudao-module-iot/yudao-module-iot-server/pom.xml b/yudao-module-iot/yudao-module-iot-server/pom.xml index c49856b65..2a6db9ec7 100644 --- a/yudao-module-iot/yudao-module-iot-server/pom.xml +++ b/yudao-module-iot/yudao-module-iot-server/pom.xml @@ -19,6 +19,11 @@ + + + io.vertx + vertx-mqtt + cn.iocoder.cloud diff --git a/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/config/ProvincialPlatformMqttProperties.java b/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/config/ProvincialPlatformMqttProperties.java index e0513efa1..4a24bf2a3 100644 --- a/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/config/ProvincialPlatformMqttProperties.java +++ b/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/config/ProvincialPlatformMqttProperties.java @@ -67,7 +67,7 @@ public class ProvincialPlatformMqttProperties { /** * QoS 等级 (0, 1, 2) */ - private Integer qos = 1; + private Integer qos; /** * 重连延迟,单位:毫秒 @@ -77,7 +77,7 @@ public class ProvincialPlatformMqttProperties { /** * 告警推送主题 */ - private String alertTopic = "provincial/alert"; + private String alertTopic; /** * 告警推送间隔,单位:秒 (定时任务执行间隔) diff --git a/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/core/ProvincialPlatformMqttClient.java b/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/core/ProvincialPlatformMqttClient.java index 01993cc5e..d7e9610bf 100644 --- a/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/core/ProvincialPlatformMqttClient.java +++ b/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/core/ProvincialPlatformMqttClient.java @@ -4,6 +4,7 @@ import cn.hutool.core.lang.Assert; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.module.iot.dal.dataobject.alert.IotAlertRecordDO; import cn.iocoder.yudao.module.iot.framework.mqtt.config.ProvincialPlatformMqttProperties; +import io.netty.handler.codec.mqtt.MqttQoS; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.mqtt.MqttClient; @@ -33,7 +34,6 @@ public class ProvincialPlatformMqttClient { @Resource private ProvincialPlatformMqttProperties mqttProperties; - @Resource(name = "mqttVertx") private Vertx vertx; private MqttClient mqttClient; @@ -52,13 +52,16 @@ public class ProvincialPlatformMqttClient { } try { - // 1. 创建 MQTT 客户端 + // 1. 创建 Vertx 实例 + this.vertx = Vertx.vertx(); + + // 2. 创建 MQTT 客户端 createMqttClient(); - // 2. 同步连接 MQTT Broker + // 3. 同步连接 MQTT Broker connectMqttSync(); - // 3. 标记服务为运行状态 + // 4. 标记服务为运行状态 isRunning = true; log.info("[start][省平台 MQTT 客户端启动成功]"); } catch (Exception e) { @@ -74,6 +77,13 @@ public class ProvincialPlatformMqttClient { } stopMqttClient(); + + // 关闭 Vertx 实例 + if (vertx != null) { + vertx.close(); + vertx = null; + } + isRunning = false; log.info("[stop][省平台 MQTT 客户端已停止]"); } @@ -82,14 +92,14 @@ public class ProvincialPlatformMqttClient { * 创建 MQTT 客户端 */ private void createMqttClient() { - MqttClientOptions options = new MqttClientOptions() - .setClientId(mqttProperties.getClientId()) - .setUsername(mqttProperties.getUsername()) - .setPassword(mqttProperties.getPassword()) - .setSsl(mqttProperties.getSsl()) - .setCleanSession(mqttProperties.getCleanSession()) - .setKeepAliveInterval(mqttProperties.getKeepAliveIntervalSeconds()) - .setConnectTimeout(mqttProperties.getConnectTimeoutSeconds() * 1000); // Vert.x 需要毫秒 + MqttClientOptions options = new MqttClientOptions(); + options.setClientId(mqttProperties.getClientId()); + options.setUsername(mqttProperties.getUsername()); + options.setPassword(mqttProperties.getPassword()); + options.setSsl(mqttProperties.getSsl()); + options.setCleanSession(mqttProperties.getCleanSession()); + options.setKeepAliveInterval(mqttProperties.getKeepAliveIntervalSeconds()); + options.setConnectTimeout(mqttProperties.getConnectTimeoutSeconds() * 1000); // Vert.x 需要毫秒 this.mqttClient = MqttClient.create(vertx, options); } @@ -250,7 +260,7 @@ public class ProvincialPlatformMqttClient { mqttClient.publish( topic, Buffer.buffer(payload), - io.vertx.mqtt.MqttQoS.valueOf(mqttProperties.getQos()), + MqttQoS.valueOf(mqttProperties.getQos()), false, false, publishResult -> { diff --git a/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/job/alert/IotAlertRecordPushJob.java b/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/job/alert/IotAlertRecordPushJob.java index 0ca6df591..ec115407f 100644 --- a/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/job/alert/IotAlertRecordPushJob.java +++ b/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/job/alert/IotAlertRecordPushJob.java @@ -25,7 +25,7 @@ public class IotAlertRecordPushJob extends QuartzJobBean { @Resource private IotAlertRecordService alertRecordService; - @Resource(required = false) + @Resource private ProvincialPlatformMqttClient provincialPlatformMqttClient; @Override diff --git a/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/service/alert/IotAlertRecordServiceImpl.java b/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/service/alert/IotAlertRecordServiceImpl.java index 6b75b8dc0..2ff9747cb 100644 --- a/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/service/alert/IotAlertRecordServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/service/alert/IotAlertRecordServiceImpl.java @@ -42,7 +42,7 @@ public class IotAlertRecordServiceImpl implements IotAlertRecordService { @Resource(name = "iotSchedulerManager") private IotSchedulerManager schedulerManager; - @Resource(required = false) + @Resource private ProvincialPlatformMqttProperties mqttProperties; @Override