Server-Sent Events (SSE) 技术解析与实战
前言
在当今互联网应用中,实时数据交互已成为关键需求。从AI聊天机器人到实时通知系统,从股票行情更新到协作编辑工具,都需要服务器能够主动向客户端推送数据。Server-Sent Events作为一种基于HTTP的单向服务器推送技术,在这些场景中展现出独特的优势。
SSE特别适合以下场景:
AI聊天机器人:服务器逐步返回AI生成的回答实时通知系统:新消息、提醒等通知推送日志监控:实时展示系统日志输出数据更新:股票价格、天气信息等实时更新效果图图片
Server-Sent Events是一种允许服务器向客户端推送实时事件的技术。与传统的HTTP请求 - 响应模式不同,SSE建立的是一个持久的HTTP连接,服务器可以在任何时候通过这个连接向客户端推送数据。
SSE具有以下特点:
基于标准HTTP协议,无需额外协议支持单向通信:仅服务器向客户端推送数据轻量级:相比WebSocket,实现更简单支持自动重连:连接断开时客户端可自动尝试重新连接支持事件类型:可以推送不同类型的事件工作原理SSE的工作流程如下:
复制
客户端 服务端
| |
|-- GET /events -------->| 建立连接
|<-- 200 OK -------------| 返回事件流
|<-- data: message1 -----| 推送消息1
|<-- data: message2 -----| 推送消息2
|<-- ... ---| 持续推送1.2.3.4.5.6.7.
SSE数据格式采用简单的文本格式,使用data:前缀表示数据内容,event:前缀表示事件类型,例如:
复制
event: message
data: Hello, this is a server-sent event!
data: This is another message without event type1.2.3.4.
技术
连接类型
双向通信
实现复杂度
浏览器支持
适用场景
SSE
单向 HTTP
否
低
良好
服务器主动推送,单向数据流
WebSocket
双向 TCP
是
中
良好
双向实时通信,高交互场景
轮询
多次 HTTP
否
低
良好
简单场景,对实时性要求不高
长轮询
单次 HTTP
否
中
良好
服务器主动推送,中等实时性要求
实现 SSE
创建 SSE 控制器以下使用预设的中文回复示例,实际应用中这里会调用AI API
复制
@RestController
@RequestMapping("/api/sse")
public class SseController {
// 存储所有活跃的SSE连接
private final Map<String, SseEmitter> clients = new HashMap<>();
// 用于处理消息生成的线程池
private final ExecutorService executor = Executors.newFixedThreadPool(10);
// 消息计数器,用于生成唯一消息ID
private final AtomicInteger messageCounter = new AtomicInteger(0);
// 超时时间设置为30分钟,避免频繁断开重连
private static final long TIMEOUT = 30 * 60 * 1000L;
@GetMapping("/connect/{clientId}")
public SseEmitter connect(@PathVariable String clientId) {
// 检查是否已有相同客户端ID的连接
if (clients.containsKey(clientId)) {
clients.get(clientId).complete();
}
// 创建新的SseEmitter实例
SseEmitter emitter = new SseEmitter(TIMEOUT);
// 添加心跳机制,每20秒发送一次心跳消息
executor.submit(() -> {
try {
while (true) {
Thread.sleep(20000);
if (emitter != null && clients.containsKey(clientId)) {
emitter.send(SseEmitter.event().name("heartbeat").data("ping"));
} else {
break;
}
}
} catch (Exception e) {
emitter.completeWithError(e);
}
});
emitter.onTimeout(() -> {
System.out.println("客户端连接超时: " + clientId);
clients.remove(clientId);
emitter.complete();
});
emitter.onError(e -> {
System.out.println("客户端连接错误: " + clientId + ", 错误: " + e.getMessage());
clients.remove(clientId);
emitter.completeWithError(e);
});
// 注册完成事件处理器
emitter.onCompletion(() -> {
System.out.println("SSE处理完成: " + clientId);
clients.remove(clientId);
});
clients.put(clientId, emitter);
sendSystemMessage(emitter, "连接已建立,您可以开始与AI助手聊天了。");
return emitter;
}
@GetMapping("/chat/{clientId}/{message}")
public void sendChatMessage(@PathVariable String clientId, @PathVariable String message) {
SseEmitter emitter = clients.get(clientId);
if (emitter == null) {
return;
}
// 在单独线程中处理消息,避免阻塞主线程
executor.submit(() -> {
try {
String response = generateAiResponse(message);
int messageId = messageCounter.incrementAndGet();
// 发送消息开始事件
emitter.send(SseEmitter.event()
.name("messageStart")
.data(MapUtil.builder()
.put("id", messageId)
.put("clientId", clientId)
.build()));
// 模拟AI逐步生成响应
String[] parts = response.split("(?<=。|!|?|\\.|!|\\?)");
for (String part : parts) {
if (part.trim().isEmpty()) continue;
// 发送消息片段
emitter.send(SseEmitter.event()
.name("messageFragment")
.data(MapUtil.builder()
.put("id", messageId)
.put("content", part)
.put("isLast", false)
.build()));
// 模拟思考时间
Thread.sleep(500 + (long)(Math.random() * 1000));
}
// 发送消息结束事件
emitter.send(SseEmitter.event()
.name("messageEnd")
.data(MapUtil.builder()
.put("id", messageId)
.put("clientId", clientId)
.build()));
} catch (Exception e) {
try {
emitter.send(SseEmitter.event()
.name("error")
.data("生成AI回复时出错: " + e.getMessage()));
} catch (IOException ex) {
ex.printStackTrace();
}
}
});
}
@PostMapping("/ack/{clientId}/{messageId}")
public void acknowledgeMessage(
@PathVariable String clientId,
@PathVariable int messageId
) {
System.out.println("收到消息确认: " + messageId + " 来自客户端: " + clientId);
// 这里可以记录消息已确认,进行相应处理
}
@GetMapping("/disconnect/{clientId}")
public void disconnect(@PathVariable String clientId) {
SseEmitter emitter = clients.get(clientId);
if (emitter != null) {
clients.remove(clientId);
emitter.complete();
}
}
private void sendSystemMessage(SseEmitter emitter, String content) {
try {
emitter.send(SseEmitter.event()
.name("systemMessage")
.data(MapUtil.of("content", content)));
} catch (IOException e) {
e.printStackTrace();
}
}
private String generateAiResponse(String prompt) {
// 实际应用中这里会调用AI API
// 这里使用预设的中文回复示例
String lowerCasePrompt = prompt.toLowerCase();
if (lowerCasePrompt.contains("你好") || lowerCasePrompt.contains("hi") || lowerCasePrompt.contains("hello")) {
return"你好!我是AI助手,很高兴为你服务。";
} elseif (lowerCasePrompt.contains("一安") || lowerCasePrompt.contains("一安未来")) {
return"一安未来公众号致力于Java,大数据;心得交流,技术分享;";
} else {
return"很有意思的问题!让我思考一下... " + prompt + " 是一个很好的话题。我可以从多个角度为你分析和解答,你是否想了解更多相关信息?";
}
}
}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.83.84.85.86.87.88.89.90.91.92.93.94.95.96.97.98.99.100.101.102.103.104.105.106.107.108.109.110.111.112.113.114.115.116.117.118.119.120.121.122.123.124.125.126.127.128.129.130.131.132.133.134.135.136.137.138.139.140.141.142.143.144.145.146.147.148.149.150.151.152.153.154.155.156.157.158.159.160.161.162.163.
复制
@Configuration
public class CorsConfig {
@Bean
public CorsFilter corsFilter() {
CorsConfiguration config = new CorsConfiguration();
config.addAllowedOriginPattern("*"); // 允许所有域名进行跨域调用
config.addAllowedHeader("*"); // 允许任何请求头
config.addAllowedMethod("*"); // 允许任何方法(POST、GET等)
config.setAllowCredentials(true); // 允许携带凭证
UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
source.registerCorsConfiguration("/**", config); // 对所有接口都有效
return new CorsFilter(source);
}
}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.
阅读剩余
THE END