|
|
|
|
@ -1,6 +1,7 @@
|
|
|
|
|
package org.dromara.common.sse.core;
|
|
|
|
|
|
|
|
|
|
import cn.hutool.core.map.MapUtil;
|
|
|
|
|
import cn.hutool.core.util.ObjectUtil;
|
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
import org.dromara.common.redis.utils.RedisUtils;
|
|
|
|
|
import org.dromara.common.sse.dto.SseMessageDto;
|
|
|
|
|
@ -24,6 +25,9 @@ public class SseEmitterManager {
|
|
|
|
|
*/
|
|
|
|
|
private final static String SSE_TOPIC = "global:sse";
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 本地内存缓存,存活用户连接
|
|
|
|
|
*/
|
|
|
|
|
private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
@ -43,32 +47,18 @@ public class SseEmitterManager {
|
|
|
|
|
|
|
|
|
|
emitters.put(token, emitter);
|
|
|
|
|
|
|
|
|
|
// 当 emitter 完成、超时或发生错误时,从映射表中移除对应的 token
|
|
|
|
|
emitter.onCompletion(() -> {
|
|
|
|
|
SseEmitter remove = emitters.remove(token);
|
|
|
|
|
if (remove != null) {
|
|
|
|
|
remove.complete();
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
emitter.onTimeout(() -> {
|
|
|
|
|
SseEmitter remove = emitters.remove(token);
|
|
|
|
|
if (remove != null) {
|
|
|
|
|
remove.complete();
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
emitter.onError((e) -> {
|
|
|
|
|
SseEmitter remove = emitters.remove(token);
|
|
|
|
|
if (remove != null) {
|
|
|
|
|
remove.complete();
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
// 注册连接结束、超时、错误等统一清理逻辑
|
|
|
|
|
Runnable callback = () -> disconnect(userId, token);
|
|
|
|
|
emitter.onCompletion(callback);
|
|
|
|
|
emitter.onTimeout(callback);
|
|
|
|
|
emitter.onError(e -> callback.run());
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
// 向客户端发送一条连接成功的事件
|
|
|
|
|
emitter.send(SseEmitter.event().comment("connected"));
|
|
|
|
|
log.warn("SSE连接成功 userId={}, token={}", userId, token);
|
|
|
|
|
} catch (IOException e) {
|
|
|
|
|
// 如果发送消息失败,则从映射表中移除 emitter
|
|
|
|
|
emitters.remove(token);
|
|
|
|
|
log.warn("SSE连接初始化失败 userId={}, token={}, 错误: {}", userId, token, e.getMessage());
|
|
|
|
|
disconnect(userId, token);
|
|
|
|
|
}
|
|
|
|
|
return emitter;
|
|
|
|
|
}
|
|
|
|
|
@ -85,13 +75,20 @@ public class SseEmitterManager {
|
|
|
|
|
}
|
|
|
|
|
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
|
|
|
|
|
if (MapUtil.isNotEmpty(emitters)) {
|
|
|
|
|
try {
|
|
|
|
|
SseEmitter sseEmitter = emitters.get(token);
|
|
|
|
|
sseEmitter.send(SseEmitter.event().comment("disconnected"));
|
|
|
|
|
sseEmitter.complete();
|
|
|
|
|
} catch (Exception ignore) {
|
|
|
|
|
SseEmitter sseEmitter = emitters.remove(token);
|
|
|
|
|
if (ObjectUtil.isNotNull(sseEmitter)) {
|
|
|
|
|
try {
|
|
|
|
|
sseEmitter.send(SseEmitter.event().comment("disconnected"));
|
|
|
|
|
} catch (IOException e) {
|
|
|
|
|
log.warn("SSE断开通知失败 userId={}, token={}", userId, token);
|
|
|
|
|
} finally {
|
|
|
|
|
sseEmitter.complete();
|
|
|
|
|
log.warn("SSE连接断开 userId={}, token={}", userId, token);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (emitters.isEmpty()) {
|
|
|
|
|
USER_TOKEN_EMITTERS.remove(userId);
|
|
|
|
|
}
|
|
|
|
|
emitters.remove(token);
|
|
|
|
|
} else {
|
|
|
|
|
USER_TOKEN_EMITTERS.remove(userId);
|
|
|
|
|
}
|
|
|
|
|
@ -115,16 +112,12 @@ public class SseEmitterManager {
|
|
|
|
|
public void sendMessage(Long userId, String message) {
|
|
|
|
|
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
|
|
|
|
|
if (MapUtil.isNotEmpty(emitters)) {
|
|
|
|
|
for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) {
|
|
|
|
|
for (String token : emitters.keySet()) {
|
|
|
|
|
try {
|
|
|
|
|
entry.getValue().send(SseEmitter.event()
|
|
|
|
|
.name("message")
|
|
|
|
|
.data(message));
|
|
|
|
|
emitters.get(token).send(SseEmitter.event().name("message").data(message));
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
SseEmitter remove = emitters.remove(entry.getKey());
|
|
|
|
|
if (remove != null) {
|
|
|
|
|
remove.complete();
|
|
|
|
|
}
|
|
|
|
|
log.warn("SSE消息发送失败 userId={}, token={}, error={}", userId, token, e.getMessage());
|
|
|
|
|
disconnect(userId, token);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
|