From c36e9d87a57342a7d9820479536feff9d65983f0 Mon Sep 17 00:00:00 2001 From: wangrunpu <2095588299@qq.com> Date: Fri, 16 Jan 2026 10:46:41 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E5=91=8A=E8=AD=A6=E6=8E=A8=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ProvincialPlatformMqttProperties.java | 97 ++++++ .../core/ProvincialPlatformMqttClient.java | 279 ++++++++++++++++++ .../iot/job/alert/IotAlertRecordPushJob.java | 72 +++++ .../alert/IotAlertRecordServiceImpl.java | 2 +- 4 files changed, 449 insertions(+), 1 deletion(-) create mode 100644 yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/config/ProvincialPlatformMqttProperties.java create mode 100644 yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/core/ProvincialPlatformMqttClient.java create mode 100644 yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/job/alert/IotAlertRecordPushJob.java diff --git a/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/config/ProvincialPlatformMqttProperties.java b/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/config/ProvincialPlatformMqttProperties.java new file mode 100644 index 000000000..e0513efa1 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/config/ProvincialPlatformMqttProperties.java @@ -0,0 +1,97 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * 省平台 MQTT 配置属性 + * + * @author 芋道源码 + */ +@Data +@Component +@ConfigurationProperties(prefix = "yudao.iot.provincial-platform.mqtt") +public class ProvincialPlatformMqttProperties { + + /** + * 是否启用省平台MQTT推送 + */ + private Boolean enabled; + + /** + * MQTT Broker 地址 + */ + private String host; + + /** + * MQTT Broker 端口 + */ + private Integer port; + + /** + * MQTT 用户名 + */ + private String username; + + /** + * MQTT 密码 + */ + private String password; + + /** + * MQTT 客户端 ID + */ + private String clientId; + + /** + * 是否开启 SSL + */ + private Boolean ssl; + + /** + * 是否启用 Clean Session (默认: true) + */ + private Boolean cleanSession; + + /** + * 心跳间隔,单位秒 (默认: 60) + */ + private Integer keepAliveIntervalSeconds; + + /** + * 连接超时,单位:秒 + */ + private Integer connectTimeoutSeconds; + + /** + * QoS 等级 (0, 1, 2) + */ + private Integer qos = 1; + + /** + * 重连延迟,单位:毫秒 + */ + private Long reconnectDelayMs; + + /** + * 告警推送主题 + */ + private String alertTopic = "provincial/alert"; + + /** + * 告警推送间隔,单位:秒 (定时任务执行间隔) + */ + private Integer alertPushIntervalSeconds; + + /** + * 告警数据推送主题 + */ + private String alertPropertyTopic; + + /** + * 告警数据推送间隔 + */ + private Integer alertPropertyPushIntervalSeconds; + +} diff --git a/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/core/ProvincialPlatformMqttClient.java b/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/core/ProvincialPlatformMqttClient.java new file mode 100644 index 000000000..01993cc5e --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/framework/mqtt/core/ProvincialPlatformMqttClient.java @@ -0,0 +1,279 @@ +package cn.iocoder.yudao.module.iot.framework.mqtt.core; + +import cn.hutool.core.lang.Assert; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.module.iot.dal.dataobject.alert.IotAlertRecordDO; +import cn.iocoder.yudao.module.iot.framework.mqtt.config.ProvincialPlatformMqttProperties; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.mqtt.MqttClient; +import io.vertx.mqtt.MqttClientOptions; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import jakarta.annotation.Resource; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * 省平台 MQTT 客户端 + * 用于向省平台推送告警记录 + * + * @author 芋道源码 + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "yudao.iot.provincial-platform.mqtt", name = "enabled", havingValue = "true") +public class ProvincialPlatformMqttClient { + + @Resource + private ProvincialPlatformMqttProperties mqttProperties; + + @Resource(name = "mqttVertx") + private Vertx vertx; + + private MqttClient mqttClient; + + private volatile boolean isRunning = false; + + @PostConstruct + public void start() { + if (!mqttProperties.getEnabled()) { + log.info("[start][省平台MQTT推送未启用,跳过启动]"); + return; + } + + if (isRunning) { + return; + } + + try { + // 1. 创建 MQTT 客户端 + createMqttClient(); + + // 2. 同步连接 MQTT Broker + connectMqttSync(); + + // 3. 标记服务为运行状态 + isRunning = true; + log.info("[start][省平台 MQTT 客户端启动成功]"); + } catch (Exception e) { + log.error("[start][省平台 MQTT 客户端启动失败]", e); + throw new RuntimeException("省平台 MQTT 客户端启动失败: " + e.getMessage(), e); + } + } + + @PreDestroy + public void stop() { + if (!isRunning) { + return; + } + + stopMqttClient(); + isRunning = false; + log.info("[stop][省平台 MQTT 客户端已停止]"); + } + + /** + * 创建 MQTT 客户端 + */ + private void createMqttClient() { + MqttClientOptions options = new MqttClientOptions() + .setClientId(mqttProperties.getClientId()) + .setUsername(mqttProperties.getUsername()) + .setPassword(mqttProperties.getPassword()) + .setSsl(mqttProperties.getSsl()) + .setCleanSession(mqttProperties.getCleanSession()) + .setKeepAliveInterval(mqttProperties.getKeepAliveIntervalSeconds()) + .setConnectTimeout(mqttProperties.getConnectTimeoutSeconds() * 1000); // Vert.x 需要毫秒 + + this.mqttClient = MqttClient.create(vertx, options); + } + + /** + * 同步连接 MQTT Broker + */ + private void connectMqttSync() { + String host = mqttProperties.getHost(); + int port = mqttProperties.getPort(); + + Assert.notBlank(host, "省平台MQTT服务器地址不能为空"); + + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean success = new AtomicBoolean(false); + + mqttClient.connect(port, host, connectResult -> { + if (connectResult.succeeded()) { + log.info("[connectMqttSync][省平台 MQTT 客户端连接成功, host: {}, port: {}]", host, port); + setupMqttHandlers(); + success.set(true); + } else { + log.error("[connectMqttSync][连接省平台 MQTT Broker 失败, host: {}, port: {}]", + host, port, connectResult.cause()); + } + latch.countDown(); + }); + + try { + boolean awaitResult = latch.await(10, TimeUnit.SECONDS); + if (!awaitResult) { + log.error("[connectMqttSync][等待连接结果超时]"); + throw new RuntimeException("连接省平台 MQTT Broker 超时"); + } + if (!success.get()) { + throw new RuntimeException(String.format("首次连接省平台 MQTT Broker 失败,地址: %s, 端口: %d", host, port)); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("[connectMqttSync][等待连接结果被中断]", e); + throw new RuntimeException("连接省平台 MQTT Broker 被中断", e); + } + } + + /** + * 异步连接 MQTT Broker + */ + private void connectMqttAsync() { + String host = mqttProperties.getHost(); + int port = mqttProperties.getPort(); + + mqttClient.connect(port, host, connectResult -> { + if (connectResult.succeeded()) { + log.info("[connectMqttAsync][省平台 MQTT 客户端重连成功]"); + setupMqttHandlers(); + } else { + log.error("[connectMqttAsync][连接省平台 MQTT Broker 失败, host: {}, port: {}]", + host, port, connectResult.cause()); + log.warn("[connectMqttAsync][重连失败,将再次尝试]"); + reconnectWithDelay(); + } + }); + } + + /** + * 延迟重连 + */ + private void reconnectWithDelay() { + if (!isRunning) { + return; + } + if (mqttClient != null && mqttClient.isConnected()) { + return; + } + + long delay = mqttProperties.getReconnectDelayMs(); + log.info("[reconnectWithDelay][将在 {} 毫秒后尝试重连省平台 MQTT Broker]", delay); + vertx.setTimer(delay, timerId -> { + if (!isRunning) { + return; + } + if (mqttClient != null && mqttClient.isConnected()) { + return; + } + + log.info("[reconnectWithDelay][开始重连省平台 MQTT Broker]"); + try { + createMqttClient(); + connectMqttAsync(); + } catch (Exception e) { + log.error("[reconnectWithDelay][重连过程中发生异常]", e); + vertx.setTimer(delay, t -> reconnectWithDelay()); + } + }); + } + + /** + * 设置 MQTT 处理器 + */ + private void setupMqttHandlers() { + // 1. 设置断开重连监听器 + mqttClient.closeHandler(closeEvent -> { + if (!isRunning) { + return; + } + log.warn("[closeHandler][省平台 MQTT 连接已断开, 准备重连]"); + reconnectWithDelay(); + }); + + // 2. 设置异常处理器 + mqttClient.exceptionHandler(exception -> + log.error("[exceptionHandler][省平台 MQTT 客户端异常]", exception)); + } + + /** + * 停止 MQTT 客户端 + */ + private void stopMqttClient() { + if (mqttClient == null) { + return; + } + try { + if (mqttClient.isConnected()) { + try { + CountDownLatch disconnectLatch = new CountDownLatch(1); + mqttClient.disconnect(ar -> disconnectLatch.countDown()); + if (!disconnectLatch.await(5, TimeUnit.SECONDS)) { + log.warn("[stopMqttClient][断开省平台 MQTT 连接超时]"); + } + } catch (Exception e) { + log.warn("[stopMqttClient][关闭省平台 MQTT 客户端异常]", e); + } + } + } catch (Exception e) { + log.warn("[stopMqttClient][停止省平台 MQTT 客户端过程中发生异常]", e); + } finally { + mqttClient = null; + } + } + + /** + * 发布告警记录到省平台 + * + * @param alertRecord 告警记录 + */ + public void publishAlertRecord(IotAlertRecordDO alertRecord) { + if (!isRunning || mqttClient == null || !mqttClient.isConnected()) { + log.warn("[publishAlertRecord][省平台 MQTT 客户端未连接, 无法发布告警记录,告警ID: {}]", alertRecord.getId()); + return; + } + + try { + // 1. 构建消息内容 + String payload = JsonUtils.toJsonString(alertRecord); + + // 2. 发布到 MQTT 主题 + String topic = mqttProperties.getAlertTopic(); + mqttClient.publish( + topic, + Buffer.buffer(payload), + io.vertx.mqtt.MqttQoS.valueOf(mqttProperties.getQos()), + false, + false, + publishResult -> { + if (publishResult.succeeded()) { + log.info("[publishAlertRecord][告警记录推送成功,告警ID: {}, 主题: {}]", alertRecord.getId(), topic); + } else { + log.error("[publishAlertRecord][告警记录推送失败,告警ID: {}, 主题: {}]", + alertRecord.getId(), topic, publishResult.cause()); + } + } + ); + } catch (Exception e) { + log.error("[publishAlertRecord][发布告警记录异常,告警ID: {}]", alertRecord.getId(), e); + } + } + + /** + * 检查客户端是否已连接 + * + * @return 是否已连接 + */ + public boolean isConnected() { + return isRunning && mqttClient != null && mqttClient.isConnected(); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/job/alert/IotAlertRecordPushJob.java b/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/job/alert/IotAlertRecordPushJob.java new file mode 100644 index 000000000..0ca6df591 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/job/alert/IotAlertRecordPushJob.java @@ -0,0 +1,72 @@ +package cn.iocoder.yudao.module.iot.job.alert; + +import cn.iocoder.yudao.module.iot.dal.dataobject.alert.IotAlertRecordDO; +import cn.iocoder.yudao.module.iot.framework.mqtt.core.ProvincialPlatformMqttClient; +import cn.iocoder.yudao.module.iot.service.alert.IotAlertRecordService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.quartz.JobExecutionContext; +import org.springframework.scheduling.quartz.QuartzJobBean; + +/** + * IoT 告警记录推送 Job + * 用于定时推送告警记录到省平台 MQTT 服务器 + * + * @author 芋道源码 + */ +@Slf4j +public class IotAlertRecordPushJob extends QuartzJobBean { + + /** + * JobData Key - 告警记录编号 + */ + public static final String JOB_DATA_KEY_ALERT_RECORD_ID = "alertRecordId"; + + @Resource + private IotAlertRecordService alertRecordService; + + @Resource(required = false) + private ProvincialPlatformMqttClient provincialPlatformMqttClient; + + @Override + protected void executeInternal(JobExecutionContext context) { + // 1. 获得告警记录编号 + Long alertRecordId = context.getMergedJobDataMap().getLong(JOB_DATA_KEY_ALERT_RECORD_ID); + if (alertRecordId == null) { + log.error("[executeInternal][告警记录编号为空,跳过推送]"); + return; + } + + // 2. 查询告警记录 + IotAlertRecordDO alertRecord = alertRecordService.getAlertRecord(alertRecordId); + if (alertRecord == null) { + log.warn("[executeInternal][告警记录不存在,告警ID: {}]", alertRecordId); + return; + } + + // 3. 检查告警是否已处理 + if (Boolean.TRUE.equals(alertRecord.getProcessStatus())) { + log.info("[executeInternal][告警已处理,停止推送,告警ID: {}]", alertRecordId); + return; + } + + // 4. 推送告警记录到省平台 + if (provincialPlatformMqttClient == null) { + log.warn("[executeInternal][省平台MQTT客户端未配置,跳过推送,告警ID: {}]", alertRecordId); + return; + } + + if (!provincialPlatformMqttClient.isConnected()) { + log.warn("[executeInternal][省平台MQTT客户端未连接,跳过本次推送,告警ID: {}]", alertRecordId); + return; + } + + try { + provincialPlatformMqttClient.publishAlertRecord(alertRecord); + log.info("[executeInternal][成功推送告警记录到省平台,告警ID: {}]", alertRecordId); + } catch (Exception e) { + log.error("[executeInternal][推送告警记录到省平台异常,告警ID: {}]", alertRecordId, e); + } + } + +} diff --git a/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/service/alert/IotAlertRecordServiceImpl.java b/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/service/alert/IotAlertRecordServiceImpl.java index 7007765f5..6b75b8dc0 100644 --- a/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/service/alert/IotAlertRecordServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-server/src/main/java/cn/iocoder/yudao/module/iot/service/alert/IotAlertRecordServiceImpl.java @@ -124,7 +124,7 @@ public class IotAlertRecordServiceImpl implements IotAlertRecordService { Map jobDataMap = MapUtil.of(IotAlertRecordPushJob.JOB_DATA_KEY_ALERT_RECORD_ID, alertRecordId); // 构建CRON表达式:每N秒执行一次(从配置中读取) - Integer pushIntervalSeconds = mqttProperties.getPushIntervalSeconds(); + Integer pushIntervalSeconds = mqttProperties.getAlertPushIntervalSeconds(); String cronExpression = String.format("0/%d * * * * ?", pushIntervalSeconds); // 注册定时任务