feat(webapi): include cid in ServerEvent::Msg

This commit is contained in:
oxalica 2024-10-08 21:14:13 -04:00
parent 814fac1974
commit ff7fd9e4b2
6 changed files with 18 additions and 18 deletions

View file

@ -5,7 +5,7 @@ use std::fmt;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use url::Url; use url::Url;
use crate::msg::{Id, MemberPermission, RoomAttrs, SignedChatMsg, SignedChatMsgWithId}; use crate::msg::{Id, MemberPermission, RoomAttrs, SignedChatMsgWithId};
use crate::PubKey; use crate::PubKey;
/// The response object returned as body on HTTP error status. /// The response object returned as body on HTTP error status.
@ -162,8 +162,7 @@ pub struct RoomMember {
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
pub enum ServerEvent { pub enum ServerEvent {
/// A message from a joined room. /// A message from a joined room.
// FIXME: Include cid. Msg(SignedChatMsgWithId),
Msg(SignedChatMsg),
/// The receiver is too slow to receive and some events and are dropped. /// The receiver is too slow to receive and some events and are dropped.
// FIXME: Should we indefinitely buffer them or just disconnect the client instead? // FIXME: Should we indefinitely buffer them or just disconnect the client instead?
Lagged, Lagged,

View file

@ -11,7 +11,7 @@ use anyhow::{bail, Context as _, Result};
use axum::extract::ws::{close_code, CloseFrame, Message, WebSocket}; use axum::extract::ws::{close_code, CloseFrame, Message, WebSocket};
use axum::extract::WebSocketUpgrade; use axum::extract::WebSocketUpgrade;
use axum::response::Response; use axum::response::Response;
use blah_types::msg::{AuthPayload, SignedChatMsg}; use blah_types::msg::{AuthPayload, SignedChatMsgWithId};
use blah_types::server::{ClientEvent, ServerEvent}; use blah_types::server::{ClientEvent, ServerEvent};
use blah_types::Signed; use blah_types::Signed;
use futures_util::future::Either; use futures_util::future::Either;
@ -58,7 +58,7 @@ pub struct State {
} }
impl State { impl State {
pub fn on_room_msg(&self, msg: SignedChatMsg, room_members: Vec<i64>) { pub fn on_room_msg(&self, msg: SignedChatMsgWithId, room_members: Vec<i64>) {
let listeners = self.user_listeners.lock(); let listeners = self.user_listeners.lock();
let mut cnt = 0usize; let mut cnt = 0usize;
let msg = Arc::new(ServerEvent::Msg(msg)); let msg = Arc::new(ServerEvent::Msg(msg));

View file

@ -13,7 +13,7 @@ use axum_extra::extract::WithRejection as R;
use blah_types::msg::{ use blah_types::msg::{
ChatPayload, CreateGroup, CreatePeerChat, CreateRoomPayload, DeleteRoomPayload, ChatPayload, CreateGroup, CreatePeerChat, CreateRoomPayload, DeleteRoomPayload,
MemberPermission, RoomAdminOp, RoomAdminPayload, RoomAttrs, ServerPermission, MemberPermission, RoomAdminOp, RoomAdminPayload, RoomAttrs, ServerPermission,
SignedChatMsgWithId, SignedChatMsgWithId, WithMsgId,
}; };
use blah_types::server::{ use blah_types::server::{
ErrorResponseWithChallenge, RoomList, RoomMember, RoomMemberList, RoomMetadata, RoomMsgs, ErrorResponseWithChallenge, RoomList, RoomMember, RoomMemberList, RoomMetadata, RoomMsgs,
@ -460,7 +460,7 @@ async fn post_room_msg(
})?; })?;
// FIXME: Optimize this to not traverses over all members. // FIXME: Optimize this to not traverses over all members.
st.event.on_room_msg(chat, members); st.event.on_room_msg(WithMsgId::new(cid, chat), members);
Ok(Json(cid)) Ok(Json(cid))
} }

View file

@ -1525,7 +1525,7 @@ async fn event(server: Server) {
{ {
let chat = server.post_chat(rid1, &ALICE, "alice1").await.unwrap(); let chat = server.post_chat(rid1, &ALICE, "alice1").await.unwrap();
let got = ws.next().await.unwrap().unwrap(); let got = ws.next().await.unwrap().unwrap();
assert_eq!(got, ServerEvent::Msg(chat.msg)); assert_eq!(got, ServerEvent::Msg(chat));
} }
// Should receive msgs from other user. // Should receive msgs from other user.
@ -1536,7 +1536,7 @@ async fn event(server: Server) {
.unwrap(); .unwrap();
let chat = server.post_chat(rid1, &BOB, "bob1").await.unwrap(); let chat = server.post_chat(rid1, &BOB, "bob1").await.unwrap();
let got = ws.next().await.unwrap().unwrap(); let got = ws.next().await.unwrap().unwrap();
assert_eq!(got, ServerEvent::Msg(chat.msg)); assert_eq!(got, ServerEvent::Msg(chat));
} }
// Should receive msgs from new room. // Should receive msgs from new room.
@ -1547,7 +1547,7 @@ async fn event(server: Server) {
{ {
let chat = server.post_chat(rid2, &ALICE, "alice2").await.unwrap(); let chat = server.post_chat(rid2, &ALICE, "alice2").await.unwrap();
let got = ws.next().await.unwrap().unwrap(); let got = ws.next().await.unwrap().unwrap();
assert_eq!(got, ServerEvent::Msg(chat.msg)); assert_eq!(got, ServerEvent::Msg(chat));
} }
// Each streams should receive each message once. // Each streams should receive each message once.
@ -1556,9 +1556,9 @@ async fn event(server: Server) {
let chat = server.post_chat(rid1, &ALICE, "alice1").await.unwrap(); let chat = server.post_chat(rid1, &ALICE, "alice1").await.unwrap();
let got1 = ws.next().await.unwrap().unwrap(); let got1 = ws.next().await.unwrap().unwrap();
assert_eq!(got1, ServerEvent::Msg(chat.msg.clone())); assert_eq!(got1, ServerEvent::Msg(chat.clone()));
let got2 = ws2.next().await.unwrap().unwrap(); let got2 = ws2.next().await.unwrap().unwrap();
assert_eq!(got2, ServerEvent::Msg(chat.msg)); assert_eq!(got2, ServerEvent::Msg(chat));
} }
} }

View file

@ -574,7 +574,7 @@ components:
- type: object - type: object
properties: properties:
chat: chat:
$ref: '#/components/schemas/Signed-Chat' $ref: '#/components/schemas/WithMsgId-Signed-Chat'
- type: object - type: object
properties: properties:

View file

@ -331,14 +331,15 @@ async function connectServer(newServerUrl) {
}; };
ws.onmessage = async (e) => { ws.onmessage = async (e) => {
console.log('ws event', e.data); console.log('ws event', e.data);
const msg = JSON.parse(e.data); const evt = JSON.parse(e.data);
if (msg.msg !== undefined) { if (evt.msg !== undefined) {
if (msg.msg.signee.payload.room === curRoom) { if (evt.msg.signee.payload.room === curRoom) {
await showChatMsg(msg.msg); lastCid = evt.msg.cid;
await showChatMsg(evt.msg);
} else { } else {
console.log('ignore background room msg'); console.log('ignore background room msg');
} }
} else if (msg.lagged !== undefined) { } else if (evt.lagged !== undefined) {
log('some events are dropped because of queue overflow') log('some events are dropped because of queue overflow')
} else { } else {
log(`unknown ws msg: ${e.data}`); log(`unknown ws msg: ${e.data}`);