关注我的公众号:【编程朝花夕拾】,可获取首发内容。
01 引言
SSE是一种用于实现服务器向客户端实时单向推送数据的Web技术。基于HTTP协议,允许服务器将数据以事件流(Event Stream)的形式发送给客户端。客户端通过建立持久的HTTP连接,并监听事件流,可以实时接收服务器推送的数据。
因为是Web技术,且通过Http协议的建立连接。所以它的适用性更强,基础的网络环境都可支持。本文采用SSE技术代替WebSocket,看看实现的效果。
至于SSE的详细介绍,我之前写过一篇文章,可以参考:
02 Maven依赖的引入
由于SSE是Web技术,所以只需要引入Web即可,无需第三方依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
03 SSE的最佳实践
SSE不需要额外的配置,就像正常的Http请求一样丝滑。
3.1 SSE建立连接
public static final ConcurrentHashMap<String, SseEmitter> SSE_MAP = new ConcurrentHashMap<>();
@GetMapping("/connect")
public SseEmitter sseEmitter() throws IOException {
log.info("SSE连接建立成功......");
SseEmitter emitter = new SseEmitter(0L);
// 链接建立完成之后,需要发送消息给客户端
String sseEmitterId = UUID.randomUUID().toString();
SSE_MAP.put(sseEmitterId, emitter);
// 发送消息,发送客户端的唯一标识会给客户端
Map<String, Object> msgMap = new HashMap<>();
msgMap.put("clientId", sseEmitterId);
emitter.send(SseEmitter.event().data(JSON.toJSONString(msgMap)));
// 发送进入直播间的消息
noticeClient(sseEmitterId, sseEmitterId.substring(0, 5) + "*** 进入了直播间");
emitter.onCompletion(() -> {
log.info("SSE操作完毕,连接关闭");
SSE_MAP.remove(sseEmitterId);
// 发送离开直播间的消息
noticeClient(sseEmitterId, sseEmitterId.substring(0, 5) + "*** 离开了直播间");
});
return emitter;
}
代码解读:
建立连接就是返回一个SseEmitter
对象即可,初次连接直接new
一个新对象即可。构造函数可以设置超时时间,超过时间自动断开连接,重连之后又是新的客户端。案例中设置成0,表示永不超时。
为了标记客户端的唯一性,特地使用UUID作为客户端额唯一标识。并发送给客户端,客户端将唯一标识放在Header请求头中。
建立连接后,需要通知各个客户端,用户进入直播间。
最后监听当前客户端关闭后,需要通知所有客户端,有用户离开了直播间。
3.2 发送评论
/**
* 处理发送的消息
*
**/
@GetMapping("/handleMessage")
public String handleMessage(String msg, HttpServletRequest request) {
noticeClient(request.getHeader("x-client-id"), doSensitivewords(msg));
return "发送成功";
}
private void noticeClient(String sseEmitterId, String message) {
// 发送消息,在线人数加1
Map<String, Object> msgMap = new HashMap<>();
msgMap.put("msg", message);
msgMap.put("time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
msgMap.put("sessionId", sseEmitterId.substring(0, 5)+ "***");
msgMap.put("count", SSE_MAP.values().size());
SSE_MAP.values().forEach(item -> {
try {
item.send(SseEmitter.event().data(JSON.toJSONString(msgMap)));
log.info("发送消息完成");
} catch (IOException e) {
log.error("发送消息异常......", e);
}
});
}
代码解读:
从Header中获取客户端的唯一标识,然后针对发送的信息进行敏感词过滤。然后通知所有的客户端,将信息展示在直播间。
3.3 关闭客户端
@GetMapping("/closeWindow")
public String closeWindow(HttpServletRequest request) {
String sseEmitterId = request.getHeader("x-client-id");
SSE_MAP.remove(sseEmitterId);
noticeClient(sseEmitterId, sseEmitterId.substring(0, 5) + "*** 离开了直播间");
return "sseEmitterId 连接关闭了";
}
代码解读:
直接删除当前刚关闭的客户端,并通知其他客户端有人离开了直播间。
注意: SSE客户端并没有监听客户端关闭的函数,这里使用的是监听页面的关闭
3.4 客户端的创建
简单一览SSE客户端的API:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>模拟直播室实时评论的功能</title>
</head>
<body>
<div style="width: 700px; margin: 0 auto;">
<div>
<h1>直播间</h1>
<div>
<span>直播间人数:</span>
<span style="color: blue" id="count">1</span>
</div>
<div id="message" style="height: 300px; overflow: auto; border: 1px solid #ccc;"></div>
</div>
<div style="margin-top: 10px;">
<input type="text" id="messageInput">
<button onclick="sendMessage()">发送评论</button>
</div>
</div>
</body>
<script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js"></script>
<script>
// 建立连接
let source = new EventSource("/sse/connect");
let client_id;
// 打来链接
source.onopen = function(event) {
console.log("SSE is open now.");
};
// 处理消息
source.onmessage = function(event) {
let message = event.data;
console.log(event);
console.log("Received message: " + message);
let json = JSON.parse(message);
let clientId = json.clientId
if (clientId) {
client_id = clientId;
}else {
$("#message").append('<p><span style="color: blueviolet">'+json.time+ '('+json.sessionId+'):</span>'+ json.msg + '</p>');
$("#count").text(json.count);
}
};
source.onerror = function(event) {
if (event.eventPhase === EventSource.CLOSED) {
console.log('Connection closed, will not attempt to reconnect');
source.close();
}
}
// 监控页面是否关闭
window.onbeforeunload = function(e) {
fetch("/sse/closeWindow", {
headers: {
"x-client-id": client_id
}
})
.then(response => response.text())
.then(data => console.log(data))
.catch(error => console.error('Error:', error));
};
// 发送消息
function sendMessage() {
let message = $("#messageInput").val();
$("#messageInput").val("");
fetch("/sse/handleMessage?msg=" + message, {
headers: {
"x-client-id": client_id
}
})
.then(response => response.text())
.then(data => console.log(data))
.catch(error => console.error('Error:', error));
}
</script>
</html>
该客户端和基于WebSocket的的客户端类似。但是SSE无法直接向服务端的SSE发送信息,需要借助额外的请求,模拟发送信息,然后由服务端推送消息给每一个客户端。
onopen
、onmessage
、onerror
是SSE客户端提供的监听方法,分别监听建立连接、服务端推送消息以及服务端异常时的回调方法。
window.onbeforeunload
为Window窗口的关闭的监控方法,和SSE无关。在这里模拟客户端关闭的场景。
3.5 测试
进入直播间
实时评论以及敏感词检测
离开直播间
04 小结
在使用的SSE
时,明显感觉配置上比WebSocket
要简单很多,代码量也就少了很多。但是SSE
没有监听客户端关闭的回调,且无法判断当前客户端是否为同一客户端。
SSE
有自动超时、重连的机制,这个是WebSocket
没有的。
关于直播间评、直播室、群聊这类业务功能,可以的实现的技术很多。如何根据业务场景选择合适的技术,以及在使用过程中如何处理空闲的资源等,都在考验着程序员、架构师的能力。