feat(types,webapi): impl id_key/act_key for all APIs and update docs

This commit is contained in:
oxalica 2024-09-17 17:31:20 -04:00
parent fb76756482
commit cb72d049e0
10 changed files with 426 additions and 330 deletions

View file

@ -1,12 +1,17 @@
use std::borrow::Borrow;
use std::ops::DerefMut;
use std::path::PathBuf;
use anyhow::{ensure, Context, Result};
use axum::http::StatusCode;
use blah_types::{ServerPermission, UserKey};
use parking_lot::Mutex;
use rusqlite::{params, Connection, OpenFlags};
use rusqlite::{params, Connection, OpenFlags, OptionalExtension};
use serde::Deserialize;
use serde_inline_default::serde_inline_default;
use crate::ApiError;
const DEFAULT_DATABASE_PATH: &str = "/var/lib/blahd/db.sqlite";
static INIT_SQL: &str = include_str!("../schema.sql");
@ -97,6 +102,31 @@ impl Database {
}
}
pub trait ConnectionExt: Borrow<Connection> {
fn get_user(&self, user: &UserKey) -> Result<(i64, ServerPermission), ApiError> {
self.borrow()
.query_row(
r"
SELECT `uid`, `permission`
FROM `valid_user_act_key`
WHERE (`id_key`, `act_key`) = (?, ?)
",
params![user.id_key, user.act_key],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.optional()?
.ok_or_else(|| {
error_response!(
StatusCode::NOT_FOUND,
"not_found",
"the user does not exist",
)
})
}
}
impl ConnectionExt for Connection {}
#[test]
fn init_sql_valid() {
let conn = Connection::open_in_memory().unwrap();

View file

@ -7,20 +7,20 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use anyhow::{bail, Context as _, Result};
use anyhow::{anyhow, bail, Context as _, Result};
use axum::extract::ws::{Message, WebSocket};
use blah_types::{AuthPayload, Signed, SignedChatMsg};
use futures_util::future::Either;
use futures_util::stream::SplitSink;
use futures_util::{stream_select, SinkExt as _, Stream, StreamExt};
use parking_lot::Mutex;
use rusqlite::{params, OptionalExtension};
use serde::{de, Deserialize, Serialize};
use serde_inline_default::serde_inline_default;
use tokio::sync::broadcast;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tokio_stream::wrappers::BroadcastStream;
use crate::database::ConnectionExt;
use crate::AppState;
#[derive(Debug, Deserialize)]
@ -143,19 +143,13 @@ pub async fn handle_ws(st: Arc<AppState>, ws: &mut WebSocket) -> Result<Infallib
let auth = serde_json::from_str::<Signed<AuthPayload>>(&payload)?;
st.verify_signed_data(&auth)?;
st.db
let (uid, _) = st
.db
.get()
.query_row(
r"
SELECT `uid`
FROM `user`
WHERE `userkey` = ?
",
params![auth.signee.user],
|row| row.get::<_, u64>(0),
)
.optional()?
.context("invalid user")?
.get_user(&auth.signee.user)
.map_err(|err| anyhow!("{}", err.message))?;
// FIXME: Consistency of id's sign.
uid as u64
};
tracing::debug!(%uid, "user connected");

View file

@ -15,6 +15,7 @@ use blah_types::{
RoomAdminPayload, RoomAttrs, RoomMetadata, ServerPermission, Signed, SignedChatMsg, Signee,
UserKey, UserRegisterPayload, WithMsgId,
};
use database::ConnectionExt;
use ed25519_dalek::SIGNATURE_LENGTH;
use id::IdExt;
use middleware::{Auth, MaybeAuth, ResultExt as _, SignedJson};
@ -188,10 +189,10 @@ async fn user_get(
.query_row(
"
SELECT 1
FROM `user`
WHERE `userkey` = ?
FROM `valid_user_act_key`
WHERE (`id_key`, `act_key`) = (?, ?)
",
params![user],
params![user.id_key, user.act_key],
|_| Ok(()),
)
.optional()?,
@ -271,7 +272,10 @@ async fn room_list(
signee: Signee {
nonce: row.get("nonce")?,
timestamp: row.get("timestamp")?,
user: row.get("userkey")?,
user: UserKey {
act_key: row.get("act_key")?,
id_key: row.get("id_key")?,
},
payload: ChatPayload {
rich_text: row.get("rich_text")?,
room: rid,
@ -290,7 +294,7 @@ async fn room_list(
.filter(|cid| cid.0 != 0),
unseen_cnt: row.get("unseen_cnt").ok(),
member_permission: row.get("member_perm").ok(),
peer_user: row.get("peer_userkey").ok(),
peer_user: row.get("peer_id_key").ok(),
})
})?
.collect::<Result<Vec<_>, _>>()?;
@ -303,7 +307,8 @@ async fn room_list(
ListRoomFilter::Public => query(
r"
SELECT `rid`, `title`, `attrs`, 0 AS `last_seen_cid`,
`cid`, `last_author`.`userkey`, `timestamp`, `nonce`, `sig`, `rich_text`
`cid`, `timestamp`, `nonce`, `sig`, `rich_text`,
`last_author`.`id_key`, `msg`.`act_key`
FROM `room`
LEFT JOIN `msg` USING (`rid`)
LEFT JOIN `user` AS `last_author` USING (`uid`)
@ -325,16 +330,17 @@ async fn room_list(
r"
SELECT
`rid`, `title`, `attrs`, `last_seen_cid`, `room_member`.`permission` AS `member_perm`,
`cid`, `last_author`.`userkey`, `timestamp`, `nonce`, `sig`, `rich_text`,
`peer_user`.`userkey` AS `peer_userkey`
FROM `user`
`cid`, `timestamp`, `nonce`, `sig`, `rich_text`,
`last_author`.`id_key`, `msg`.`act_key`,
`peer_user`.`id_key` AS `peer_id_key`
FROM `valid_user_act_key` AS `me`
JOIN `room_member` USING (`uid`)
JOIN `room` USING (`rid`)
LEFT JOIN `msg` USING (`rid`)
LEFT JOIN `user` AS `last_author` ON (`last_author`.`uid` = `msg`.`uid`)
LEFT JOIN `user` AS `peer_user` ON
(`peer_user`.`uid` = `room`.`peer1` + `room`.`peer2` - `user`.`uid`)
WHERE `user`.`userkey` = :userkey AND
(`peer_user`.`uid` = `room`.`peer1` + `room`.`peer2` - `me`.`uid`)
WHERE (`me`.`id_key`, `me`.`act_key`) = (:id_key, :act_key) AND
`rid` > :start_rid
GROUP BY `rid` HAVING `cid` IS MAX(`cid`)
ORDER BY `rid` ASC
@ -343,7 +349,8 @@ async fn room_list(
named_params! {
":start_rid": start_rid,
":page_len": page_len,
":userkey": user,
":id_key": user.id_key,
":act_key": user.act_key,
},
)
}
@ -353,20 +360,21 @@ async fn room_list(
r"
SELECT
`rid`, `title`, `attrs`, `last_seen_cid`, `room_member`.`permission` AS `member_perm`,
`cid`, `last_author`.`userkey`, `timestamp`, `nonce`, `sig`, `rich_text`,
`peer_user`.`userkey` AS `peer_userkey`,
`cid`, `timestamp`, `nonce`, `sig`, `rich_text`,
`last_author`.`id_key`, `msg`.`act_key`,
`peer_user`.`id_key` AS `peer_id_key`,
(SELECT COUNT(*)
FROM `msg` AS `unseen_msg`
WHERE `unseen_msg`.`rid` = `room`.`rid` AND
`last_seen_cid` < `unseen_msg`.`cid`) AS `unseen_cnt`
FROM `user`
FROM `valid_user_act_key` AS `me`
JOIN `room_member` USING (`uid`)
JOIN `room` USING (`rid`)
LEFT JOIN `msg` USING (`rid`)
LEFT JOIN `user` AS `last_author` ON (`last_author`.`uid` = `msg`.`uid`)
LEFT JOIN `user` AS `peer_user` ON
(`peer_user`.`uid` = `room`.`peer1` + `room`.`peer2` - `user`.`uid`)
WHERE `user`.`userkey` = :userkey AND
(`peer_user`.`uid` = `room`.`peer1` + `room`.`peer2` - `me`.`uid`)
WHERE (`me`.`id_key`, `me`.`act_key`) = (:id_key, :act_key) AND
`rid` > :start_rid AND
`cid` > `last_seen_cid`
GROUP BY `rid` HAVING `cid` IS MAX(`cid`)
@ -376,7 +384,8 @@ async fn room_list(
named_params! {
":start_rid": start_rid,
":page_len": page_len,
":userkey": user,
":id_key": user.id_key,
":act_key": user.act_key,
},
)
}
@ -389,14 +398,16 @@ async fn room_create(
SignedJson(params): SignedJson<CreateRoomPayload>,
) -> Result<Json<Id>, ApiError> {
match params.signee.payload {
CreateRoomPayload::Group(op) => room_create_group(&st, params.signee.user, op).await,
CreateRoomPayload::PeerChat(op) => room_create_peer_chat(&st, params.signee.user, op).await,
CreateRoomPayload::Group(op) => room_create_group(&st, &params.signee.user, op).await,
CreateRoomPayload::PeerChat(op) => {
room_create_peer_chat(&st, &params.signee.user, op).await
}
}
}
async fn room_create_group(
st: &AppState,
user: UserKey,
user: &UserKey,
op: CreateGroup,
) -> Result<Json<Id>, ApiError> {
if !RoomAttrs::GROUP_ATTRS.contains(op.attrs) {
@ -412,10 +423,10 @@ async fn room_create_group(
.query_row(
r"
SELECT `uid`, `permission`
FROM `user`
WHERE `userkey` = ?
FROM `valid_user_act_key`
WHERE (`id_key`, `act_key`) = (?, ?)
",
params![user],
params![user.id_key, user.act_key],
|row| {
Ok((
row.get::<_, i64>("uid")?,
@ -462,11 +473,11 @@ async fn room_create_group(
async fn room_create_peer_chat(
st: &AppState,
src_user: UserKey,
src_user: &UserKey,
op: CreatePeerChat,
) -> Result<Json<Id>, ApiError> {
let tgt_user = op.peer;
if tgt_user == src_user {
let tgt_user_id_key = op.peer;
if tgt_user_id_key == src_user.id_key {
return Err(error_response!(
StatusCode::NOT_IMPLEMENTED,
"not_implemented",
@ -478,38 +489,19 @@ async fn room_create_peer_chat(
let mut conn = st.db.get();
let txn = conn.transaction()?;
let src_uid = txn
let (src_uid, _) = txn.get_user(src_user)?;
let (tgt_uid, _) = txn
.query_row(
r"
SELECT `uid` FROM `user`
WHERE `userkey` = ?
",
params![src_user],
|row| row.get::<_, i64>(0),
)
.optional()?
.ok_or_else(|| {
error_response!(
StatusCode::NOT_FOUND,
"not_found",
"the user does not exist",
)
})?;
let tgt_uid = txn
.query_row(
r"
SELECT `uid`
SELECT `uid`, `permission`
FROM `user`
WHERE `userkey` = :userkey AND
`permission` & :perm = :perm
WHERE `id_key` = ?
",
named_params! {
":userkey": tgt_user,
":perm": ServerPermission::ACCEPT_PEER_CHAT,
},
|row| row.get::<_, i64>(0),
params![tgt_user_id_key],
|row| Ok((row.get::<_, i64>(0)?, row.get::<_, ServerPermission>(1)?)),
)
.optional()?
.filter(|(_, perm)| perm.contains(ServerPermission::ACCEPT_PEER_CHAT))
.ok_or_else(|| {
error_response!(
StatusCode::NOT_FOUND,
@ -518,11 +510,8 @@ async fn room_create_peer_chat(
)
})?;
let (peer1, peer2) = if src_uid <= tgt_uid {
(src_uid, tgt_uid)
} else {
(tgt_uid, src_uid)
};
let mut peers = [src_uid, tgt_uid];
peers.sort();
let rid = Id::gen_peer_chat_rid();
let updated = txn.execute(
r"
@ -533,8 +522,8 @@ async fn room_create_peer_chat(
named_params! {
":rid": rid,
":attrs": RoomAttrs::PEER_CHAT,
":peer1": peer1,
":peer2": peer2,
":peer1": peers[0],
":peer2": peers[1],
},
)?;
if updated == 0 {
@ -553,7 +542,7 @@ async fn room_create_peer_chat(
",
)?;
// TODO: Limit permission of the src user?
for uid in [peer1, peer2] {
for uid in peers {
stmt.execute(named_params! {
":rid": rid,
":uid": uid,
@ -656,7 +645,8 @@ async fn room_get_feed(
.map(|WithMsgId { cid, msg }| {
let time = SystemTime::UNIX_EPOCH + Duration::from_secs(msg.signee.timestamp);
let author = FeedAuthor {
name: msg.signee.user.to_string(),
// TODO: Retrieve id_url as name.
name: msg.signee.user.id_key.to_string(),
};
FeedItem {
id: cid.to_string(),
@ -747,6 +737,11 @@ fn get_room_if_readable<T>(
user: Option<&UserKey>,
f: impl FnOnce(&Row<'_>) -> rusqlite::Result<T>,
) -> Result<T, ApiError> {
let (id_key, act_key) = match user {
Some(keys) => (Some(&keys.id_key), Some(&keys.act_key)),
None => (None, None),
};
conn.query_row(
r"
SELECT `title`, `attrs`
@ -755,14 +750,15 @@ fn get_room_if_readable<T>(
((`attrs` & :perm) = :perm OR
EXISTS(SELECT 1
FROM `room_member`
JOIN `user` USING (`uid`)
JOIN `valid_user_act_key` USING (`uid`)
WHERE `room_member`.`rid` = `room`.`rid` AND
`userkey` = :userkey))
(`id_key`, `act_key`) = (:id_key, :act_key)))
",
named_params! {
":rid": rid,
":perm": RoomAttrs::PUBLIC_READABLE,
":userkey": user,
":id_key": id_key,
":act_key": act_key,
},
f,
)
@ -787,7 +783,7 @@ fn query_room_msgs(
let page_len = pagination.effective_page_len(st);
let mut stmt = conn.prepare(
r"
SELECT `cid`, `timestamp`, `nonce`, `sig`, `userkey`, `sig`, `rich_text`
SELECT `cid`, `timestamp`, `nonce`, `sig`, `id_key`, `act_key`, `sig`, `rich_text`
FROM `msg`
JOIN `user` USING (`uid`)
WHERE `rid` = :rid AND
@ -813,7 +809,10 @@ fn query_room_msgs(
signee: Signee {
nonce: row.get("nonce")?,
timestamp: row.get("timestamp")?,
user: row.get("userkey")?,
user: UserKey {
id_key: row.get("id_key")?,
act_key: row.get("act_key")?,
},
payload: ChatPayload {
room: rid,
rich_text: row.get("rich_text")?,
@ -850,13 +849,14 @@ async fn room_msg_post(
r"
SELECT `uid`, `room_member`.`permission`
FROM `room_member`
JOIN `user` USING (`uid`)
JOIN `valid_user_act_key` USING (`uid`)
WHERE `rid` = :rid AND
`userkey` = :userkey
(`id_key`, `act_key`) = (:id_key, :act_key)
",
named_params! {
":rid": rid,
":userkey": &chat.signee.user,
":id_key": &chat.signee.user.id_key,
":act_key": &chat.signee.user.act_key,
},
|row| {
Ok((
@ -885,13 +885,14 @@ async fn room_msg_post(
let cid = Id::gen();
conn.execute(
r"
INSERT INTO `msg` (`cid`, `rid`, `uid`, `timestamp`, `nonce`, `sig`, `rich_text`)
VALUES (:cid, :rid, :uid, :timestamp, :nonce, :sig, :rich_text)
INSERT INTO `msg` (`cid`, `rid`, `uid`, `act_key`, `timestamp`, `nonce`, `sig`, `rich_text`)
VALUES (:cid, :rid, :uid, :act_key, :timestamp, :nonce, :sig, :rich_text)
",
named_params! {
":cid": cid,
":rid": rid,
":uid": uid,
":act_key": chat.signee.user.act_key,
":timestamp": chat.signee.timestamp,
":nonce": chat.signee.nonce,
":rich_text": &chat.signee.payload.rich_text,
@ -954,7 +955,7 @@ async fn room_admin(
match op.signee.payload.op {
RoomAdminOp::AddMember { user, permission } => {
if user != op.signee.user {
if user != op.signee.user.id_key {
return Err(error_response!(
StatusCode::NOT_IMPLEMENTED,
"not_implemented",
@ -968,17 +969,17 @@ async fn room_admin(
"invalid permission",
));
}
room_join(&st, rid, user, permission).await?;
room_join(&st, rid, &op.signee.user, permission).await?;
}
RoomAdminOp::RemoveMember { user } => {
if user != op.signee.user {
if user != op.signee.user.id_key {
return Err(error_response!(
StatusCode::NOT_IMPLEMENTED,
"not_implemented",
"only self-removing is implemented yet",
));
}
room_leave(&st, rid, user).await?;
room_leave(&st, rid, &op.signee.user).await?;
}
}
@ -988,29 +989,12 @@ async fn room_admin(
async fn room_join(
st: &AppState,
rid: Id,
user: UserKey,
user: &UserKey,
permission: MemberPermission,
) -> Result<(), ApiError> {
let mut conn = st.db.get();
let txn = conn.transaction()?;
let uid = txn
.query_row(
r"
SELECT `uid`
FROM `user`
WHERE `userkey` = ?
",
params![user],
|row| row.get::<_, i32>(0),
)
.optional()?
.ok_or_else(|| {
error_response!(
StatusCode::NOT_FOUND,
"not_found",
"the user does not exist",
)
})?;
let (uid, _) = txn.get_user(user)?;
txn.query_row(
r"
SELECT `attrs`
@ -1053,7 +1037,7 @@ async fn room_join(
Ok(())
}
async fn room_leave(st: &AppState, rid: Id, user: UserKey) -> Result<(), ApiError> {
async fn room_leave(st: &AppState, rid: Id, user: &UserKey) -> Result<(), ApiError> {
let mut conn = st.db.get();
let txn = conn.transaction()?;
@ -1062,13 +1046,13 @@ async fn room_leave(st: &AppState, rid: Id, user: UserKey) -> Result<(), ApiErro
r"
SELECT `uid`
FROM `room_member`
JOIN `user` USING (`uid`)
WHERE `rid` = :rid AND
`userkey` = :userkey
JOIN `valid_user_act_key` USING (`uid`)
WHERE (`rid`, `id_key`, `act_key`) = (:rid, :id_key, :act_key)
",
named_params! {
":rid": rid,
":userkey": user,
":id_key": user.id_key,
":act_key": user.act_key,
},
|row| row.get::<_, u64>("uid"),
)
@ -1108,12 +1092,15 @@ async fn room_msg_mark_seen(
SET `last_seen_cid` = MAX(`last_seen_cid`, :cid)
WHERE
`rid` = :rid AND
`uid` = (SELECT `uid` FROM `user` WHERE `userkey` = :userkey)
`uid` = (SELECT `uid`
FROM `valid_user_act_key`
WHERE (`id_key`, `act_key`) = (:id_key, :act_key))
",
named_params! {
":cid": cid,
":rid": rid,
":userkey": user,
":id_key": user.id_key,
":act_key": user.act_key,
},
)?;

View file

@ -4,7 +4,7 @@ use std::time::{Duration, Instant};
use anyhow::{anyhow, ensure, Context};
use axum::http::{HeaderMap, HeaderName, StatusCode};
use blah_types::{
get_timestamp, Signed, UserIdentityDesc, UserKey, UserRegisterPayload, X_BLAH_DIFFICULTY,
get_timestamp, PubKey, Signed, UserIdentityDesc, UserRegisterPayload, X_BLAH_DIFFICULTY,
X_BLAH_NONCE,
};
use http_body_util::BodyExt;
@ -270,9 +270,9 @@ pub async fn user_register(
let uid = txn
.query_row(
r"
INSERT INTO `user` (`userkey`, `last_fetch_time`, `id_desc`)
INSERT INTO `user` (`id_key`, `last_fetch_time`, `id_desc`)
VALUES (:id_key, :last_fetch_time, :id_desc)
ON CONFLICT (`userkey`) DO UPDATE SET
ON CONFLICT (`id_key`) DO UPDATE SET
`last_fetch_time` = :last_fetch_time,
`id_desc` = :id_desc
WHERE `last_fetch_time` < :last_fetch_time
@ -323,20 +323,31 @@ pub async fn user_register(
fn validate_id_desc(
id_url: &Url,
id_key: &UserKey,
id_key: &PubKey,
id_desc: &UserIdentityDesc,
now: u64,
) -> anyhow::Result<()> {
ensure!(*id_key == id_desc.id_key, "id_key mismatch");
let profile_signing_key = &id_desc.profile.signee.user;
ensure!(
*id_key == id_desc.profile.signee.user.id_key,
"profile id_key mismatch",
);
let profile_signing_key = &id_desc.profile.signee.user.act_key;
let mut profile_signed = false;
for (i, act_key) in id_desc.act_keys.iter().enumerate() {
let kdesc = &act_key.signee.payload;
for (i, signed_kdesc) in id_desc.act_keys.iter().enumerate() {
let kdesc = &signed_kdesc.signee.payload;
(|| {
ensure!(act_key.signee.user == *id_key, "not signed by id_key");
act_key.verify().context("signature verification failed")?;
// act_key itself is signed by id_key, so both are id_key here.
ensure!(
signed_kdesc.signee.user.id_key == *id_key
&& signed_kdesc.signee.user.act_key == *id_key,
"not signed by id_key",
);
signed_kdesc
.verify()
.context("signature verification failed")?;
if now < kdesc.expire_time && *profile_signing_key == kdesc.act_key {
profile_signed = true;
}