add component propagation

This commit is contained in:
Aleksandr 2025-10-07 20:33:46 +03:00
parent 04ff3853e6
commit 4e3b349e20
4 changed files with 235 additions and 31 deletions

View file

@ -22,7 +22,7 @@ cli = [
]
enable-tracing = ["fastrace/enable"]
logforth = ["dep:logforth", "dep:opentelemetry-otlp"]
tokio = ["dep:tokio"]
tokio = ["dep:tokio", "dep:fastrace"]
# Very long running.
get_time_test = []
@ -72,3 +72,4 @@ log = { version = "0.4.28", features = ["kv", "kv_serde", "std", "serde"] }
logforth = { version = "0.28.1", default-features = false, features = ["append-fastrace", "append-file", "bridge-log", "layout-text", "layout-json", "append-async", "append-opentelemetry", "diagnostic-fastrace", "rustls"], optional = true }
opentelemetry-otlp = { version = "0.31.0", default-features = false, features = ["logs", "metrics", "http-proto", "http-json"], optional = true }
fastrace = { version = "0.7.14", optional = true }
pin-project-lite = "0.2.16"

View file

@ -1,17 +1,12 @@
use std::{collections::HashMap, fmt, path::PathBuf};
use std::{collections::HashMap, path::PathBuf};
use logforth::{
Filter,
filter::FilterResult,
layout::JsonLayout,
record::{LevelFilter, Metadata},
};
use logforth::{Filter, filter::FilterResult, layout::JsonLayout, record::Metadata};
use opentelemetry_otlp::WithExportConfig as _;
use crate::{component_configs::ComponentConfig, data};
use crate::{component_configs::ComponentConfig, data, supervisor, utils::unlikely};
#[data(crate = crate, copy)]
#[data(crate = crate, copy, display(name))]
#[derive(Default)]
pub enum LogLevel {
Debug,
@ -20,19 +15,21 @@ pub enum LogLevel {
#[default]
Info,
Trace,
Disabled,
}
impl From<LogLevel> for log::Level {
fn from(value: LogLevel) -> Self {
impl LogLevel {
fn into_logforth(self) -> Option<logforth::record::Level> {
use LogLevel::*;
use log::Level as D;
use logforth::record::Level as D;
match value {
Info => D::Info,
Warn => D::Warn,
Trace => D::Trace,
Error => D::Error,
Debug => D::Debug,
match self {
Info => Some(D::Info),
Warn => Some(D::Warn),
Trace => Some(D::Trace),
Error => Some(D::Error),
Debug => Some(D::Debug),
Disabled => None,
}
}
}
@ -92,15 +89,23 @@ impl Append {
#[data(crate = crate)]
pub struct Config {
#[serde(default = "Config::default_enabled")]
pub enabled: bool,
#[serde(default)]
pub default_level: LogLevel,
#[serde(default)]
pub levels: HashMap<String, LogLevel>,
pub components: HashMap<String, LogLevel>,
#[serde(default)]
pub targets: HashMap<String, LogLevel>,
#[serde(default = "Config::default_append")]
pub append: Vec<Append>,
}
impl Config {
fn default_enabled() -> bool {
true
}
fn default_append() -> Vec<Append> {
vec![Append::Stderr]
}
@ -110,12 +115,123 @@ impl Default for Config {
fn default() -> Self {
Self {
default_level: LogLevel::default(),
levels: HashMap::default(),
targets: HashMap::default(),
enabled: Self::default_enabled(),
components: HashMap::default(),
append: Self::default_append(),
}
}
}
#[derive(Debug)]
struct ComponentDiagnostics {}
impl logforth::Diagnostic for ComponentDiagnostics {
fn visit(&self, visitor: &mut dyn logforth::kv::Visitor) -> Result<(), logforth::Error> {
use logforth::kv::{Key, Value};
if let Some(r) = supervisor::scope::with(|scope| {
visitor.visit(
Key::new("system_component"),
Value::from_str(scope.component()),
)
}) {
r
} else {
Ok(())
}
}
}
#[derive(Debug)]
struct LogFilter {
cfg: ComponentConfig<Config>,
}
impl Filter for LogFilter {
fn enabled(
&self,
metadata: &Metadata,
diags: &[Box<dyn logforth::Diagnostic>],
) -> FilterResult {
use logforth::kv;
// Reject if logs are turned off entirely.
if unlikely(!self.cfg.enabled) {
return FilterResult::Reject;
}
// Reject if logs for target are disabled or would be filtered out by level.
if let Some(max_level) = self
.cfg
.targets
.get(metadata.target())
.map(|lvl| lvl.into_logforth())
{
if let Some(max_level) = max_level {
if metadata.level() > max_level {
return FilterResult::Reject;
}
} else {
return FilterResult::Reject;
}
}
#[derive(Debug)]
struct DiagVisitor<'a> {
max_level: Option<LogLevel>,
map: &'a HashMap<String, LogLevel>,
has_component: bool,
}
impl kv::Visitor for DiagVisitor<'_> {
fn visit(&mut self, key: kv::Key, value: kv::Value) -> Result<(), logforth::Error> {
if key == kv::Key::new("system_component") {
self.has_component |= true;
if let Some(component) = value.to_str() {
self.max_level = self.map.get(&*component).copied();
}
}
Ok(())
}
}
let mut visitor = DiagVisitor {
max_level: None,
has_component: false,
map: &self.cfg.components,
};
for diag in diags {
_ = diag.visit(&mut visitor);
if visitor.has_component {
break;
}
}
let max_level: Option<logforth::record::Level> = if let Some(max_level) = visitor.max_level
{
// There's a system_component field and level is configured explicitly.
max_level.into_logforth()
} else if visitor.has_component {
// There's system_component field, but level isn't configured explicitly.
self.cfg.default_level.into_logforth()
} else {
// There's no system_component field.
self.cfg
.targets
.get(metadata.target())
.copied()
.and_then(|l| l.into_logforth())
};
if max_level.is_some_and(|lvl| metadata.level() <= lvl) {
FilterResult::Accept
} else {
FilterResult::Reject
}
}
}
pub fn setup(cfg: ComponentConfig<Config>) {
use logforth::{
append::{self, asynchronous as async_},
@ -129,7 +245,9 @@ pub fn setup(cfg: ComponentConfig<Config>) {
appenders = appenders.append(appender.to_appender());
}
d.diagnostic(diagnostic::FastraceDiagnostic::default())
d.diagnostic(ComponentDiagnostics {})
.filter(LogFilter { cfg })
.diagnostic(diagnostic::FastraceDiagnostic::default())
.append(append::FastraceEvent::default())
.append(appenders.build())
})

View file

@ -12,6 +12,10 @@ use crate::{
sync::notifies::mpsc::{Tx, make as make_notify},
};
use self::scope::InScopeExt as _;
pub mod scope;
pub struct SlaveRx {
rx: mpsc::Receiver<Command>,
}
@ -111,9 +115,12 @@ impl Supervisor {
let config = self.configs.get::<T>();
let name = config.section().to_compact_string();
let component_name = name.clone();
self.set.spawn(
async move {
let fut = f(config, slave_rx);
let scope = scope::Scope::new(&component_name);
let fut = f(config, slave_rx).in_scope(scope.into());
tokio::pin!(fut);
loop {
tokio::select! {
@ -163,17 +170,21 @@ impl Supervisor {
}
pub async fn wait_for_completion(mut self) -> eyre::Result<()> {
while let Some(res) = self.set.join_next().await {
let error = match res {
Err(e) => Some(eyre!("failed to join task: {e}")),
Ok(Err(e)) => Some(e),
Ok(Ok(())) => None,
};
async move {
while let Some(res) = self.set.join_next().await {
let error = match res {
Err(e) => Some(eyre!("failed to join task: {e}")),
Ok(Err(e)) => Some(e),
Ok(Ok(())) => None,
};
if let Some(error) = error {
eprintln!("failed to join task: {error:#}");
if let Some(error) = error {
log::error!(error:%; "failed to join component");
}
}
}
.in_scope(scope::Scope::new("system.supervisor").into())
.await;
fastrace::flush();

74
src/supervisor/scope.rs Normal file
View file

@ -0,0 +1,74 @@
use std::{
cell::RefCell,
pin::Pin,
sync::Arc,
task::{Context, Poll},
thread_local,
};
use pin_project_lite::pin_project;
thread_local! {
static SCOPE: RefCell<Option<Arc<Scope>>> = RefCell::new(None);
}
pin_project! {
pub struct FutureInScope<F> {
scope: Arc<Scope>,
#[pin]
fut: F,
}
}
impl<F: Future> Future for FutureInScope<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
SCOPE.with_borrow_mut(|opt| match opt {
Some(arc) => {
if !Arc::ptr_eq(arc, &*this.scope) {
*arc = Arc::clone(&*this.scope);
}
}
None => *opt = Some(Arc::clone(&this.scope)),
});
this.fut.poll(cx)
}
}
pub struct Scope {
component: String,
}
impl Scope {
pub fn new(component: &str) -> Self {
Self {
component: component.into(),
}
}
pub fn component(&self) -> &str {
self.component.as_str()
}
}
pub trait InScopeExt {
type InScopeFuture: Future;
fn in_scope(self, scope: Arc<Scope>) -> Self::InScopeFuture;
}
impl<Fut: Future> InScopeExt for Fut {
type InScopeFuture = FutureInScope<Fut>;
fn in_scope(self, scope: Arc<Scope>) -> Self::InScopeFuture {
FutureInScope { scope, fut: self }
}
}
pub fn with<O>(f: impl FnOnce(&Scope) -> O) -> Option<O> {
SCOPE.with_borrow(|scope| scope.as_deref().map(f))
}