From 04ff3853e6f8d5a1d48686c1af23e7e8e519e9db Mon Sep 17 00:00:00 2001 From: Aleksandr Date: Tue, 7 Oct 2025 03:33:54 +0300 Subject: [PATCH] add dumb logging --- Cargo.toml | 11 +++- src/cli/mod.rs | 3 + src/component_configs.rs | 12 +++- src/lib.rs | 2 + src/logging/logforth.rs | 137 +++++++++++++++++++++++++++++++++++++++ src/logging/mod.rs | 4 ++ src/supervisor/mod.rs | 48 ++++++++------ 7 files changed, 195 insertions(+), 22 deletions(-) create mode 100644 src/logging/logforth.rs create mode 100644 src/logging/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 9c9f4a4..5863e0e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,8 +16,12 @@ cli = [ "figment/yaml", "figment/json", "figment/toml", - "figment/env" + "figment/env", + "logforth", + "dep:fastrace" ] +enable-tracing = ["fastrace/enable"] +logforth = ["dep:logforth", "dep:opentelemetry-otlp"] tokio = ["dep:tokio"] # Very long running. get_time_test = [] @@ -63,3 +67,8 @@ eyre = { version = "0.6.12" } num_cpus = { version = "1.17.0", optional = true } either = "1.15.0" humantime = "2.3.0" + +log = { version = "0.4.28", features = ["kv", "kv_serde", "std", "serde"] } +logforth = { version = "0.28.1", default-features = false, features = ["append-fastrace", "append-file", "bridge-log", "layout-text", "layout-json", "append-async", "append-opentelemetry", "diagnostic-fastrace", "rustls"], optional = true } +opentelemetry-otlp = { version = "0.31.0", default-features = false, features = ["logs", "metrics", "http-proto", "http-json"], optional = true } +fastrace = { version = "0.7.14", optional = true } diff --git a/src/cli/mod.rs b/src/cli/mod.rs index 001b618..9a39558 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -149,6 +149,8 @@ impl App { } let component_configs = ComponentConfigs::new(map); + crate::logging::logforth::setup(component_configs.get()); + let rt_config = component_configs.get::(); let rt = match rt_config.flavor { @@ -184,5 +186,6 @@ impl Default for App { }; this.optional::("system.rt") + .optional::("system.logging") } } diff --git a/src/component_configs.rs b/src/component_configs.rs index 66fde80..3fe0420 100644 --- a/src/component_configs.rs +++ b/src/component_configs.rs @@ -1,8 +1,8 @@ use crate::{collections::HashMap, str::CompactString, trait_set}; use std::{ - mem, any::{Any, TypeId}, + fmt, mem, ops::Deref, sync::{ Arc, Mutex, @@ -91,11 +91,21 @@ pub struct ComponentConfig { handle: Arc, } +impl fmt::Debug for ComponentConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + ::fmt(&**self, f) + } +} + impl ComponentConfig { pub fn section(&self) -> &str { &self.handle.section } + pub fn outdated(&self) -> bool { + self.generation != self.handle.generation.load(Ordering::Relaxed) + } + /// Update component config. Returns previous config if something's changed. pub fn try_update(&mut self) -> Option { let generation = self.handle.generation.load(Ordering::Relaxed); diff --git a/src/lib.rs b/src/lib.rs index 81ef92e..1a9e94b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,6 +28,8 @@ pub use trait_set::trait_set; #[cfg(feature = "cli")] pub mod cli; +pub mod logging; + pub mod array; pub mod error; pub mod utils; diff --git a/src/logging/logforth.rs b/src/logging/logforth.rs new file mode 100644 index 0000000..f50bb27 --- /dev/null +++ b/src/logging/logforth.rs @@ -0,0 +1,137 @@ +use std::{collections::HashMap, fmt, path::PathBuf}; + +use logforth::{ + Filter, + filter::FilterResult, + layout::JsonLayout, + record::{LevelFilter, Metadata}, +}; + +use opentelemetry_otlp::WithExportConfig as _; + +use crate::{component_configs::ComponentConfig, data}; + +#[data(crate = crate, copy)] +#[derive(Default)] +pub enum LogLevel { + Debug, + Warn, + Error, + #[default] + Info, + Trace, +} + +impl From for log::Level { + fn from(value: LogLevel) -> Self { + use LogLevel::*; + use log::Level as D; + + match value { + Info => D::Info, + Warn => D::Warn, + Trace => D::Trace, + Error => D::Error, + Debug => D::Debug, + } + } +} + +#[data(crate = crate)] +pub enum Append { + File { + base_dir: PathBuf, + file_name: String, + }, + Otel { + name: String, + endpoint: String, + #[serde(default)] + labels: HashMap, + }, + Stderr, + Stdout, +} + +impl Append { + pub fn to_appender(&self) -> Box { + use logforth::append; + + match self { + Self::File { + base_dir, + file_name, + } => Box::new( + append::file::FileBuilder::new(base_dir.clone(), file_name.clone()) + .filename_suffix("log") + .layout(JsonLayout::default()) + .build() + .unwrap(), + ), + Self::Otel { + name, + endpoint, + labels, + } => { + let log_exporter = opentelemetry_otlp::LogExporter::builder() + .with_http() + .with_endpoint(endpoint.as_str()) + .build() + .unwrap(); + Box::new( + append::opentelemetry::OpentelemetryLogBuilder::new(name.clone(), log_exporter) + .labels(labels.iter().map(|(k, v)| (k.clone(), v.clone()))) + .build(), + ) + } + Self::Stderr => Box::new(append::Stderr::default()), + Self::Stdout => Box::new(append::Stdout::default()), + } + } +} + +#[data(crate = crate)] +pub struct Config { + #[serde(default)] + pub default_level: LogLevel, + #[serde(default)] + pub levels: HashMap, + #[serde(default = "Config::default_append")] + pub append: Vec, +} + +impl Config { + fn default_append() -> Vec { + vec![Append::Stderr] + } +} + +impl Default for Config { + fn default() -> Self { + Self { + default_level: LogLevel::default(), + levels: HashMap::default(), + append: Self::default_append(), + } + } +} + +pub fn setup(cfg: ComponentConfig) { + use logforth::{ + append::{self, asynchronous as async_}, + diagnostic, + }; + + logforth::starter_log::builder() + .dispatch(move |d| { + let mut appenders = async_::AsyncBuilder::new("system.logger"); + for appender in cfg.append.iter() { + appenders = appenders.append(appender.to_appender()); + } + + d.diagnostic(diagnostic::FastraceDiagnostic::default()) + .append(append::FastraceEvent::default()) + .append(appenders.build()) + }) + .apply(); +} diff --git a/src/logging/mod.rs b/src/logging/mod.rs new file mode 100644 index 0000000..1c39afe --- /dev/null +++ b/src/logging/mod.rs @@ -0,0 +1,4 @@ +pub use log::{debug, error, info, log, log_enabled, trace, warn}; + +#[cfg(feature = "logforth")] +pub mod logforth; diff --git a/src/supervisor/mod.rs b/src/supervisor/mod.rs index 30c3bee..4ddb258 100644 --- a/src/supervisor/mod.rs +++ b/src/supervisor/mod.rs @@ -1,5 +1,6 @@ use compact_str::ToCompactString; use eyre::eyre; +use fastrace::{Span, future::FutureExt, prelude::SpanContext}; use tokio::{sync::mpsc, task::JoinSet}; use crate::{ @@ -82,6 +83,7 @@ where } pub struct Supervisor { + span: Span, set: JoinSet>, handles: HashMap, configs: ComponentConfigs, @@ -90,6 +92,7 @@ pub struct Supervisor { impl Supervisor { pub fn new(configs: ComponentConfigs) -> Self { Self { + span: Span::root("supervisor", SpanContext::random()), set: JoinSet::new(), handles: HashMap::default(), configs, @@ -108,35 +111,38 @@ impl Supervisor { let config = self.configs.get::(); let name = config.section().to_compact_string(); - self.set.spawn(async move { - let fut = f(config, slave_rx); - tokio::pin!(fut); - loop { - tokio::select! { - res = &mut fut => { - break res; - } + self.set.spawn( + async move { + let fut = f(config, slave_rx); + tokio::pin!(fut); + loop { + tokio::select! { + res = &mut fut => { + break res; + } - cmd = rx.recv(), if !rx.is_closed() => { - let Some(cmd) = cmd else { - continue; - }; + cmd = rx.recv(), if !rx.is_closed() => { + let Some(cmd) = cmd else { + continue; + }; - match cmd { - SupervisorCommand::Ping { pong } => { - pong.wake(); - } + match cmd { + SupervisorCommand::Ping { pong } => { + pong.wake(); + } - SupervisorCommand::User(u) => { - if stx.send(u).await.is_err() { - rx.close(); + SupervisorCommand::User(u) => { + if stx.send(u).await.is_err() { + rx.close(); + } } } } } } } - }); + .in_span(Span::enter_with_parent(name.clone(), &self.span)), + ); let hash_map::Entry::Vacant(vac) = self.handles.entry(name.clone()) else { panic!("{name:?} already exists") @@ -169,6 +175,8 @@ impl Supervisor { } } + fastrace::flush(); + Ok(()) } }