fix: 提取消息处理逻辑,对应模块可将其重写

master
wangrunpu 1 month ago
parent ffa448bb3f
commit a0a867dbfa

@ -1,7 +1,6 @@
package org.dromara.common.mqtt.config;
import org.dromara.common.mqtt.server.MqttGateway;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;

@ -0,0 +1,23 @@
package org.dromara.common.mqtt.handler;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public interface MqttMessageHandler {
void onMessageArrived(String topic, MqttMessage message);
default void onConnectionLost(Throwable cause) {
}
default void onConnectComplete(boolean reconnect, String serverURI) {
}
default void onDeliveryComplete(String topic) {
}
}

@ -3,10 +3,16 @@ package org.dromara.common.mqtt.server;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.dromara.common.mqtt.config.MqttProperties;
import org.dromara.common.mqtt.handler.MqttMessageHandler;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import java.util.HashMap;
@Service
public class MqttGateway {
@ -16,6 +22,8 @@ public class MqttGateway {
private final MqttProperties mqttProperties;
private MqttClient mqttClient;
private MqttConnectOptions connectOptions;
private MqttMessageHandler messageHandler;
private WebClient webClient;
// 可以通过构造函数注入配置
public MqttGateway(MqttProperties mqttProperties) {
@ -50,24 +58,36 @@ public class MqttGateway {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("MQTT 连接成功. (重连: {})", reconnect);
// TODO: 可以在这里添加重连成功后重新订阅主题的逻辑
if (messageHandler != null) {
messageHandler.onConnectComplete(reconnect, serverURI);
}
}
@Override
public void connectionLost(Throwable cause) {
log.error("MQTT 连接丢失: {}", cause.getMessage());
if (messageHandler != null) {
messageHandler.onConnectionLost(cause);
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// TODO: 收到消息处理逻辑,可以分发给不同的处理器
log.info("接收到消息: Topic: {}, Qos: {}, Payload: {}",
topic, message.getQos(), new String(message.getPayload()));
if (messageHandler != null) {
messageHandler.onMessageArrived(topic, message);
} else {
// 默认处理逻辑
log.info("接收到消息: Topic: {}, Qos: {}, Payload: {}",
topic, message.getQos(), new String(message.getPayload()));
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// 消息发布完成 (QoS > 0 时)
if (messageHandler != null && token.getTopics() != null && token.getTopics().length > 0) {
messageHandler.onDeliveryComplete(token.getTopics()[0]);
}
}
});
@ -136,4 +156,41 @@ public class MqttGateway {
log.error("MQTT 订阅失败: Topic: {}", topic, e);
}
}
/**
*
* @param deviceId ID
* @param command
*/
public void sendCommand(String deviceId, String command) {
String topic = "iot-hazard/" + deviceId;
// 发布消息QoS=1不保留
publish(topic, command, 1, false);
}
/**
* 404
* @param deviceId
* @return
*/
public HashMap<String, Object> getDeviceLinkInfo(String deviceId) {
return webClient.get()
.uri("/api/v5/clients/" + deviceId)
.retrieve()
.onStatus(
status -> status.value() == 404,
response -> Mono.empty()
)
.bodyToMono(new ParameterizedTypeReference<HashMap<String, Object>>() {})
.block();
}
/**
*
* @param handler
*/
public void setMessageHandler(MqttMessageHandler handler) {
this.messageHandler = handler;
log.info("已设置自定义 MQTT 消息处理器: {}", handler.getClass().getSimpleName());
}
}

@ -1,56 +0,0 @@
package org.dromara.demo.service;
import jakarta.annotation.PostConstruct;
import org.dromara.common.mqtt.server.MqttGateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import java.util.HashMap;
@Service
public class DeviceService {
@Autowired
private MqttGateway mqttGateway;
@Autowired
private WebClient webClient;
// 示例:在服务启动后自动订阅一个主题
@PostConstruct
public void initSubscription() {
// 订阅一个主题QoS=1
mqttGateway.subscribe("iot-hazard-server", 1);
}
/**
*
* @param deviceId ID
* @param command
*/
public void sendCommand(String deviceId, String command) {
String topic = "iot-hazard/" + deviceId;
// 发布消息QoS=1不保留
mqttGateway.publish(topic, command, 1, false);
}
/**
* 404
* @param deviceId
* @return
*/
public HashMap<String, Object> getDeviceLinkInfo(String deviceId) {
return webClient.get()
.uri("/api/v5/clients/" + deviceId)
.retrieve()
.onStatus(
status -> status.value() == 404,
response -> Mono.empty()
)
.bodyToMono(new ParameterizedTypeReference<HashMap<String, Object>>() {})
.block();
}
}

@ -0,0 +1,56 @@
package org.dromara.hazard.config;
import jakarta.annotation.PostConstruct;
import org.dromara.common.mqtt.server.MqttGateway;
import org.dromara.hazard.handler.HazardMqttMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
/**
* MQTT
*
* @author ruoyi
*/
@Configuration
public class HazardMqttConfig {
private static final Logger log = LoggerFactory.getLogger(HazardMqttConfig.class);
@Autowired(required = false)
private MqttGateway mqttGateway;
@Autowired
private HazardMqttMessageHandler hazardMqttMessageHandler;
/**
*
*/
@PostConstruct
public void init() {
if (mqttGateway != null) {
mqttGateway.setMessageHandler(hazardMqttMessageHandler);
log.info("危险源模块MQTT消息处理器已注册");
// 订阅相关主题
subscribeTopics();
} else {
log.warn("MqttGateway未启用跳过MQTT消息处理器注册");
}
}
/**
*
*/
private void subscribeTopics() {
try {
// 订阅一个主题QoS=1
mqttGateway.subscribe("iot-hazard-server", 1);
log.info("危险源模块已订阅MQTT主题: iot-hazard-server");
} catch (Exception e) {
log.error("订阅MQTT主题失败", e);
}
}
}

@ -0,0 +1,74 @@
package org.dromara.hazard.handler;
import org.dromara.common.mqtt.handler.MqttMessageHandler;
import org.dromara.common.mqtt.server.MqttGateway;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* MQTT
*
* @author ruoyi
*/
@Component
public class HazardMqttMessageHandler implements MqttMessageHandler {
private static final Logger log = LoggerFactory.getLogger(HazardMqttMessageHandler.class);
@Autowired
private MqttGateway mqttGateway;
/**
* MQTT
*/
@Override
public void onMessageArrived(String topic, MqttMessage message) {
String payload = new String(message.getPayload());
log.info("【危险源模块】接收到MQTT消息 - Topic: {}, QoS: {}, Retained: {}, Payload: {}",
topic, message.getQos(), message.isRetained(), payload);
// TODO: 根据不同的主题分发到不同的业务处理方法
if (topic.equals("iot-hazard-server")) {
handleHazardMessage(topic, payload);
} else {
log.warn("未知的消息主题: {}", topic);
}
}
/**
*
*/
@Override
public void onConnectionLost(Throwable cause) {
log.error("【危险源模块】MQTT连接丢失", cause);
// TODO: 可以在这里添加告警通知、重连策略等
}
/**
*
*/
@Override
public void onConnectComplete(boolean reconnect, String serverURI) {
log.info("【危险源模块】MQTT{}连接成功: {}", reconnect ? "重" : "", serverURI);
// TODO: 可以在这里重新订阅主题
}
/**
*
*/
@Override
public void onDeliveryComplete(String topic) {
log.debug("【危险源模块】消息发送完成: {}", topic);
}
/**
*
*/
private void handleHazardMessage(String topic, String payload) {
log.info("处理危险源消息 - Topic: {}, Payload: {}", topic, payload);
// TODO: 实现你的业务逻辑
// 例如: 解析JSON、保存数据库、触发业务流程等
}
}
Loading…
Cancel
Save