From 016350a480b2f26ea338c5111ff946d1c88b4bb0 Mon Sep 17 00:00:00 2001 From: Aleksandr Date: Sat, 20 Sep 2025 17:02:10 +0300 Subject: [PATCH] add supervisor --- Cargo.toml | 14 +++- src/fut.rs | 6 ++ src/lib.rs | 3 + src/supervisor/mod.rs | 152 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 172 insertions(+), 3 deletions(-) create mode 100644 src/supervisor/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 18b27cd..a6afb09 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,8 +7,15 @@ edition = "2024" members = ["macros", "utils"] [features] -default = [] -cli = ["dep:clap", "dep:figment", "dep:tokio", "dep:color-eyre", "dep:num_cpus"] +default = ["tokio"] +cli = [ + "tokio/rt-multi-thread", + "dep:clap", + "dep:figment", + "dep:color-eyre", + "dep:num_cpus", +] +tokio = ["dep:tokio"] # Very long running. get_time_test = [] @@ -47,7 +54,8 @@ slotmap = { version = "1.0.7", features = ["serde"] } clap = { version = "4.5.47", features = ["derive"], optional = true } figment = { version = "0.10.19", features = ["json", "yaml", "env", "toml"], optional = true } -tokio = { version = "1.47.1", features = ["rt", "rt-multi-thread"], optional = true } +tokio = { version = "1.47.1", features = ["rt", "sync", "macros"], optional = true } color-eyre = { version = "0.6.5", optional = true } eyre = { version = "0.6.12" } num_cpus = { version = "1.17.0", optional = true } +either = "1.15.0" diff --git a/src/fut.rs b/src/fut.rs index 8bbc24d..c9066f2 100644 --- a/src/fut.rs +++ b/src/fut.rs @@ -8,6 +8,12 @@ use std::{ #[crate::perfect_derive(Debug, Clone, Default, Copy, PartialEq, Eq, PartialOrd, Ord)] pub struct Never(PhantomData); +impl Never { + pub const fn new() -> Self { + Self(PhantomData) + } +} + impl Future for Never { type Output = T; diff --git a/src/lib.rs b/src/lib.rs index 8d8870a..8ad8a4e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,6 +35,8 @@ pub mod utils; pub mod sync; pub mod fut; +#[cfg(feature = "tokio")] +pub mod supervisor; pub mod time; pub mod generic; @@ -46,6 +48,7 @@ pub mod collections; pub mod handling; pub mod hash; +pub use either::{self, Either, Left, Right}; pub use paste::paste; #[doc(hidden)] diff --git a/src/supervisor/mod.rs b/src/supervisor/mod.rs new file mode 100644 index 0000000..62841fd --- /dev/null +++ b/src/supervisor/mod.rs @@ -0,0 +1,152 @@ +use eyre::eyre; +use tokio::{sync::mpsc, task::JoinSet}; + +use crate::{ + collections::{HashMap, hash_map}, + data, + fut::Fut, + str::CompactString, + sync::notifies::mpsc::{Tx, make as make_notify}, +}; + +pub struct SlaveRx { + rx: mpsc::Receiver, +} + +impl SlaveRx { + pub async fn recv(&mut self) -> Option { + self.rx.recv().await + } +} + +pub type CommandResult = Result; + +#[data(crate = crate)] +pub enum Command { + Shutdown, +} + +enum SupervisorCommand { + Ping { pong: Tx }, + + User(Command), +} + +#[data(crate = crate, error)] +pub enum CommandError { + #[display("the receiver is no longer alive")] + Closed, +} + +#[derive(Clone)] +pub struct Handle { + tx: mpsc::Sender, +} + +impl Handle { + pub async fn request_shutdown(&self) -> CommandResult<()> { + self.tx + .send(SupervisorCommand::User(Command::Shutdown)) + .await + .map_err(|_| CommandError::Closed) + } + + pub async fn ping(&self) -> CommandResult<()> { + let (tx, rx) = make_notify(); + + self.tx + .send(SupervisorCommand::Ping { pong: tx }) + .await + .map_err(|_| CommandError::Closed)?; + + rx.await; + + Ok(()) + } +} + +#[derive(Default)] +pub struct Supervisor { + set: JoinSet>, + handles: HashMap, +} + +impl Supervisor { + pub fn empty() -> Self { + Self::default() + } + + pub fn add(mut self, name: &str, f: F) -> Self + where + F: Send + 'static + FnOnce(SlaveRx) -> FFut, + FFut: 'static + Fut>, + { + let (tx, mut rx) = mpsc::channel(8); + let (stx, srx) = mpsc::channel(8); + let handle = Handle { tx }; + let slave_rx = SlaveRx { rx: srx }; + + self.set.spawn(async move { + let fut = f(slave_rx); + tokio::pin!(fut); + loop { + tokio::select! { + res = &mut fut => { + break res; + } + + cmd = rx.recv(), if !rx.is_closed() => { + let Some(cmd) = cmd else { + continue; + }; + + match cmd { + SupervisorCommand::Ping { pong } => { + pong.wake(); + } + + SupervisorCommand::User(u) => { + if stx.send(u).await.is_err() { + rx.close(); + } + } + } + } + } + } + }); + + let hash_map::Entry::Vacant(vac) = self.handles.entry(name.into()) else { + panic!("{name:?} already exists") + }; + + vac.insert(handle); + + self + } + + pub fn iter(&self) -> impl Iterator + use<'_> { + self.handles.iter().map(|(n, h)| (n.as_str(), h)) + } + + #[track_caller] + pub fn handle(&self, h: &str) -> Handle { + self.handles.get(h).expect("handle does not exist").clone() + } + + pub async fn wait_for_completion(mut self) -> eyre::Result<()> { + while let Some(res) = self.set.join_next().await { + let error = match res { + Err(e) => Some(eyre!("failed to join task: {e}")), + Ok(Err(e)) => Some(e), + Ok(Ok(())) => None, + }; + + if let Some(error) = error { + eprintln!("failed to join task: {error:#}"); + } + } + + Ok(()) + } +}