Reorganize workspace layout and split out blahd

This commit is contained in:
oxalica 2024-08-30 19:16:44 -04:00
parent 370722731b
commit 668b873b07
10 changed files with 89 additions and 66 deletions

26
blahd/Cargo.toml Normal file
View file

@ -0,0 +1,26 @@
[package]
name = "blahd"
version = "0.0.0"
edition = "2021"
[dependencies]
anyhow = "1"
axum = { version = "0.7", features = ["tokio"] }
clap = { version = "4", features = ["derive"] }
ed25519-dalek = "2"
futures-util = "0.3"
hex = { version = "0.4", features = ["serde"] }
humantime = "2"
rusqlite = { version = "0.32", features = ["uuid"] }
sd-notify = "0.4"
serde = { version = "1", features = ["derive"] }
serde-aux = "4"
serde_json = "1"
tokio = { version = "1", features = ["macros", "rt-multi-thread", "sync"] }
tokio-stream = { version = "0.1", features = ["sync"] }
tower-http = { version = "0.5", features = ["cors", "limit"] }
tracing = "0.1"
tracing-subscriber = "0.3"
uuid = { version = "1", features = ["v4"] }
blah = { path = "..", features = ["rusqlite"] }

122
blahd/docs/webapi.yaml Normal file
View file

@ -0,0 +1,122 @@
openapi: 3.1.0
info:
title: Blah Chatserver Proto
version: 0.0.1
paths:
/room/create:
post:
summary: Create a new room
requestBody:
content:
application/json:
schema:
$ref: WithSig<CreateRoomPayload>
example:
sig: 99a77e836538268839ed3419c649eefb043cb51d448f641cc2a1c523811aab4aacd09f92e7c0688ffd659bfc6acb764fea79979a491e132bf6a56dd23adc1d09
signee:
nonce: 670593955
payload:
typ: create_room
attrs: 1 # PUBLIC_READABLE
title: 'hello room'
members:
- user: 83ce46ced47ec0391c64846cbb6c507250ead4985b6a044d68751edc46015dd7
permission: -1
timestamp: 1724966284
user: 83ce46ced47ec0391c64846cbb6c507250ead4985b6a044d68751edc46015dd7
responses:
200:
content:
application/json:
type: string
description: UUID of the newly created room (ruuid).
403:
description: The user does not have permission to create room.
/room/{ruuid}/feed.json:
get:
summary: JSON feed of room {ruuid}, which must be public readable
description: For human and feed reader consumption only.
responses:
200:
text/feed+json:
scheme:
$ref: 'https://www.jsonfeed.org/version/1.1/'
404:
description: Room does not exist or is private.
/room/{ruuid}/item:
get:
summary: Get chat history for room {ruuid}
description: |
Return chat items in reversed time order, up to PAGE_LEN items.
The last (oldest) chat id can be used as query parameter for the next
GET, to repeatly fetch full history.
headers:
Authorization:
description: Proof of membership for private rooms.
required: false
schema:
$ret: WithSig<AuthPayload>
parameters:
before_id:
description: Filter items before (not including) a given chat id (cid).
in: query
responses:
200:
content:
application/json:
x-description: TODO
post:
summary: Post a chat in room {ruuid}
requestBody:
content:
application/json:
schema:
$ref: WithSig<ChatPayload>
example:
sig: 99a77e836538268839ed3419c649eefb043cb51d448f641cc2a1c523811aab4aacd09f92e7c0688ffd659bfc6acb764fea79979a491e132bf6a56dd23adc1d09
signee:
nonce: 670593955
payload:
typ: chat
room: 7ed9e067-ec37-4054-9fc2-b1bd890929bd
rich_text: ["before ",["bold ",{"b":true}],["italic bold ",{"b":true,"i":true}],"end"]
timestamp: 1724966284
user: 83ce46ced47ec0391c64846cbb6c507250ead4985b6a044d68751edc46015dd7
responses:
200:
content:
application/json:
type: integer
description: Created chat id (cid).
400:
description: Body is invalid or fails the verification.
403:
description: The user does not have permission to post in this room.
404:
description: Room not found.
/room/{ruuid}/event:
get:
summary: Get an event stream for future new items.
description: |
This is a temporary interface, before a better notification system
(post notifications? websocket?) is implemented.
headers:
Authorization:
description: Proof of membership for private rooms.
required: false
schema:
$ret: WithSig<AuthPayload>
responses:
200:
content:
text/event-stream:
x-description: An event stream, each event is a JSON with type WithSig<ChatPayload>
400:
description: Body is invalid or fails the verification.
404:
description: Room not found.

32
blahd/init.sql Normal file
View file

@ -0,0 +1,32 @@
PRAGMA journal_mode=WAL;
PRAGMA foreign_keys=TRUE;
CREATE TABLE IF NOT EXISTS `user` (
`uid` INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
`userkey` BLOB NOT NULL UNIQUE,
`permission` INTEGER NOT NULL DEFAULT 0
) STRICT;
CREATE TABLE IF NOT EXISTS `room` (
`rid` INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
`ruuid` BLOB NOT NULL UNIQUE,
`title` TEXT NOT NULL,
`attrs` INTEGER NOT NULL
) STRICT;
CREATE TABLE IF NOT EXISTS `room_member` (
`rid` INTEGER NOT NULL REFERENCES `room` ON DELETE CASCADE,
`uid` INTEGER NOT NULL REFERENCES `user` ON DELETE RESTRICT,
`permission` INTEGER NOT NULL,
PRIMARY KEY (`rid`, `uid`)
) STRICT;
CREATE TABLE IF NOT EXISTS `room_item` (
`cid` INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
`rid` INTEGER NOT NULL REFERENCES `room` ON DELETE CASCADE,
`uid` INTEGER NOT NULL REFERENCES `user` ON DELETE RESTRICT,
`timestamp` INTEGER NOT NULL,
`nonce` INTEGER NOT NULL,
`sig` BLOB NOT NULL,
`rich_text` TEXT NOT NULL
) STRICT;

580
blahd/src/main.rs Normal file
View file

@ -0,0 +1,580 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::convert::Infallible;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use anyhow::{ensure, Context, Result};
use axum::extract::{FromRequest, FromRequestParts, Path, Query, Request, State};
use axum::http::{header, request, StatusCode};
use axum::response::{sse, IntoResponse, Response};
use axum::routing::{get, post};
use axum::{async_trait, Json, Router};
use blah::types::{
AuthPayload, ChatItem, ChatPayload, CreateRoomPayload, MemberPermission, RoomAttrs,
ServerPermission, Signee, UserKey, WithSig,
};
use ed25519_dalek::SIGNATURE_LENGTH;
use rusqlite::{named_params, params, OptionalExtension, Row};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use tokio_stream::StreamExt;
use uuid::Uuid;
const PAGE_LEN: usize = 64;
const EVENT_QUEUE_LEN: usize = 1024;
const MAX_BODY_LEN: usize = 4 << 10; // 4KiB
#[derive(Debug, clap::Parser)]
struct Cli {
/// Address to listen on.
#[arg(long)]
listen: String,
/// Path to the SQLite database.
#[arg(long)]
database: PathBuf,
/// The global absolute URL prefix where this service is hosted.
/// It is for link generation and must not have trailing slash.
#[arg(long)]
base_url: String,
}
fn main() -> Result<()> {
tracing_subscriber::fmt::init();
let cli = <Cli as clap::Parser>::parse();
let db = rusqlite::Connection::open(&cli.database).context("failed to open database")?;
let st = AppState::init(&*cli.base_url, db).context("failed to initialize state")?;
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("failed to initialize tokio runtime")?
.block_on(main_async(cli, st))?;
Ok(())
}
#[derive(Debug)]
struct AppState {
conn: Mutex<rusqlite::Connection>,
room_listeners: Mutex<HashMap<u64, broadcast::Sender<Arc<ChatItem>>>>,
base_url: Box<str>,
}
impl AppState {
fn init(base_url: impl Into<Box<str>>, conn: rusqlite::Connection) -> Result<Self> {
static INIT_SQL: &str = include_str!("../init.sql");
let base_url = base_url.into();
ensure!(
!base_url.ends_with('/'),
"base_url must not has trailing slash",
);
conn.execute_batch(INIT_SQL)
.context("failed to initialize database")?;
Ok(Self {
conn: Mutex::new(conn),
room_listeners: Mutex::new(HashMap::new()),
base_url,
})
}
}
type ArcState = State<Arc<AppState>>;
async fn main_async(opt: Cli, st: AppState) -> Result<()> {
let app = Router::new()
.route("/room/create", post(room_create))
// NB. Sync with `feed_url` and `next_url` generation.
.route("/room/:ruuid/feed.json", get(room_get_feed))
.route("/room/:ruuid/event", get(room_event))
.route("/room/:ruuid/item", get(room_get_item).post(room_post_item))
.with_state(Arc::new(st))
.layer(tower_http::limit::RequestBodyLimitLayer::new(MAX_BODY_LEN))
// NB. This comes at last (outmost layer), so inner errors will still be wraped with
// correct CORS headers.
.layer(tower_http::cors::CorsLayer::permissive());
let listener = tokio::net::TcpListener::bind(&opt.listen)
.await
.context("failed to listen on socket")?;
tracing::info!("listening on {}", opt.listen);
let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Ready]);
axum::serve(listener, app)
.await
.context("failed to serve")?;
Ok(())
}
fn from_db_error(err: rusqlite::Error) -> StatusCode {
match err {
rusqlite::Error::QueryReturnedNoRows => StatusCode::NOT_FOUND,
err => {
tracing::error!(%err, "database error");
StatusCode::INTERNAL_SERVER_ERROR
}
}
}
async fn room_create(
st: ArcState,
SignedJson(params): SignedJson<CreateRoomPayload>,
) -> Result<Json<Uuid>, StatusCode> {
let members = &params.signee.payload.members.0;
if !members
.iter()
.any(|m| m.user == params.signee.user && m.permission == MemberPermission::ALL)
{
return Err(StatusCode::BAD_REQUEST);
}
let mut conn = st.conn.lock().unwrap();
let Some(true) = conn
.query_row(
r"
SELECT `permission`
FROM `user`
WHERE `userkey` = ?
",
params![params.signee.user],
|row| {
let perm = row.get::<_, ServerPermission>("permission")?;
Ok(perm.contains(ServerPermission::CREATE_ROOM))
},
)
.optional()
.map_err(from_db_error)?
else {
return Err(StatusCode::FORBIDDEN);
};
let ruuid = Uuid::new_v4();
(|| {
let txn = conn.transaction()?;
let rid = txn.query_row(
r"
INSERT INTO `room` (`ruuid`, `title`)
VALUES (:ruuid, :title)
RETURNING `rid`
",
named_params! {
":ruuid": ruuid,
":title": params.signee.payload.title,
},
|row| row.get::<_, u64>(0),
)?;
let mut insert_user = txn.prepare(
r"
INSERT INTO `user` (`userkey`)
VALUES (?)
ON CONFLICT (`userkey`) DO NOTHING
",
)?;
let mut insert_member = txn.prepare(
r"
INSERT INTO `room_member` (`rid`, `uid`, `permission`)
SELECT :rid, `uid`, :permission
FROM `user`
WHERE `userkey` = :userkey
",
)?;
for member in members {
insert_user.execute(params![member.user])?;
insert_member.execute(named_params! {
":rid": rid,
":userkey": member.user,
":permission": member.permission,
})?;
}
drop(insert_member);
drop(insert_user);
txn.commit()?;
Ok(())
})()
.map_err(from_db_error)?;
Ok(Json(ruuid))
}
// NB. `next_url` generation depends on this structure.
#[derive(Debug, Deserialize)]
struct GetRoomItemParams {
#[serde(
default,
deserialize_with = "serde_aux::field_attributes::deserialize_number_from_string"
)]
before_id: u64,
}
async fn room_get_item(
st: ArcState,
Path(ruuid): Path<Uuid>,
params: Query<GetRoomItemParams>,
OptionalAuth(user): OptionalAuth,
) -> Result<impl IntoResponse, StatusCode> {
let (room_meta, items) =
query_room_items(&st.conn.lock().unwrap(), ruuid, user.as_ref(), &params)
.map_err(from_db_error)?;
// TODO: This format is to-be-decided. Or do we even need this interface other than
// `feed.json`?
Ok(Json((room_meta, items)))
}
async fn room_get_feed(
st: ArcState,
Path(ruuid): Path<Uuid>,
params: Query<GetRoomItemParams>,
) -> Result<impl IntoResponse, StatusCode> {
let (room_meta, items) =
query_room_items(&st.conn.lock().unwrap(), ruuid, None, &params).map_err(from_db_error)?;
let items = items
.into_iter()
.map(|(cid, item)| {
let time = SystemTime::UNIX_EPOCH + Duration::from_secs(item.signee.timestamp);
let author = FeedAuthor {
name: item.signee.user.to_string(),
};
FeedItem {
id: cid.to_string(),
content_html: item.signee.payload.rich_text.html().to_string(),
date_published: humantime::format_rfc3339(time).to_string(),
authors: (author,),
extra: FeedItemExtra {
timestamp: item.signee.timestamp,
nonce: item.signee.nonce,
sig: item.sig,
},
}
})
.collect::<Vec<_>>();
let base_url = &st.base_url;
let feed_url = format!("{base_url}/room/{ruuid}/feed.json");
let next_url = (items.len() == PAGE_LEN).then(|| {
let last_id = &items.last().expect("page size is not 0").id;
format!("{feed_url}?before_id={last_id}")
});
let feed = FeedRoom {
title: room_meta.title,
items,
next_url,
feed_url,
};
Ok((
[(header::CONTENT_TYPE, "application/feed+json")],
Json(feed),
))
}
/// Ref: https://www.jsonfeed.org/version/1.1/
#[derive(Debug, Serialize)]
#[serde(tag = "version", rename = "https://jsonfeed.org/version/1.1")]
struct FeedRoom {
title: String,
feed_url: String,
#[serde(skip_serializing_if = "Option::is_none")]
next_url: Option<String>,
items: Vec<FeedItem>,
}
#[derive(Debug, Serialize)]
struct FeedItem {
id: String,
content_html: String,
date_published: String,
authors: (FeedAuthor,),
#[serde(rename = "_blah")]
extra: FeedItemExtra,
}
#[derive(Debug, Serialize)]
struct FeedAuthor {
name: String,
}
#[derive(Debug, Serialize)]
struct FeedItemExtra {
timestamp: u64,
nonce: u32,
#[serde(with = "hex::serde")]
sig: [u8; SIGNATURE_LENGTH],
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RoomMetadata {
pub title: String,
pub attrs: RoomAttrs,
}
fn get_room_if_readable<T>(
conn: &rusqlite::Connection,
ruuid: Uuid,
user: Option<&UserKey>,
f: impl FnOnce(&Row<'_>) -> rusqlite::Result<T>,
) -> rusqlite::Result<T> {
conn.query_row(
r"
SELECT `rid`, `title`, `attrs`
FROM `room`
WHERE `ruuid` = :ruuid AND
((`attrs` & :perm) = :perm OR
EXISTS(SELECT 1
FROM `room_member`
JOIN `user` USING (`uid`)
WHERE `room_member`.`rid` = `room`.`rid` AND
`userkey` = :userkey))
",
named_params! {
":perm": RoomAttrs::PUBLIC_READABLE,
":ruuid": ruuid,
":userkey": user,
},
f,
)
}
fn query_room_items(
conn: &rusqlite::Connection,
ruuid: Uuid,
user: Option<&UserKey>,
params: &GetRoomItemParams,
) -> rusqlite::Result<(RoomMetadata, Vec<(u64, ChatItem)>)> {
let (rid, title, attrs) = get_room_if_readable(conn, ruuid, user, |row| {
Ok((
row.get::<_, u64>("rid")?,
row.get::<_, String>("title")?,
row.get::<_, RoomAttrs>("attrs")?,
))
})?;
let room_meta = RoomMetadata { title, attrs };
let mut stmt = conn.prepare(
r"
SELECT `cid`, `timestamp`, `nonce`, `sig`, `userkey`, `sig`, `rich_text`
FROM `room_item`
JOIN `user` USING (`uid`)
WHERE `rid` = :rid AND
(:before_cid = 0 OR `cid` < :before_cid)
ORDER BY `cid` DESC
LIMIT :limit
",
)?;
let items = stmt
.query_and_then(
named_params! {
":rid": rid,
":before_cid": params.before_id,
":limit": PAGE_LEN,
},
|row| {
let cid = row.get::<_, u64>("cid")?;
let item = ChatItem {
sig: row.get("sig")?,
signee: Signee {
nonce: row.get("nonce")?,
timestamp: row.get("timestamp")?,
user: row.get("userkey")?,
payload: ChatPayload {
room: ruuid,
rich_text: row.get("rich_text")?,
},
},
};
Ok((cid, item))
},
)?
.collect::<rusqlite::Result<Vec<_>>>()?;
Ok((room_meta, items))
}
/// Extractor for verified JSON payload.
#[derive(Debug)]
struct SignedJson<T>(WithSig<T>);
#[async_trait]
impl<S: Send + Sync, T: Serialize + DeserializeOwned> FromRequest<S> for SignedJson<T> {
type Rejection = Response;
async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
let Json(data) = <Json<WithSig<T>> as FromRequest<S>>::from_request(req, state)
.await
.map_err(|err| err.into_response())?;
data.verify().map_err(|err| {
tracing::debug!(%err, "unsigned payload");
StatusCode::BAD_REQUEST.into_response()
})?;
Ok(Self(data))
}
}
/// Extractor for optional verified JSON authorization header.
#[derive(Debug)]
struct OptionalAuth(Option<UserKey>);
#[async_trait]
impl<S: Send + Sync> FromRequestParts<S> for OptionalAuth {
type Rejection = StatusCode;
async fn from_request_parts(
parts: &mut request::Parts,
_state: &S,
) -> Result<Self, Self::Rejection> {
let Some(auth) = parts.headers.get(header::AUTHORIZATION) else {
return Ok(Self(None));
};
let ret = serde_json::from_slice::<WithSig<AuthPayload>>(auth.as_bytes())
.context("invalid JSON")
.and_then(|data| {
data.verify()?;
Ok(data.signee.user)
});
match ret {
Ok(user) => Ok(Self(Some(user))),
Err(err) => {
tracing::debug!(%err, "invalid authorization");
Err(StatusCode::BAD_REQUEST)
}
}
}
}
async fn room_post_item(
st: ArcState,
Path(ruuid): Path<Uuid>,
SignedJson(chat): SignedJson<ChatPayload>,
) -> Result<Json<u64>, StatusCode> {
if ruuid != chat.signee.payload.room {
return Err(StatusCode::BAD_REQUEST);
}
let (rid, cid) = {
let conn = st.conn.lock().unwrap();
let Some((rid, uid)) = conn
.query_row(
r"
SELECT `rid`, `uid`
FROM `room`
JOIN `room_member` USING (`rid`)
JOIN `user` USING (`uid`)
WHERE `ruuid` = :ruuid AND
`userkey` = :userkey AND
(`room_member`.`permission` & :perm) = :perm
",
named_params! {
":ruuid": ruuid,
":userkey": &chat.signee.user,
":perm": MemberPermission::POST_CHAT,
},
|row| Ok((row.get::<_, u64>("rid")?, row.get::<_, u64>("uid")?)),
)
.optional()
.map_err(from_db_error)?
else {
tracing::debug!("rejected post: unpermitted user {}", chat.signee.user);
return Err(StatusCode::FORBIDDEN);
};
let cid = conn
.query_row(
r"
INSERT INTO `room_item` (`rid`, `uid`, `timestamp`, `nonce`, `sig`, `rich_text`)
VALUES (:rid, :uid, :timestamp, :nonce, :sig, :rich_text)
RETURNING `cid`
",
named_params! {
":rid": rid,
":uid": uid,
":timestamp": chat.signee.timestamp,
":nonce": chat.signee.nonce,
":rich_text": &chat.signee.payload.rich_text,
":sig": chat.sig,
},
|row| row.get::<_, u64>(0),
)
.map_err(from_db_error)?;
(rid, cid)
};
{
let mut listeners = st.room_listeners.lock().unwrap();
if let Some(tx) = listeners.get(&rid) {
if tx.send(Arc::new(chat)).is_err() {
// Clean up because all receivers died.
listeners.remove(&rid);
}
}
}
Ok(Json(cid))
}
async fn room_event(
st: ArcState,
Path(ruuid): Path<Uuid>,
// TODO: There is actually no way to add headers via `EventSource` in client side.
// But this API is kinda temporary and need a better replacement anyway.
// So just only support public room for now.
OptionalAuth(user): OptionalAuth,
) -> Result<impl IntoResponse, StatusCode> {
let rid = get_room_if_readable(&st.conn.lock().unwrap(), ruuid, user.as_ref(), |row| {
row.get::<_, u64>(0)
})
.map_err(from_db_error)?;
let rx = match st.room_listeners.lock().unwrap().entry(rid) {
Entry::Occupied(ent) => ent.get().subscribe(),
Entry::Vacant(ent) => {
let (tx, rx) = broadcast::channel(EVENT_QUEUE_LEN);
ent.insert(tx);
rx
}
};
// Do clean up when this stream is closed.
struct CleanOnDrop {
st: Arc<AppState>,
rid: u64,
}
impl Drop for CleanOnDrop {
fn drop(&mut self) {
if let Ok(mut listeners) = self.st.room_listeners.lock() {
if let Some(tx) = listeners.get(&self.rid) {
if tx.receiver_count() == 0 {
listeners.remove(&self.rid);
}
}
}
}
}
let _guard = CleanOnDrop { st: st.0, rid };
let stream = tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(move |ret| {
let _guard = &_guard;
// On stream closure or lagging, close the current stream so client can retry.
let item = ret.ok()?;
let evt = sse::Event::default()
.json_data(&*item)
.expect("serialization cannot fail");
Some(Ok::<_, Infallible>(evt))
});
// NB. Send an empty event immediately to trigger client ready event.
let first_event = sse::Event::default().comment("");
let stream = futures_util::stream::iter(Some(Ok(first_event))).chain(stream);
Ok(sse::Sse::new(stream).keep_alive(sse::KeepAlive::default()))
}