fix: 告警推送

master
wangrunpu 3 weeks ago
parent f2676dec9d
commit c36e9d87a5

@ -0,0 +1,97 @@
package cn.iocoder.yudao.module.iot.framework.mqtt.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* MQTT
*
* @author
*/
@Data
@Component
@ConfigurationProperties(prefix = "yudao.iot.provincial-platform.mqtt")
public class ProvincialPlatformMqttProperties {
/**
* MQTT
*/
private Boolean enabled;
/**
* MQTT Broker
*/
private String host;
/**
* MQTT Broker
*/
private Integer port;
/**
* MQTT
*/
private String username;
/**
* MQTT
*/
private String password;
/**
* MQTT ID
*/
private String clientId;
/**
* SSL
*/
private Boolean ssl;
/**
* Clean Session (: true)
*/
private Boolean cleanSession;
/**
* (: 60)
*/
private Integer keepAliveIntervalSeconds;
/**
*
*/
private Integer connectTimeoutSeconds;
/**
* QoS (0, 1, 2)
*/
private Integer qos = 1;
/**
*
*/
private Long reconnectDelayMs;
/**
*
*/
private String alertTopic = "provincial/alert";
/**
* ()
*/
private Integer alertPushIntervalSeconds;
/**
*
*/
private String alertPropertyTopic;
/**
*
*/
private Integer alertPropertyPushIntervalSeconds;
}

@ -0,0 +1,279 @@
package cn.iocoder.yudao.module.iot.framework.mqtt.core;
import cn.hutool.core.lang.Assert;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.dal.dataobject.alert.IotAlertRecordDO;
import cn.iocoder.yudao.module.iot.framework.mqtt.config.ProvincialPlatformMqttProperties;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import jakarta.annotation.Resource;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* MQTT
*
*
* @author
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "yudao.iot.provincial-platform.mqtt", name = "enabled", havingValue = "true")
public class ProvincialPlatformMqttClient {
@Resource
private ProvincialPlatformMqttProperties mqttProperties;
@Resource(name = "mqttVertx")
private Vertx vertx;
private MqttClient mqttClient;
private volatile boolean isRunning = false;
@PostConstruct
public void start() {
if (!mqttProperties.getEnabled()) {
log.info("[start][省平台MQTT推送未启用跳过启动]");
return;
}
if (isRunning) {
return;
}
try {
// 1. 创建 MQTT 客户端
createMqttClient();
// 2. 同步连接 MQTT Broker
connectMqttSync();
// 3. 标记服务为运行状态
isRunning = true;
log.info("[start][省平台 MQTT 客户端启动成功]");
} catch (Exception e) {
log.error("[start][省平台 MQTT 客户端启动失败]", e);
throw new RuntimeException("省平台 MQTT 客户端启动失败: " + e.getMessage(), e);
}
}
@PreDestroy
public void stop() {
if (!isRunning) {
return;
}
stopMqttClient();
isRunning = false;
log.info("[stop][省平台 MQTT 客户端已停止]");
}
/**
* MQTT
*/
private void createMqttClient() {
MqttClientOptions options = new MqttClientOptions()
.setClientId(mqttProperties.getClientId())
.setUsername(mqttProperties.getUsername())
.setPassword(mqttProperties.getPassword())
.setSsl(mqttProperties.getSsl())
.setCleanSession(mqttProperties.getCleanSession())
.setKeepAliveInterval(mqttProperties.getKeepAliveIntervalSeconds())
.setConnectTimeout(mqttProperties.getConnectTimeoutSeconds() * 1000); // Vert.x 需要毫秒
this.mqttClient = MqttClient.create(vertx, options);
}
/**
* MQTT Broker
*/
private void connectMqttSync() {
String host = mqttProperties.getHost();
int port = mqttProperties.getPort();
Assert.notBlank(host, "省平台MQTT服务器地址不能为空");
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean success = new AtomicBoolean(false);
mqttClient.connect(port, host, connectResult -> {
if (connectResult.succeeded()) {
log.info("[connectMqttSync][省平台 MQTT 客户端连接成功, host: {}, port: {}]", host, port);
setupMqttHandlers();
success.set(true);
} else {
log.error("[connectMqttSync][连接省平台 MQTT Broker 失败, host: {}, port: {}]",
host, port, connectResult.cause());
}
latch.countDown();
});
try {
boolean awaitResult = latch.await(10, TimeUnit.SECONDS);
if (!awaitResult) {
log.error("[connectMqttSync][等待连接结果超时]");
throw new RuntimeException("连接省平台 MQTT Broker 超时");
}
if (!success.get()) {
throw new RuntimeException(String.format("首次连接省平台 MQTT Broker 失败,地址: %s, 端口: %d", host, port));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("[connectMqttSync][等待连接结果被中断]", e);
throw new RuntimeException("连接省平台 MQTT Broker 被中断", e);
}
}
/**
* MQTT Broker
*/
private void connectMqttAsync() {
String host = mqttProperties.getHost();
int port = mqttProperties.getPort();
mqttClient.connect(port, host, connectResult -> {
if (connectResult.succeeded()) {
log.info("[connectMqttAsync][省平台 MQTT 客户端重连成功]");
setupMqttHandlers();
} else {
log.error("[connectMqttAsync][连接省平台 MQTT Broker 失败, host: {}, port: {}]",
host, port, connectResult.cause());
log.warn("[connectMqttAsync][重连失败,将再次尝试]");
reconnectWithDelay();
}
});
}
/**
*
*/
private void reconnectWithDelay() {
if (!isRunning) {
return;
}
if (mqttClient != null && mqttClient.isConnected()) {
return;
}
long delay = mqttProperties.getReconnectDelayMs();
log.info("[reconnectWithDelay][将在 {} 毫秒后尝试重连省平台 MQTT Broker]", delay);
vertx.setTimer(delay, timerId -> {
if (!isRunning) {
return;
}
if (mqttClient != null && mqttClient.isConnected()) {
return;
}
log.info("[reconnectWithDelay][开始重连省平台 MQTT Broker]");
try {
createMqttClient();
connectMqttAsync();
} catch (Exception e) {
log.error("[reconnectWithDelay][重连过程中发生异常]", e);
vertx.setTimer(delay, t -> reconnectWithDelay());
}
});
}
/**
* MQTT
*/
private void setupMqttHandlers() {
// 1. 设置断开重连监听器
mqttClient.closeHandler(closeEvent -> {
if (!isRunning) {
return;
}
log.warn("[closeHandler][省平台 MQTT 连接已断开, 准备重连]");
reconnectWithDelay();
});
// 2. 设置异常处理器
mqttClient.exceptionHandler(exception ->
log.error("[exceptionHandler][省平台 MQTT 客户端异常]", exception));
}
/**
* MQTT
*/
private void stopMqttClient() {
if (mqttClient == null) {
return;
}
try {
if (mqttClient.isConnected()) {
try {
CountDownLatch disconnectLatch = new CountDownLatch(1);
mqttClient.disconnect(ar -> disconnectLatch.countDown());
if (!disconnectLatch.await(5, TimeUnit.SECONDS)) {
log.warn("[stopMqttClient][断开省平台 MQTT 连接超时]");
}
} catch (Exception e) {
log.warn("[stopMqttClient][关闭省平台 MQTT 客户端异常]", e);
}
}
} catch (Exception e) {
log.warn("[stopMqttClient][停止省平台 MQTT 客户端过程中发生异常]", e);
} finally {
mqttClient = null;
}
}
/**
*
*
* @param alertRecord
*/
public void publishAlertRecord(IotAlertRecordDO alertRecord) {
if (!isRunning || mqttClient == null || !mqttClient.isConnected()) {
log.warn("[publishAlertRecord][省平台 MQTT 客户端未连接, 无法发布告警记录告警ID: {}]", alertRecord.getId());
return;
}
try {
// 1. 构建消息内容
String payload = JsonUtils.toJsonString(alertRecord);
// 2. 发布到 MQTT 主题
String topic = mqttProperties.getAlertTopic();
mqttClient.publish(
topic,
Buffer.buffer(payload),
io.vertx.mqtt.MqttQoS.valueOf(mqttProperties.getQos()),
false,
false,
publishResult -> {
if (publishResult.succeeded()) {
log.info("[publishAlertRecord][告警记录推送成功告警ID: {}, 主题: {}]", alertRecord.getId(), topic);
} else {
log.error("[publishAlertRecord][告警记录推送失败告警ID: {}, 主题: {}]",
alertRecord.getId(), topic, publishResult.cause());
}
}
);
} catch (Exception e) {
log.error("[publishAlertRecord][发布告警记录异常告警ID: {}]", alertRecord.getId(), e);
}
}
/**
*
*
* @return
*/
public boolean isConnected() {
return isRunning && mqttClient != null && mqttClient.isConnected();
}
}

@ -0,0 +1,72 @@
package cn.iocoder.yudao.module.iot.job.alert;
import cn.iocoder.yudao.module.iot.dal.dataobject.alert.IotAlertRecordDO;
import cn.iocoder.yudao.module.iot.framework.mqtt.core.ProvincialPlatformMqttClient;
import cn.iocoder.yudao.module.iot.service.alert.IotAlertRecordService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobExecutionContext;
import org.springframework.scheduling.quartz.QuartzJobBean;
/**
* IoT Job
* MQTT
*
* @author
*/
@Slf4j
public class IotAlertRecordPushJob extends QuartzJobBean {
/**
* JobData Key -
*/
public static final String JOB_DATA_KEY_ALERT_RECORD_ID = "alertRecordId";
@Resource
private IotAlertRecordService alertRecordService;
@Resource(required = false)
private ProvincialPlatformMqttClient provincialPlatformMqttClient;
@Override
protected void executeInternal(JobExecutionContext context) {
// 1. 获得告警记录编号
Long alertRecordId = context.getMergedJobDataMap().getLong(JOB_DATA_KEY_ALERT_RECORD_ID);
if (alertRecordId == null) {
log.error("[executeInternal][告警记录编号为空,跳过推送]");
return;
}
// 2. 查询告警记录
IotAlertRecordDO alertRecord = alertRecordService.getAlertRecord(alertRecordId);
if (alertRecord == null) {
log.warn("[executeInternal][告警记录不存在告警ID: {}]", alertRecordId);
return;
}
// 3. 检查告警是否已处理
if (Boolean.TRUE.equals(alertRecord.getProcessStatus())) {
log.info("[executeInternal][告警已处理停止推送告警ID: {}]", alertRecordId);
return;
}
// 4. 推送告警记录到省平台
if (provincialPlatformMqttClient == null) {
log.warn("[executeInternal][省平台MQTT客户端未配置跳过推送告警ID: {}]", alertRecordId);
return;
}
if (!provincialPlatformMqttClient.isConnected()) {
log.warn("[executeInternal][省平台MQTT客户端未连接跳过本次推送告警ID: {}]", alertRecordId);
return;
}
try {
provincialPlatformMqttClient.publishAlertRecord(alertRecord);
log.info("[executeInternal][成功推送告警记录到省平台告警ID: {}]", alertRecordId);
} catch (Exception e) {
log.error("[executeInternal][推送告警记录到省平台异常告警ID: {}]", alertRecordId, e);
}
}
}

@ -124,7 +124,7 @@ public class IotAlertRecordServiceImpl implements IotAlertRecordService {
Map<String, Object> jobDataMap = MapUtil.of(IotAlertRecordPushJob.JOB_DATA_KEY_ALERT_RECORD_ID, alertRecordId); Map<String, Object> jobDataMap = MapUtil.of(IotAlertRecordPushJob.JOB_DATA_KEY_ALERT_RECORD_ID, alertRecordId);
// 构建CRON表达式每N秒执行一次从配置中读取 // 构建CRON表达式每N秒执行一次从配置中读取
Integer pushIntervalSeconds = mqttProperties.getPushIntervalSeconds(); Integer pushIntervalSeconds = mqttProperties.getAlertPushIntervalSeconds();
String cronExpression = String.format("0/%d * * * * ?", pushIntervalSeconds); String cronExpression = String.format("0/%d * * * * ?", pushIntervalSeconds);
// 注册定时任务 // 注册定时任务

Loading…
Cancel
Save