better cli setup
This commit is contained in:
parent
321ab18890
commit
8015bcc99f
8 changed files with 321 additions and 31 deletions
|
@ -7,13 +7,16 @@ edition = "2024"
|
|||
members = ["macros", "utils"]
|
||||
|
||||
[features]
|
||||
default = ["tokio"]
|
||||
default = []
|
||||
cli = [
|
||||
"tokio/rt-multi-thread",
|
||||
"dep:clap",
|
||||
"dep:figment",
|
||||
"dep:color-eyre",
|
||||
"dep:num_cpus",
|
||||
"figment/yaml",
|
||||
"figment/json",
|
||||
"figment/toml",
|
||||
"figment/env"
|
||||
]
|
||||
tokio = ["dep:tokio"]
|
||||
# Very long running.
|
||||
|
@ -53,7 +56,7 @@ blake3 = "1.8.2"
|
|||
slotmap = { version = "1.0.7", features = ["serde"] }
|
||||
|
||||
clap = { version = "4.5.47", features = ["derive"], optional = true }
|
||||
figment = { version = "0.10.19", features = ["json", "yaml", "env", "toml"], optional = true }
|
||||
figment = { version = "0.10.19" }
|
||||
tokio = { version = "1.47.1", features = ["rt", "sync", "macros"], optional = true }
|
||||
color-eyre = { version = "0.6.5", optional = true }
|
||||
eyre = { version = "0.6.12" }
|
||||
|
|
134
src/cli/mod.rs
134
src/cli/mod.rs
|
@ -1,4 +1,9 @@
|
|||
use std::{num::NonZeroUsize, path::PathBuf};
|
||||
use std::{
|
||||
any::{TypeId, type_name},
|
||||
num::NonZeroUsize,
|
||||
path::PathBuf,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use clap::{Args, Parser};
|
||||
|
||||
|
@ -7,24 +12,39 @@ use figment::{
|
|||
Figment,
|
||||
providers::{Env, Format, Json, Toml, YamlExtended},
|
||||
};
|
||||
use hashbrown::hash_map;
|
||||
|
||||
use tokio::runtime as rt;
|
||||
use crate::{collections::HashMap, str::CompactString};
|
||||
|
||||
use crate::component_configs::{
|
||||
ComponentConfigs, ConfigHandle, DeserializeCfg, ErasedValue, IsComponentConfig,
|
||||
};
|
||||
|
||||
use tokio::runtime;
|
||||
|
||||
pub mod rt;
|
||||
|
||||
#[derive(Debug, Clone, Copy, clap::Args)]
|
||||
pub struct NoArgs {}
|
||||
|
||||
#[derive(Parser)]
|
||||
struct AppArgs<A: Args> {
|
||||
#[clap(long, short)]
|
||||
/// Config file(s) for the application.
|
||||
config: Vec<PathBuf>,
|
||||
#[clap(long, short)]
|
||||
/// Number of worker threads app will consume.
|
||||
worker_threads: Option<NonZeroUsize>,
|
||||
|
||||
#[clap(flatten)]
|
||||
args: A,
|
||||
}
|
||||
|
||||
struct ComponentConfigSetup {
|
||||
section: CompactString,
|
||||
deserialize: DeserializeCfg,
|
||||
}
|
||||
|
||||
pub struct App {
|
||||
figment: Figment,
|
||||
component_configs: HashMap<TypeId, ComponentConfigSetup>,
|
||||
}
|
||||
|
||||
impl App {
|
||||
|
@ -34,19 +54,70 @@ impl App {
|
|||
self
|
||||
}
|
||||
|
||||
pub fn run<A, C>(
|
||||
pub fn optional<T: IsComponentConfig + Default>(mut self, section: &str) -> Self {
|
||||
let hash_map::Entry::Vacant(vacant) = self.component_configs.entry(TypeId::of::<T>())
|
||||
else {
|
||||
panic!("{} already exists", type_name::<T>())
|
||||
};
|
||||
|
||||
vacant.insert(ComponentConfigSetup {
|
||||
section: section.into(),
|
||||
deserialize: |path, figment| {
|
||||
let res: figment::Result<T> = figment.extract_inner_lossy(path);
|
||||
let res = match res {
|
||||
Ok(o) => Ok(o),
|
||||
Err(e) => match &e.kind {
|
||||
figment::error::Kind::MissingField(field) if field == path => {
|
||||
Ok(T::default())
|
||||
}
|
||||
_ => Err(e).wrap_err("failed to deserialize section"),
|
||||
},
|
||||
};
|
||||
|
||||
res.map(|x| {
|
||||
let x: ErasedValue = Arc::new(Mutex::new(x));
|
||||
x
|
||||
})
|
||||
},
|
||||
});
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub fn require<T: IsComponentConfig>(mut self, section: &str) -> Self {
|
||||
let hash_map::Entry::Vacant(vacant) = self.component_configs.entry(TypeId::of::<T>())
|
||||
else {
|
||||
panic!("{} already exists", type_name::<T>())
|
||||
};
|
||||
|
||||
vacant.insert(ComponentConfigSetup {
|
||||
section: section.into(),
|
||||
deserialize: |path, figment| {
|
||||
let res: T = figment
|
||||
.extract_inner_lossy(path)
|
||||
.wrap_err("failed to deserialize")?;
|
||||
Ok(Arc::new(Mutex::new(res)))
|
||||
},
|
||||
});
|
||||
self
|
||||
}
|
||||
|
||||
pub fn run<A>(
|
||||
self,
|
||||
f: impl Send + 'static + AsyncFnOnce(A, C) -> eyre::Result<()>,
|
||||
f: impl Send + 'static + AsyncFnOnce(A, ComponentConfigs) -> eyre::Result<()>,
|
||||
) -> eyre::Result<()>
|
||||
where
|
||||
A: Args,
|
||||
C: serde::de::DeserializeOwned,
|
||||
{
|
||||
let Self { mut figment } = self;
|
||||
const ONE: NonZeroUsize = NonZeroUsize::new(1).unwrap();
|
||||
|
||||
let Self {
|
||||
mut figment,
|
||||
component_configs,
|
||||
} = self;
|
||||
color_eyre::install().wrap_err("failed to install color-eyre")?;
|
||||
let AppArgs {
|
||||
config: configs,
|
||||
worker_threads,
|
||||
args,
|
||||
} = AppArgs::<A>::parse();
|
||||
|
||||
|
@ -55,7 +126,10 @@ impl App {
|
|||
let component = config.components().next_back().ok_or_else(|| {
|
||||
eyre::eyre!("{} must contain at least one component", config.display())
|
||||
})?;
|
||||
let last = component.as_os_str().as_encoded_bytes();
|
||||
let last = component
|
||||
.as_os_str()
|
||||
.as_encoded_bytes()
|
||||
.to_ascii_lowercase();
|
||||
|
||||
if config.is_dir() {
|
||||
todo!("populate everything from the directory")
|
||||
|
@ -68,12 +142,27 @@ impl App {
|
|||
}
|
||||
}
|
||||
|
||||
let config: C = figment.extract_lossy().wrap_err("failed to load config")?;
|
||||
let rt = match worker_threads.map_or(num_cpus::get(), NonZeroUsize::get) {
|
||||
0 | 1 => rt::Builder::new_current_thread(),
|
||||
n => {
|
||||
let mut b = rt::Builder::new_multi_thread();
|
||||
b.worker_threads(n);
|
||||
let mut map = HashMap::default();
|
||||
for (type_id, setup) in component_configs {
|
||||
let handle = ConfigHandle::initial(setup.section, setup.deserialize, &figment)?;
|
||||
map.insert(type_id, Arc::new(handle));
|
||||
}
|
||||
|
||||
let component_configs = ComponentConfigs::new(map);
|
||||
let rt_config = component_configs.get::<rt::RtConfig>();
|
||||
|
||||
let rt = match rt_config.flavor {
|
||||
rt::Flavor::MultiThreaded {
|
||||
threads: rt::ThreadsCount::Specific(ONE),
|
||||
}
|
||||
| rt::Flavor::SingleThreaded => runtime::Builder::new_current_thread(),
|
||||
rt::Flavor::MultiThreaded { threads } => {
|
||||
let mut b = runtime::Builder::new_current_thread();
|
||||
b.worker_threads(match threads {
|
||||
rt::ThreadsCount::Specific(s) => s.get(),
|
||||
rt::ThreadsCount::ThreadPerCpu => num_cpus::get(),
|
||||
});
|
||||
|
||||
b
|
||||
}
|
||||
}
|
||||
|
@ -81,14 +170,19 @@ impl App {
|
|||
.build()
|
||||
.wrap_err("failed to create tokio runtime")?;
|
||||
|
||||
rt.block_on(f(args, config))
|
||||
drop(rt_config);
|
||||
|
||||
rt.block_on(f(args, component_configs))
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for App {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
let this = Self {
|
||||
component_configs: HashMap::default(),
|
||||
figment: Figment::new(),
|
||||
}
|
||||
};
|
||||
|
||||
this.optional::<rt::RtConfig>("system.rt")
|
||||
}
|
||||
}
|
||||
|
|
37
src/cli/rt.rs
Normal file
37
src/cli/rt.rs
Normal file
|
@ -0,0 +1,37 @@
|
|||
use std::num::NonZeroUsize;
|
||||
|
||||
use crate::data;
|
||||
|
||||
#[data(crate = crate)]
|
||||
#[derive(Default)]
|
||||
#[serde(default)]
|
||||
pub struct RtConfig {
|
||||
pub flavor: Flavor,
|
||||
}
|
||||
|
||||
#[data(crate = crate, copy)]
|
||||
#[derive(Default)]
|
||||
pub enum ThreadsCount {
|
||||
#[default]
|
||||
ThreadPerCpu,
|
||||
#[serde(untagged)]
|
||||
Specific(NonZeroUsize),
|
||||
}
|
||||
|
||||
#[data(crate = crate, copy)]
|
||||
pub enum Flavor {
|
||||
SingleThreaded,
|
||||
#[serde(rename = "worker_threads")]
|
||||
MultiThreaded {
|
||||
#[serde(default)]
|
||||
threads: ThreadsCount,
|
||||
},
|
||||
}
|
||||
|
||||
impl Default for Flavor {
|
||||
fn default() -> Self {
|
||||
Self::MultiThreaded {
|
||||
threads: ThreadsCount::default(),
|
||||
}
|
||||
}
|
||||
}
|
124
src/component_configs.rs
Normal file
124
src/component_configs.rs
Normal file
|
@ -0,0 +1,124 @@
|
|||
use crate::{collections::HashMap, str::CompactString, trait_set};
|
||||
|
||||
use std::{
|
||||
mem,
|
||||
any::{Any, TypeId},
|
||||
ops::Deref,
|
||||
sync::{
|
||||
Arc, Mutex,
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
},
|
||||
};
|
||||
|
||||
use eyre::Context;
|
||||
use figment::Figment;
|
||||
|
||||
pub(crate) type ErasedValue = Arc<Mutex<dyn Send + Sync + Any + 'static>>;
|
||||
pub(crate) type DeserializeCfg = fn(&str, &Figment) -> eyre::Result<ErasedValue>;
|
||||
|
||||
trait_set! {
|
||||
pub trait IsComponentConfig = Any + 'static + Clone + Send + Sync + serde::de::DeserializeOwned + serde::Serialize;
|
||||
}
|
||||
|
||||
pub(crate) struct ConfigHandle {
|
||||
section: CompactString,
|
||||
recent: ErasedValue,
|
||||
generation: AtomicUsize,
|
||||
deserialize: DeserializeCfg,
|
||||
}
|
||||
|
||||
impl ConfigHandle {
|
||||
pub(crate) fn initial(
|
||||
section: CompactString,
|
||||
deserialize: DeserializeCfg,
|
||||
figment: &Figment,
|
||||
) -> eyre::Result<Self> {
|
||||
let recent =
|
||||
deserialize(§ion, figment).wrap_err("failed to deserialize config at start")?;
|
||||
|
||||
Ok(Self {
|
||||
section,
|
||||
recent,
|
||||
deserialize,
|
||||
generation: AtomicUsize::new(0),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn read<T: IsComponentConfig>(&self) -> Option<(T, usize)> {
|
||||
let cfg = self.recent.lock().unwrap();
|
||||
let generation = self.generation.load(Ordering::Relaxed);
|
||||
|
||||
cfg.downcast_ref::<T>().map(|r| (r.clone(), generation))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ComponentConfigs {
|
||||
map: Arc<HashMap<TypeId, Arc<ConfigHandle>>>,
|
||||
}
|
||||
|
||||
impl ComponentConfigs {
|
||||
pub(crate) fn new(map: HashMap<TypeId, Arc<ConfigHandle>>) -> Self {
|
||||
Self { map: Arc::new(map) }
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
pub fn get<T: IsComponentConfig>(&self) -> ComponentConfig<T> {
|
||||
self.try_get::<T>().unwrap_or_else(|| {
|
||||
panic!(
|
||||
"configuration for {} does not exist",
|
||||
std::any::type_name::<T>()
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
pub fn try_get<T: IsComponentConfig>(&self) -> Option<ComponentConfig<T>> {
|
||||
let type_id = TypeId::of::<T>();
|
||||
self.map.get(&type_id).cloned().map(|handle| {
|
||||
let (cfg, generation) = handle.read::<T>().unwrap();
|
||||
ComponentConfig {
|
||||
cfg,
|
||||
generation,
|
||||
handle,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ComponentConfig<T> {
|
||||
cfg: T,
|
||||
generation: usize,
|
||||
handle: Arc<ConfigHandle>,
|
||||
}
|
||||
|
||||
impl<T: IsComponentConfig> ComponentConfig<T> {
|
||||
pub fn section(&self) -> &str {
|
||||
&self.handle.section
|
||||
}
|
||||
|
||||
/// Update component config. Returns previous config if something's changed.
|
||||
pub fn try_update(&mut self) -> Option<T> {
|
||||
let generation = self.handle.generation.load(Ordering::Relaxed);
|
||||
|
||||
if generation == self.generation {
|
||||
None
|
||||
} else {
|
||||
Some(self.force_update())
|
||||
}
|
||||
}
|
||||
|
||||
/// Update component config forcibly.
|
||||
pub fn force_update(&mut self) -> T {
|
||||
let (cfg, generation) = self.handle.read::<T>().unwrap();
|
||||
self.generation = generation;
|
||||
mem::replace(&mut self.cfg, cfg)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Deref for ComponentConfig<T> {
|
||||
type Target = T;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.cfg
|
||||
}
|
||||
}
|
|
@ -48,6 +48,9 @@ pub mod collections;
|
|||
pub mod handling;
|
||||
pub mod hash;
|
||||
|
||||
pub mod component_configs;
|
||||
pub mod signal;
|
||||
|
||||
pub use either::{self, Either, Left, Right};
|
||||
pub use paste::paste;
|
||||
|
||||
|
|
7
src/signal.rs
Normal file
7
src/signal.rs
Normal file
|
@ -0,0 +1,7 @@
|
|||
use crate::data;
|
||||
|
||||
#[data(crate = crate, copy)]
|
||||
pub enum Signal {
|
||||
/// SIGHUP.
|
||||
HangUp,
|
||||
}
|
|
@ -1,8 +1,10 @@
|
|||
use compact_str::ToCompactString;
|
||||
use eyre::eyre;
|
||||
use tokio::{sync::mpsc, task::JoinSet};
|
||||
|
||||
use crate::{
|
||||
collections::{HashMap, hash_map},
|
||||
component_configs::{ComponentConfig, ComponentConfigs, IsComponentConfig},
|
||||
data,
|
||||
fut::Fut,
|
||||
str::CompactString,
|
||||
|
@ -65,29 +67,49 @@ impl Handle {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub trait Supervised<T>:
|
||||
Send + Sync + 'static + Fn(ComponentConfig<T>, SlaveRx) -> Self::Fut
|
||||
{
|
||||
type Fut: 'static + Fut<Output = eyre::Result<()>>;
|
||||
}
|
||||
|
||||
impl<T, F, FFut> Supervised<T> for F
|
||||
where
|
||||
F: Send + Sync + 'static + Fn(ComponentConfig<T>, SlaveRx) -> FFut,
|
||||
FFut: 'static + Fut<Output = eyre::Result<()>>,
|
||||
{
|
||||
type Fut = FFut;
|
||||
}
|
||||
|
||||
pub struct Supervisor {
|
||||
set: JoinSet<eyre::Result<()>>,
|
||||
handles: HashMap<CompactString, Handle>,
|
||||
configs: ComponentConfigs,
|
||||
}
|
||||
|
||||
impl Supervisor {
|
||||
pub fn empty() -> Self {
|
||||
Self::default()
|
||||
pub fn new(configs: ComponentConfigs) -> Self {
|
||||
Self {
|
||||
set: JoinSet::new(),
|
||||
handles: HashMap::default(),
|
||||
configs,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add<F, FFut>(mut self, name: &str, f: F) -> Self
|
||||
pub fn add<T>(mut self, f: impl Supervised<T>) -> Self
|
||||
where
|
||||
F: Send + 'static + FnOnce(SlaveRx) -> FFut,
|
||||
FFut: 'static + Fut<Output = eyre::Result<()>>,
|
||||
T: IsComponentConfig,
|
||||
{
|
||||
let (tx, mut rx) = mpsc::channel(8);
|
||||
let (stx, srx) = mpsc::channel(8);
|
||||
let handle = Handle { tx };
|
||||
let slave_rx = SlaveRx { rx: srx };
|
||||
|
||||
let config = self.configs.get::<T>();
|
||||
let name = config.section().to_compact_string();
|
||||
|
||||
self.set.spawn(async move {
|
||||
let fut = f(slave_rx);
|
||||
let fut = f(config, slave_rx);
|
||||
tokio::pin!(fut);
|
||||
loop {
|
||||
tokio::select! {
|
||||
|
@ -116,7 +138,7 @@ impl Supervisor {
|
|||
}
|
||||
});
|
||||
|
||||
let hash_map::Entry::Vacant(vac) = self.handles.entry(name.into()) else {
|
||||
let hash_map::Entry::Vacant(vac) = self.handles.entry(name.clone()) else {
|
||||
panic!("{name:?} already exists")
|
||||
};
|
||||
|
||||
|
|
0
utils/src/sync/sema.rs
Normal file
0
utils/src/sync/sema.rs
Normal file
Loading…
Add table
Add a link
Reference in a new issue