diff --git a/Cargo.lock b/Cargo.lock index d15974f..198282e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -314,6 +314,7 @@ dependencies = [ "blah-types", "clap", "ed25519-dalek", + "expect-test", "futures-util", "hex", "http-body-util", diff --git a/blahd/Cargo.toml b/blahd/Cargo.toml index a2b173c..9c11f5d 100644 --- a/blahd/Cargo.toml +++ b/blahd/Cargo.toml @@ -37,6 +37,7 @@ url = { version = "2", features = ["serde"] } blah-types = { path = "../blah-types", features = ["rusqlite"] } [dev-dependencies] +expect-test = "1" nix = { version = "0.29", features = ["fs", "process", "signal"] } reqwest = { version = "0.12", features = ["json"] } rstest = { version = "0.22", default-features = false } diff --git a/blahd/config.example.toml b/blahd/config.example.toml index d22b983..c4e04e8 100644 --- a/blahd/config.example.toml +++ b/blahd/config.example.toml @@ -43,6 +43,12 @@ max_request_len = 4096 # The maximum timestamp tolerance in seconds for request validation. timestamp_tolerance_secs = 90 +[server.feed] +# Additional page length limit for room feed. +# Feed are fetched frequently and older items are rarely useful. You do want a +# large response for feed. +max_page_len = 64 + [server.ws] # The max waiting time for the first authentication message for websocket. diff --git a/blahd/src/feed.rs b/blahd/src/feed.rs new file mode 100644 index 0000000..3a2ca5d --- /dev/null +++ b/blahd/src/feed.rs @@ -0,0 +1,86 @@ +//! Room feed generation. +use std::num::NonZero; +use std::time::{Duration, SystemTime}; + +use axum::http::header; +use axum::response::{IntoResponse, Response}; +use axum::Json; +use blah_types::{SignedChatMsg, WithMsgId}; +use serde::{Deserialize, Serialize}; +use url::Url; + +const JSON_FEED_MIME: &str = "application/feed+json"; + +#[derive(Debug, Clone, Deserialize)] +#[serde(default)] +pub struct Config { + pub max_page_len: NonZero, +} + +impl Default for Config { + fn default() -> Self { + Self { + max_page_len: 64.try_into().expect("not zero"), + } + } +} + +/// Ref: +#[derive(Debug, Serialize)] +#[serde(tag = "version", rename = "https://jsonfeed.org/version/1.1")] +struct JsonFeed { + title: String, + feed_url: Url, + #[serde(skip_serializing_if = "Option::is_none")] + next_url: Option, + items: Vec, +} + +#[derive(Debug, Serialize)] +struct JsonFeedItem { + id: String, + content_html: String, + date_published: String, + authors: (JsonFeedAuthor,), + // I don't think there is a need to return other special fields like signatures here. + // This API is for readers only which cannot recognize them anyway. + // Our clients should already use the dedicate API (`/room/:rid/msg`). +} + +#[derive(Debug, Serialize)] +struct JsonFeedAuthor { + name: String, +} + +pub fn to_json_feed( + title: String, + msgs: Vec>, + self_url: Url, + next_url: Option, +) -> Response { + let items = msgs + .into_iter() + .map(|WithMsgId { cid, msg }| { + let time = SystemTime::UNIX_EPOCH + Duration::from_secs(msg.signee.timestamp); + let author = JsonFeedAuthor { + // TODO: Retrieve id_url as name. + name: msg.signee.user.id_key.to_string(), + }; + JsonFeedItem { + id: cid.to_string(), + content_html: msg.signee.payload.rich_text.html().to_string(), + date_published: humantime::format_rfc3339(time).to_string(), + authors: (author,), + } + }) + .collect::>(); + + let feed = JsonFeed { + title, + items, + feed_url: self_url, + next_url, + }; + + ([(header::CONTENT_TYPE, JSON_FEED_MIME)], Json(feed)).into_response() +} diff --git a/blahd/src/lib.rs b/blahd/src/lib.rs index 1b0d0ba..a09e0bc 100644 --- a/blahd/src/lib.rs +++ b/blahd/src/lib.rs @@ -3,10 +3,10 @@ use std::sync::Arc; use std::time::{Duration, SystemTime}; use anyhow::Result; -use axum::extract::ws; +use axum::extract::{ws, OriginalUri}; use axum::extract::{Path, Query, State, WebSocketUpgrade}; use axum::http::{header, HeaderMap, HeaderName, StatusCode}; -use axum::response::{IntoResponse, Response}; +use axum::response::Response; use axum::routing::{get, post}; use axum::{Json, Router}; use axum_extra::extract::WithRejection as R; @@ -17,7 +17,6 @@ use blah_types::{ X_BLAH_NONCE, }; use database::{Transaction, TransactionOps}; -use ed25519_dalek::SIGNATURE_LENGTH; use id::IdExt; use middleware::{Auth, MaybeAuth, ResultExt as _, SignedJson}; use parking_lot::Mutex; @@ -32,6 +31,7 @@ mod middleware; pub mod config; mod database; mod event; +mod feed; mod id; mod register; mod utils; @@ -54,6 +54,8 @@ pub struct ServerConfig { #[serde_inline_default(90)] pub timestamp_tolerance_secs: u64, + #[serde(default)] + pub feed: feed::Config, #[serde(default)] pub ws: event::Config, #[serde(default)] @@ -135,7 +137,6 @@ pub fn router(st: Arc) -> Router { .route("/room", get(room_list)) .route("/room/create", post(room_create)) .route("/room/:rid", get(room_get_metadata).delete(room_delete)) - // NB. Sync with `feed_url` and `next_url` generation. .route("/room/:rid/feed.json", get(room_get_feed)) .route("/room/:rid/msg", get(room_msg_list).post(room_msg_post)) .route("/room/:rid/msg/:cid/seen", post(room_msg_mark_seen)) @@ -425,9 +426,23 @@ async fn room_get_metadata( async fn room_get_feed( st: ArcState, + R(OriginalUri(req_uri), _): RE, R(Path(rid), _): RE>, - R(Query(pagination), _): RE>, -) -> Result { + R(Query(mut pagination), _): RE>, +) -> Result { + // TODO: If-None-Match. + let self_url = st + .config + .base_url + .join(req_uri.path()) + .expect("base_url can be a base"); + + pagination.top = Some( + pagination + .effective_page_len(&st) + .min(st.config.feed.max_page_len), + ); + let (title, msgs, skip_token) = st.db.with_read(|txn| { let (attrs, title) = txn.get_room_having(rid, RoomAttrs::PUBLIC_READABLE)?; // Sanity check. @@ -437,40 +452,13 @@ async fn room_get_feed( Ok((title, msgs, skip_token)) })?; - let items = msgs - .into_iter() - .map(|WithMsgId { cid, msg }| { - let time = SystemTime::UNIX_EPOCH + Duration::from_secs(msg.signee.timestamp); - let author = FeedAuthor { - // TODO: Retrieve id_url as name. - name: msg.signee.user.id_key.to_string(), - }; - FeedItem { - id: cid.to_string(), - content_html: msg.signee.payload.rich_text.html().to_string(), - date_published: humantime::format_rfc3339(time).to_string(), - authors: (author,), - extra: FeedItemExtra { - timestamp: msg.signee.timestamp, - nonce: msg.signee.nonce, - sig: msg.sig, - }, - } - }) - .collect::>(); - - let feed_url = st - .config - .base_url - .join(&format!("/room/{rid}/feed.json")) - .expect("base_url must be valid"); let next_url = skip_token.map(|skip_token| { let next_params = Pagination { skip_token: Some(skip_token), top: pagination.top, until_token: None, }; - let mut next_url = feed_url.clone(); + let mut next_url = self_url.clone(); { let mut query = next_url.query_pairs_mut(); let ser = serde_urlencoded::Serializer::new(&mut query); @@ -481,51 +469,8 @@ async fn room_get_feed( } next_url }); - let feed = FeedRoom { - title, - items, - next_url, - feed_url, - }; - Ok(( - [(header::CONTENT_TYPE, "application/feed+json")], - Json(feed), - )) -} - -/// Ref: -#[derive(Debug, Serialize)] -#[serde(tag = "version", rename = "https://jsonfeed.org/version/1.1")] -struct FeedRoom { - title: String, - feed_url: Url, - #[serde(skip_serializing_if = "Option::is_none")] - next_url: Option, - items: Vec, -} - -#[derive(Debug, Serialize)] -struct FeedItem { - id: String, - content_html: String, - date_published: String, - authors: (FeedAuthor,), - #[serde(rename = "_blah")] - extra: FeedItemExtra, -} - -#[derive(Debug, Serialize)] -struct FeedAuthor { - name: String, -} - -#[derive(Debug, Serialize)] -struct FeedItemExtra { - timestamp: u64, - nonce: u32, - #[serde(with = "hex::serde")] - sig: [u8; SIGNATURE_LENGTH], + Ok(feed::to_json_feed(title, msgs, self_url, next_url)) } /// Get room messages with pagination parameters, diff --git a/blahd/src/middleware.rs b/blahd/src/middleware.rs index f3b838e..d79b77e 100644 --- a/blahd/src/middleware.rs +++ b/blahd/src/middleware.rs @@ -1,4 +1,5 @@ use std::backtrace::Backtrace; +use std::convert::Infallible; use std::fmt; use std::sync::Arc; @@ -59,6 +60,13 @@ impl IntoResponse for ApiError { } } +// For infallible extractors. +impl From for ApiError { + fn from(v: Infallible) -> Self { + match v {} + } +} + macro_rules! define_from_deser_rejection { ($($ty:ty, $name:literal;)*) => { $( diff --git a/blahd/tests/webapi.rs b/blahd/tests/webapi.rs index 0cd9f3d..0becfe4 100644 --- a/blahd/tests/webapi.rs +++ b/blahd/tests/webapi.rs @@ -18,6 +18,7 @@ use blah_types::{ }; use blahd::{ApiError, AppState, Database, RoomList, RoomMsgs}; use ed25519_dalek::SigningKey; +use expect_test::expect; use futures_util::future::BoxFuture; use futures_util::{SinkExt, Stream, StreamExt, TryFutureExt}; use parking_lot::Mutex; @@ -32,18 +33,22 @@ use tokio::net::TcpListener; // Register API requires a non-IP hostname. const LOCALHOST: &str = "localhost"; const REGISTER_DIFFICULTY: u8 = 1; +const BASE_URL: &str = "http://base.example.com"; const TIME_TOLERANCE: Duration = Duration::from_millis(100); const WS_CONNECT_TIMEOUT: Duration = Duration::from_millis(1500); -const CONFIG: fn(u16) -> String = |port| { +const CONFIG: fn(u16) -> String = |_port| { format!( r#" -base_url="http://{LOCALHOST}:{port}" +base_url="{BASE_URL}" [ws] auth_timeout_sec = 1 +[feed] +max_page_len = 2 + [register] enable_public = true difficulty = {REGISTER_DIFFICULTY} @@ -720,6 +725,61 @@ async fn room_chat_post_read(server: Server) { assert_eq!(msgs, RoomMsgs::default()); } +#[rstest] +#[tokio::test] +async fn room_feed(server: Server) { + // Only public readable rooms provides feed. Not even for public joinable ones. + let rid_need_join = server + .create_room(&ALICE, RoomAttrs::PUBLIC_JOINABLE, "not so public") + .await + .unwrap(); + server + .get::(&format!("/room/{rid_need_join}/feed.json"), None) + .await + .expect_api_err(StatusCode::NOT_FOUND, "room_not_found"); + + let rid = server + .create_room( + &ALICE, + RoomAttrs::PUBLIC_READABLE | RoomAttrs::PUBLIC_JOINABLE, + "public", + ) + .await + .unwrap(); + server + .join_room(rid, &BOB, MemberPermission::POST_CHAT) + .await + .unwrap(); + server.post_chat(rid, &ALICE, "a").await.unwrap(); + let cid2 = server.post_chat(rid, &BOB, "b1").await.unwrap().cid; + server.post_chat(rid, &BOB, "b2").await.unwrap(); + + let feed = server + .get::(&format!("/room/{rid}/feed.json"), None) + .await + .unwrap(); + // TODO: Ideally we should assert on the result, but it contains time and random id currently. + assert_eq!(feed["title"].as_str().unwrap(), "public"); + assert_eq!(feed["items"].as_array().unwrap().len(), 2); + let feed_url = format!("{BASE_URL}/_blah/room/{rid}/feed.json"); + assert_eq!(feed["feed_url"].as_str().unwrap(), feed_url,); + assert_eq!( + feed["next_url"].as_str().unwrap(), + format!("{feed_url}?skipToken={cid2}&top=2"), + ); + + let feed2 = server + .get::( + &format!("/room/{rid}/feed.json?skipToken={cid2}&top=2"), + None, + ) + .await + .unwrap(); + let items = feed2["items"].as_array().unwrap(); + assert_eq!(items.len(), 1); + assert_eq!(items[0]["content_html"].as_str().unwrap(), "a"); +} + #[rstest] #[tokio::test] async fn last_seen(server: Server) { @@ -989,7 +1049,7 @@ async fn register_flow(server: Server) { register_fast(&req) .await .expect_api_err(StatusCode::BAD_REQUEST, "invalid_server_url"); - req.server_url = format!("http://{}", server.domain()).parse().unwrap(); + req.server_url = BASE_URL.parse().unwrap(); register_fast(&req) .await