Compare commits

...

2 Commits

@ -4,8 +4,10 @@ import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.utils.SpringUtils; import org.dromara.common.core.utils.SpringUtils;
import org.dromara.common.core.utils.StringUtils;
import org.dromara.common.redis.utils.RedisUtils; import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.common.sse.dto.SseMessageDto; import org.dromara.common.sse.dto.SseMessageDto;
import org.dromara.common.sse.enums.SseEnums;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException; import java.io.IOException;
@ -169,13 +171,13 @@ public class SseEmitterManager {
* @param userId id * @param userId id
* @param message * @param message
*/ */
public void sendMessage(Long userId, String message) { public void sendMessage(Long userId, String event, String message) {
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId); Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
if (MapUtil.isNotEmpty(emitters)) { if (MapUtil.isNotEmpty(emitters)) {
for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) { for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) {
try { try {
entry.getValue().send(SseEmitter.event() entry.getValue().send(SseEmitter.event()
.name("message") .name(StringUtils.isEmpty(event)? SseEnums.Event.MESSAGE.getEvent() :event)
.data(message)); .data(message));
} catch (Exception e) { } catch (Exception e) {
SseEmitter remove = emitters.remove(entry.getKey()); SseEmitter remove = emitters.remove(entry.getKey());
@ -194,9 +196,9 @@ public class SseEmitterManager {
* *
* @param message * @param message
*/ */
public void sendMessage(String message) { public void sendMessage(String event, String message) {
for (Long userId : USER_TOKEN_EMITTERS.keySet()) { for (Long userId : USER_TOKEN_EMITTERS.keySet()) {
sendMessage(userId, message); sendMessage(userId, event, message);
} }
} }
@ -209,9 +211,10 @@ public class SseEmitterManager {
SseMessageDto broadcastMessage = new SseMessageDto(); SseMessageDto broadcastMessage = new SseMessageDto();
broadcastMessage.setMessage(sseMessageDto.getMessage()); broadcastMessage.setMessage(sseMessageDto.getMessage());
broadcastMessage.setUserIds(sseMessageDto.getUserIds()); broadcastMessage.setUserIds(sseMessageDto.getUserIds());
broadcastMessage.setEvent(sseMessageDto.getEvent());
RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}", log.info("SSE发送主题订阅消息topic:{} session keys:{} event:{} message:{}",
SSE_TOPIC, sseMessageDto.getUserIds(), sseMessageDto.getMessage()); SSE_TOPIC, sseMessageDto.getUserIds(), sseMessageDto.getEvent(), sseMessageDto.getMessage());
}); });
} }
@ -220,11 +223,12 @@ public class SseEmitterManager {
* *
* @param message * @param message
*/ */
public void publishAll(String message) { public void publishAll(String event, String message) {
SseMessageDto broadcastMessage = new SseMessageDto(); SseMessageDto broadcastMessage = new SseMessageDto();
broadcastMessage.setEvent(event);
broadcastMessage.setMessage(message); broadcastMessage.setMessage(message);
RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
log.info("SSE发送主题订阅消息topic:{} message:{}", SSE_TOPIC, message); log.info("SSE发送主题订阅消息topic:{} event:{} message:{}", SSE_TOPIC, event, message);
}); });
} }
} }

@ -22,6 +22,11 @@ public class SseMessageDto implements Serializable {
*/ */
private List<Long> userIds; private List<Long> userIds;
/**
*
*/
private String event;
/** /**
* *
*/ */

@ -0,0 +1,17 @@
package org.dromara.common.sse.enums;
import lombok.Getter;
public class SseEnums {
@Getter
public enum Event {
MESSAGE("message"),
HAZARD("hazard");
private String event;
Event(String event) {
this.event = event;
}
}
}

@ -28,14 +28,14 @@ public class SseTopicListener implements ApplicationRunner, Ordered {
@Override @Override
public void run(ApplicationArguments args) throws Exception { public void run(ApplicationArguments args) throws Exception {
sseEmitterManager.subscribeMessage((message) -> { sseEmitterManager.subscribeMessage((message) -> {
log.info("SSE主题订阅收到消息session keys={} message={}", message.getUserIds(), message.getMessage()); log.info("SSE主题订阅收到消息session keys={} event={} message={}", message.getUserIds(), message.getEvent(), message.getMessage());
// 如果key不为空就按照key发消息 如果为空就群发 // 如果key不为空就按照key发消息 如果为空就群发
if (CollUtil.isNotEmpty(message.getUserIds())) { if (CollUtil.isNotEmpty(message.getUserIds())) {
message.getUserIds().forEach(key -> { message.getUserIds().forEach(key -> {
sseEmitterManager.sendMessage(key, message.getMessage()); sseEmitterManager.sendMessage(key, message.getEvent(), message.getMessage());
}); });
} else { } else {
sseEmitterManager.sendMessage(message.getMessage()); sseEmitterManager.sendMessage(message.getEvent(), message.getMessage());
} }
}); });
log.info("初始化SSE主题订阅监听器成功"); log.info("初始化SSE主题订阅监听器成功");

@ -31,11 +31,11 @@ public class SseMessageUtils {
* @param userId id * @param userId id
* @param message * @param message
*/ */
public static void sendMessage(Long userId, String message) { public static void sendMessage(Long userId, String event, String message) {
if (!isEnable()) { if (!isEnable()) {
return; return;
} }
MANAGER.sendMessage(userId, message); MANAGER.sendMessage(userId, event, message);
} }
/** /**
@ -43,11 +43,11 @@ public class SseMessageUtils {
* *
* @param message * @param message
*/ */
public static void sendMessage(String message) { public static void sendMessage(String event, String message) {
if (!isEnable()) { if (!isEnable()) {
return; return;
} }
MANAGER.sendMessage(message); MANAGER.sendMessage(event, message);
} }
/** /**
@ -67,11 +67,11 @@ public class SseMessageUtils {
* *
* @param message * @param message
*/ */
public static void publishAll(String message) { public static void publishAll(String event, String message) {
if (!isEnable()) { if (!isEnable()) {
return; return;
} }
MANAGER.publishAll(message); MANAGER.publishAll(event, message);
} }
/** /**

@ -64,7 +64,7 @@ public class SysNoticeController extends BaseController {
return R.fail(); return R.fail();
} }
String type = dictService.getDictLabel("sys_notice_type", notice.getNoticeType()); String type = dictService.getDictLabel("sys_notice_type", notice.getNoticeType());
SseMessageUtils.publishAll("[" + type + "] " + notice.getNoticeTitle()); SseMessageUtils.publishAll("message", "[" + type + "] " + notice.getNoticeTitle());
return R.ok(); return R.ok();
} }

Loading…
Cancel
Save