refactor: use websocket

This commit is contained in:
Shibo Lyu 2024-09-03 23:57:37 +08:00
parent f34ad89b79
commit 20fdc2203d

View file

@ -4,14 +4,19 @@ import type { BlahKeyPair, BlahSignedPayload } from '../crypto';
import type { BlahAuth, BlahMessage, BlahRoomInfo, BlahUserJoinMessage } from '../structures';
import { BlahError } from './error';
const RECONNECT_TIMEOUT = 1500;
const RECONNECT_MAX_TRIES = 5;
export class BlahChatServerConnection {
private static commonHeaders = { 'x-blah-client': `Weblah/${version}` };
private endpoint: string;
private keypair?: BlahKeyPair;
private eventSources: Map<string, EventSource> = new Map();
private messageListeners: Map<string, Set<(event: MessageEvent) => void>> = new Map();
private webSocket: WebSocket | null = null;
private messageListeners: Map<string, Set<(message: BlahSignedPayload<BlahMessage>) => void>> =
new Map();
private webSocketRetryTimeout: number | null = null;
constructor(endpoint: string, keypair?: BlahKeyPair) {
this.endpoint = endpoint;
@ -104,53 +109,66 @@ export class BlahChatServerConnection {
return items;
}
private createEventSource(roomId: string): EventSource {
const source = new EventSource(`${this.endpoint}/room/${roomId}/event`);
const onSourceError = (e: Event) => {
console.error('EventSource error:', e);
this.eventSources.delete(roomId);
// Retry
this.eventSources.set(roomId, this.createEventSource(roomId));
private createWebSocket(remainingTries: number = RECONNECT_MAX_TRIES - 1): WebSocket {
const socket = new WebSocket(`${this.endpoint}/ws`);
const onSocketClose = (e: Event) => {
console.error('WebSocket error or closed:', e);
this.webSocket?.close();
if (remainingTries > 0)
this.webSocketRetryTimeout = setTimeout(
() => (this.webSocket = this.createWebSocket(remainingTries - 1)),
RECONNECT_TIMEOUT
);
};
source.addEventListener('error', onSourceError);
socket.addEventListener('close', onSocketClose);
// Attach back any existing listeners
const listeners = this.messageListeners.get(roomId) ?? new Set();
listeners.forEach((listener) => source?.addEventListener('message', listener));
socket.addEventListener('open', async () => {
if (this.keypair) {
const { Authorization } = await this.generateAuthHeader();
socket.send(Authorization);
}
});
return source;
socket.addEventListener('message', (event) => {
const frameJson: { chat: BlahSignedPayload<BlahMessage> } | { lagged: boolean } = JSON.parse(
event.data
);
if ('chat' in frameJson) {
const message = frameJson.chat;
const listeners = this.messageListeners.get(message.signee.payload.room);
if (listeners) for (const listener of listeners) listener(message);
} else {
console.log('Unknown WebSocket frame:', frameJson);
}
});
return socket;
}
subscribeRoom(
roomId: string,
onNewMessage: (message: BlahSignedPayload<BlahMessage>) => void
): { unsubscribe: () => void } {
let source = this.eventSources.get(roomId);
if (!source) {
source = this.createEventSource(roomId);
}
if (!this.webSocket) this.webSocket = this.createWebSocket();
const listener = (event: MessageEvent) => {
const message = JSON.parse(event.data) as BlahSignedPayload<BlahMessage>;
onNewMessage(message);
};
source.addEventListener('message', listener);
const listeners = this.messageListeners.get(roomId) ?? new Set();
listeners.add(listener);
listeners.add(onNewMessage);
this.messageListeners.set(roomId, listeners);
return {
unsubscribe: () => {
source?.removeEventListener('message', listener);
const listeners = this.messageListeners.get(roomId) ?? new Set();
listeners.delete(listener);
listeners.delete(onNewMessage);
if (listeners.size === 0) {
source?.close();
this.eventSources.delete(roomId);
this.messageListeners.delete(roomId);
}
if (this.messageListeners.size === 0) {
if (this.webSocketRetryTimeout) clearTimeout(this.webSocketRetryTimeout);
this.webSocket?.close();
this.webSocket = null;
}
}
};
}