mirror of
https://github.com/Blah-IM/blahrs.git
synced 2025-05-01 00:31:09 +00:00
refactor(blahd),test: move feed generation into submod and add test
This commit is contained in:
parent
1e8c16888c
commit
94e5913513
7 changed files with 188 additions and 81 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -314,6 +314,7 @@ dependencies = [
|
||||||
"blah-types",
|
"blah-types",
|
||||||
"clap",
|
"clap",
|
||||||
"ed25519-dalek",
|
"ed25519-dalek",
|
||||||
|
"expect-test",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"hex",
|
"hex",
|
||||||
"http-body-util",
|
"http-body-util",
|
||||||
|
|
|
@ -37,6 +37,7 @@ url = { version = "2", features = ["serde"] }
|
||||||
blah-types = { path = "../blah-types", features = ["rusqlite"] }
|
blah-types = { path = "../blah-types", features = ["rusqlite"] }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
expect-test = "1"
|
||||||
nix = { version = "0.29", features = ["fs", "process", "signal"] }
|
nix = { version = "0.29", features = ["fs", "process", "signal"] }
|
||||||
reqwest = { version = "0.12", features = ["json"] }
|
reqwest = { version = "0.12", features = ["json"] }
|
||||||
rstest = { version = "0.22", default-features = false }
|
rstest = { version = "0.22", default-features = false }
|
||||||
|
|
|
@ -43,6 +43,12 @@ max_request_len = 4096
|
||||||
# The maximum timestamp tolerance in seconds for request validation.
|
# The maximum timestamp tolerance in seconds for request validation.
|
||||||
timestamp_tolerance_secs = 90
|
timestamp_tolerance_secs = 90
|
||||||
|
|
||||||
|
[server.feed]
|
||||||
|
# Additional page length limit for room feed.
|
||||||
|
# Feed are fetched frequently and older items are rarely useful. You do want a
|
||||||
|
# large response for feed.
|
||||||
|
max_page_len = 64
|
||||||
|
|
||||||
[server.ws]
|
[server.ws]
|
||||||
|
|
||||||
# The max waiting time for the first authentication message for websocket.
|
# The max waiting time for the first authentication message for websocket.
|
||||||
|
|
86
blahd/src/feed.rs
Normal file
86
blahd/src/feed.rs
Normal file
|
@ -0,0 +1,86 @@
|
||||||
|
//! Room feed generation.
|
||||||
|
use std::num::NonZero;
|
||||||
|
use std::time::{Duration, SystemTime};
|
||||||
|
|
||||||
|
use axum::http::header;
|
||||||
|
use axum::response::{IntoResponse, Response};
|
||||||
|
use axum::Json;
|
||||||
|
use blah_types::{SignedChatMsg, WithMsgId};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
const JSON_FEED_MIME: &str = "application/feed+json";
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
#[serde(default)]
|
||||||
|
pub struct Config {
|
||||||
|
pub max_page_len: NonZero<u32>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for Config {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
max_page_len: 64.try_into().expect("not zero"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Ref: <https://www.jsonfeed.org/version/1.1/>
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
#[serde(tag = "version", rename = "https://jsonfeed.org/version/1.1")]
|
||||||
|
struct JsonFeed {
|
||||||
|
title: String,
|
||||||
|
feed_url: Url,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
next_url: Option<Url>,
|
||||||
|
items: Vec<JsonFeedItem>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
struct JsonFeedItem {
|
||||||
|
id: String,
|
||||||
|
content_html: String,
|
||||||
|
date_published: String,
|
||||||
|
authors: (JsonFeedAuthor,),
|
||||||
|
// I don't think there is a need to return other special fields like signatures here.
|
||||||
|
// This API is for readers only which cannot recognize them anyway.
|
||||||
|
// Our clients should already use the dedicate API (`/room/:rid/msg`).
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
struct JsonFeedAuthor {
|
||||||
|
name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn to_json_feed(
|
||||||
|
title: String,
|
||||||
|
msgs: Vec<WithMsgId<SignedChatMsg>>,
|
||||||
|
self_url: Url,
|
||||||
|
next_url: Option<Url>,
|
||||||
|
) -> Response {
|
||||||
|
let items = msgs
|
||||||
|
.into_iter()
|
||||||
|
.map(|WithMsgId { cid, msg }| {
|
||||||
|
let time = SystemTime::UNIX_EPOCH + Duration::from_secs(msg.signee.timestamp);
|
||||||
|
let author = JsonFeedAuthor {
|
||||||
|
// TODO: Retrieve id_url as name.
|
||||||
|
name: msg.signee.user.id_key.to_string(),
|
||||||
|
};
|
||||||
|
JsonFeedItem {
|
||||||
|
id: cid.to_string(),
|
||||||
|
content_html: msg.signee.payload.rich_text.html().to_string(),
|
||||||
|
date_published: humantime::format_rfc3339(time).to_string(),
|
||||||
|
authors: (author,),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let feed = JsonFeed {
|
||||||
|
title,
|
||||||
|
items,
|
||||||
|
feed_url: self_url,
|
||||||
|
next_url,
|
||||||
|
};
|
||||||
|
|
||||||
|
([(header::CONTENT_TYPE, JSON_FEED_MIME)], Json(feed)).into_response()
|
||||||
|
}
|
101
blahd/src/lib.rs
101
blahd/src/lib.rs
|
@ -3,10 +3,10 @@ use std::sync::Arc;
|
||||||
use std::time::{Duration, SystemTime};
|
use std::time::{Duration, SystemTime};
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use axum::extract::ws;
|
use axum::extract::{ws, OriginalUri};
|
||||||
use axum::extract::{Path, Query, State, WebSocketUpgrade};
|
use axum::extract::{Path, Query, State, WebSocketUpgrade};
|
||||||
use axum::http::{header, HeaderMap, HeaderName, StatusCode};
|
use axum::http::{header, HeaderMap, HeaderName, StatusCode};
|
||||||
use axum::response::{IntoResponse, Response};
|
use axum::response::Response;
|
||||||
use axum::routing::{get, post};
|
use axum::routing::{get, post};
|
||||||
use axum::{Json, Router};
|
use axum::{Json, Router};
|
||||||
use axum_extra::extract::WithRejection as R;
|
use axum_extra::extract::WithRejection as R;
|
||||||
|
@ -17,7 +17,6 @@ use blah_types::{
|
||||||
X_BLAH_NONCE,
|
X_BLAH_NONCE,
|
||||||
};
|
};
|
||||||
use database::{Transaction, TransactionOps};
|
use database::{Transaction, TransactionOps};
|
||||||
use ed25519_dalek::SIGNATURE_LENGTH;
|
|
||||||
use id::IdExt;
|
use id::IdExt;
|
||||||
use middleware::{Auth, MaybeAuth, ResultExt as _, SignedJson};
|
use middleware::{Auth, MaybeAuth, ResultExt as _, SignedJson};
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
|
@ -32,6 +31,7 @@ mod middleware;
|
||||||
pub mod config;
|
pub mod config;
|
||||||
mod database;
|
mod database;
|
||||||
mod event;
|
mod event;
|
||||||
|
mod feed;
|
||||||
mod id;
|
mod id;
|
||||||
mod register;
|
mod register;
|
||||||
mod utils;
|
mod utils;
|
||||||
|
@ -54,6 +54,8 @@ pub struct ServerConfig {
|
||||||
#[serde_inline_default(90)]
|
#[serde_inline_default(90)]
|
||||||
pub timestamp_tolerance_secs: u64,
|
pub timestamp_tolerance_secs: u64,
|
||||||
|
|
||||||
|
#[serde(default)]
|
||||||
|
pub feed: feed::Config,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub ws: event::Config,
|
pub ws: event::Config,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
|
@ -135,7 +137,6 @@ pub fn router(st: Arc<AppState>) -> Router {
|
||||||
.route("/room", get(room_list))
|
.route("/room", get(room_list))
|
||||||
.route("/room/create", post(room_create))
|
.route("/room/create", post(room_create))
|
||||||
.route("/room/:rid", get(room_get_metadata).delete(room_delete))
|
.route("/room/:rid", get(room_get_metadata).delete(room_delete))
|
||||||
// NB. Sync with `feed_url` and `next_url` generation.
|
|
||||||
.route("/room/:rid/feed.json", get(room_get_feed))
|
.route("/room/:rid/feed.json", get(room_get_feed))
|
||||||
.route("/room/:rid/msg", get(room_msg_list).post(room_msg_post))
|
.route("/room/:rid/msg", get(room_msg_list).post(room_msg_post))
|
||||||
.route("/room/:rid/msg/:cid/seen", post(room_msg_mark_seen))
|
.route("/room/:rid/msg/:cid/seen", post(room_msg_mark_seen))
|
||||||
|
@ -425,9 +426,23 @@ async fn room_get_metadata(
|
||||||
|
|
||||||
async fn room_get_feed(
|
async fn room_get_feed(
|
||||||
st: ArcState,
|
st: ArcState,
|
||||||
|
R(OriginalUri(req_uri), _): RE<OriginalUri>,
|
||||||
R(Path(rid), _): RE<Path<Id>>,
|
R(Path(rid), _): RE<Path<Id>>,
|
||||||
R(Query(pagination), _): RE<Query<Pagination>>,
|
R(Query(mut pagination), _): RE<Query<Pagination>>,
|
||||||
) -> Result<impl IntoResponse, ApiError> {
|
) -> Result<Response, ApiError> {
|
||||||
|
// TODO: If-None-Match.
|
||||||
|
let self_url = st
|
||||||
|
.config
|
||||||
|
.base_url
|
||||||
|
.join(req_uri.path())
|
||||||
|
.expect("base_url can be a base");
|
||||||
|
|
||||||
|
pagination.top = Some(
|
||||||
|
pagination
|
||||||
|
.effective_page_len(&st)
|
||||||
|
.min(st.config.feed.max_page_len),
|
||||||
|
);
|
||||||
|
|
||||||
let (title, msgs, skip_token) = st.db.with_read(|txn| {
|
let (title, msgs, skip_token) = st.db.with_read(|txn| {
|
||||||
let (attrs, title) = txn.get_room_having(rid, RoomAttrs::PUBLIC_READABLE)?;
|
let (attrs, title) = txn.get_room_having(rid, RoomAttrs::PUBLIC_READABLE)?;
|
||||||
// Sanity check.
|
// Sanity check.
|
||||||
|
@ -437,40 +452,13 @@ async fn room_get_feed(
|
||||||
Ok((title, msgs, skip_token))
|
Ok((title, msgs, skip_token))
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let items = msgs
|
|
||||||
.into_iter()
|
|
||||||
.map(|WithMsgId { cid, msg }| {
|
|
||||||
let time = SystemTime::UNIX_EPOCH + Duration::from_secs(msg.signee.timestamp);
|
|
||||||
let author = FeedAuthor {
|
|
||||||
// TODO: Retrieve id_url as name.
|
|
||||||
name: msg.signee.user.id_key.to_string(),
|
|
||||||
};
|
|
||||||
FeedItem {
|
|
||||||
id: cid.to_string(),
|
|
||||||
content_html: msg.signee.payload.rich_text.html().to_string(),
|
|
||||||
date_published: humantime::format_rfc3339(time).to_string(),
|
|
||||||
authors: (author,),
|
|
||||||
extra: FeedItemExtra {
|
|
||||||
timestamp: msg.signee.timestamp,
|
|
||||||
nonce: msg.signee.nonce,
|
|
||||||
sig: msg.sig,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
let feed_url = st
|
|
||||||
.config
|
|
||||||
.base_url
|
|
||||||
.join(&format!("/room/{rid}/feed.json"))
|
|
||||||
.expect("base_url must be valid");
|
|
||||||
let next_url = skip_token.map(|skip_token| {
|
let next_url = skip_token.map(|skip_token| {
|
||||||
let next_params = Pagination {
|
let next_params = Pagination {
|
||||||
skip_token: Some(skip_token),
|
skip_token: Some(skip_token),
|
||||||
top: pagination.top,
|
top: pagination.top,
|
||||||
until_token: None,
|
until_token: None,
|
||||||
};
|
};
|
||||||
let mut next_url = feed_url.clone();
|
let mut next_url = self_url.clone();
|
||||||
{
|
{
|
||||||
let mut query = next_url.query_pairs_mut();
|
let mut query = next_url.query_pairs_mut();
|
||||||
let ser = serde_urlencoded::Serializer::new(&mut query);
|
let ser = serde_urlencoded::Serializer::new(&mut query);
|
||||||
|
@ -481,51 +469,8 @@ async fn room_get_feed(
|
||||||
}
|
}
|
||||||
next_url
|
next_url
|
||||||
});
|
});
|
||||||
let feed = FeedRoom {
|
|
||||||
title,
|
|
||||||
items,
|
|
||||||
next_url,
|
|
||||||
feed_url,
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok((
|
Ok(feed::to_json_feed(title, msgs, self_url, next_url))
|
||||||
[(header::CONTENT_TYPE, "application/feed+json")],
|
|
||||||
Json(feed),
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Ref: <https://www.jsonfeed.org/version/1.1/>
|
|
||||||
#[derive(Debug, Serialize)]
|
|
||||||
#[serde(tag = "version", rename = "https://jsonfeed.org/version/1.1")]
|
|
||||||
struct FeedRoom {
|
|
||||||
title: String,
|
|
||||||
feed_url: Url,
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
next_url: Option<Url>,
|
|
||||||
items: Vec<FeedItem>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize)]
|
|
||||||
struct FeedItem {
|
|
||||||
id: String,
|
|
||||||
content_html: String,
|
|
||||||
date_published: String,
|
|
||||||
authors: (FeedAuthor,),
|
|
||||||
#[serde(rename = "_blah")]
|
|
||||||
extra: FeedItemExtra,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize)]
|
|
||||||
struct FeedAuthor {
|
|
||||||
name: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize)]
|
|
||||||
struct FeedItemExtra {
|
|
||||||
timestamp: u64,
|
|
||||||
nonce: u32,
|
|
||||||
#[serde(with = "hex::serde")]
|
|
||||||
sig: [u8; SIGNATURE_LENGTH],
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get room messages with pagination parameters,
|
/// Get room messages with pagination parameters,
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
use std::backtrace::Backtrace;
|
use std::backtrace::Backtrace;
|
||||||
|
use std::convert::Infallible;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
@ -59,6 +60,13 @@ impl IntoResponse for ApiError {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// For infallible extractors.
|
||||||
|
impl From<Infallible> for ApiError {
|
||||||
|
fn from(v: Infallible) -> Self {
|
||||||
|
match v {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
macro_rules! define_from_deser_rejection {
|
macro_rules! define_from_deser_rejection {
|
||||||
($($ty:ty, $name:literal;)*) => {
|
($($ty:ty, $name:literal;)*) => {
|
||||||
$(
|
$(
|
||||||
|
|
|
@ -18,6 +18,7 @@ use blah_types::{
|
||||||
};
|
};
|
||||||
use blahd::{ApiError, AppState, Database, RoomList, RoomMsgs};
|
use blahd::{ApiError, AppState, Database, RoomList, RoomMsgs};
|
||||||
use ed25519_dalek::SigningKey;
|
use ed25519_dalek::SigningKey;
|
||||||
|
use expect_test::expect;
|
||||||
use futures_util::future::BoxFuture;
|
use futures_util::future::BoxFuture;
|
||||||
use futures_util::{SinkExt, Stream, StreamExt, TryFutureExt};
|
use futures_util::{SinkExt, Stream, StreamExt, TryFutureExt};
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
|
@ -32,18 +33,22 @@ use tokio::net::TcpListener;
|
||||||
// Register API requires a non-IP hostname.
|
// Register API requires a non-IP hostname.
|
||||||
const LOCALHOST: &str = "localhost";
|
const LOCALHOST: &str = "localhost";
|
||||||
const REGISTER_DIFFICULTY: u8 = 1;
|
const REGISTER_DIFFICULTY: u8 = 1;
|
||||||
|
const BASE_URL: &str = "http://base.example.com";
|
||||||
|
|
||||||
const TIME_TOLERANCE: Duration = Duration::from_millis(100);
|
const TIME_TOLERANCE: Duration = Duration::from_millis(100);
|
||||||
const WS_CONNECT_TIMEOUT: Duration = Duration::from_millis(1500);
|
const WS_CONNECT_TIMEOUT: Duration = Duration::from_millis(1500);
|
||||||
|
|
||||||
const CONFIG: fn(u16) -> String = |port| {
|
const CONFIG: fn(u16) -> String = |_port| {
|
||||||
format!(
|
format!(
|
||||||
r#"
|
r#"
|
||||||
base_url="http://{LOCALHOST}:{port}"
|
base_url="{BASE_URL}"
|
||||||
|
|
||||||
[ws]
|
[ws]
|
||||||
auth_timeout_sec = 1
|
auth_timeout_sec = 1
|
||||||
|
|
||||||
|
[feed]
|
||||||
|
max_page_len = 2
|
||||||
|
|
||||||
[register]
|
[register]
|
||||||
enable_public = true
|
enable_public = true
|
||||||
difficulty = {REGISTER_DIFFICULTY}
|
difficulty = {REGISTER_DIFFICULTY}
|
||||||
|
@ -720,6 +725,61 @@ async fn room_chat_post_read(server: Server) {
|
||||||
assert_eq!(msgs, RoomMsgs::default());
|
assert_eq!(msgs, RoomMsgs::default());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[rstest]
|
||||||
|
#[tokio::test]
|
||||||
|
async fn room_feed(server: Server) {
|
||||||
|
// Only public readable rooms provides feed. Not even for public joinable ones.
|
||||||
|
let rid_need_join = server
|
||||||
|
.create_room(&ALICE, RoomAttrs::PUBLIC_JOINABLE, "not so public")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
server
|
||||||
|
.get::<NoContent>(&format!("/room/{rid_need_join}/feed.json"), None)
|
||||||
|
.await
|
||||||
|
.expect_api_err(StatusCode::NOT_FOUND, "room_not_found");
|
||||||
|
|
||||||
|
let rid = server
|
||||||
|
.create_room(
|
||||||
|
&ALICE,
|
||||||
|
RoomAttrs::PUBLIC_READABLE | RoomAttrs::PUBLIC_JOINABLE,
|
||||||
|
"public",
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
server
|
||||||
|
.join_room(rid, &BOB, MemberPermission::POST_CHAT)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
server.post_chat(rid, &ALICE, "a").await.unwrap();
|
||||||
|
let cid2 = server.post_chat(rid, &BOB, "b1").await.unwrap().cid;
|
||||||
|
server.post_chat(rid, &BOB, "b2").await.unwrap();
|
||||||
|
|
||||||
|
let feed = server
|
||||||
|
.get::<serde_json::Value>(&format!("/room/{rid}/feed.json"), None)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
// TODO: Ideally we should assert on the result, but it contains time and random id currently.
|
||||||
|
assert_eq!(feed["title"].as_str().unwrap(), "public");
|
||||||
|
assert_eq!(feed["items"].as_array().unwrap().len(), 2);
|
||||||
|
let feed_url = format!("{BASE_URL}/_blah/room/{rid}/feed.json");
|
||||||
|
assert_eq!(feed["feed_url"].as_str().unwrap(), feed_url,);
|
||||||
|
assert_eq!(
|
||||||
|
feed["next_url"].as_str().unwrap(),
|
||||||
|
format!("{feed_url}?skipToken={cid2}&top=2"),
|
||||||
|
);
|
||||||
|
|
||||||
|
let feed2 = server
|
||||||
|
.get::<serde_json::Value>(
|
||||||
|
&format!("/room/{rid}/feed.json?skipToken={cid2}&top=2"),
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let items = feed2["items"].as_array().unwrap();
|
||||||
|
assert_eq!(items.len(), 1);
|
||||||
|
assert_eq!(items[0]["content_html"].as_str().unwrap(), "a");
|
||||||
|
}
|
||||||
|
|
||||||
#[rstest]
|
#[rstest]
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn last_seen(server: Server) {
|
async fn last_seen(server: Server) {
|
||||||
|
@ -989,7 +1049,7 @@ async fn register_flow(server: Server) {
|
||||||
register_fast(&req)
|
register_fast(&req)
|
||||||
.await
|
.await
|
||||||
.expect_api_err(StatusCode::BAD_REQUEST, "invalid_server_url");
|
.expect_api_err(StatusCode::BAD_REQUEST, "invalid_server_url");
|
||||||
req.server_url = format!("http://{}", server.domain()).parse().unwrap();
|
req.server_url = BASE_URL.parse().unwrap();
|
||||||
|
|
||||||
register_fast(&req)
|
register_fast(&req)
|
||||||
.await
|
.await
|
||||||
|
|
Loading…
Add table
Reference in a new issue