mirror of
https://github.com/Blah-IM/blahrs.git
synced 2025-05-01 00:31:09 +00:00
Initial impl
This commit is contained in:
parent
ae7a2b09fc
commit
593da123b6
10 changed files with 3018 additions and 1 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
@ -1 +1,3 @@
|
||||||
/target
|
/target
|
||||||
|
*.sqlite*
|
||||||
|
*.key
|
||||||
|
|
2107
Cargo.lock
generated
2107
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
23
Cargo.toml
23
Cargo.toml
|
@ -3,4 +3,27 @@ name = "blah"
|
||||||
version = "0.0.0"
|
version = "0.0.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
|
# TODO: Shrink dependencies.
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
anyhow = "1.0.86"
|
||||||
|
axum = { version = "0.7.5", features = ["tokio"] }
|
||||||
|
bitflags = { version = "2.6.0", features = ["serde"] }
|
||||||
|
clap = { version = "4.5.16", features = ["derive"] }
|
||||||
|
ed25519-dalek = { version = "2.1.1", features = ["digest", "serde"] }
|
||||||
|
hex = { version = "0.4.3", features = ["serde"] }
|
||||||
|
humantime = "2.1.0"
|
||||||
|
rand_core = "0.6.4"
|
||||||
|
rusqlite = { version = "0.32.1", features = ["uuid"] }
|
||||||
|
sd-notify = "0.4.2"
|
||||||
|
serde = { version = "1.0.209", features = ["derive"] }
|
||||||
|
serde-aux = "4.5.0"
|
||||||
|
serde-constant = "0.1.0"
|
||||||
|
serde_json = "1.0.127"
|
||||||
|
tokio = { version = "1.39.3", features = ["macros", "rt-multi-thread", "sync"] }
|
||||||
|
tokio-stream = { version = "0.1.15", features = ["sync"] }
|
||||||
|
tracing = "0.1.40"
|
||||||
|
tracing-subscriber = "0.3.18"
|
||||||
|
uuid = { version = "1.10.0", features = ["serde", "v4"] }
|
||||||
|
|
||||||
|
[workspace]
|
||||||
|
members = [ "./blahctl" ]
|
||||||
|
|
16
blahctl/Cargo.toml
Normal file
16
blahctl/Cargo.toml
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
[package]
|
||||||
|
name = "blahctl"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
anyhow = "1.0.86"
|
||||||
|
bitflags = "2.6.0"
|
||||||
|
blah.path = ".."
|
||||||
|
clap = { version = "4.5.16", features = ["derive"] }
|
||||||
|
ed25519-dalek = { version = "2.1.1", features = ["pkcs8", "pem", "rand_core"] }
|
||||||
|
rand = "0.8.5"
|
||||||
|
reqwest = { version = "0.12.7", features = ["json"] }
|
||||||
|
rusqlite = "0.32.1"
|
||||||
|
tokio = { version = "1.39.3", features = ["rt", "macros"] }
|
||||||
|
uuid = "1.10.0"
|
225
blahctl/src/main.rs
Normal file
225
blahctl/src/main.rs
Normal file
|
@ -0,0 +1,225 @@
|
||||||
|
use std::io::Write;
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::{fs, io};
|
||||||
|
|
||||||
|
use anyhow::{Context, Result};
|
||||||
|
use bitflags::Flags;
|
||||||
|
use blah::types::{ChatPayload, CreateRoomPayload, ServerPermission, UserKey, WithSig};
|
||||||
|
use ed25519_dalek::pkcs8::spki::der::pem::LineEnding;
|
||||||
|
use ed25519_dalek::pkcs8::{DecodePrivateKey, DecodePublicKey, EncodePrivateKey, EncodePublicKey};
|
||||||
|
use ed25519_dalek::{SigningKey, VerifyingKey};
|
||||||
|
use rand::rngs::OsRng;
|
||||||
|
use reqwest::Url;
|
||||||
|
use rusqlite::{named_params, Connection};
|
||||||
|
use tokio::runtime::Runtime;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
/// NB. Sync with docs of [`User::url`].
|
||||||
|
const KEY_URL_SUBPATH: &str = "/.well-known/blah/key";
|
||||||
|
|
||||||
|
#[derive(Debug, clap::Parser)]
|
||||||
|
struct Cli {
|
||||||
|
#[command(subcommand)]
|
||||||
|
command: Command,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, clap::Subcommand)]
|
||||||
|
enum Command {
|
||||||
|
/// Generate a keypair.
|
||||||
|
GenerateKey {
|
||||||
|
/// The output path to store secret key.
|
||||||
|
#[arg(long, short)]
|
||||||
|
output: PathBuf,
|
||||||
|
},
|
||||||
|
/// Database manipulation.
|
||||||
|
Database {
|
||||||
|
/// The path to the database.
|
||||||
|
#[arg(long = "db")]
|
||||||
|
database: PathBuf,
|
||||||
|
|
||||||
|
#[command(subcommand)]
|
||||||
|
command: DbCommand,
|
||||||
|
},
|
||||||
|
/// Access the API endpoint.
|
||||||
|
Api {
|
||||||
|
/// The URL to the API endpoint.
|
||||||
|
#[arg(long)]
|
||||||
|
url: Url,
|
||||||
|
|
||||||
|
#[command(subcommand)]
|
||||||
|
command: ApiCommand,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, clap::Subcommand)]
|
||||||
|
enum DbCommand {
|
||||||
|
/// Set user property, possibly adding new users.
|
||||||
|
SetUser {
|
||||||
|
#[command(flatten)]
|
||||||
|
user: User,
|
||||||
|
|
||||||
|
#[arg(long, value_parser = flag_parser::<ServerPermission>)]
|
||||||
|
permission: ServerPermission,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
fn flag_parser<T: Flags>(s: &str) -> clap::error::Result<T> {
|
||||||
|
bitflags::parser::from_str_strict(s)
|
||||||
|
.map_err(|_| clap::error::Error::new(clap::error::ErrorKind::InvalidValue))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, clap::Subcommand)]
|
||||||
|
enum ApiCommand {
|
||||||
|
/// Create a room with the given user as the only owner.
|
||||||
|
CreateRoom {
|
||||||
|
#[arg(long, short = 'f')]
|
||||||
|
private_key_file: PathBuf,
|
||||||
|
|
||||||
|
#[arg(long)]
|
||||||
|
title: String,
|
||||||
|
},
|
||||||
|
PostChat {
|
||||||
|
#[arg(long, short = 'f')]
|
||||||
|
private_key_file: PathBuf,
|
||||||
|
|
||||||
|
#[arg(long)]
|
||||||
|
room: Uuid,
|
||||||
|
|
||||||
|
#[arg(long)]
|
||||||
|
text: String,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// This should be an enum but clap does not support it on `Args` yet.
|
||||||
|
// See: https://github.com/clap-rs/clap/issues/2621
|
||||||
|
#[derive(Debug, clap::Args)]
|
||||||
|
#[clap(group = clap::ArgGroup::new("user").required(true).multiple(false))]
|
||||||
|
struct User {
|
||||||
|
/// The path to a user public key.
|
||||||
|
#[arg(long, short = 'f', group = "user")]
|
||||||
|
public_key_file: Option<PathBuf>,
|
||||||
|
|
||||||
|
/// The user domain where `/.well-known/blah/key` is hosted.
|
||||||
|
#[arg(long, group = "user")]
|
||||||
|
url: Option<Url>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl User {
|
||||||
|
async fn fetch_key(&self) -> Result<UserKey> {
|
||||||
|
let rawkey = if let Some(path) = &self.public_key_file {
|
||||||
|
fs::read_to_string(path).context("failed to read key file")?
|
||||||
|
} else if let Some(url) = &self.url {
|
||||||
|
let url = url.join(KEY_URL_SUBPATH)?;
|
||||||
|
reqwest::get(url).await?.error_for_status()?.text().await?
|
||||||
|
} else {
|
||||||
|
unreachable!()
|
||||||
|
};
|
||||||
|
let key = VerifyingKey::from_public_key_pem(&rawkey)
|
||||||
|
.context("invalid key")?
|
||||||
|
.to_bytes();
|
||||||
|
Ok(UserKey(key))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static INIT_SQL: &str = include_str!("../../init.sql");
|
||||||
|
|
||||||
|
fn main() -> Result<()> {
|
||||||
|
let cli = <Cli as clap::Parser>::parse();
|
||||||
|
|
||||||
|
match cli.command {
|
||||||
|
Command::GenerateKey { output } => {
|
||||||
|
let privkey = SigningKey::generate(&mut OsRng);
|
||||||
|
let pubkey_doc = privkey.verifying_key().to_public_key_pem(LineEnding::LF)?;
|
||||||
|
privkey.write_pkcs8_pem_file(&output, LineEnding::LF)?;
|
||||||
|
io::stdout().write_all(pubkey_doc.as_bytes())?;
|
||||||
|
}
|
||||||
|
Command::Database { database, command } => {
|
||||||
|
let conn = Connection::open(database).context("failed to open database")?;
|
||||||
|
conn.execute_batch(INIT_SQL)
|
||||||
|
.context("failed to initialize database")?;
|
||||||
|
main_db(conn, command)?;
|
||||||
|
}
|
||||||
|
Command::Api { url, command } => build_rt()?.block_on(main_api(url, command))?,
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn build_rt() -> Result<Runtime> {
|
||||||
|
tokio::runtime::Builder::new_current_thread()
|
||||||
|
.enable_all()
|
||||||
|
.build()
|
||||||
|
.context("failed to initialize tokio runtime")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main_db(conn: Connection, command: DbCommand) -> Result<()> {
|
||||||
|
match command {
|
||||||
|
DbCommand::SetUser { user, permission } => {
|
||||||
|
let userkey = build_rt()?.block_on(user.fetch_key())?;
|
||||||
|
|
||||||
|
conn.execute(
|
||||||
|
r"
|
||||||
|
INSERT
|
||||||
|
INTO `user` (`userkey`, `permission`)
|
||||||
|
VALUES (:userkey, :permission)
|
||||||
|
ON CONFLICT (`userkey`) DO UPDATE SET
|
||||||
|
`permission` = :permission
|
||||||
|
",
|
||||||
|
named_params! {
|
||||||
|
":userkey": userkey,
|
||||||
|
":permission": permission,
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn load_signing_key(path: &Path) -> Result<SigningKey> {
|
||||||
|
let pem = fs::read_to_string(path).context("failed to read private key file")?;
|
||||||
|
SigningKey::from_pkcs8_pem(&pem).context("failed to parse private key")
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn main_api(api_url: Url, command: ApiCommand) -> Result<()> {
|
||||||
|
let client = reqwest::Client::new();
|
||||||
|
match command {
|
||||||
|
ApiCommand::CreateRoom {
|
||||||
|
private_key_file,
|
||||||
|
title,
|
||||||
|
} => {
|
||||||
|
let key = load_signing_key(&private_key_file)?;
|
||||||
|
let payload = WithSig::sign(&key, &mut OsRng, CreateRoomPayload { title })?;
|
||||||
|
|
||||||
|
let ret = client
|
||||||
|
.post(api_url.join("/room/create")?)
|
||||||
|
.json(&payload)
|
||||||
|
.send()
|
||||||
|
.await?
|
||||||
|
.error_for_status()?
|
||||||
|
.text()
|
||||||
|
.await?;
|
||||||
|
println!("{ret}");
|
||||||
|
}
|
||||||
|
ApiCommand::PostChat {
|
||||||
|
private_key_file,
|
||||||
|
room,
|
||||||
|
text,
|
||||||
|
} => {
|
||||||
|
let key = load_signing_key(&private_key_file)?;
|
||||||
|
let payload = ChatPayload { room, text };
|
||||||
|
let payload = WithSig::sign(&key, &mut OsRng, payload)?;
|
||||||
|
|
||||||
|
let ret = client
|
||||||
|
.post(api_url.join(&format!("/room/{room}/item"))?)
|
||||||
|
.json(&payload)
|
||||||
|
.send()
|
||||||
|
.await?
|
||||||
|
.error_for_status()?
|
||||||
|
.text()
|
||||||
|
.await?;
|
||||||
|
println!("{ret}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
31
init.sql
Normal file
31
init.sql
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
PRAGMA journal_mode=WAL;
|
||||||
|
PRAGMA foreign_keys=TRUE;
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS `user` (
|
||||||
|
`uid` INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
|
||||||
|
`userkey` BLOB NOT NULL UNIQUE,
|
||||||
|
`permission` INTEGER NOT NULL DEFAULT 0
|
||||||
|
) STRICT;
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS `room` (
|
||||||
|
`rid` INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
|
||||||
|
`ruuid` BLOB NOT NULL UNIQUE,
|
||||||
|
`title` TEXT NOT NULL
|
||||||
|
) STRICT;
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS `room_member` (
|
||||||
|
`rid` INTEGER NOT NULL REFERENCES `room` ON DELETE CASCADE,
|
||||||
|
`uid` INTEGER NOT NULL REFERENCES `user` ON DELETE RESTRICT,
|
||||||
|
`permission` INTEGER NOT NULL,
|
||||||
|
PRIMARY KEY (`rid`, `uid`)
|
||||||
|
) STRICT;
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS `room_item` (
|
||||||
|
`cid` INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
|
||||||
|
`rid` INTEGER NOT NULL REFERENCES `room` ON DELETE CASCADE,
|
||||||
|
`uid` INTEGER NOT NULL REFERENCES `user` ON DELETE RESTRICT,
|
||||||
|
`timestamp` INTEGER NOT NULL,
|
||||||
|
`nonce` INTEGER NOT NULL,
|
||||||
|
`sig` BLOB NOT NULL,
|
||||||
|
`message` TEXT NOT NULL
|
||||||
|
) STRICT;
|
7
shell.nix
Normal file
7
shell.nix
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
with import <nixpkgs> { };
|
||||||
|
mkShell {
|
||||||
|
nativeBuildInputs = [ pkg-config sqlite-interactive ];
|
||||||
|
buildInputs = [ openssl.dev sqlite.dev ];
|
||||||
|
|
||||||
|
env.RUST_LOG = "debug";
|
||||||
|
}
|
1
src/lib.rs
Normal file
1
src/lib.rs
Normal file
|
@ -0,0 +1 @@
|
||||||
|
pub mod types;
|
438
src/main.rs
438
src/main.rs
|
@ -1 +1,437 @@
|
||||||
fn main() {}
|
use std::collections::hash_map::Entry;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::convert::Infallible;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::time::{Duration, SystemTime};
|
||||||
|
|
||||||
|
use anyhow::{ensure, Context, Result};
|
||||||
|
use axum::extract::{FromRequest, Path, Query, Request, State};
|
||||||
|
use axum::http::{header, StatusCode};
|
||||||
|
use axum::response::{sse, IntoResponse, Response};
|
||||||
|
use axum::routing::{get, post};
|
||||||
|
use axum::{async_trait, Json, Router};
|
||||||
|
use blah::types::{
|
||||||
|
ChatItem, ChatPayload, CreateRoomPayload, RoomPermission, ServerPermission, UserKey, WithSig,
|
||||||
|
};
|
||||||
|
use ed25519_dalek::SIGNATURE_LENGTH;
|
||||||
|
use rusqlite::{named_params, params, OptionalExtension};
|
||||||
|
use serde::de::DeserializeOwned;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use tokio::sync::broadcast;
|
||||||
|
use tokio_stream::StreamExt;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
const PAGE_LEN: usize = 1024;
|
||||||
|
const EVENT_QUEUE_LEN: usize = 1024;
|
||||||
|
|
||||||
|
#[derive(Debug, clap::Parser)]
|
||||||
|
struct Cli {
|
||||||
|
/// Address to listen on.
|
||||||
|
#[arg(long)]
|
||||||
|
listen: String,
|
||||||
|
|
||||||
|
/// Path to the SQLite database.
|
||||||
|
#[arg(long)]
|
||||||
|
database: PathBuf,
|
||||||
|
|
||||||
|
/// The global absolute URL prefix where this service is hosted.
|
||||||
|
/// It is for link generation and must not have trailing slash.
|
||||||
|
#[arg(long)]
|
||||||
|
base_url: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() -> Result<()> {
|
||||||
|
tracing_subscriber::fmt::init();
|
||||||
|
let cli = <Cli as clap::Parser>::parse();
|
||||||
|
|
||||||
|
let db = rusqlite::Connection::open(&cli.database).context("failed to open database")?;
|
||||||
|
let st = AppState::init(&*cli.base_url, db).context("failed to initialize state")?;
|
||||||
|
|
||||||
|
tokio::runtime::Builder::new_multi_thread()
|
||||||
|
.enable_all()
|
||||||
|
.build()
|
||||||
|
.context("failed to initialize tokio runtime")?
|
||||||
|
.block_on(main_async(cli, st))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct AppState {
|
||||||
|
conn: Mutex<rusqlite::Connection>,
|
||||||
|
room_listeners: Mutex<HashMap<u64, broadcast::Sender<Arc<ChatItem>>>>,
|
||||||
|
|
||||||
|
base_url: Box<str>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AppState {
|
||||||
|
fn init(base_url: impl Into<Box<str>>, conn: rusqlite::Connection) -> Result<Self> {
|
||||||
|
static INIT_SQL: &str = include_str!("../init.sql");
|
||||||
|
|
||||||
|
let base_url = base_url.into();
|
||||||
|
ensure!(
|
||||||
|
!base_url.ends_with('/'),
|
||||||
|
"base_url must not has trailing slash",
|
||||||
|
);
|
||||||
|
|
||||||
|
conn.execute_batch(INIT_SQL)
|
||||||
|
.context("failed to initialize database")?;
|
||||||
|
Ok(Self {
|
||||||
|
conn: Mutex::new(conn),
|
||||||
|
room_listeners: Mutex::new(HashMap::new()),
|
||||||
|
base_url,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type ArcState = State<Arc<AppState>>;
|
||||||
|
|
||||||
|
async fn main_async(opt: Cli, st: AppState) -> Result<()> {
|
||||||
|
let app = Router::new()
|
||||||
|
.route("/room/create", post(room_create))
|
||||||
|
// NB. Sync with `feed_url` and `next_url` generation.
|
||||||
|
.route("/room/:ruuid/feed.json", get(room_get_feed))
|
||||||
|
.route("/room/:ruuid/event", get(room_event))
|
||||||
|
.route("/room/:ruuid/item", post(room_post_item))
|
||||||
|
.with_state(Arc::new(st));
|
||||||
|
|
||||||
|
let listener = tokio::net::TcpListener::bind(&opt.listen)
|
||||||
|
.await
|
||||||
|
.context("failed to listen on socket")?;
|
||||||
|
|
||||||
|
tracing::info!("listening on {}", opt.listen);
|
||||||
|
let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Ready]);
|
||||||
|
|
||||||
|
axum::serve(listener, app)
|
||||||
|
.await
|
||||||
|
.context("failed to serve")?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn from_db_error(err: rusqlite::Error) -> StatusCode {
|
||||||
|
match err {
|
||||||
|
rusqlite::Error::QueryReturnedNoRows => StatusCode::NOT_FOUND,
|
||||||
|
err => {
|
||||||
|
tracing::error!(%err, "database error");
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn room_create(
|
||||||
|
st: ArcState,
|
||||||
|
SignedJson(params): SignedJson<CreateRoomPayload>,
|
||||||
|
) -> Result<Json<Uuid>, StatusCode> {
|
||||||
|
let mut conn = st.conn.lock().unwrap();
|
||||||
|
let Some((uid, _perm)) = conn
|
||||||
|
.query_row(
|
||||||
|
r"
|
||||||
|
SELECT `uid`, `permission`
|
||||||
|
FROM `user`
|
||||||
|
WHERE `userkey` = ?
|
||||||
|
",
|
||||||
|
params![params.signee.user],
|
||||||
|
|row| {
|
||||||
|
let uid = row.get::<_, u64>("uid")?;
|
||||||
|
let perm = row.get::<_, ServerPermission>("permission")?;
|
||||||
|
Ok((uid, perm))
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.optional()
|
||||||
|
.map_err(from_db_error)?
|
||||||
|
.filter(|(_, perm)| perm.contains(ServerPermission::CREATE_ROOM))
|
||||||
|
else {
|
||||||
|
return Err(StatusCode::UNAUTHORIZED);
|
||||||
|
};
|
||||||
|
|
||||||
|
let ruuid = Uuid::new_v4();
|
||||||
|
|
||||||
|
(|| {
|
||||||
|
let txn = conn.transaction()?;
|
||||||
|
let rid = txn.query_row(
|
||||||
|
r"
|
||||||
|
INSERT INTO `room` (`ruuid`, `title`)
|
||||||
|
VALUES (:ruuid, :title)
|
||||||
|
RETURNING `rid`
|
||||||
|
",
|
||||||
|
named_params! {
|
||||||
|
":ruuid": ruuid,
|
||||||
|
":title": params.signee.payload.title,
|
||||||
|
},
|
||||||
|
|row| row.get::<_, u64>(0),
|
||||||
|
)?;
|
||||||
|
txn.execute(
|
||||||
|
r"
|
||||||
|
INSERT INTO `room_member` (`rid`, `uid`, `permission`)
|
||||||
|
VALUES (:rid, :uid, :permission)
|
||||||
|
",
|
||||||
|
named_params! {
|
||||||
|
":rid": rid,
|
||||||
|
":uid": uid,
|
||||||
|
":permission": RoomPermission::ALL,
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
txn.commit()?;
|
||||||
|
Ok(())
|
||||||
|
})()
|
||||||
|
.map_err(from_db_error)?;
|
||||||
|
|
||||||
|
Ok(Json(ruuid))
|
||||||
|
}
|
||||||
|
|
||||||
|
// NB. `next_url` generation depends on this structure.
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
struct GetRoomParams {
|
||||||
|
#[serde(
|
||||||
|
default,
|
||||||
|
deserialize_with = "serde_aux::field_attributes::deserialize_number_from_string"
|
||||||
|
)]
|
||||||
|
before_id: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn room_get_feed(
|
||||||
|
st: ArcState,
|
||||||
|
Path(ruuid): Path<Uuid>,
|
||||||
|
params: Query<GetRoomParams>,
|
||||||
|
) -> Result<impl IntoResponse, StatusCode> {
|
||||||
|
let room_feed = query_feed(&st.conn.lock().unwrap(), &st.base_url, ruuid, ¶ms)
|
||||||
|
.map_err(from_db_error)?;
|
||||||
|
Ok((
|
||||||
|
[(header::CONTENT_TYPE, "application/feed+json")],
|
||||||
|
Json(room_feed),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Ref: https://www.jsonfeed.org/version/1.1/
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
#[serde(tag = "version", rename = "https://jsonfeed.org/version/1.1")]
|
||||||
|
struct FeedRoom {
|
||||||
|
title: String,
|
||||||
|
feed_url: String,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
next_url: Option<String>,
|
||||||
|
items: Vec<FeedItem>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
struct FeedItem {
|
||||||
|
id: String,
|
||||||
|
content_text: String,
|
||||||
|
date_published: String,
|
||||||
|
authors: (FeedAuthor,),
|
||||||
|
#[serde(rename = "_blah")]
|
||||||
|
extra: FeedItemExtra,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
struct FeedAuthor {
|
||||||
|
name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
struct FeedItemExtra {
|
||||||
|
timestamp: u64,
|
||||||
|
nonce: u32,
|
||||||
|
#[serde(with = "hex::serde")]
|
||||||
|
sig: [u8; SIGNATURE_LENGTH],
|
||||||
|
}
|
||||||
|
|
||||||
|
fn query_feed(
|
||||||
|
conn: &rusqlite::Connection,
|
||||||
|
base_url: &str,
|
||||||
|
ruuid: Uuid,
|
||||||
|
params: &GetRoomParams,
|
||||||
|
) -> rusqlite::Result<FeedRoom> {
|
||||||
|
let (rid, title) = conn.query_row(
|
||||||
|
r"
|
||||||
|
SELECT `rid`, `title`
|
||||||
|
FROM `room`
|
||||||
|
WHERE `ruuid` = ?
|
||||||
|
",
|
||||||
|
params![ruuid],
|
||||||
|
|row| {
|
||||||
|
let rid = row.get::<_, u64>("rid")?;
|
||||||
|
let title = row.get::<_, String>("title")?;
|
||||||
|
Ok((rid, title))
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let mut stmt = conn.prepare(
|
||||||
|
r"
|
||||||
|
SELECT `cid`, `timestamp`, `nonce`, `sig`, `userkey`, `sig`, `message`
|
||||||
|
FROM `room_item`
|
||||||
|
JOIN `user` USING (`uid`)
|
||||||
|
WHERE `rid` = :rid AND
|
||||||
|
(:before_cid = 0 OR `cid` < :before_cid)
|
||||||
|
ORDER BY `cid` DESC
|
||||||
|
LIMIT :limit
|
||||||
|
",
|
||||||
|
)?;
|
||||||
|
let items = stmt
|
||||||
|
.query_and_then(
|
||||||
|
named_params! {
|
||||||
|
":rid": rid,
|
||||||
|
":before_cid": params.before_id,
|
||||||
|
":limit": PAGE_LEN,
|
||||||
|
},
|
||||||
|
|row| {
|
||||||
|
let timestamp = row.get::<_, u64>("timestamp")?;
|
||||||
|
let time = SystemTime::UNIX_EPOCH + Duration::from_secs(timestamp);
|
||||||
|
let author = FeedAuthor {
|
||||||
|
name: row.get::<_, UserKey>("userkey")?.to_string(),
|
||||||
|
};
|
||||||
|
Ok(FeedItem {
|
||||||
|
id: row.get::<_, u64>("cid")?.to_string(),
|
||||||
|
content_text: row.get("message")?,
|
||||||
|
date_published: humantime::format_rfc3339(time).to_string(),
|
||||||
|
authors: (author,),
|
||||||
|
extra: FeedItemExtra {
|
||||||
|
timestamp,
|
||||||
|
nonce: row.get("nonce")?,
|
||||||
|
sig: row.get("sig")?,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
},
|
||||||
|
)?
|
||||||
|
.collect::<rusqlite::Result<Vec<_>>>()?;
|
||||||
|
|
||||||
|
let feed_url = format!("{base_url}/room/{ruuid}/feed.json");
|
||||||
|
let next_url = (items.len() == PAGE_LEN).then(|| {
|
||||||
|
let last_id = &items.last().expect("page size is not 0").id;
|
||||||
|
format!("{feed_url}?before_id={last_id}")
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(FeedRoom {
|
||||||
|
title,
|
||||||
|
items,
|
||||||
|
next_url,
|
||||||
|
feed_url,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extractor for verified JSON payload.
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct SignedJson<T>(WithSig<T>);
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<S: Send + Sync, T: Serialize + DeserializeOwned> FromRequest<S> for SignedJson<T> {
|
||||||
|
type Rejection = Response;
|
||||||
|
|
||||||
|
async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
|
||||||
|
let Json(data) = <Json<WithSig<T>> as FromRequest<S>>::from_request(req, state)
|
||||||
|
.await
|
||||||
|
.map_err(|err| err.into_response())?;
|
||||||
|
data.verify().map_err(|err| {
|
||||||
|
tracing::debug!(%err, "unsigned payload");
|
||||||
|
StatusCode::BAD_REQUEST.into_response()
|
||||||
|
})?;
|
||||||
|
Ok(Self(data))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn room_post_item(
|
||||||
|
st: ArcState,
|
||||||
|
Path(ruuid): Path<Uuid>,
|
||||||
|
SignedJson(chat): SignedJson<ChatPayload>,
|
||||||
|
) -> Result<Json<u64>, StatusCode> {
|
||||||
|
if ruuid != chat.signee.payload.room {
|
||||||
|
return Err(StatusCode::BAD_REQUEST);
|
||||||
|
}
|
||||||
|
|
||||||
|
let (rid, cid) = {
|
||||||
|
let conn = st.conn.lock().unwrap();
|
||||||
|
let Some((rid, uid)) = conn
|
||||||
|
.query_row(
|
||||||
|
r"
|
||||||
|
SELECT `rid`, `uid`
|
||||||
|
FROM `room`
|
||||||
|
JOIN `room_member` USING (`rid`)
|
||||||
|
JOIN `user` USING (`uid`)
|
||||||
|
WHERE `ruuid` = :ruuid AND
|
||||||
|
`userkey` = :userkey
|
||||||
|
",
|
||||||
|
named_params! {
|
||||||
|
":ruuid": ruuid,
|
||||||
|
":userkey": &chat.signee.user,
|
||||||
|
},
|
||||||
|
|row| Ok((row.get::<_, u64>("rid")?, row.get::<_, u64>("uid")?)),
|
||||||
|
)
|
||||||
|
.optional()
|
||||||
|
.map_err(from_db_error)?
|
||||||
|
else {
|
||||||
|
tracing::debug!("rejected post: unpermitted user {:?}", chat.signee.user);
|
||||||
|
return Err(StatusCode::UNAUTHORIZED);
|
||||||
|
};
|
||||||
|
|
||||||
|
let cid = conn
|
||||||
|
.query_row(
|
||||||
|
r"
|
||||||
|
INSERT INTO `room_item` (`rid`, `uid`, `timestamp`, `nonce`, `sig`, `message`)
|
||||||
|
VALUES (:rid, :uid, :timestamp, :nonce, :sig, :message)
|
||||||
|
RETURNING `cid`
|
||||||
|
",
|
||||||
|
named_params! {
|
||||||
|
":rid": rid,
|
||||||
|
":uid": uid,
|
||||||
|
":timestamp": chat.signee.timestamp,
|
||||||
|
":nonce": chat.signee.nonce,
|
||||||
|
":message": &chat.signee.payload.text,
|
||||||
|
":sig": chat.sig,
|
||||||
|
},
|
||||||
|
|row| row.get::<_, u64>(0),
|
||||||
|
)
|
||||||
|
.map_err(from_db_error)?;
|
||||||
|
(rid, cid)
|
||||||
|
};
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut listeners = st.room_listeners.lock().unwrap();
|
||||||
|
if let Some(tx) = listeners.get(&rid) {
|
||||||
|
if tx.send(Arc::new(chat)).is_err() {
|
||||||
|
// Clean up because all receivers died.
|
||||||
|
listeners.remove(&rid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Json(cid))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn room_event(
|
||||||
|
Path(ruuid): Path<Uuid>,
|
||||||
|
st: ArcState,
|
||||||
|
) -> Result<impl IntoResponse, StatusCode> {
|
||||||
|
let rid = st
|
||||||
|
.conn
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.query_row(
|
||||||
|
r"
|
||||||
|
SELECT `rid`
|
||||||
|
FROM `room`
|
||||||
|
WHERE `ruuid` = ?
|
||||||
|
",
|
||||||
|
params![ruuid],
|
||||||
|
|row| row.get::<_, u64>(0),
|
||||||
|
)
|
||||||
|
.map_err(from_db_error)?;
|
||||||
|
|
||||||
|
let rx = match st.room_listeners.lock().unwrap().entry(rid) {
|
||||||
|
Entry::Occupied(ent) => ent.get().subscribe(),
|
||||||
|
Entry::Vacant(ent) => {
|
||||||
|
let (tx, rx) = broadcast::channel(EVENT_QUEUE_LEN);
|
||||||
|
ent.insert(tx);
|
||||||
|
rx
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let stream = tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(|ret| {
|
||||||
|
// On stream closure or lagging, close the current stream so client can retry.
|
||||||
|
let item = ret.ok()?;
|
||||||
|
let evt = sse::Event::default()
|
||||||
|
.json_data(&*item)
|
||||||
|
.expect("serialization cannot fail");
|
||||||
|
Some(Ok::<_, Infallible>(evt))
|
||||||
|
});
|
||||||
|
Ok(sse::Sse::new(stream).keep_alive(sse::KeepAlive::default()))
|
||||||
|
}
|
||||||
|
|
169
src/types.rs
Normal file
169
src/types.rs
Normal file
|
@ -0,0 +1,169 @@
|
||||||
|
use std::fmt;
|
||||||
|
// NB. All structs here that are part of signee must be lexically sorted, as RFC8785.
|
||||||
|
use std::time::SystemTime;
|
||||||
|
|
||||||
|
use anyhow::{ensure, Context};
|
||||||
|
use bitflags::bitflags;
|
||||||
|
use ed25519_dalek::{
|
||||||
|
Signature, Signer, SigningKey, VerifyingKey, PUBLIC_KEY_LENGTH, SIGNATURE_LENGTH,
|
||||||
|
};
|
||||||
|
use rand_core::RngCore;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
const TIMESTAMP_TOLERENCE: u64 = 90;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(transparent)]
|
||||||
|
pub struct UserKey(#[serde(with = "hex::serde")] pub [u8; PUBLIC_KEY_LENGTH]);
|
||||||
|
|
||||||
|
impl fmt::Display for UserKey {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
let mut buf = [0u8; PUBLIC_KEY_LENGTH * 2];
|
||||||
|
hex::encode_to_slice(self.0, &mut buf).unwrap();
|
||||||
|
f.write_str(std::str::from_utf8(&buf).unwrap())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
#[serde(deny_unknown_fields)]
|
||||||
|
pub struct WithSig<T> {
|
||||||
|
// sorted
|
||||||
|
#[serde(with = "hex::serde")]
|
||||||
|
pub sig: [u8; SIGNATURE_LENGTH],
|
||||||
|
pub signee: Signee<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
#[serde(deny_unknown_fields)]
|
||||||
|
pub struct Signee<T> {
|
||||||
|
// sorted
|
||||||
|
pub nonce: u32,
|
||||||
|
pub payload: T,
|
||||||
|
pub timestamp: u64,
|
||||||
|
pub user: UserKey,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_timestamp() -> u64 {
|
||||||
|
SystemTime::now()
|
||||||
|
.duration_since(SystemTime::UNIX_EPOCH)
|
||||||
|
.expect("after UNIX epoch")
|
||||||
|
.as_secs()
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Serialize> WithSig<T> {
|
||||||
|
pub fn sign(key: &SigningKey, rng: &mut impl RngCore, payload: T) -> anyhow::Result<Self> {
|
||||||
|
let signee = Signee {
|
||||||
|
nonce: rng.next_u32(),
|
||||||
|
payload,
|
||||||
|
timestamp: get_timestamp(),
|
||||||
|
user: UserKey(key.verifying_key().to_bytes()),
|
||||||
|
};
|
||||||
|
let canonical_signee = serde_json::to_vec(&signee).context("failed to serialize")?;
|
||||||
|
let sig = key.try_sign(&canonical_signee)?.to_bytes();
|
||||||
|
Ok(Self { sig, signee })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn verify(&self) -> anyhow::Result<()> {
|
||||||
|
ensure!(
|
||||||
|
self.signee.timestamp.abs_diff(get_timestamp()) < TIMESTAMP_TOLERENCE,
|
||||||
|
"invalid timestamp"
|
||||||
|
);
|
||||||
|
|
||||||
|
let canonical_signee = serde_json::to_vec(&self.signee).context("failed to serialize")?;
|
||||||
|
let sig = Signature::from_bytes(&self.sig);
|
||||||
|
VerifyingKey::from_bytes(&self.signee.user.0)?.verify_strict(&canonical_signee, &sig)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME: `deny_unknown_fields` breaks this.
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
#[serde(tag = "typ", rename = "chat")]
|
||||||
|
pub struct ChatPayload {
|
||||||
|
// sorted
|
||||||
|
pub room: Uuid,
|
||||||
|
pub text: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type ChatItem = WithSig<ChatPayload>;
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
#[serde(tag = "typ", rename = "create_room")]
|
||||||
|
pub struct CreateRoomPayload {
|
||||||
|
pub title: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
#[serde(deny_unknown_fields, tag = "typ", rename_all = "snake_case")]
|
||||||
|
pub enum RoomAdminPayload {
|
||||||
|
AddMember {
|
||||||
|
// sorted
|
||||||
|
permission: RoomPermission,
|
||||||
|
room: Uuid,
|
||||||
|
user: UserKey,
|
||||||
|
},
|
||||||
|
// TODO: CRUD
|
||||||
|
}
|
||||||
|
|
||||||
|
bitflags! {
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct ServerPermission: u64 {
|
||||||
|
const CREATE_ROOM = 1 << 0;
|
||||||
|
|
||||||
|
const ALL = !0;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct RoomPermission: u64 {
|
||||||
|
const PUSH_CHAT = 1 << 0;
|
||||||
|
const ADD_MEMBER = 1 << 1;
|
||||||
|
|
||||||
|
const ALL = !0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mod sql_impl {
|
||||||
|
use rusqlite::types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef};
|
||||||
|
use rusqlite::{Result, ToSql};
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
impl ToSql for UserKey {
|
||||||
|
fn to_sql(&self) -> Result<ToSqlOutput<'_>> {
|
||||||
|
// TODO: Extensive key format?
|
||||||
|
self.0.to_sql()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FromSql for UserKey {
|
||||||
|
fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
|
||||||
|
let rawkey = <[u8; PUBLIC_KEY_LENGTH]>::column_result(value)?;
|
||||||
|
let key = VerifyingKey::from_bytes(&rawkey)
|
||||||
|
.map_err(|err| FromSqlError::Other(format!("invalid pubkey: {err}").into()))?;
|
||||||
|
Ok(UserKey(key.to_bytes()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
macro_rules! impl_u64_flag {
|
||||||
|
($($name:ident),*) => {
|
||||||
|
$(
|
||||||
|
impl ToSql for $name {
|
||||||
|
fn to_sql(&self) -> Result<ToSqlOutput<'_>> {
|
||||||
|
// Cast out the sign.
|
||||||
|
Ok((self.bits() as i64).into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FromSql for $name {
|
||||||
|
fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
|
||||||
|
// Cast out the sign.
|
||||||
|
i64::column_result(value).map(|v| $name::from_bits_retain(v as u64))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)*
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
impl_u64_flag!(ServerPermission, RoomPermission);
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue