diff --git a/src/renderer/http/sse.js b/src/renderer/http/sse.js new file mode 100644 index 0000000..5156615 --- /dev/null +++ b/src/renderer/http/sse.js @@ -0,0 +1,189 @@ +import { createEventSource } from './api.js'; + +/** + * 全局 SSE 管理器 + * 负责管理 EventSource 连接,并将事件分发到注册的监听器 + */ +class SSEManager { + constructor() { + this.eventSource = null; + this.listeners = new Map(); + this.isConnected = false; + this.eventBuffer = []; // 用于存储连接前产生的事件 + this.maxBufferSize = 1000; + } + + /** + * 建立 SSE 连接 + */ + connect() { + if (this.eventSource) { + console.log('[SSEManager] 连接已存在,跳过'); + return; + } + + console.log('[SSEManager] 建立 SSE 连接...'); + this.eventSource = createEventSource(); + this.isConnected = true; + + this.eventSource.onmessage = (e) => { + try { + const data = JSON.parse(e.data); + console.log('[SSEManager] 收到事件:', data.type, data); + this._dispatchEvent(data); + } catch (err) { + console.error('[SSEManager] 解析事件数据失败:', err); + } + }; + + this.eventSource.onerror = (err) => { + console.error('[SSEManager] SSE 连接错误:', err); + this.isConnected = false; + // 触发错误监听器 + this._dispatchEvent({ type: 'sse.error', error: err }); + }; + + this.eventSource.onopen = () => { + console.log('[SSEManager] SSE 连接已打开'); + this.isConnected = true; + }; + } + + /** + * 关闭 SSE 连接 + */ + disconnect() { + if (this.eventSource) { + console.log('[SSEManager] 关闭 SSE 连接'); + this.eventSource.close(); + this.eventSource = null; + this.isConnected = false; + } + } + + /** + * 重新连接 SSE + */ + reconnect() { + this.disconnect(); + this.connect(); + } + + /** + * 注册事件监听器 + * @param {string} eventType - 事件类型,如 'message.part.updated' + * @param {Function} callback - 回调函数 + * @returns {Function} 取消监听的函数 + */ + on(eventType, callback) { + if (!this.listeners.has(eventType)) { + this.listeners.set(eventType, new Set()); + } + this.listeners.get(eventType).add(callback); + + // 返回取消监听的函数 + return () => { + this.off(eventType, callback); + }; + } + + /** + * 移除事件监听器 + * @param {string} eventType - 事件类型 + * @param {Function} callback - 回调函数 + */ + off(eventType, callback) { + if (this.listeners.has(eventType)) { + this.listeners.get(eventType).delete(callback); + } + } + + /** + * 分发事件到所有注册的监听器 + * @param {Object} data - 事件数据 + */ + _dispatchEvent(data) { + const eventType = data.type; + + // 存储到缓冲区 + this._bufferEvent(data); + + // 分发到特定类型的监听器 + if (this.listeners.has(eventType)) { + this.listeners.get(eventType).forEach((callback) => { + try { + callback(data); + } catch (err) { + console.error(`[SSEManager] 监听器执行失败 (${eventType}):`, err); + } + }); + } + + // 分发到通配符监听器 (*) + if (this.listeners.has('*')) { + this.listeners.get('*').forEach((callback) => { + try { + callback(data); + } catch (err) { + console.error('[SSEManager] 通配符监听器执行失败:', err); + } + }); + } + } + + /** + * 将事件存储到缓冲区 + * @param {Object} data - 事件数据 + */ + _bufferEvent(data) { + this.eventBuffer.push({ + data, + timestamp: Date.now(), + }); + + // 限制缓冲区大小 + if (this.eventBuffer.length > this.maxBufferSize) { + this.eventBuffer.shift(); + } + } + + /** + * 获取缓冲区中的事件 + * @param {string} eventType - 可选的事件类型过滤 + * @param {number} since - 可选的时间戳,获取此时间之后的事件 + * @returns {Array} 事件列表 + */ + getBufferedEvents(eventType, since) { + let events = this.eventBuffer; + + if (since) { + events = events.filter((e) => e.timestamp > since); + } + + if (eventType) { + events = events.filter((e) => e.data.type === eventType); + } + + return events.map((e) => e.data); + } + + /** + * 清空事件缓冲区 + */ + clearBuffer() { + this.eventBuffer = []; + } + + /** + * 获取连接状态 + * @returns {boolean} + */ + getConnectionStatus() { + return this.isConnected; + } +} + +// 导出单例实例 +export const sseManager = new SSEManager(); + +export default sseManager;