add supervisor

This commit is contained in:
Aleksandr 2025-09-20 17:02:10 +03:00
parent 752c859fd7
commit 016350a480
4 changed files with 172 additions and 3 deletions

View file

@ -7,8 +7,15 @@ edition = "2024"
members = ["macros", "utils"]
[features]
default = []
cli = ["dep:clap", "dep:figment", "dep:tokio", "dep:color-eyre", "dep:num_cpus"]
default = ["tokio"]
cli = [
"tokio/rt-multi-thread",
"dep:clap",
"dep:figment",
"dep:color-eyre",
"dep:num_cpus",
]
tokio = ["dep:tokio"]
# Very long running.
get_time_test = []
@ -47,7 +54,8 @@ slotmap = { version = "1.0.7", features = ["serde"] }
clap = { version = "4.5.47", features = ["derive"], optional = true }
figment = { version = "0.10.19", features = ["json", "yaml", "env", "toml"], optional = true }
tokio = { version = "1.47.1", features = ["rt", "rt-multi-thread"], optional = true }
tokio = { version = "1.47.1", features = ["rt", "sync", "macros"], optional = true }
color-eyre = { version = "0.6.5", optional = true }
eyre = { version = "0.6.12" }
num_cpus = { version = "1.17.0", optional = true }
either = "1.15.0"

View file

@ -8,6 +8,12 @@ use std::{
#[crate::perfect_derive(Debug, Clone, Default, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct Never<T>(PhantomData<T>);
impl<T> Never<T> {
pub const fn new() -> Self {
Self(PhantomData)
}
}
impl<T> Future for Never<T> {
type Output = T;

View file

@ -35,6 +35,8 @@ pub mod utils;
pub mod sync;
pub mod fut;
#[cfg(feature = "tokio")]
pub mod supervisor;
pub mod time;
pub mod generic;
@ -46,6 +48,7 @@ pub mod collections;
pub mod handling;
pub mod hash;
pub use either::{self, Either, Left, Right};
pub use paste::paste;
#[doc(hidden)]

152
src/supervisor/mod.rs Normal file
View file

@ -0,0 +1,152 @@
use eyre::eyre;
use tokio::{sync::mpsc, task::JoinSet};
use crate::{
collections::{HashMap, hash_map},
data,
fut::Fut,
str::CompactString,
sync::notifies::mpsc::{Tx, make as make_notify},
};
pub struct SlaveRx {
rx: mpsc::Receiver<Command>,
}
impl SlaveRx {
pub async fn recv(&mut self) -> Option<Command> {
self.rx.recv().await
}
}
pub type CommandResult<T> = Result<T, CommandError>;
#[data(crate = crate)]
pub enum Command {
Shutdown,
}
enum SupervisorCommand {
Ping { pong: Tx },
User(Command),
}
#[data(crate = crate, error)]
pub enum CommandError {
#[display("the receiver is no longer alive")]
Closed,
}
#[derive(Clone)]
pub struct Handle {
tx: mpsc::Sender<SupervisorCommand>,
}
impl Handle {
pub async fn request_shutdown(&self) -> CommandResult<()> {
self.tx
.send(SupervisorCommand::User(Command::Shutdown))
.await
.map_err(|_| CommandError::Closed)
}
pub async fn ping(&self) -> CommandResult<()> {
let (tx, rx) = make_notify();
self.tx
.send(SupervisorCommand::Ping { pong: tx })
.await
.map_err(|_| CommandError::Closed)?;
rx.await;
Ok(())
}
}
#[derive(Default)]
pub struct Supervisor {
set: JoinSet<eyre::Result<()>>,
handles: HashMap<CompactString, Handle>,
}
impl Supervisor {
pub fn empty() -> Self {
Self::default()
}
pub fn add<F, FFut>(mut self, name: &str, f: F) -> Self
where
F: Send + 'static + FnOnce(SlaveRx) -> FFut,
FFut: 'static + Fut<Output = eyre::Result<()>>,
{
let (tx, mut rx) = mpsc::channel(8);
let (stx, srx) = mpsc::channel(8);
let handle = Handle { tx };
let slave_rx = SlaveRx { rx: srx };
self.set.spawn(async move {
let fut = f(slave_rx);
tokio::pin!(fut);
loop {
tokio::select! {
res = &mut fut => {
break res;
}
cmd = rx.recv(), if !rx.is_closed() => {
let Some(cmd) = cmd else {
continue;
};
match cmd {
SupervisorCommand::Ping { pong } => {
pong.wake();
}
SupervisorCommand::User(u) => {
if stx.send(u).await.is_err() {
rx.close();
}
}
}
}
}
}
});
let hash_map::Entry::Vacant(vac) = self.handles.entry(name.into()) else {
panic!("{name:?} already exists")
};
vac.insert(handle);
self
}
pub fn iter(&self) -> impl Iterator<Item = (&str, &Handle)> + use<'_> {
self.handles.iter().map(|(n, h)| (n.as_str(), h))
}
#[track_caller]
pub fn handle(&self, h: &str) -> Handle {
self.handles.get(h).expect("handle does not exist").clone()
}
pub async fn wait_for_completion(mut self) -> eyre::Result<()> {
while let Some(res) = self.set.join_next().await {
let error = match res {
Err(e) => Some(eyre!("failed to join task: {e}")),
Ok(Err(e)) => Some(e),
Ok(Ok(())) => None,
};
if let Some(error) = error {
eprintln!("failed to join task: {error:#}");
}
}
Ok(())
}
}