From 20fdc2203d948d333a29cb12138958658261bd5a Mon Sep 17 00:00:00 2001 From: Shibo Lyu Date: Tue, 3 Sep 2024 23:57:37 +0800 Subject: [PATCH] refactor: use websocket --- src/lib/blah/connection/chatServer.ts | 76 +++++++++++++++++---------- 1 file changed, 47 insertions(+), 29 deletions(-) diff --git a/src/lib/blah/connection/chatServer.ts b/src/lib/blah/connection/chatServer.ts index ce59f42..705a706 100644 --- a/src/lib/blah/connection/chatServer.ts +++ b/src/lib/blah/connection/chatServer.ts @@ -4,14 +4,19 @@ import type { BlahKeyPair, BlahSignedPayload } from '../crypto'; import type { BlahAuth, BlahMessage, BlahRoomInfo, BlahUserJoinMessage } from '../structures'; import { BlahError } from './error'; +const RECONNECT_TIMEOUT = 1500; +const RECONNECT_MAX_TRIES = 5; + export class BlahChatServerConnection { private static commonHeaders = { 'x-blah-client': `Weblah/${version}` }; private endpoint: string; private keypair?: BlahKeyPair; - private eventSources: Map = new Map(); - private messageListeners: Map void>> = new Map(); + private webSocket: WebSocket | null = null; + private messageListeners: Map) => void>> = + new Map(); + private webSocketRetryTimeout: number | null = null; constructor(endpoint: string, keypair?: BlahKeyPair) { this.endpoint = endpoint; @@ -104,53 +109,66 @@ export class BlahChatServerConnection { return items; } - private createEventSource(roomId: string): EventSource { - const source = new EventSource(`${this.endpoint}/room/${roomId}/event`); - const onSourceError = (e: Event) => { - console.error('EventSource error:', e); - this.eventSources.delete(roomId); - // Retry - this.eventSources.set(roomId, this.createEventSource(roomId)); + private createWebSocket(remainingTries: number = RECONNECT_MAX_TRIES - 1): WebSocket { + const socket = new WebSocket(`${this.endpoint}/ws`); + const onSocketClose = (e: Event) => { + console.error('WebSocket error or closed:', e); + this.webSocket?.close(); + if (remainingTries > 0) + this.webSocketRetryTimeout = setTimeout( + () => (this.webSocket = this.createWebSocket(remainingTries - 1)), + RECONNECT_TIMEOUT + ); }; - source.addEventListener('error', onSourceError); + socket.addEventListener('close', onSocketClose); - // Attach back any existing listeners - const listeners = this.messageListeners.get(roomId) ?? new Set(); - listeners.forEach((listener) => source?.addEventListener('message', listener)); + socket.addEventListener('open', async () => { + if (this.keypair) { + const { Authorization } = await this.generateAuthHeader(); + socket.send(Authorization); + } + }); - return source; + socket.addEventListener('message', (event) => { + const frameJson: { chat: BlahSignedPayload } | { lagged: boolean } = JSON.parse( + event.data + ); + + if ('chat' in frameJson) { + const message = frameJson.chat; + const listeners = this.messageListeners.get(message.signee.payload.room); + if (listeners) for (const listener of listeners) listener(message); + } else { + console.log('Unknown WebSocket frame:', frameJson); + } + }); + + return socket; } subscribeRoom( roomId: string, onNewMessage: (message: BlahSignedPayload) => void ): { unsubscribe: () => void } { - let source = this.eventSources.get(roomId); - if (!source) { - source = this.createEventSource(roomId); - } + if (!this.webSocket) this.webSocket = this.createWebSocket(); - const listener = (event: MessageEvent) => { - const message = JSON.parse(event.data) as BlahSignedPayload; - onNewMessage(message); - }; - - source.addEventListener('message', listener); const listeners = this.messageListeners.get(roomId) ?? new Set(); - listeners.add(listener); + listeners.add(onNewMessage); this.messageListeners.set(roomId, listeners); return { unsubscribe: () => { - source?.removeEventListener('message', listener); const listeners = this.messageListeners.get(roomId) ?? new Set(); - listeners.delete(listener); + listeners.delete(onNewMessage); if (listeners.size === 0) { - source?.close(); - this.eventSources.delete(roomId); this.messageListeners.delete(roomId); } + if (this.messageListeners.size === 0) { + if (this.webSocketRetryTimeout) clearTimeout(this.webSocketRetryTimeout); + this.webSocket?.close(); + this.webSocket = null; + } } }; }