feat: 增加mqtt模块

master
wangrunpu 1 month ago
parent c48c05bdad
commit ffa448bb3f

@ -49,6 +49,8 @@
<anyline.version>8.7.2-20250603</anyline.version>
<!-- 工作流配置 -->
<warm-flow.version>1.8.2</warm-flow.version>
<!-- mqtt客户端配置 -->
<client.mqtt.version>1.2.5</client.mqtt.version>
<!-- 插件版本 -->
<maven-jar-plugin.version>3.4.2</maven-jar-plugin.version>
@ -99,7 +101,11 @@
<!-- 依赖声明 -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>${client.mqtt.version}</version>
</dependency>
<!-- SpringBoot的依赖配置-->
<dependency>
<groupId>org.springframework.boot</groupId>

@ -270,3 +270,12 @@ justauth:
client-id: 10**********6
client-secret: 1f7d08**********5b7**********29e
redirect-uri: ${justauth.address}/social-callback?source=gitea
mqtt:
server: http://localhost:18083
host: tcp://83l6076j43.goho.co:37690
client-id: stm32-iot-server # 推荐使用模块名+端口,确保唯一性
username:
password:
api-key: 890c854975af5456
secret-key: aHONcCJLm8H3fu301vOkRRe8aiXV9AGsRjuAjganacmH

@ -34,6 +34,7 @@
<module>ruoyi-common-tenant</module>
<module>ruoyi-common-websocket</module>
<module>ruoyi-common-sse</module>
<module>ruoyi-common-mqtt</module>
</modules>
<artifactId>ruoyi-common</artifactId>

@ -68,6 +68,13 @@
<version>${revision}</version>
</dependency>
<!-- 数据交互服务 -->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-mqtt</artifactId>
<version>${revision}</version>
</dependency>
<!-- 数据库服务 -->
<dependency>
<groupId>org.dromara</groupId>

@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>ruoyi-common-mqtt</artifactId>
<description>
ruoyi-common-mqtt 数据交互服务
</description>
<dependencies>
<!-- webflux -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- mqtt -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
<!-- 核心模块 -->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-core</artifactId>
<version>${revision}</version>
</dependency>
</dependencies>
</project>

@ -0,0 +1,32 @@
package org.dromara.common.mqtt.config;
import org.dromara.common.mqtt.server.MqttGateway;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.Base64;
/**
* MQTT
*/
@Configuration
@EnableConfigurationProperties(MqttProperties.class)
// 只有当配置中设置了 mqtt.host 时才启用此自动配置
@ConditionalOnProperty(prefix = "mqtt", name = "host")
public class MqttAutoConfiguration {
@Bean
public MqttGateway mqttGateway(MqttProperties mqttProperties) {
return new MqttGateway(mqttProperties);
}
@Bean
public WebClient webClient(MqttProperties mqttProperties) {
String basicAuth = Base64.getEncoder().encodeToString((mqttProperties.getApiKey() + ":" + mqttProperties.getSecretKey()).getBytes());
return WebClient.builder().baseUrl(mqttProperties.getServer()).defaultHeader("Authorization", "Basic " + basicAuth).build();
}
}

@ -0,0 +1,37 @@
package org.dromara.common.mqtt.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import lombok.Data;
@Data
@ConfigurationProperties(prefix = "mqtt") // 配置文件前缀
public class MqttProperties {
private String server;
/** Broker 地址 (例如: tcp://broker.emqx.io:1883) */
private String host;
/** 客户端 ID */
private String clientId;
/** 用户名 */
private String username;
/** 密码 */
private String password;
/** 连接超时时间 (秒) */
private int connectionTimeout = 30;
/** 心跳间隔 (秒) */
private int keepAliveInterval = 60;
/** 是否清除会话 */
private boolean cleanSession = true;
/** 断开重连是否启用 */
private boolean automaticReconnect = true;
private String apiKey;
private String secretKey;
}

@ -0,0 +1,139 @@
package org.dromara.common.mqtt.server;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.dromara.common.mqtt.config.MqttProperties;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@Service
public class MqttGateway {
private static final Logger log = LoggerFactory.getLogger(MqttGateway.class);
private final MqttProperties mqttProperties;
private MqttClient mqttClient;
private MqttConnectOptions connectOptions;
// 可以通过构造函数注入配置
public MqttGateway(MqttProperties mqttProperties) {
this.mqttProperties = mqttProperties;
this.initOptions(); // 初始化连接选项
}
// 初始化连接选项
private void initOptions() {
connectOptions = new MqttConnectOptions();
connectOptions.setUserName(mqttProperties.getUsername());
connectOptions.setPassword(mqttProperties.getPassword() != null ? mqttProperties.getPassword().toCharArray() : null);
connectOptions.setConnectionTimeout(mqttProperties.getConnectionTimeout());
connectOptions.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
connectOptions.setCleanSession(mqttProperties.isCleanSession());
connectOptions.setAutomaticReconnect(mqttProperties.isAutomaticReconnect());
// 可以设置遗嘱消息 (Last Will and Testament)
// options.setWill("topic/lwt", "I'm dead".getBytes(), 1, true);
}
/** 建立连接并在应用启动后自动执行 */
@PostConstruct
public void connect() {
try {
log.info("尝试连接 MQTT Broker: {}", mqttProperties.getHost());
// 1. 创建客户端实例
mqttClient = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientId());
// 2. 设置回调 (处理接收消息和连接状态变化)
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("MQTT 连接成功. (重连: {})", reconnect);
// TODO: 可以在这里添加重连成功后重新订阅主题的逻辑
}
@Override
public void connectionLost(Throwable cause) {
log.error("MQTT 连接丢失: {}", cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// TODO: 收到消息处理逻辑,可以分发给不同的处理器
log.info("接收到消息: Topic: {}, Qos: {}, Payload: {}",
topic, message.getQos(), new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// 消息发布完成 (QoS > 0 时)
}
});
// 3. 执行连接
mqttClient.connect(connectOptions);
log.info("MQTT 客户端已启动, ID: {}", mqttProperties.getClientId());
} catch (MqttException e) {
log.error("MQTT 连接失败: {}", e.getMessage());
// 通常在 connectOptions.setAutomaticReconnect(true) 的情况下Paho 会自动尝试重连
}
}
/** 优雅断开连接 */
@PreDestroy
public void disconnect() {
try {
if (mqttClient != null && mqttClient.isConnected()) {
mqttClient.disconnect();
mqttClient.close();
log.info("MQTT 客户端已断开连接并关闭.");
}
} catch (MqttException e) {
log.error("MQTT 断开连接失败: {}", e.getMessage());
}
}
/**
*
* @param topic
* @param payload
* @param qos QoS (0, 1, 2)
* @param retained
*/
public void publish(String topic, String payload, int qos, boolean retained) {
if (mqttClient == null || !mqttClient.isConnected()) {
log.warn("MQTT 客户端未连接,无法发布消息到主题: {}", topic);
return;
}
try {
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(qos);
message.setRetained(retained);
mqttClient.publish(topic, message);
log.debug("MQTT 消息已发布: Topic: {}, Qos: {}, Payload: {}", topic, qos, payload);
} catch (MqttException e) {
log.error("MQTT 消息发布失败: Topic: {}", topic, e);
}
}
/**
*
* @param topic
* @param qos QoS
*/
public void subscribe(String topic, int qos) {
if (mqttClient == null || !mqttClient.isConnected()) {
log.warn("MQTT 客户端未连接,无法订阅主题: {}", topic);
return;
}
try {
mqttClient.subscribe(topic, qos);
log.info("成功订阅 MQTT 主题: Topic: {}, Qos: {}", topic, qos);
} catch (MqttException e) {
log.error("MQTT 订阅失败: Topic: {}", topic, e);
}
}
}

@ -16,6 +16,10 @@
</description>
<dependencies>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-mqtt</artifactId>
</dependency>
<!-- 通用工具-->
<dependency>

@ -1,313 +0,0 @@
package org.dromara.demo.server;
import cn.hutool.core.util.ObjectUtil;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.dromara.demo.domain.ClientConnection;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import jakarta.annotation.PreDestroy;
/**
* TCP4G
* "hellohello" "byebye" JSON
* 使
*/
@Slf4j
@Component
public class IotTcpServer implements CommandLineRunner {
@Value("${tcp.server.port:8888}")
private int tcpPort;
private final ObjectMapper objectMapper = new ObjectMapper();
private final ExecutorService executorService = Executors.newCachedThreadPool();
private final ScheduledExecutorService heartbeatService = Executors.newScheduledThreadPool(2);
private final AtomicBoolean started = new AtomicBoolean(false);
private ServerSocket serverSocket;
// 用于存储客户端连接和起信息
@Getter
private final ConcurrentHashMap<String, ClientConnection> clientMaps = new ConcurrentHashMap<>();
@Override
public void run(String... args) throws Exception {
executorService.submit(this::startTcpServer);
// 启动心跳检测任务每10秒检查一次
heartbeatService.scheduleAtFixedRate(this::checkHeartbeats, 10, 30, TimeUnit.SECONDS);
}
/**
* TCP
*/
public void startTcpServer() {
if (!started.compareAndSet(false, true)) {
log.warn("TCP服务器已启动忽略重复启动");
return;
}
try {
serverSocket = new ServerSocket(tcpPort);
log.info("TCP服务器启动成功监听端口: {}", tcpPort);
// 持续监听客户端连接
while (!serverSocket.isClosed()) {
try {
Socket clientSocket = serverSocket.accept();
configureClientSocket(clientSocket);
// 为每个客户端连接创建独立的处理线程
executorService.submit(() -> handleClient(clientSocket));
} catch (IOException e) {
if (!serverSocket.isClosed()) {
log.error("接受客户端连接时发生错误", e);
}
}
}
} catch (IOException e) {
log.error("启动TCP服务器失败", e);
}
}
/**
*
* "hellohello" "byebye" JSON
* /
*/
private void handleClient(Socket clientSocket) {
//TODO 应该处理为 首个hellohello 和最后一个bygbye
//并且 如果开头和结尾不为这样的数据,则丢弃
final String START = "hellohello";
final String END = "byebye";
final String HEARTBEAT = "heartbeat";
final Charset charset = StandardCharsets.UTF_8;
String clientId = null;
try (InputStream in = clientSocket.getInputStream()) {
StringBuilder sb = new StringBuilder(4096);
byte[] buf = new byte[4096];
int n;
// 第一个完整的帧包含设备唯一标识符
boolean identifierReceived = false;
while ((n = in.read(buf)) != -1) {
if (n <= 0) continue;
String chunk = new String(buf, 0, n, charset);
sb.append(chunk);
while (true) {
int s = sb.indexOf(START);
if (s < 0) {
// 可选:避免缓冲区无限增长(很少触达)
if (sb.length() > 65536) {
sb.delete(0, sb.length() - 65536);
}
break;
}
int e = sb.indexOf(END, s + START.length());
if (e < 0) {
// 起始存在但尚无结束,保留从起始开始的内容,丢弃起始前的噪声
if (s > 0) sb.delete(0, s);
break;
}
// 帧内容为 [s+START.length, e)
int payloadStart = s + START.length();
String json = sb.substring(payloadStart, e).trim();
if (!identifierReceived) {
// 解析首个帧作为设备标识符
clientId = json;
ClientConnection clientConnection = clientMaps.get(clientId);
if (ObjectUtil.isNotNull(clientConnection) && ObjectUtil.isNotNull(clientConnection.getSocket())) {
log.info("客户端连接重复: {},断开旧连接。", clientId);
clientConnection.getSocket().close(); //有重复连接,断开旧连接
}else {
clientConnection = new ClientConnection();
}
clientConnection.setSocket(clientSocket);
clientConnection.setConnectTime(System.currentTimeMillis());
clientConnection.setLastActiveTime(System.currentTimeMillis());
clientMaps.put(clientId, clientConnection); // 关联标识符与Socket
log.info("新客户端连接: {}", clientId);
identifierReceived = true;
continue;
}
// 若尾部含其他字符,兜底裁到最后一个 '}'
int lastBrace = json.lastIndexOf('}');
if (lastBrace >= 0) {
json = json.substring(0, lastBrace + 1);
}
// 检查是否是心跳包
if (HEARTBEAT.equals(json.trim())) {
log.info("收到客户端 {} 的心跳包", clientId);
// 更新客户端最后活动时间
ClientConnection clientConnection = clientMaps.get(clientId);
clientConnection.setLastActiveTime(System.currentTimeMillis());
// 发送心跳响应
String finalClientId = clientId;
heartbeatService.schedule(() -> writeFrame(finalClientId, "heartbeat_ack", charset), 100, TimeUnit.MILLISECONDS);
sb.delete(0, e + END.length());
continue;
}else {
log.info("接收到帧: {}", json);
try {
// 解析成功后,封装并发送温湿度响应
try {
JsonNode root = objectMapper.readTree(json);
log.info("解析数据成功:{}",root);
} catch (Exception sendEx) {
log.warn("构造/发送温湿度响应失败: {}", sendEx.getMessage());
}
} catch (Exception ex) {
log.error("解析传感器数据失败: {}", json, ex);
}
}
// 删除到结束标记之后,继续解析后续帧
sb.delete(0, e + END.length());
}
}
// 连接关闭时的尾帧兜底
int s = sb.indexOf(START);
int e = (s >= 0) ? sb.indexOf(END, s + START.length()) : -1;
if (s >= 0 && e > s) {
String json = sb.substring(s + START.length(), e).trim();
int lastBrace = json.lastIndexOf('}');
if (lastBrace >= 0) json = json.substring(0, lastBrace + 1);
if (!json.isEmpty()) {
log.info("接收到尾帧: {}", json);
try {
} catch (Exception ex) {
log.error("解析传感器数据失败(尾帧): {}", json, ex);
}
}
}
} catch (IOException e) {
log.error("处理客户端连接时发生错误", e);
} finally {
if (clientId != null) {
clientMaps.remove(clientId);
}
try {
// 客户端断开连接时执行的代码
clientSocket.close();
log.info("客户端连接已关闭: {}", clientId);
} catch (IOException e) {
log.error("关闭客户端连接时发生错误", e);
}
}
}
/**
*
*/
private void checkHeartbeats() {
long currentTime = System.currentTimeMillis();
long timeout = 90 * 1000; // 90秒超时
clientMaps.entrySet().removeIf(entry -> {
String clientId = entry.getKey();
ClientConnection clientConnection = entry.getValue();
if (ObjectUtil.isNotNull(clientConnection) && ObjectUtil.isNotNull(clientConnection.getLastActiveTime())) {
// 检查是否超时
if (currentTime - clientConnection.getLastActiveTime() > timeout) {
log.warn("客户端 {} 心跳超时,断开连接", clientId);
try {
clientConnection.getSocket().close();
} catch (IOException e) {
log.error("关闭超时客户端连接时发生错误", e);
}
return true; // 移除该客户端
}
return false; // 保留该客户端
}else{
return true;
}
});
}
/**
* hellohello + json + byebye
*/
private void writeFrame(String clientId, String json, Charset charset) {
String frame = "hellohello" + json + "byebye";
try {
ClientConnection clientConnection = clientMaps.get(clientId);
if (ObjectUtil.isNotNull(clientConnection)) {
clientConnection.getSocket().getOutputStream().write(frame.getBytes(charset));
clientConnection.getSocket().getOutputStream().flush();
}
log.info("向客户端 {} 发送帧:{}", clientId, frame);
} catch (IOException e) {
log.warn("向客户端 {} 发送帧失败: {}", clientId, e.getMessage());
}
}
/**
*
* @param clientId
* @param message
* @throws IOException IO
*/
public void sendDataToClient(String clientId, String message) throws IOException {
ClientConnection clientConnection = clientMaps.get(clientId);
if (ObjectUtil.isNotNull(clientConnection) && clientConnection.getSocket() != null && !clientConnection.getSocket().isClosed()) {
writeFrame(clientId, message, StandardCharsets.UTF_8);
} else {
log.warn("无法向客户端 {} 发送数据因为找不到对应的Socket或已关闭", clientId);
}
}
/**
* Socket
*/
private void configureClientSocket(Socket socket) {
try {
socket.setKeepAlive(true);
socket.setTcpNoDelay(true);
// 设置读超时,避免永久阻塞
// socket.setSoTimeout(60000); // 60秒超时
} catch (Exception e) {
log.warn("配置客户端Socket参数失败: {}", e.getMessage());
}
}
/**
* TCP
*/
@PreDestroy
public void stopTcpServer() {
try {
if (serverSocket != null && !serverSocket.isClosed()) {
serverSocket.close();
log.info("TCP服务器已停止");
}
executorService.shutdown();
heartbeatService.shutdown();
started.set(false);
} catch (IOException e) {
log.error("停止TCP服务器时发生错误", e);
}
}
}

@ -0,0 +1,56 @@
package org.dromara.demo.service;
import jakarta.annotation.PostConstruct;
import org.dromara.common.mqtt.server.MqttGateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import java.util.HashMap;
@Service
public class DeviceService {
@Autowired
private MqttGateway mqttGateway;
@Autowired
private WebClient webClient;
// 示例:在服务启动后自动订阅一个主题
@PostConstruct
public void initSubscription() {
// 订阅一个主题QoS=1
mqttGateway.subscribe("iot-hazard-server", 1);
}
/**
*
* @param deviceId ID
* @param command
*/
public void sendCommand(String deviceId, String command) {
String topic = "iot-hazard/" + deviceId;
// 发布消息QoS=1不保留
mqttGateway.publish(topic, command, 1, false);
}
/**
* 404
* @param deviceId
* @return
*/
public HashMap<String, Object> getDeviceLinkInfo(String deviceId) {
return webClient.get()
.uri("/api/v5/clients/" + deviceId)
.retrieve()
.onStatus(
status -> status.value() == 404,
response -> Mono.empty()
)
.bodyToMono(new ParameterizedTypeReference<HashMap<String, Object>>() {})
.block();
}
}

@ -16,6 +16,11 @@
</description>
<dependencies>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-mqtt</artifactId>
</dependency>
<!-- 通用工具-->
<dependency>
<groupId>org.dromara</groupId>

Loading…
Cancel
Save