From fe587f057f49e7156bab82b917d6da636d759f8d Mon Sep 17 00:00:00 2001 From: oxalica Date: Mon, 14 Oct 2024 18:59:04 -0400 Subject: [PATCH] feat(blahd): impl optional prometheus metrics --- Cargo.lock | 121 +++++++++++++++++++++++++++++++++++--- blahd/Cargo.toml | 11 +++- blahd/config.example.toml | 15 +++++ blahd/src/bin/blahd.rs | 92 ++++++++++++++++++----------- blahd/src/config.rs | 26 +++++++- blahd/src/event.rs | 14 +++++ blahd/src/lib.rs | 5 ++ blahd/src/metric.rs | 42 +++++++++++++ flake.nix | 1 + 9 files changed, 284 insertions(+), 43 deletions(-) create mode 100644 blahd/src/metric.rs diff --git a/Cargo.lock b/Cargo.lock index 968abc8..f8457ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -217,6 +217,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-metrics" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4388453857d3d845c087a74b58e688a71388c412a436092ee15c78ce2c7bc080" +dependencies = [ + "axum", + "bytes", + "http-body", + "metrics", + "pin-project-lite", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -313,6 +328,7 @@ dependencies = [ "anyhow", "axum", "axum-extra", + "axum-metrics", "blah-types", "clap", "data-encoding", @@ -323,6 +339,8 @@ dependencies = [ "html-escape", "http-body-util", "humantime", + "metrics", + "metrics-exporter-prometheus", "mock_instant", "nix", "parking_lot", @@ -801,6 +819,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2" + [[package]] name = "foreign-types" version = "0.3.2" @@ -923,7 +947,7 @@ dependencies = [ "futures-core", "futures-sink", "http", - "indexmap 2.5.0", + "indexmap 2.6.0", "slab", "tokio", "tokio-util", @@ -955,6 +979,15 @@ dependencies = [ "ahash", ] +[[package]] +name = "hashbrown" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" +dependencies = [ + "foldhash", +] + [[package]] name = "hashlink" version = "0.9.1" @@ -1177,12 +1210,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.5.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5" +checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" dependencies = [ "equivalent", - "hashbrown 0.14.5", + "hashbrown 0.15.0", "serde", ] @@ -1295,6 +1328,44 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "metrics" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ae428771d17306715c5091d446327d1cfdedc82185c65ba8423ab404e45bf10" +dependencies = [ + "ahash", + "portable-atomic", +] + +[[package]] +name = "metrics-exporter-prometheus" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85b6f8152da6d7892ff1b7a1c0fa3f435e92b5918ad67035c3bb432111d9a29b" +dependencies = [ + "base64 0.22.1", + "indexmap 2.6.0", + "metrics", + "metrics-util", + "quanta", + "thiserror", +] + +[[package]] +name = "metrics-util" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15b482df36c13dd1869d73d14d28cd4855fbd6cfc32294bee109908a9f4a4ed7" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.15.0", + "metrics", + "quanta", + "sketches-ddsketch", +] + [[package]] name = "mime" version = "0.3.17" @@ -1592,6 +1663,12 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "portable-atomic" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2" + [[package]] name = "powerfmt" version = "0.2.0" @@ -1616,6 +1693,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "quanta" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quote" version = "1.0.37" @@ -1655,6 +1747,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "raw-cpuid" +version = "11.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ab240315c661615f2ee9f0f2cd32d5a7343a84d5ebcccb99d46e6637565e7b0" +dependencies = [ + "bitflags", +] + [[package]] name = "rayon" version = "1.10.0" @@ -2079,7 +2180,7 @@ dependencies = [ "chrono", "hex", "indexmap 1.9.3", - "indexmap 2.5.0", + "indexmap 2.6.0", "serde", "serde_derive", "serde_json", @@ -2160,6 +2261,12 @@ version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" +[[package]] +name = "sketches-ddsketch" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e9a774a6c28142ac54bb25d25562e6bcf957493a184f15ad4eebccb23e410a" + [[package]] name = "slab" version = "0.4.9" @@ -2210,7 +2317,7 @@ dependencies = [ "bitflags", "cc", "fallible-iterator", - "indexmap 2.5.0", + "indexmap 2.6.0", "log", "memchr", "phf", @@ -2502,7 +2609,7 @@ version = "0.22.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ae48d6208a266e853d946088ed816055e556cc6028c5e8e2b84d9fa5dd7c7f5" dependencies = [ - "indexmap 2.5.0", + "indexmap 2.6.0", "serde", "serde_spanned", "toml_datetime", diff --git a/blahd/Cargo.toml b/blahd/Cargo.toml index 3dd72d6..a2311fc 100644 --- a/blahd/Cargo.toml +++ b/blahd/Cargo.toml @@ -4,8 +4,13 @@ version = "0.0.0" edition = "2021" [features] -default = [] +default = ["prometheus"] unsafe_use_mock_instant_for_testing = ["dep:mock_instant", "blah-types/unsafe_use_mock_instant_for_testing"] +prometheus = [ + "dep:axum-metrics", + "dep:metrics", + "dep:metrics-exporter-prometheus", +] [dependencies] anyhow = "1" @@ -44,6 +49,10 @@ url = { version = "2", features = ["serde"] } blah-types = { path = "../blah-types", features = ["rusqlite"] } +axum-metrics = { version = "0.1", optional = true } +metrics = { version = "0.24.0", optional = true } +metrics-exporter-prometheus = { version = "0.16", optional = true, default-features = false } + [build-dependencies] url = "2" diff --git a/blahd/config.example.toml b/blahd/config.example.toml index b96691a..3634237 100644 --- a/blahd/config.example.toml +++ b/blahd/config.example.toml @@ -94,3 +94,18 @@ difficulty = 16 # The challenge nonce rotation period in seconds. nonce_rotate_secs = 60 + +# If this section appears, the prometheus metrics tracking is enabled. +# The server must be built with "prometheus" feature enabled to support this. +#[metric.prometheus] + +# The listen address for the metrics server, in the same type as `[listen]`. +# GET on route `/metrics` responds in Prometheus Exposition Format. +# See more: https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format +#listen.systemd = true + +# Upkeep interval in seconds. +#upkeep_period_secs = 5 + +# The bucket width in seconds for histograms. +#bucket_duration_secs = 20 diff --git a/blahd/src/bin/blahd.rs b/blahd/src/bin/blahd.rs index d52f67c..f488223 100644 --- a/blahd/src/bin/blahd.rs +++ b/blahd/src/bin/blahd.rs @@ -3,9 +3,10 @@ use std::os::fd::{FromRawFd, OwnedFd}; use std::path::PathBuf; use std::sync::Arc; -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{bail, Context, Result}; use blahd::config::{Config, ListenConfig}; use blahd::{AppState, Database}; +use tokio::net::TcpListener; use tokio::signal::unix::{signal, SignalKind}; /// Blah Chat Server @@ -57,47 +58,72 @@ fn main() -> Result<()> { async fn main_serve(db: Database, config: Config) -> Result<()> { let st = AppState::new(db, config.server); - let (listener_display, listener) = match &config.listen { - ListenConfig::Address(addr) => ( - format!("address {addr:?}"), - tokio::net::TcpListener::bind(addr) - .await - .context("failed to listen on socket")?, - ), - ListenConfig::Systemd(_) => { - use rustix::net::{getsockname, SocketAddrAny}; + let mut listen_fds = sd_notify::listen_fds() + .context("failed to get fds from sd_listen_fds(3)")? + // SAFETY: `fd` is valid by sd_listen_fds(3) protocol. + .map(|fd| unsafe { Some(OwnedFd::from_raw_fd(fd)) }) + .collect::>(); - let [fd] = sd_notify::listen_fds() - .context("failed to get fds from sd_listen_fds(3)")? - .collect::>() - .try_into() - .map_err(|_| anyhow!("expecting exactly one fd from LISTEN_FDS"))?; - // SAFETY: `fd` is valid by sd_listen_fds(3) protocol. - let listener = unsafe { OwnedFd::from_raw_fd(fd) }; - - let addr = getsockname(&listener).context("failed to getsockname")?; - match addr { - SocketAddrAny::V4(_) | SocketAddrAny::V6(_) => { - let listener = std::net::TcpListener::from(listener); - listener - .set_nonblocking(true) - .context("failed to set socket non-blocking")?; - let listener = tokio::net::TcpListener::from_std(listener) - .context("failed to register async socket")?; - (format!("tcp socket {addr:?} from LISTEN_FDS"), listener) + let mut get_listener = move |name: &'static str, fd_idx: usize, listen: ListenConfig| { + let fd = listen_fds.get_mut(0).and_then(|opt| opt.take()); + async move { + match listen { + ListenConfig::Address(addr) => { + tracing::info!("serve {name} on address {addr:?}"); + TcpListener::bind(addr) + .await + .context("failed to listen on socket") + } + ListenConfig::Systemd(_) => { + use rustix::net::{getsockname, SocketAddrAny}; + + let fd = fd.with_context(|| format!("missing LISTEN_FDS[{fd_idx}]"))?; + let addr = getsockname(&fd).context("failed to getsockname")?; + match addr { + SocketAddrAny::V4(_) | SocketAddrAny::V6(_) => { + let listener = std::net::TcpListener::from(fd); + listener + .set_nonblocking(true) + .context("failed to set socket non-blocking")?; + let listener = TcpListener::from_std(listener) + .context("failed to register async socket")?; + tracing::info!( + "serve {name} on tcp socket {addr:?} from LISTEN_FDS[{fd_idx}" + ); + Ok(listener) + } + // Unix socket support for axum is currently overly complex. + // WAIT: https://github.com/tokio-rs/axum/pull/2479 + _ => bail!("unsupported socket type from LISTEN_FDS[{fd_idx}]: {addr:?}"), + } } - // Unix socket support for axum is currently overly complex. - // WAIT: https://github.com/tokio-rs/axum/pull/2479 - _ => bail!("unsupported socket type from LISTEN_FDS: {addr:?}"), } } }; - tracing::info!("listening on {listener_display}"); - let router = blahd::router(Arc::new(st)); + #[cfg(not(feature = "prometheus"))] + anyhow::ensure!( + config.metric.is_none(), + "metric support is disabled at compile time", + ); + + #[cfg(feature = "prometheus")] + let router = if let Some(metric_config) = &config.metric { + let blahd::config::MetricConfig::Prometheus(prom_config) = metric_config; + let metric_listener = get_listener("metrics", 1, prom_config.listen.clone()).await?; + let (metric_router, recorder, upkeeper) = blahd::metrics_router(metric_config); + metrics::set_global_recorder(recorder).expect("only set once"); + tokio::spawn(upkeeper); + tokio::spawn(axum::serve(metric_listener, metric_router).into_future()); + router.layer(axum_metrics::MetricLayer::default()) + } else { + router + }; + let mut sigterm = signal(SignalKind::terminate()).context("failed to listen on SIGTERM")?; + let listener = get_listener("main", 0, config.listen.clone()).await?; let service = axum::serve(listener, router) .with_graceful_shutdown(async move { sigterm.recv().await; diff --git a/blahd/src/config.rs b/blahd/src/config.rs index 644c787..abcd696 100644 --- a/blahd/src/config.rs +++ b/blahd/src/config.rs @@ -1,5 +1,8 @@ -use serde::{Deserialize, Serialize}; +use std::num::NonZero; + +use serde::Deserialize; use serde_constant::ConstBool; +use serde_inline_default::serde_inline_default; use crate::{database, ServerConfig}; @@ -10,15 +13,34 @@ pub struct Config { pub database: database::Config, pub listen: ListenConfig, pub server: ServerConfig, + #[serde(default)] + pub metric: Option, } -#[derive(Debug, Clone, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "snake_case")] pub enum ListenConfig { Address(String), Systemd(ConstBool), } +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum MetricConfig { + Prometheus(#[serde(default)] PrometheusConfig), +} + +#[serde_inline_default] +#[derive(Debug, Clone, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct PrometheusConfig { + pub listen: ListenConfig, + #[serde_inline_default(5.try_into().expect("not zero"))] + pub upkeep_period_secs: NonZero, + #[serde_inline_default(20.try_into().expect("not zero"))] + pub bucket_duration_secs: NonZero, +} + #[cfg(test)] mod tests { use super::*; diff --git a/blahd/src/event.rs b/blahd/src/event.rs index 3967387..d28a10a 100644 --- a/blahd/src/event.rs +++ b/blahd/src/event.rs @@ -138,6 +138,20 @@ impl Stream for UserEventReceiver { // TODO: Authenticate via HTTP query? pub async fn get_ws(st: ArcState, ws: WebSocketUpgrade) -> Response { ws.on_upgrade(move |mut socket| async move { + #[cfg(feature = "prometheus")] + let _guard = { + struct DecOnDrop(metrics::Gauge); + impl Drop for DecOnDrop { + fn drop(&mut self) { + self.0.decrement(1); + } + } + + let gauge = metrics::gauge!("ws_connections_in_flight"); + gauge.increment(1); + DecOnDrop(gauge) + }; + match handle_ws(st.0, &mut socket).await { #[allow( unreachable_patterns, diff --git a/blahd/src/lib.rs b/blahd/src/lib.rs index 3cd0039..0be2c71 100644 --- a/blahd/src/lib.rs +++ b/blahd/src/lib.rs @@ -44,6 +44,11 @@ mod id; mod register; mod utils; +#[cfg(feature = "prometheus")] +mod metric; +#[cfg(feature = "prometheus")] +pub use metric::metrics_router; + pub use database::{Config as DatabaseConfig, Database}; pub use middleware::ApiError; diff --git a/blahd/src/metric.rs b/blahd/src/metric.rs new file mode 100644 index 0000000..a47d46b --- /dev/null +++ b/blahd/src/metric.rs @@ -0,0 +1,42 @@ +use std::time::Duration; + +use axum::Router; +use futures_util::future::BoxFuture; +use metrics::Recorder; +use metrics_exporter_prometheus::PrometheusBuilder; +use tokio::time::{interval, MissedTickBehavior}; + +use crate::config::MetricConfig; + +type DynRecorder = Box; +type UpkeeperTask = BoxFuture<'static, ()>; + +pub fn metrics_router(config: &MetricConfig) -> (Router, DynRecorder, UpkeeperTask) { + let MetricConfig::Prometheus(config) = config; + + let recorder = PrometheusBuilder::new() + .set_bucket_duration(Duration::from_secs( + config.bucket_duration_secs.get().into(), + )) + .expect("not zero") + .build_recorder(); + + let handle_render = recorder.handle(); + let get_metrics = || async move { handle_render.render() }; + + let upkeeper = Box::pin({ + let handle_upkeep = recorder.handle(); + let upkeep_period = Duration::from_secs(config.upkeep_period_secs.get().into()); + async move { + let mut interval = interval(upkeep_period); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + loop { + interval.tick().await; + handle_upkeep.run_upkeep(); + } + } + }) as _; + + let router = Router::new().route("/metrics", axum::routing::get(get_metrics)); + (router, Box::new(recorder) as _, upkeeper) +} diff --git a/flake.nix b/flake.nix index 0058d3e..db9117c 100644 --- a/flake.nix +++ b/flake.nix @@ -63,6 +63,7 @@ rec { ++ [ "--package=blahd" "--package=blahctl" + "--features=prometheus" ]; # Intentionally omit the socket unit. It is trivial but