Compare commits

...

2 Commits

@ -4,8 +4,10 @@ import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.utils.SpringUtils;
import org.dromara.common.core.utils.StringUtils;
import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.common.sse.dto.SseMessageDto;
import org.dromara.common.sse.enums.SseEnums;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
@ -169,13 +171,13 @@ public class SseEmitterManager {
* @param userId id
* @param message
*/
public void sendMessage(Long userId, String message) {
public void sendMessage(Long userId, String event, String message) {
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
if (MapUtil.isNotEmpty(emitters)) {
for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) {
try {
entry.getValue().send(SseEmitter.event()
.name("message")
.name(StringUtils.isEmpty(event)? SseEnums.Event.MESSAGE.getEvent() :event)
.data(message));
} catch (Exception e) {
SseEmitter remove = emitters.remove(entry.getKey());
@ -194,9 +196,9 @@ public class SseEmitterManager {
*
* @param message
*/
public void sendMessage(String message) {
public void sendMessage(String event, String message) {
for (Long userId : USER_TOKEN_EMITTERS.keySet()) {
sendMessage(userId, message);
sendMessage(userId, event, message);
}
}
@ -209,9 +211,10 @@ public class SseEmitterManager {
SseMessageDto broadcastMessage = new SseMessageDto();
broadcastMessage.setMessage(sseMessageDto.getMessage());
broadcastMessage.setUserIds(sseMessageDto.getUserIds());
broadcastMessage.setEvent(sseMessageDto.getEvent());
RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}",
SSE_TOPIC, sseMessageDto.getUserIds(), sseMessageDto.getMessage());
log.info("SSE发送主题订阅消息topic:{} session keys:{} event:{} message:{}",
SSE_TOPIC, sseMessageDto.getUserIds(), sseMessageDto.getEvent(), sseMessageDto.getMessage());
});
}
@ -220,11 +223,12 @@ public class SseEmitterManager {
*
* @param message
*/
public void publishAll(String message) {
public void publishAll(String event, String message) {
SseMessageDto broadcastMessage = new SseMessageDto();
broadcastMessage.setEvent(event);
broadcastMessage.setMessage(message);
RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
log.info("SSE发送主题订阅消息topic:{} message:{}", SSE_TOPIC, message);
log.info("SSE发送主题订阅消息topic:{} event:{} message:{}", SSE_TOPIC, event, message);
});
}
}

@ -22,6 +22,11 @@ public class SseMessageDto implements Serializable {
*/
private List<Long> userIds;
/**
*
*/
private String event;
/**
*
*/

@ -0,0 +1,17 @@
package org.dromara.common.sse.enums;
import lombok.Getter;
public class SseEnums {
@Getter
public enum Event {
MESSAGE("message"),
HAZARD("hazard");
private String event;
Event(String event) {
this.event = event;
}
}
}

@ -28,14 +28,14 @@ public class SseTopicListener implements ApplicationRunner, Ordered {
@Override
public void run(ApplicationArguments args) throws Exception {
sseEmitterManager.subscribeMessage((message) -> {
log.info("SSE主题订阅收到消息session keys={} message={}", message.getUserIds(), message.getMessage());
log.info("SSE主题订阅收到消息session keys={} event={} message={}", message.getUserIds(), message.getEvent(), message.getMessage());
// 如果key不为空就按照key发消息 如果为空就群发
if (CollUtil.isNotEmpty(message.getUserIds())) {
message.getUserIds().forEach(key -> {
sseEmitterManager.sendMessage(key, message.getMessage());
sseEmitterManager.sendMessage(key, message.getEvent(), message.getMessage());
});
} else {
sseEmitterManager.sendMessage(message.getMessage());
sseEmitterManager.sendMessage(message.getEvent(), message.getMessage());
}
});
log.info("初始化SSE主题订阅监听器成功");

@ -31,11 +31,11 @@ public class SseMessageUtils {
* @param userId id
* @param message
*/
public static void sendMessage(Long userId, String message) {
public static void sendMessage(Long userId, String event, String message) {
if (!isEnable()) {
return;
}
MANAGER.sendMessage(userId, message);
MANAGER.sendMessage(userId, event, message);
}
/**
@ -43,11 +43,11 @@ public class SseMessageUtils {
*
* @param message
*/
public static void sendMessage(String message) {
public static void sendMessage(String event, String message) {
if (!isEnable()) {
return;
}
MANAGER.sendMessage(message);
MANAGER.sendMessage(event, message);
}
/**
@ -67,11 +67,11 @@ public class SseMessageUtils {
*
* @param message
*/
public static void publishAll(String message) {
public static void publishAll(String event, String message) {
if (!isEnable()) {
return;
}
MANAGER.publishAll(message);
MANAGER.publishAll(event, message);
}
/**

@ -64,7 +64,7 @@ public class SysNoticeController extends BaseController {
return R.fail();
}
String type = dictService.getDictLabel("sys_notice_type", notice.getNoticeType());
SseMessageUtils.publishAll("[" + type + "] " + notice.getNoticeTitle());
SseMessageUtils.publishAll("message", "[" + type + "] " + notice.getNoticeTitle());
return R.ok();
}

Loading…
Cancel
Save