diff --git a/Cargo.toml b/Cargo.toml index 5863e0e..f2e5504 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ cli = [ ] enable-tracing = ["fastrace/enable"] logforth = ["dep:logforth", "dep:opentelemetry-otlp"] -tokio = ["dep:tokio"] +tokio = ["dep:tokio", "dep:fastrace"] # Very long running. get_time_test = [] @@ -72,3 +72,4 @@ log = { version = "0.4.28", features = ["kv", "kv_serde", "std", "serde"] } logforth = { version = "0.28.1", default-features = false, features = ["append-fastrace", "append-file", "bridge-log", "layout-text", "layout-json", "append-async", "append-opentelemetry", "diagnostic-fastrace", "rustls"], optional = true } opentelemetry-otlp = { version = "0.31.0", default-features = false, features = ["logs", "metrics", "http-proto", "http-json"], optional = true } fastrace = { version = "0.7.14", optional = true } +pin-project-lite = "0.2.16" diff --git a/src/logging/logforth.rs b/src/logging/logforth.rs index f50bb27..3e24dc2 100644 --- a/src/logging/logforth.rs +++ b/src/logging/logforth.rs @@ -1,17 +1,12 @@ -use std::{collections::HashMap, fmt, path::PathBuf}; +use std::{collections::HashMap, path::PathBuf}; -use logforth::{ - Filter, - filter::FilterResult, - layout::JsonLayout, - record::{LevelFilter, Metadata}, -}; +use logforth::{Filter, filter::FilterResult, layout::JsonLayout, record::Metadata}; use opentelemetry_otlp::WithExportConfig as _; -use crate::{component_configs::ComponentConfig, data}; +use crate::{component_configs::ComponentConfig, data, supervisor, utils::unlikely}; -#[data(crate = crate, copy)] +#[data(crate = crate, copy, display(name))] #[derive(Default)] pub enum LogLevel { Debug, @@ -20,19 +15,21 @@ pub enum LogLevel { #[default] Info, Trace, + Disabled, } -impl From for log::Level { - fn from(value: LogLevel) -> Self { +impl LogLevel { + fn into_logforth(self) -> Option { use LogLevel::*; - use log::Level as D; + use logforth::record::Level as D; - match value { - Info => D::Info, - Warn => D::Warn, - Trace => D::Trace, - Error => D::Error, - Debug => D::Debug, + match self { + Info => Some(D::Info), + Warn => Some(D::Warn), + Trace => Some(D::Trace), + Error => Some(D::Error), + Debug => Some(D::Debug), + Disabled => None, } } } @@ -92,15 +89,23 @@ impl Append { #[data(crate = crate)] pub struct Config { + #[serde(default = "Config::default_enabled")] + pub enabled: bool, #[serde(default)] pub default_level: LogLevel, #[serde(default)] - pub levels: HashMap, + pub components: HashMap, + #[serde(default)] + pub targets: HashMap, #[serde(default = "Config::default_append")] pub append: Vec, } impl Config { + fn default_enabled() -> bool { + true + } + fn default_append() -> Vec { vec![Append::Stderr] } @@ -110,12 +115,123 @@ impl Default for Config { fn default() -> Self { Self { default_level: LogLevel::default(), - levels: HashMap::default(), + targets: HashMap::default(), + enabled: Self::default_enabled(), + components: HashMap::default(), append: Self::default_append(), } } } +#[derive(Debug)] +struct ComponentDiagnostics {} + +impl logforth::Diagnostic for ComponentDiagnostics { + fn visit(&self, visitor: &mut dyn logforth::kv::Visitor) -> Result<(), logforth::Error> { + use logforth::kv::{Key, Value}; + + if let Some(r) = supervisor::scope::with(|scope| { + visitor.visit( + Key::new("system_component"), + Value::from_str(scope.component()), + ) + }) { + r + } else { + Ok(()) + } + } +} + +#[derive(Debug)] +struct LogFilter { + cfg: ComponentConfig, +} + +impl Filter for LogFilter { + fn enabled( + &self, + metadata: &Metadata, + diags: &[Box], + ) -> FilterResult { + use logforth::kv; + + // Reject if logs are turned off entirely. + if unlikely(!self.cfg.enabled) { + return FilterResult::Reject; + } + // Reject if logs for target are disabled or would be filtered out by level. + if let Some(max_level) = self + .cfg + .targets + .get(metadata.target()) + .map(|lvl| lvl.into_logforth()) + { + if let Some(max_level) = max_level { + if metadata.level() > max_level { + return FilterResult::Reject; + } + } else { + return FilterResult::Reject; + } + } + + #[derive(Debug)] + struct DiagVisitor<'a> { + max_level: Option, + map: &'a HashMap, + has_component: bool, + } + + impl kv::Visitor for DiagVisitor<'_> { + fn visit(&mut self, key: kv::Key, value: kv::Value) -> Result<(), logforth::Error> { + if key == kv::Key::new("system_component") { + self.has_component |= true; + if let Some(component) = value.to_str() { + self.max_level = self.map.get(&*component).copied(); + } + } + + Ok(()) + } + } + + let mut visitor = DiagVisitor { + max_level: None, + has_component: false, + map: &self.cfg.components, + }; + for diag in diags { + _ = diag.visit(&mut visitor); + if visitor.has_component { + break; + } + } + + let max_level: Option = if let Some(max_level) = visitor.max_level + { + // There's a system_component field and level is configured explicitly. + max_level.into_logforth() + } else if visitor.has_component { + // There's system_component field, but level isn't configured explicitly. + self.cfg.default_level.into_logforth() + } else { + // There's no system_component field. + self.cfg + .targets + .get(metadata.target()) + .copied() + .and_then(|l| l.into_logforth()) + }; + + if max_level.is_some_and(|lvl| metadata.level() <= lvl) { + FilterResult::Accept + } else { + FilterResult::Reject + } + } +} + pub fn setup(cfg: ComponentConfig) { use logforth::{ append::{self, asynchronous as async_}, @@ -129,7 +245,9 @@ pub fn setup(cfg: ComponentConfig) { appenders = appenders.append(appender.to_appender()); } - d.diagnostic(diagnostic::FastraceDiagnostic::default()) + d.diagnostic(ComponentDiagnostics {}) + .filter(LogFilter { cfg }) + .diagnostic(diagnostic::FastraceDiagnostic::default()) .append(append::FastraceEvent::default()) .append(appenders.build()) }) diff --git a/src/supervisor/mod.rs b/src/supervisor/mod.rs index 4ddb258..60006af 100644 --- a/src/supervisor/mod.rs +++ b/src/supervisor/mod.rs @@ -12,6 +12,10 @@ use crate::{ sync::notifies::mpsc::{Tx, make as make_notify}, }; +use self::scope::InScopeExt as _; + +pub mod scope; + pub struct SlaveRx { rx: mpsc::Receiver, } @@ -111,9 +115,12 @@ impl Supervisor { let config = self.configs.get::(); let name = config.section().to_compact_string(); + let component_name = name.clone(); self.set.spawn( async move { - let fut = f(config, slave_rx); + let scope = scope::Scope::new(&component_name); + let fut = f(config, slave_rx).in_scope(scope.into()); + tokio::pin!(fut); loop { tokio::select! { @@ -163,17 +170,21 @@ impl Supervisor { } pub async fn wait_for_completion(mut self) -> eyre::Result<()> { - while let Some(res) = self.set.join_next().await { - let error = match res { - Err(e) => Some(eyre!("failed to join task: {e}")), - Ok(Err(e)) => Some(e), - Ok(Ok(())) => None, - }; + async move { + while let Some(res) = self.set.join_next().await { + let error = match res { + Err(e) => Some(eyre!("failed to join task: {e}")), + Ok(Err(e)) => Some(e), + Ok(Ok(())) => None, + }; - if let Some(error) = error { - eprintln!("failed to join task: {error:#}"); + if let Some(error) = error { + log::error!(error:%; "failed to join component"); + } } } + .in_scope(scope::Scope::new("system.supervisor").into()) + .await; fastrace::flush(); diff --git a/src/supervisor/scope.rs b/src/supervisor/scope.rs new file mode 100644 index 0000000..fa06114 --- /dev/null +++ b/src/supervisor/scope.rs @@ -0,0 +1,74 @@ +use std::{ + cell::RefCell, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + thread_local, +}; + +use pin_project_lite::pin_project; + +thread_local! { + static SCOPE: RefCell>> = RefCell::new(None); +} + +pin_project! { + pub struct FutureInScope { + scope: Arc, + #[pin] + fut: F, + } +} + +impl Future for FutureInScope { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + SCOPE.with_borrow_mut(|opt| match opt { + Some(arc) => { + if !Arc::ptr_eq(arc, &*this.scope) { + *arc = Arc::clone(&*this.scope); + } + } + None => *opt = Some(Arc::clone(&this.scope)), + }); + + this.fut.poll(cx) + } +} + +pub struct Scope { + component: String, +} + +impl Scope { + pub fn new(component: &str) -> Self { + Self { + component: component.into(), + } + } + + pub fn component(&self) -> &str { + self.component.as_str() + } +} + +pub trait InScopeExt { + type InScopeFuture: Future; + + fn in_scope(self, scope: Arc) -> Self::InScopeFuture; +} + +impl InScopeExt for Fut { + type InScopeFuture = FutureInScope; + + fn in_scope(self, scope: Arc) -> Self::InScopeFuture { + FutureInScope { scope, fut: self } + } +} + +pub fn with(f: impl FnOnce(&Scope) -> O) -> Option { + SCOPE.with_borrow(|scope| scope.as_deref().map(f)) +}