feat: impl basic peer chat

This commit is contained in:
oxalica 2024-09-10 12:15:22 -04:00
parent 9e96927693
commit 1e944ead31
7 changed files with 361 additions and 64 deletions

View file

@ -7,6 +7,7 @@ use blah_types::Id;
pub trait IdExt {
fn gen() -> Self;
fn gen_peer_chat_rid() -> Self;
}
impl IdExt for Id {
@ -14,8 +15,15 @@ impl IdExt for Id {
let timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("after UNIX epoch");
let timestamp_ms = timestamp.as_millis() as i64;
assert!(timestamp_ms > 0);
Id(timestamp_ms << 16)
let timestamp_ms = timestamp.as_millis();
assert!(
0 < timestamp_ms && timestamp_ms < (1 << 48),
"invalid timestamp",
);
Id((timestamp_ms as i64) << 16)
}
fn gen_peer_chat_rid() -> Self {
Id(Self::gen().0 | i64::MIN)
}
}

View file

@ -11,8 +11,9 @@ use axum::routing::{get, post};
use axum::{Json, Router};
use axum_extra::extract::WithRejection as R;
use blah_types::{
ChatItem, ChatPayload, CreateRoomPayload, Id, MemberPermission, RoomAdminOp, RoomAdminPayload,
RoomAttrs, RoomMetadata, ServerPermission, Signee, UserKey, WithItemId, WithSig,
ChatItem, ChatPayload, CreateGroup, CreatePeerChat, CreateRoomPayload, Id, MemberPermission,
RoomAdminOp, RoomAdminPayload, RoomAttrs, RoomMetadata, ServerPermission, Signee, UserKey,
WithItemId, WithSig,
};
use config::ServerConfig;
use ed25519_dalek::SIGNATURE_LENGTH;
@ -177,7 +178,7 @@ async fn room_list(
until_token: None,
};
let page_len = pagination.effective_page_len(&st);
let start_rid = pagination.skip_token.unwrap_or(Id(0));
let start_rid = pagination.skip_token.unwrap_or(Id::MIN);
let query = |sql: &str, params: &[(&str, &dyn ToSql)]| -> Result<RoomList, ApiError> {
let rooms = st
@ -187,8 +188,6 @@ async fn room_list(
.query_map(params, |row| {
// TODO: Extract this into a function.
let rid = row.get("rid")?;
let title = row.get("title")?;
let attrs = row.get("attrs")?;
let last_item = row
.get::<_, Option<Id>>("cid")?
.map(|cid| {
@ -209,18 +208,16 @@ async fn room_list(
})
})
.transpose()?;
let last_seen_cid =
Some(row.get::<_, Id>("last_seen_cid")?).filter(|cid| cid.0 != 0);
let unseen_cnt = row.get("unseen_cnt").ok();
let member_permission = row.get("member_perm").ok();
Ok(RoomMetadata {
rid,
title,
attrs,
title: row.get("title")?,
attrs: row.get("attrs")?,
last_item,
last_seen_cid,
unseen_cnt,
member_permission,
last_seen_cid: Some(row.get::<_, Id>("last_seen_cid")?)
.filter(|cid| cid.0 != 0),
unseen_cnt: row.get("unseen_cnt").ok(),
member_permission: row.get("member_perm").ok(),
peer_user: row.get("peer_userkey").ok(),
})
})?
.collect::<Result<Vec<_>, _>>()?;
@ -255,12 +252,15 @@ async fn room_list(
r"
SELECT
`rid`, `title`, `attrs`, `last_seen_cid`, `room_member`.`permission` AS `member_perm`,
`cid`, `last_author`.`userkey`, `timestamp`, `nonce`, `sig`, `rich_text`
`cid`, `last_author`.`userkey`, `timestamp`, `nonce`, `sig`, `rich_text`,
`peer_user`.`userkey` AS `peer_userkey`
FROM `user`
JOIN `room_member` USING (`uid`)
JOIN `room` USING (`rid`)
LEFT JOIN `room_item` USING (`rid`)
LEFT JOIN `user` AS `last_author` ON (`last_author`.`uid` = `room_item`.`uid`)
LEFT JOIN `user` AS `peer_user` ON
(`peer_user`.`uid` = `room`.`peer1` + `room`.`peer2` - `user`.`uid`)
WHERE `user`.`userkey` = :userkey AND
`rid` > :start_rid
GROUP BY `rid` HAVING `cid` IS MAX(`cid`)
@ -281,6 +281,7 @@ async fn room_list(
SELECT
`rid`, `title`, `attrs`, `last_seen_cid`, `room_member`.`permission` AS `member_perm`,
`cid`, `last_author`.`userkey`, `timestamp`, `nonce`, `sig`, `rich_text`,
`peer_user`.`userkey` AS `peer_userkey`,
(SELECT COUNT(*)
FROM `room_item` AS `unseen_item`
WHERE `unseen_item`.`rid` = `room`.`rid` AND
@ -290,6 +291,8 @@ async fn room_list(
JOIN `room` USING (`rid`)
LEFT JOIN `room_item` USING (`rid`)
LEFT JOIN `user` AS `last_author` ON (`last_author`.`uid` = `room_item`.`uid`)
LEFT JOIN `user` AS `peer_user` ON
(`peer_user`.`uid` = `room`.`peer1` + `room`.`peer2` - `user`.`uid`)
WHERE `user`.`userkey` = :userkey AND
`rid` > :start_rid AND
`cid` > `last_seen_cid`
@ -312,10 +315,29 @@ async fn room_create(
st: ArcState,
SignedJson(params): SignedJson<CreateRoomPayload>,
) -> Result<Json<Id>, ApiError> {
let members = &params.signee.payload.members.0;
match params.signee.payload {
CreateRoomPayload::Group(op) => room_create_group(&st, params.signee.user, op).await,
CreateRoomPayload::PeerChat(op) => room_create_peer_chat(&st, params.signee.user, op).await,
}
}
async fn room_create_group(
st: &AppState,
user: UserKey,
op: CreateGroup,
) -> Result<Json<Id>, ApiError> {
if !RoomAttrs::GROUP_ATTRS.contains(op.attrs) {
return Err(error_response!(
StatusCode::BAD_REQUEST,
"deserialization",
"invalid room attributes",
));
}
let members = &op.members.0;
if !members
.iter()
.any(|m| m.user == params.signee.user && m.permission == MemberPermission::ALL)
.any(|m| m.user == user && m.permission == MemberPermission::ALL)
{
return Err(error_response!(
StatusCode::BAD_REQUEST,
@ -332,7 +354,7 @@ async fn room_create(
FROM `user`
WHERE `userkey` = ?
",
params![params.signee.user],
params![user],
|row| {
let perm = row.get::<_, ServerPermission>("permission")?;
Ok(perm.contains(ServerPermission::CREATE_ROOM))
@ -356,8 +378,8 @@ async fn room_create(
",
named_params! {
":rid": rid,
":title": params.signee.payload.title,
":attrs": params.signee.payload.attrs,
":title": op.title,
":attrs": op.attrs,
},
)?;
let mut insert_user = txn.prepare(
@ -390,6 +412,118 @@ async fn room_create(
Ok(Json(rid))
}
async fn room_create_peer_chat(
st: &AppState,
src_user: UserKey,
op: CreatePeerChat,
) -> Result<Json<Id>, ApiError> {
let tgt_user = op.peer;
if tgt_user == src_user {
return Err(error_response!(
StatusCode::NOT_IMPLEMENTED,
"not_implemented",
"self-chat is not implemented yet",
));
}
// TODO: Access control and throttling.
let mut conn = st.db.get();
let txn = conn.transaction()?;
let tgt_uid = txn
.query_row(
r"
SELECT `uid`
FROM `user`
WHERE `userkey` = :userkey AND
`permission` & :perm = :perm
",
named_params! {
":userkey": tgt_user,
":perm": ServerPermission::ACCEPT_PEER_CHAT,
},
|row| row.get::<_, i64>(0),
)
.optional()?
.ok_or_else(|| {
error_response!(
StatusCode::NOT_FOUND,
"not_found",
"peer user does not exist or disallows peer chat",
)
})?;
let src_uid = txn
.query_row(
r"
SELECT `uid` FROM `user`
WHERE `userkey` = ?
",
params![src_user],
|row| row.get::<_, i64>(0),
)
.optional()?;
let src_uid = match src_uid {
Some(uid) => uid,
None => {
txn.execute(
r"
INSERT INTO `user` (`userkey`)
VALUES (?)
",
params![src_user],
)?;
txn.last_insert_rowid()
}
};
let (peer1, peer2) = if src_uid <= tgt_uid {
(src_uid, tgt_uid)
} else {
(tgt_uid, src_uid)
};
let rid = Id::gen_peer_chat_rid();
let updated = txn.execute(
r"
INSERT INTO `room` (`rid`, `attrs`, `peer1`, `peer2`)
VALUES (:rid, :attrs, :peer1, :peer2)
ON CONFLICT (`peer1`, `peer2`) WHERE `rid` < 0 DO NOTHING
",
named_params! {
":rid": rid,
":attrs": RoomAttrs::PEER_CHAT,
":peer1": peer1,
":peer2": peer2,
},
)?;
if updated == 0 {
return Err(error_response!(
StatusCode::CONFLICT,
"exists",
"room already exists"
));
}
{
let mut stmt = txn.prepare(
r"
INSERT INTO `room_member` (`rid`, `uid`, `permission`)
VALUES (:rid, :uid, :perm)
",
)?;
// TODO: Limit permission of the src user?
for uid in [peer1, peer2] {
stmt.execute(named_params! {
":rid": rid,
":uid": uid,
":perm": MemberPermission::MAX_PEER_CHAT,
})?;
}
}
txn.commit()?;
Ok(Json(rid))
}
/// Pagination query parameters.
///
/// Field names are inspired by Microsoft's design, which is an extension to OData spec.
@ -444,7 +578,7 @@ async fn room_get_metadata(
let conn = st.db.get();
let (title, attrs) = get_room_if_readable(&conn, rid, auth.into_optional()?.as_ref(), |row| {
Ok((
row.get::<_, String>("title")?,
row.get::<_, Option<String>>("title")?,
row.get::<_, RoomAttrs>("attrs")?,
))
})?;
@ -459,6 +593,7 @@ async fn room_get_metadata(
last_seen_cid: None,
unseen_cnt: None,
member_permission: None,
peer_user: None,
}))
}
@ -624,8 +759,8 @@ fn query_room_items(
.query_and_then(
named_params! {
":rid": rid,
":after_cid": pagination.until_token.unwrap_or(Id(-1)),
":before_cid": pagination.skip_token.unwrap_or(Id(i64::MAX)),
":after_cid": pagination.until_token.unwrap_or(Id::MIN),
":before_cid": pagination.skip_token.unwrap_or(Id::MAX),
":limit": page_len,
},
|row| {