feat: 定时推送初步实现

master
wangrunpu 3 weeks ago
parent c36e9d87a5
commit 78d8bb0bd5

@ -19,6 +19,11 @@
</description>
<dependencies>
<!-- MQTT 相关 -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mqtt</artifactId>
</dependency>
<!-- Spring Cloud 基础 -->
<dependency>
<groupId>cn.iocoder.cloud</groupId>

@ -67,7 +67,7 @@ public class ProvincialPlatformMqttProperties {
/**
* QoS (0, 1, 2)
*/
private Integer qos = 1;
private Integer qos;
/**
*
@ -77,7 +77,7 @@ public class ProvincialPlatformMqttProperties {
/**
*
*/
private String alertTopic = "provincial/alert";
private String alertTopic;
/**
* ()

@ -4,6 +4,7 @@ import cn.hutool.core.lang.Assert;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.dal.dataobject.alert.IotAlertRecordDO;
import cn.iocoder.yudao.module.iot.framework.mqtt.config.ProvincialPlatformMqttProperties;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.MqttClient;
@ -33,7 +34,6 @@ public class ProvincialPlatformMqttClient {
@Resource
private ProvincialPlatformMqttProperties mqttProperties;
@Resource(name = "mqttVertx")
private Vertx vertx;
private MqttClient mqttClient;
@ -52,13 +52,16 @@ public class ProvincialPlatformMqttClient {
}
try {
// 1. 创建 MQTT 客户端
// 1. 创建 Vertx 实例
this.vertx = Vertx.vertx();
// 2. 创建 MQTT 客户端
createMqttClient();
// 2. 同步连接 MQTT Broker
// 3. 同步连接 MQTT Broker
connectMqttSync();
// 3. 标记服务为运行状态
// 4. 标记服务为运行状态
isRunning = true;
log.info("[start][省平台 MQTT 客户端启动成功]");
} catch (Exception e) {
@ -74,6 +77,13 @@ public class ProvincialPlatformMqttClient {
}
stopMqttClient();
// 关闭 Vertx 实例
if (vertx != null) {
vertx.close();
vertx = null;
}
isRunning = false;
log.info("[stop][省平台 MQTT 客户端已停止]");
}
@ -82,14 +92,14 @@ public class ProvincialPlatformMqttClient {
* MQTT
*/
private void createMqttClient() {
MqttClientOptions options = new MqttClientOptions()
.setClientId(mqttProperties.getClientId())
.setUsername(mqttProperties.getUsername())
.setPassword(mqttProperties.getPassword())
.setSsl(mqttProperties.getSsl())
.setCleanSession(mqttProperties.getCleanSession())
.setKeepAliveInterval(mqttProperties.getKeepAliveIntervalSeconds())
.setConnectTimeout(mqttProperties.getConnectTimeoutSeconds() * 1000); // Vert.x 需要毫秒
MqttClientOptions options = new MqttClientOptions();
options.setClientId(mqttProperties.getClientId());
options.setUsername(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword());
options.setSsl(mqttProperties.getSsl());
options.setCleanSession(mqttProperties.getCleanSession());
options.setKeepAliveInterval(mqttProperties.getKeepAliveIntervalSeconds());
options.setConnectTimeout(mqttProperties.getConnectTimeoutSeconds() * 1000); // Vert.x 需要毫秒
this.mqttClient = MqttClient.create(vertx, options);
}
@ -250,7 +260,7 @@ public class ProvincialPlatformMqttClient {
mqttClient.publish(
topic,
Buffer.buffer(payload),
io.vertx.mqtt.MqttQoS.valueOf(mqttProperties.getQos()),
MqttQoS.valueOf(mqttProperties.getQos()),
false,
false,
publishResult -> {

@ -25,7 +25,7 @@ public class IotAlertRecordPushJob extends QuartzJobBean {
@Resource
private IotAlertRecordService alertRecordService;
@Resource(required = false)
@Resource
private ProvincialPlatformMqttClient provincialPlatformMqttClient;
@Override

@ -42,7 +42,7 @@ public class IotAlertRecordServiceImpl implements IotAlertRecordService {
@Resource(name = "iotSchedulerManager")
private IotSchedulerManager schedulerManager;
@Resource(required = false)
@Resource
private ProvincialPlatformMqttProperties mqttProperties;
@Override

Loading…
Cancel
Save