import { createEventSource } from './api.js'; /** * 全局 SSE 管理器 * 负责管理 EventSource 连接,并将事件分发到注册的监听器 */ class SSEManager { constructor() { this.eventSource = null; this.listeners = new Map(); this.isConnected = false; this.eventBuffer = []; // 用于存储连接前产生的事件 this.maxBufferSize = 1000; } /** * 建立 SSE 连接 */ connect() { if (this.eventSource) { console.log('[SSEManager] 连接已存在,跳过'); return; } console.log('[SSEManager] 建立 SSE 连接...'); this.eventSource = createEventSource(); this.isConnected = true; this.eventSource.onmessage = (e) => { try { const data = JSON.parse(e.data); console.log('[SSEManager] 收到事件:', data.type, data); this._dispatchEvent(data); } catch (err) { console.error('[SSEManager] 解析事件数据失败:', err); } }; this.eventSource.onerror = (err) => { console.error('[SSEManager] SSE 连接错误:', err); this.isConnected = false; // 触发错误监听器 this._dispatchEvent({ type: 'sse.error', error: err }); }; this.eventSource.onopen = () => { console.log('[SSEManager] SSE 连接已打开'); this.isConnected = true; }; } /** * 关闭 SSE 连接 */ disconnect() { if (this.eventSource) { console.log('[SSEManager] 关闭 SSE 连接'); this.eventSource.close(); this.eventSource = null; this.isConnected = false; } } /** * 重新连接 SSE */ reconnect() { this.disconnect(); this.connect(); } /** * 注册事件监听器 * @param {string} eventType - 事件类型,如 'message.part.updated' * @param {Function} callback - 回调函数 * @returns {Function} 取消监听的函数 */ on(eventType, callback) { if (!this.listeners.has(eventType)) { this.listeners.set(eventType, new Set()); } this.listeners.get(eventType).add(callback); // 返回取消监听的函数 return () => { this.off(eventType, callback); }; } /** * 移除事件监听器 * @param {string} eventType - 事件类型 * @param {Function} callback - 回调函数 */ off(eventType, callback) { if (this.listeners.has(eventType)) { this.listeners.get(eventType).delete(callback); } } /** * 分发事件到所有注册的监听器 * @param {Object} data - 事件数据 */ _dispatchEvent(data) { const eventType = data.type; // 存储到缓冲区 this._bufferEvent(data); // 分发到特定类型的监听器 if (this.listeners.has(eventType)) { this.listeners.get(eventType).forEach((callback) => { try { callback(data); } catch (err) { console.error(`[SSEManager] 监听器执行失败 (${eventType}):`, err); } }); } // 分发到通配符监听器 (*) if (this.listeners.has('*')) { this.listeners.get('*').forEach((callback) => { try { callback(data); } catch (err) { console.error('[SSEManager] 通配符监听器执行失败:', err); } }); } } /** * 将事件存储到缓冲区 * @param {Object} data - 事件数据 */ _bufferEvent(data) { this.eventBuffer.push({ data, timestamp: Date.now(), }); // 限制缓冲区大小 if (this.eventBuffer.length > this.maxBufferSize) { this.eventBuffer.shift(); } } /** * 获取缓冲区中的事件 * @param {string} eventType - 可选的事件类型过滤 * @param {number} since - 可选的时间戳,获取此时间之后的事件 * @returns {Array} 事件列表 */ getBufferedEvents(eventType, since) { let events = this.eventBuffer; if (since) { events = events.filter((e) => e.timestamp > since); } if (eventType) { events = events.filter((e) => e.data.type === eventType); } return events.map((e) => e.data); } /** * 清空事件缓冲区 */ clearBuffer() { this.eventBuffer = []; } /** * 获取连接状态 * @returns {boolean} */ getConnectionStatus() { return this.isConnected; } } // 导出单例实例 export const sseManager = new SSEManager(); export default sseManager;