diff --git a/Cargo.toml b/Cargo.toml index 3d7d4da..9c9f4a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,13 +7,16 @@ edition = "2024" members = ["macros", "utils"] [features] -default = ["tokio"] +default = [] cli = [ "tokio/rt-multi-thread", "dep:clap", - "dep:figment", "dep:color-eyre", "dep:num_cpus", + "figment/yaml", + "figment/json", + "figment/toml", + "figment/env" ] tokio = ["dep:tokio"] # Very long running. @@ -53,7 +56,7 @@ blake3 = "1.8.2" slotmap = { version = "1.0.7", features = ["serde"] } clap = { version = "4.5.47", features = ["derive"], optional = true } -figment = { version = "0.10.19", features = ["json", "yaml", "env", "toml"], optional = true } +figment = { version = "0.10.19" } tokio = { version = "1.47.1", features = ["rt", "sync", "macros"], optional = true } color-eyre = { version = "0.6.5", optional = true } eyre = { version = "0.6.12" } diff --git a/src/cli/mod.rs b/src/cli/mod.rs index 4e08f49..001b618 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -1,4 +1,9 @@ -use std::{num::NonZeroUsize, path::PathBuf}; +use std::{ + any::{TypeId, type_name}, + num::NonZeroUsize, + path::PathBuf, + sync::{Arc, Mutex}, +}; use clap::{Args, Parser}; @@ -7,24 +12,39 @@ use figment::{ Figment, providers::{Env, Format, Json, Toml, YamlExtended}, }; +use hashbrown::hash_map; -use tokio::runtime as rt; +use crate::{collections::HashMap, str::CompactString}; + +use crate::component_configs::{ + ComponentConfigs, ConfigHandle, DeserializeCfg, ErasedValue, IsComponentConfig, +}; + +use tokio::runtime; + +pub mod rt; + +#[derive(Debug, Clone, Copy, clap::Args)] +pub struct NoArgs {} #[derive(Parser)] struct AppArgs { #[clap(long, short)] /// Config file(s) for the application. config: Vec, - #[clap(long, short)] - /// Number of worker threads app will consume. - worker_threads: Option, #[clap(flatten)] args: A, } +struct ComponentConfigSetup { + section: CompactString, + deserialize: DeserializeCfg, +} + pub struct App { figment: Figment, + component_configs: HashMap, } impl App { @@ -34,19 +54,70 @@ impl App { self } - pub fn run( + pub fn optional(mut self, section: &str) -> Self { + let hash_map::Entry::Vacant(vacant) = self.component_configs.entry(TypeId::of::()) + else { + panic!("{} already exists", type_name::()) + }; + + vacant.insert(ComponentConfigSetup { + section: section.into(), + deserialize: |path, figment| { + let res: figment::Result = figment.extract_inner_lossy(path); + let res = match res { + Ok(o) => Ok(o), + Err(e) => match &e.kind { + figment::error::Kind::MissingField(field) if field == path => { + Ok(T::default()) + } + _ => Err(e).wrap_err("failed to deserialize section"), + }, + }; + + res.map(|x| { + let x: ErasedValue = Arc::new(Mutex::new(x)); + x + }) + }, + }); + + self + } + + pub fn require(mut self, section: &str) -> Self { + let hash_map::Entry::Vacant(vacant) = self.component_configs.entry(TypeId::of::()) + else { + panic!("{} already exists", type_name::()) + }; + + vacant.insert(ComponentConfigSetup { + section: section.into(), + deserialize: |path, figment| { + let res: T = figment + .extract_inner_lossy(path) + .wrap_err("failed to deserialize")?; + Ok(Arc::new(Mutex::new(res))) + }, + }); + self + } + + pub fn run( self, - f: impl Send + 'static + AsyncFnOnce(A, C) -> eyre::Result<()>, + f: impl Send + 'static + AsyncFnOnce(A, ComponentConfigs) -> eyre::Result<()>, ) -> eyre::Result<()> where A: Args, - C: serde::de::DeserializeOwned, { - let Self { mut figment } = self; + const ONE: NonZeroUsize = NonZeroUsize::new(1).unwrap(); + + let Self { + mut figment, + component_configs, + } = self; color_eyre::install().wrap_err("failed to install color-eyre")?; let AppArgs { config: configs, - worker_threads, args, } = AppArgs::::parse(); @@ -55,7 +126,10 @@ impl App { let component = config.components().next_back().ok_or_else(|| { eyre::eyre!("{} must contain at least one component", config.display()) })?; - let last = component.as_os_str().as_encoded_bytes(); + let last = component + .as_os_str() + .as_encoded_bytes() + .to_ascii_lowercase(); if config.is_dir() { todo!("populate everything from the directory") @@ -68,12 +142,27 @@ impl App { } } - let config: C = figment.extract_lossy().wrap_err("failed to load config")?; - let rt = match worker_threads.map_or(num_cpus::get(), NonZeroUsize::get) { - 0 | 1 => rt::Builder::new_current_thread(), - n => { - let mut b = rt::Builder::new_multi_thread(); - b.worker_threads(n); + let mut map = HashMap::default(); + for (type_id, setup) in component_configs { + let handle = ConfigHandle::initial(setup.section, setup.deserialize, &figment)?; + map.insert(type_id, Arc::new(handle)); + } + + let component_configs = ComponentConfigs::new(map); + let rt_config = component_configs.get::(); + + let rt = match rt_config.flavor { + rt::Flavor::MultiThreaded { + threads: rt::ThreadsCount::Specific(ONE), + } + | rt::Flavor::SingleThreaded => runtime::Builder::new_current_thread(), + rt::Flavor::MultiThreaded { threads } => { + let mut b = runtime::Builder::new_current_thread(); + b.worker_threads(match threads { + rt::ThreadsCount::Specific(s) => s.get(), + rt::ThreadsCount::ThreadPerCpu => num_cpus::get(), + }); + b } } @@ -81,14 +170,19 @@ impl App { .build() .wrap_err("failed to create tokio runtime")?; - rt.block_on(f(args, config)) + drop(rt_config); + + rt.block_on(f(args, component_configs)) } } impl Default for App { fn default() -> Self { - Self { + let this = Self { + component_configs: HashMap::default(), figment: Figment::new(), - } + }; + + this.optional::("system.rt") } } diff --git a/src/cli/rt.rs b/src/cli/rt.rs new file mode 100644 index 0000000..edac716 --- /dev/null +++ b/src/cli/rt.rs @@ -0,0 +1,37 @@ +use std::num::NonZeroUsize; + +use crate::data; + +#[data(crate = crate)] +#[derive(Default)] +#[serde(default)] +pub struct RtConfig { + pub flavor: Flavor, +} + +#[data(crate = crate, copy)] +#[derive(Default)] +pub enum ThreadsCount { + #[default] + ThreadPerCpu, + #[serde(untagged)] + Specific(NonZeroUsize), +} + +#[data(crate = crate, copy)] +pub enum Flavor { + SingleThreaded, + #[serde(rename = "worker_threads")] + MultiThreaded { + #[serde(default)] + threads: ThreadsCount, + }, +} + +impl Default for Flavor { + fn default() -> Self { + Self::MultiThreaded { + threads: ThreadsCount::default(), + } + } +} diff --git a/src/component_configs.rs b/src/component_configs.rs new file mode 100644 index 0000000..66fde80 --- /dev/null +++ b/src/component_configs.rs @@ -0,0 +1,124 @@ +use crate::{collections::HashMap, str::CompactString, trait_set}; + +use std::{ + mem, + any::{Any, TypeId}, + ops::Deref, + sync::{ + Arc, Mutex, + atomic::{AtomicUsize, Ordering}, + }, +}; + +use eyre::Context; +use figment::Figment; + +pub(crate) type ErasedValue = Arc>; +pub(crate) type DeserializeCfg = fn(&str, &Figment) -> eyre::Result; + +trait_set! { + pub trait IsComponentConfig = Any + 'static + Clone + Send + Sync + serde::de::DeserializeOwned + serde::Serialize; +} + +pub(crate) struct ConfigHandle { + section: CompactString, + recent: ErasedValue, + generation: AtomicUsize, + deserialize: DeserializeCfg, +} + +impl ConfigHandle { + pub(crate) fn initial( + section: CompactString, + deserialize: DeserializeCfg, + figment: &Figment, + ) -> eyre::Result { + let recent = + deserialize(§ion, figment).wrap_err("failed to deserialize config at start")?; + + Ok(Self { + section, + recent, + deserialize, + generation: AtomicUsize::new(0), + }) + } + + pub(crate) fn read(&self) -> Option<(T, usize)> { + let cfg = self.recent.lock().unwrap(); + let generation = self.generation.load(Ordering::Relaxed); + + cfg.downcast_ref::().map(|r| (r.clone(), generation)) + } +} + +#[derive(Clone)] +pub struct ComponentConfigs { + map: Arc>>, +} + +impl ComponentConfigs { + pub(crate) fn new(map: HashMap>) -> Self { + Self { map: Arc::new(map) } + } + + #[track_caller] + pub fn get(&self) -> ComponentConfig { + self.try_get::().unwrap_or_else(|| { + panic!( + "configuration for {} does not exist", + std::any::type_name::() + ) + }) + } + + pub fn try_get(&self) -> Option> { + let type_id = TypeId::of::(); + self.map.get(&type_id).cloned().map(|handle| { + let (cfg, generation) = handle.read::().unwrap(); + ComponentConfig { + cfg, + generation, + handle, + } + }) + } +} + +pub struct ComponentConfig { + cfg: T, + generation: usize, + handle: Arc, +} + +impl ComponentConfig { + pub fn section(&self) -> &str { + &self.handle.section + } + + /// Update component config. Returns previous config if something's changed. + pub fn try_update(&mut self) -> Option { + let generation = self.handle.generation.load(Ordering::Relaxed); + + if generation == self.generation { + None + } else { + Some(self.force_update()) + } + } + + /// Update component config forcibly. + pub fn force_update(&mut self) -> T { + let (cfg, generation) = self.handle.read::().unwrap(); + self.generation = generation; + mem::replace(&mut self.cfg, cfg) + } +} + +impl Deref for ComponentConfig { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.cfg + } +} diff --git a/src/lib.rs b/src/lib.rs index 8ad8a4e..81ef92e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,6 +48,9 @@ pub mod collections; pub mod handling; pub mod hash; +pub mod component_configs; +pub mod signal; + pub use either::{self, Either, Left, Right}; pub use paste::paste; diff --git a/src/signal.rs b/src/signal.rs new file mode 100644 index 0000000..d1311d1 --- /dev/null +++ b/src/signal.rs @@ -0,0 +1,7 @@ +use crate::data; + +#[data(crate = crate, copy)] +pub enum Signal { + /// SIGHUP. + HangUp, +} diff --git a/src/supervisor/mod.rs b/src/supervisor/mod.rs index 62841fd..30c3bee 100644 --- a/src/supervisor/mod.rs +++ b/src/supervisor/mod.rs @@ -1,8 +1,10 @@ +use compact_str::ToCompactString; use eyre::eyre; use tokio::{sync::mpsc, task::JoinSet}; use crate::{ collections::{HashMap, hash_map}, + component_configs::{ComponentConfig, ComponentConfigs, IsComponentConfig}, data, fut::Fut, str::CompactString, @@ -65,29 +67,49 @@ impl Handle { } } -#[derive(Default)] +pub trait Supervised: + Send + Sync + 'static + Fn(ComponentConfig, SlaveRx) -> Self::Fut +{ + type Fut: 'static + Fut>; +} + +impl Supervised for F +where + F: Send + Sync + 'static + Fn(ComponentConfig, SlaveRx) -> FFut, + FFut: 'static + Fut>, +{ + type Fut = FFut; +} + pub struct Supervisor { set: JoinSet>, handles: HashMap, + configs: ComponentConfigs, } impl Supervisor { - pub fn empty() -> Self { - Self::default() + pub fn new(configs: ComponentConfigs) -> Self { + Self { + set: JoinSet::new(), + handles: HashMap::default(), + configs, + } } - pub fn add(mut self, name: &str, f: F) -> Self + pub fn add(mut self, f: impl Supervised) -> Self where - F: Send + 'static + FnOnce(SlaveRx) -> FFut, - FFut: 'static + Fut>, + T: IsComponentConfig, { let (tx, mut rx) = mpsc::channel(8); let (stx, srx) = mpsc::channel(8); let handle = Handle { tx }; let slave_rx = SlaveRx { rx: srx }; + let config = self.configs.get::(); + let name = config.section().to_compact_string(); + self.set.spawn(async move { - let fut = f(slave_rx); + let fut = f(config, slave_rx); tokio::pin!(fut); loop { tokio::select! { @@ -116,7 +138,7 @@ impl Supervisor { } }); - let hash_map::Entry::Vacant(vac) = self.handles.entry(name.into()) else { + let hash_map::Entry::Vacant(vac) = self.handles.entry(name.clone()) else { panic!("{name:?} already exists") }; diff --git a/utils/src/sync/sema.rs b/utils/src/sync/sema.rs new file mode 100644 index 0000000..e69de29