|
|
|
|
@ -4,8 +4,10 @@ import cn.hutool.core.collection.CollUtil;
|
|
|
|
|
import cn.hutool.core.map.MapUtil;
|
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
import org.dromara.common.core.utils.SpringUtils;
|
|
|
|
|
import org.dromara.common.core.utils.StringUtils;
|
|
|
|
|
import org.dromara.common.redis.utils.RedisUtils;
|
|
|
|
|
import org.dromara.common.sse.dto.SseMessageDto;
|
|
|
|
|
import org.dromara.common.sse.enums.SseEnums;
|
|
|
|
|
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
|
|
|
|
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
@ -169,13 +171,13 @@ public class SseEmitterManager {
|
|
|
|
|
* @param userId 要发送消息的用户id
|
|
|
|
|
* @param message 要发送的消息内容
|
|
|
|
|
*/
|
|
|
|
|
public void sendMessage(Long userId, String message) {
|
|
|
|
|
public void sendMessage(Long userId, String event, String message) {
|
|
|
|
|
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
|
|
|
|
|
if (MapUtil.isNotEmpty(emitters)) {
|
|
|
|
|
for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) {
|
|
|
|
|
try {
|
|
|
|
|
entry.getValue().send(SseEmitter.event()
|
|
|
|
|
.name("message")
|
|
|
|
|
.name(StringUtils.isEmpty(event)? SseEnums.Event.MESSAGE.getEvent() :event)
|
|
|
|
|
.data(message));
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
SseEmitter remove = emitters.remove(entry.getKey());
|
|
|
|
|
@ -194,9 +196,9 @@ public class SseEmitterManager {
|
|
|
|
|
*
|
|
|
|
|
* @param message 要发送的消息内容
|
|
|
|
|
*/
|
|
|
|
|
public void sendMessage(String message) {
|
|
|
|
|
public void sendMessage(String event, String message) {
|
|
|
|
|
for (Long userId : USER_TOKEN_EMITTERS.keySet()) {
|
|
|
|
|
sendMessage(userId, message);
|
|
|
|
|
sendMessage(userId, event, message);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -209,9 +211,10 @@ public class SseEmitterManager {
|
|
|
|
|
SseMessageDto broadcastMessage = new SseMessageDto();
|
|
|
|
|
broadcastMessage.setMessage(sseMessageDto.getMessage());
|
|
|
|
|
broadcastMessage.setUserIds(sseMessageDto.getUserIds());
|
|
|
|
|
broadcastMessage.setEvent(sseMessageDto.getEvent());
|
|
|
|
|
RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
|
|
|
|
|
log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}",
|
|
|
|
|
SSE_TOPIC, sseMessageDto.getUserIds(), sseMessageDto.getMessage());
|
|
|
|
|
log.info("SSE发送主题订阅消息topic:{} session keys:{} event:{} message:{}",
|
|
|
|
|
SSE_TOPIC, sseMessageDto.getUserIds(), sseMessageDto.getEvent(), sseMessageDto.getMessage());
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -220,11 +223,12 @@ public class SseEmitterManager {
|
|
|
|
|
*
|
|
|
|
|
* @param message 要发布的消息内容
|
|
|
|
|
*/
|
|
|
|
|
public void publishAll(String message) {
|
|
|
|
|
public void publishAll(String event, String message) {
|
|
|
|
|
SseMessageDto broadcastMessage = new SseMessageDto();
|
|
|
|
|
broadcastMessage.setEvent(event);
|
|
|
|
|
broadcastMessage.setMessage(message);
|
|
|
|
|
RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
|
|
|
|
|
log.info("SSE发送主题订阅消息topic:{} message:{}", SSE_TOPIC, message);
|
|
|
|
|
log.info("SSE发送主题订阅消息topic:{} event:{} message:{}", SSE_TOPIC, event, message);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|