refactor: extract logic from chat page

This commit is contained in:
Shibo Lyu 2024-09-04 03:01:00 +08:00
parent 20fdc2203d
commit 5954928834
9 changed files with 203 additions and 104 deletions

View file

@ -1,5 +1,7 @@
import { version } from '$app/environment';
import type { BlahRichText } from '$lib/richText';
import { messageFromBlah, type Chat, type Message } from '$lib/types';
import { readable, type Readable } from 'svelte/store';
import type { BlahKeyPair, BlahSignedPayload } from '../crypto';
import type { BlahAuth, BlahMessage, BlahRoomInfo, BlahUserJoinMessage } from '../structures';
import { BlahError } from './error';
@ -11,14 +13,14 @@ export class BlahChatServerConnection {
private static commonHeaders = { 'x-blah-client': `Weblah/${version}` };
private endpoint: string;
private keypair?: BlahKeyPair;
private keypair: BlahKeyPair | null;
private webSocket: WebSocket | null = null;
private messageListeners: Map<string, Set<(message: BlahSignedPayload<BlahMessage>) => void>> =
new Map();
private webSocketRetryTimeout: number | null = null;
constructor(endpoint: string, keypair?: BlahKeyPair) {
constructor(endpoint: string, keypair: BlahKeyPair | null = null) {
this.endpoint = endpoint;
this.keypair = keypair;
}
@ -147,11 +149,29 @@ export class BlahChatServerConnection {
return socket;
}
connect() {
if (!this.webSocket) this.webSocket = this.createWebSocket();
}
disconnect() {
if (this.webSocketRetryTimeout) clearTimeout(this.webSocketRetryTimeout);
this.webSocket?.close();
this.webSocket = null;
}
changeKeyPair(keypair: BlahKeyPair | null) {
this.keypair = keypair;
if (this.webSocket) {
this.disconnect();
this.connect();
}
}
subscribeRoom(
roomId: string,
onNewMessage: (message: BlahSignedPayload<BlahMessage>) => void
): { unsubscribe: () => void } {
if (!this.webSocket) this.webSocket = this.createWebSocket();
if (!this.webSocket) throw new Error('Must connect to WebSocket before subscribing to rooms');
const listeners = this.messageListeners.get(roomId) ?? new Set();
listeners.add(onNewMessage);
@ -164,12 +184,43 @@ export class BlahChatServerConnection {
if (listeners.size === 0) {
this.messageListeners.delete(roomId);
}
if (this.messageListeners.size === 0) {
if (this.webSocketRetryTimeout) clearTimeout(this.webSocketRetryTimeout);
this.webSocket?.close();
this.webSocket = null;
}
}
};
}
chat(chatId: string): {
info: Readable<Chat>;
messages: Readable<Message[]>;
sendMessage: (brt: BlahRichText) => Promise<void>;
} {
const info = readable<Chat>(
{ server: this.endpoint, id: chatId, name: '', type: 'group' },
(set) => {
this.fetchRoomInfo(chatId).then((room) => {
set({ server: this.endpoint, id: chatId, name: room.title, type: 'group' });
});
}
);
const messages = readable<Message[]>([], (set, update) => {
this.fetchRoomHistory(chatId).then((history) =>
update((messages) => [
...history.map(messageFromBlah).toSorted((a, b) => a.date.getTime() - b.date.getTime()),
...messages
])
);
const { unsubscribe } = this.subscribeRoom(chatId, (message) => {
update((messages) => [...messages, messageFromBlah(message)]);
});
return unsubscribe;
});
const sendMessage = async (brt: BlahRichText) => {
await this.sendMessage(chatId, brt);
};
return { info, messages, sendMessage };
}
}

63
src/lib/chatServers.ts Normal file
View file

@ -0,0 +1,63 @@
import { persisted } from 'svelte-persisted-store';
import { get } from 'svelte/store';
import { BlahChatServerConnection } from './blah/connection/chatServer';
import { BlahKeyPair, type EncodedBlahKeyPair } from './blah/crypto';
import { currentKeyPair } from './keystore';
export const chatServers = persisted<string[]>('weblah-chat-servers', ['https://blah.oxa.li/api']);
class ChatServerConnectionPool {
private connections: Map<string, BlahChatServerConnection> = new Map();
private keypair: BlahKeyPair | null = null;
constructor() {
chatServers.subscribe(this.onChatServersChange.bind(this));
currentKeyPair.subscribe(this.onKeyPairChange.bind(this));
}
connectAll(keypair?: BlahKeyPair) {
for (const endpoint of get(chatServers)) {
const connection = new BlahChatServerConnection(endpoint, keypair);
this.connections.set(endpoint, connection);
connection.connect();
}
}
disconnectAll() {
for (const connection of this.connections.values()) {
connection.disconnect();
}
this.connections.clear();
}
private async onKeyPairChange(encodedKeyPair: EncodedBlahKeyPair) {
this.keypair = await BlahKeyPair.fromEncoded(encodedKeyPair);
for (const connection of this.connections.values()) {
connection.changeKeyPair(this.keypair);
}
}
private async onChatServersChange(newChatServers: string[]) {
// Disconnect from chat servers that are no longer in the list
for (const [endpoint, connection] of this.connections.entries()) {
if (!newChatServers.includes(endpoint)) {
connection.disconnect();
this.connections.delete(endpoint);
}
}
// Connect to chat servers that are in the list but not yet connected
for (const endpoint of newChatServers) {
if (!this.connections.has(endpoint)) {
const connection = new BlahChatServerConnection(endpoint, this.keypair);
this.connections.set(endpoint, connection);
connection.connect();
}
}
}
getConnection(endpoint: string): BlahChatServerConnection | null {
return this.connections.get(endpoint) ?? null;
}
}
export const chatServerConnectionPool = new ChatServerConnectionPool();