mirror of
https://github.com/Blah-IM/blahrs.git
synced 2025-08-18 10:12:38 +00:00
refactor(*): use term msg
to replace item
- `Msg` or `msg` is now the canonical term for the substructure in a room. It includes a `chat` subtype and (in the future) other administration subtypes like member joining or leaving. - `Message` or `message` can used in human oriented context like docs and comments, but only when it is unambiguous. - `message` is not chosen in code because it's hard to type (at least for me!), and have ambiguous meaning of: - "Human readable text" in context of `ApiError`'s field. - "A unit of data transfer, datagram" in context of WebSocket Message. - `item` is not chosen because it is overly generic.
This commit is contained in:
parent
4acc103afa
commit
73eb441a26
8 changed files with 171 additions and 175 deletions
|
@ -10,7 +10,7 @@ static INIT_SQL: &str = include_str!("../schema.sql");
|
|||
|
||||
// Simple and stupid version check for now.
|
||||
// `echo -n 'blahd-database-0' | sha256sum | head -c5` || version
|
||||
const APPLICATION_ID: i32 = 0xd9e_8403;
|
||||
const APPLICATION_ID: i32 = 0xd9e_8404;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Database {
|
||||
|
|
|
@ -8,7 +8,7 @@ use std::task::{Context, Poll};
|
|||
|
||||
use anyhow::{bail, Context as _, Result};
|
||||
use axum::extract::ws::{Message, WebSocket};
|
||||
use blah_types::{AuthPayload, ChatItem, WithSig};
|
||||
use blah_types::{AuthPayload, SignedChatMsg, WithSig};
|
||||
use futures_util::future::Either;
|
||||
use futures_util::stream::SplitSink;
|
||||
use futures_util::{stream_select, SinkExt as _, Stream, StreamExt};
|
||||
|
@ -28,8 +28,8 @@ pub enum Incoming {}
|
|||
#[derive(Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum Outgoing<'a> {
|
||||
/// A chat message from a joined room.
|
||||
Chat(&'a ChatItem),
|
||||
/// A message from a joined room.
|
||||
Msg(&'a SignedChatMsg),
|
||||
/// The receiver is too slow to receive and some events and are dropped.
|
||||
// FIXME: Should we indefinitely buffer them or just disconnect the client instead?
|
||||
Lagged,
|
||||
|
@ -71,11 +71,11 @@ impl WsSenderWrapper<'_, '_> {
|
|||
}
|
||||
}
|
||||
|
||||
type UserEventSender = broadcast::Sender<Arc<ChatItem>>;
|
||||
type UserEventSender = broadcast::Sender<Arc<SignedChatMsg>>;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct UserEventReceiver {
|
||||
rx: BroadcastStream<Arc<ChatItem>>,
|
||||
rx: BroadcastStream<Arc<SignedChatMsg>>,
|
||||
st: Arc<AppState>,
|
||||
uid: u64,
|
||||
}
|
||||
|
@ -93,7 +93,7 @@ impl Drop for UserEventReceiver {
|
|||
}
|
||||
|
||||
impl Stream for UserEventReceiver {
|
||||
type Item = Result<Arc<ChatItem>, BroadcastStreamRecvError>;
|
||||
type Item = Result<Arc<SignedChatMsg>, BroadcastStreamRecvError>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
self.rx.poll_next_unpin(cx)
|
||||
|
@ -155,7 +155,7 @@ pub async fn handle_ws(st: Arc<AppState>, ws: &mut WebSocket) -> Result<Infallib
|
|||
Either::Left(msg) => match serde_json::from_str::<Incoming>(&msg?)? {},
|
||||
Either::Right(ret) => {
|
||||
let msg = match &ret {
|
||||
Ok(chat) => Outgoing::Chat(chat),
|
||||
Ok(chat) => Outgoing::Msg(chat),
|
||||
Err(BroadcastStreamRecvError::Lagged(_)) => Outgoing::Lagged,
|
||||
};
|
||||
// TODO: Concurrent send.
|
||||
|
|
100
blahd/src/lib.rs
100
blahd/src/lib.rs
|
@ -11,9 +11,9 @@ use axum::routing::{get, post};
|
|||
use axum::{Json, Router};
|
||||
use axum_extra::extract::WithRejection as R;
|
||||
use blah_types::{
|
||||
ChatItem, ChatPayload, CreateGroup, CreatePeerChat, CreateRoomPayload, Id, MemberPermission,
|
||||
RoomAdminOp, RoomAdminPayload, RoomAttrs, RoomMetadata, ServerPermission, Signee, UserKey,
|
||||
WithItemId, WithSig,
|
||||
ChatPayload, CreateGroup, CreatePeerChat, CreateRoomPayload, Id, MemberPermission, RoomAdminOp,
|
||||
RoomAdminPayload, RoomAttrs, RoomMetadata, ServerPermission, SignedChatMsg, Signee, UserKey,
|
||||
WithMsgId, WithSig,
|
||||
};
|
||||
use config::ServerConfig;
|
||||
use ed25519_dalek::SIGNATURE_LENGTH;
|
||||
|
@ -100,8 +100,8 @@ pub fn router(st: Arc<AppState>) -> Router {
|
|||
.route("/room/:rid", get(room_get_metadata))
|
||||
// NB. Sync with `feed_url` and `next_url` generation.
|
||||
.route("/room/:rid/feed.json", get(room_get_feed))
|
||||
.route("/room/:rid/item", get(room_item_list).post(room_item_post))
|
||||
.route("/room/:rid/item/:cid/seen", post(room_item_mark_seen))
|
||||
.route("/room/:rid/msg", get(room_msg_list).post(room_msg_post))
|
||||
.route("/room/:rid/msg/:cid/seen", post(room_msg_mark_seen))
|
||||
.route("/room/:rid/admin", post(room_admin))
|
||||
.layer(tower_http::limit::RequestBodyLimitLayer::new(
|
||||
st.config.max_request_len,
|
||||
|
@ -188,12 +188,12 @@ async fn room_list(
|
|||
.query_map(params, |row| {
|
||||
// TODO: Extract this into a function.
|
||||
let rid = row.get("rid")?;
|
||||
let last_item = row
|
||||
let last_msg = row
|
||||
.get::<_, Option<Id>>("cid")?
|
||||
.map(|cid| {
|
||||
Ok::<_, rusqlite::Error>(WithItemId {
|
||||
Ok::<_, rusqlite::Error>(WithMsgId {
|
||||
cid,
|
||||
item: ChatItem {
|
||||
msg: SignedChatMsg {
|
||||
sig: row.get("sig")?,
|
||||
signee: Signee {
|
||||
nonce: row.get("nonce")?,
|
||||
|
@ -212,7 +212,7 @@ async fn room_list(
|
|||
rid,
|
||||
title: row.get("title")?,
|
||||
attrs: row.get("attrs")?,
|
||||
last_item,
|
||||
last_msg,
|
||||
last_seen_cid: Some(row.get::<_, Id>("last_seen_cid")?)
|
||||
.filter(|cid| cid.0 != 0),
|
||||
unseen_cnt: row.get("unseen_cnt").ok(),
|
||||
|
@ -232,7 +232,7 @@ async fn room_list(
|
|||
SELECT `rid`, `title`, `attrs`, 0 AS `last_seen_cid`,
|
||||
`cid`, `last_author`.`userkey`, `timestamp`, `nonce`, `sig`, `rich_text`
|
||||
FROM `room`
|
||||
LEFT JOIN `room_item` USING (`rid`)
|
||||
LEFT JOIN `msg` USING (`rid`)
|
||||
LEFT JOIN `user` AS `last_author` USING (`uid`)
|
||||
WHERE `rid` > :start_rid AND
|
||||
(`attrs` & :perm) = :perm
|
||||
|
@ -257,8 +257,8 @@ async fn room_list(
|
|||
FROM `user`
|
||||
JOIN `room_member` USING (`uid`)
|
||||
JOIN `room` USING (`rid`)
|
||||
LEFT JOIN `room_item` USING (`rid`)
|
||||
LEFT JOIN `user` AS `last_author` ON (`last_author`.`uid` = `room_item`.`uid`)
|
||||
LEFT JOIN `msg` USING (`rid`)
|
||||
LEFT JOIN `user` AS `last_author` ON (`last_author`.`uid` = `msg`.`uid`)
|
||||
LEFT JOIN `user` AS `peer_user` ON
|
||||
(`peer_user`.`uid` = `room`.`peer1` + `room`.`peer2` - `user`.`uid`)
|
||||
WHERE `user`.`userkey` = :userkey AND
|
||||
|
@ -283,14 +283,14 @@ async fn room_list(
|
|||
`cid`, `last_author`.`userkey`, `timestamp`, `nonce`, `sig`, `rich_text`,
|
||||
`peer_user`.`userkey` AS `peer_userkey`,
|
||||
(SELECT COUNT(*)
|
||||
FROM `room_item` AS `unseen_item`
|
||||
WHERE `unseen_item`.`rid` = `room`.`rid` AND
|
||||
`last_seen_cid` < `unseen_item`.`cid`) AS `unseen_cnt`
|
||||
FROM `msg` AS `unseen_msg`
|
||||
WHERE `unseen_msg`.`rid` = `room`.`rid` AND
|
||||
`last_seen_cid` < `unseen_msg`.`cid`) AS `unseen_cnt`
|
||||
FROM `user`
|
||||
JOIN `room_member` USING (`uid`)
|
||||
JOIN `room` USING (`rid`)
|
||||
LEFT JOIN `room_item` USING (`rid`)
|
||||
LEFT JOIN `user` AS `last_author` ON (`last_author`.`uid` = `room_item`.`uid`)
|
||||
LEFT JOIN `msg` USING (`rid`)
|
||||
LEFT JOIN `user` AS `last_author` ON (`last_author`.`uid` = `msg`.`uid`)
|
||||
LEFT JOIN `user` AS `peer_user` ON
|
||||
(`peer_user`.`uid` = `room`.`peer1` + `room`.`peer2` - `user`.`uid`)
|
||||
WHERE `user`.`userkey` = :userkey AND
|
||||
|
@ -536,7 +536,7 @@ struct Pagination {
|
|||
/// Maximum page size.
|
||||
top: Option<NonZeroUsize>,
|
||||
/// Only return items before (excluding) this token.
|
||||
/// Useful for `room_item_list` to pass `last_seen_cid` without over-fetching.
|
||||
/// Useful for `room_msg_list` to pass `last_seen_cid` without over-fetching.
|
||||
until_token: Option<Id>,
|
||||
}
|
||||
|
||||
|
@ -550,24 +550,24 @@ impl Pagination {
|
|||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct RoomItems {
|
||||
pub items: Vec<WithItemId<ChatItem>>,
|
||||
pub struct RoomMsgs {
|
||||
pub msgs: Vec<WithMsgId<SignedChatMsg>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub skip_token: Option<Id>,
|
||||
}
|
||||
|
||||
async fn room_item_list(
|
||||
async fn room_msg_list(
|
||||
st: ArcState,
|
||||
R(Path(rid), _): RE<Path<Id>>,
|
||||
R(Query(pagination), _): RE<Query<Pagination>>,
|
||||
auth: MaybeAuth,
|
||||
) -> Result<Json<RoomItems>, ApiError> {
|
||||
let (items, skip_token) = {
|
||||
) -> Result<Json<RoomMsgs>, ApiError> {
|
||||
let (msgs, skip_token) = {
|
||||
let conn = st.db.get();
|
||||
get_room_if_readable(&conn, rid, auth.into_optional()?.as_ref(), |_row| Ok(()))?;
|
||||
query_room_items(&st, &conn, rid, pagination)?
|
||||
query_room_msgs(&st, &conn, rid, pagination)?
|
||||
};
|
||||
Ok(Json(RoomItems { items, skip_token }))
|
||||
Ok(Json(RoomMsgs { msgs, skip_token }))
|
||||
}
|
||||
|
||||
async fn room_get_metadata(
|
||||
|
@ -589,7 +589,7 @@ async fn room_get_metadata(
|
|||
attrs,
|
||||
|
||||
// TODO: Should we include these here?
|
||||
last_item: None,
|
||||
last_msg: None,
|
||||
last_seen_cid: None,
|
||||
unseen_cnt: None,
|
||||
member_permission: None,
|
||||
|
@ -603,28 +603,28 @@ async fn room_get_feed(
|
|||
R(Query(pagination), _): RE<Query<Pagination>>,
|
||||
) -> Result<impl IntoResponse, ApiError> {
|
||||
let title;
|
||||
let (items, skip_token) = {
|
||||
let (msgs, skip_token) = {
|
||||
let conn = st.db.get();
|
||||
title = get_room_if_readable(&conn, rid, None, |row| row.get::<_, String>("title"))?;
|
||||
query_room_items(&st, &conn, rid, pagination)?
|
||||
query_room_msgs(&st, &conn, rid, pagination)?
|
||||
};
|
||||
|
||||
let items = items
|
||||
let items = msgs
|
||||
.into_iter()
|
||||
.map(|WithItemId { cid, item }| {
|
||||
let time = SystemTime::UNIX_EPOCH + Duration::from_secs(item.signee.timestamp);
|
||||
.map(|WithMsgId { cid, msg }| {
|
||||
let time = SystemTime::UNIX_EPOCH + Duration::from_secs(msg.signee.timestamp);
|
||||
let author = FeedAuthor {
|
||||
name: item.signee.user.to_string(),
|
||||
name: msg.signee.user.to_string(),
|
||||
};
|
||||
FeedItem {
|
||||
id: cid.to_string(),
|
||||
content_html: item.signee.payload.rich_text.html().to_string(),
|
||||
content_html: msg.signee.payload.rich_text.html().to_string(),
|
||||
date_published: humantime::format_rfc3339(time).to_string(),
|
||||
authors: (author,),
|
||||
extra: FeedItemExtra {
|
||||
timestamp: item.signee.timestamp,
|
||||
nonce: item.signee.nonce,
|
||||
sig: item.sig,
|
||||
timestamp: msg.signee.timestamp,
|
||||
nonce: msg.signee.nonce,
|
||||
sig: msg.sig,
|
||||
},
|
||||
}
|
||||
})
|
||||
|
@ -734,19 +734,19 @@ fn get_room_if_readable<T>(
|
|||
})
|
||||
}
|
||||
|
||||
/// Get room items with pagination parameters,
|
||||
/// return a page of items and the next skip_token if this is not the last page.
|
||||
fn query_room_items(
|
||||
/// Get room messages with pagination parameters,
|
||||
/// return a page of messages and the next `skip_token` if this is not the last page.
|
||||
fn query_room_msgs(
|
||||
st: &AppState,
|
||||
conn: &Connection,
|
||||
rid: Id,
|
||||
pagination: Pagination,
|
||||
) -> Result<(Vec<WithItemId<ChatItem>>, Option<Id>), ApiError> {
|
||||
) -> Result<(Vec<WithMsgId<SignedChatMsg>>, Option<Id>), ApiError> {
|
||||
let page_len = pagination.effective_page_len(st);
|
||||
let mut stmt = conn.prepare(
|
||||
r"
|
||||
SELECT `cid`, `timestamp`, `nonce`, `sig`, `userkey`, `sig`, `rich_text`
|
||||
FROM `room_item`
|
||||
FROM `msg`
|
||||
JOIN `user` USING (`uid`)
|
||||
WHERE `rid` = :rid AND
|
||||
:after_cid < `cid` AND
|
||||
|
@ -755,7 +755,7 @@ fn query_room_items(
|
|||
LIMIT :limit
|
||||
",
|
||||
)?;
|
||||
let items = stmt
|
||||
let msgs = stmt
|
||||
.query_and_then(
|
||||
named_params! {
|
||||
":rid": rid,
|
||||
|
@ -764,9 +764,9 @@ fn query_room_items(
|
|||
":limit": page_len,
|
||||
},
|
||||
|row| {
|
||||
Ok(WithItemId {
|
||||
Ok(WithMsgId {
|
||||
cid: row.get("cid")?,
|
||||
item: ChatItem {
|
||||
msg: SignedChatMsg {
|
||||
sig: row.get("sig")?,
|
||||
signee: Signee {
|
||||
nonce: row.get("nonce")?,
|
||||
|
@ -783,12 +783,12 @@ fn query_room_items(
|
|||
)?
|
||||
.collect::<rusqlite::Result<Vec<_>>>()?;
|
||||
let skip_token =
|
||||
(items.len() == page_len).then(|| items.last().expect("page must not be empty").cid);
|
||||
(msgs.len() == page_len).then(|| msgs.last().expect("page must not be empty").cid);
|
||||
|
||||
Ok((items, skip_token))
|
||||
Ok((msgs, skip_token))
|
||||
}
|
||||
|
||||
async fn room_item_post(
|
||||
async fn room_msg_post(
|
||||
st: ArcState,
|
||||
R(Path(rid), _): RE<Path<Id>>,
|
||||
SignedJson(chat): SignedJson<ChatPayload>,
|
||||
|
@ -836,14 +836,14 @@ async fn room_item_post(
|
|||
return Err(error_response!(
|
||||
StatusCode::FORBIDDEN,
|
||||
"permission_denied",
|
||||
"the user does not have permission to post item in the room",
|
||||
"the user does not have permission to post in the room",
|
||||
));
|
||||
}
|
||||
|
||||
let cid = Id::gen();
|
||||
conn.execute(
|
||||
r"
|
||||
INSERT INTO `room_item` (`cid`, `rid`, `uid`, `timestamp`, `nonce`, `sig`, `rich_text`)
|
||||
INSERT INTO `msg` (`cid`, `rid`, `uid`, `timestamp`, `nonce`, `sig`, `rich_text`)
|
||||
VALUES (:cid, :rid, :uid, :timestamp, :nonce, :sig, :rich_text)
|
||||
",
|
||||
named_params! {
|
||||
|
@ -1048,7 +1048,7 @@ async fn room_leave(st: &AppState, rid: Id, user: UserKey) -> Result<(), ApiErro
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn room_item_mark_seen(
|
||||
async fn room_msg_mark_seen(
|
||||
st: ArcState,
|
||||
R(Path((rid, cid)), _): RE<Path<(Id, u64)>>,
|
||||
Auth(user): Auth,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue