notifies/src/spmc.rs
2025-05-02 20:37:44 +03:00

120 lines
3.1 KiB
Rust

//! # Single producer, multi-consumer notify
//!
//! See [`Master`] for more info on it.
use crate::spsc;
/// Single producer, multi-consumer notify. Essentially just collection of
/// [`spsc`] masters.
#[derive(Default)]
pub struct Master {
masters: Vec<spsc::Master>,
}
impl Master {
/// Create master.
pub const fn new() -> Self {
Self {
masters: Vec::new(),
}
}
/// Preallocate storage for `cap` slaves.
pub fn with_capacity(cap: usize) -> Self {
Self {
masters: Vec::with_capacity(cap),
}
}
}
impl Master {
/// Register slave and return handle to it. Dropping slave
/// does nothing, as for now, space reclamation is not implemented.
pub fn make_slave(&mut self) -> spsc::Slave {
let (master, slave) = spsc::make();
self.masters.push(master);
slave
}
/// Get number of currently registered slaves.
pub fn len(&self) -> usize {
self.masters.len()
}
/// if [`Master::len`] == 0.
pub fn is_empty(&self) -> bool {
self.masters.is_empty()
}
/// Notify every registered slave. In tokio terms, this method stores
/// permit for every slave, thus next await of [`spsc::Slave`] will wake-up
/// immediately. Only one permit is stored.
pub fn notify_waiters(&mut self) {
for master in self.masters.iter_mut() {
master.notify();
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use tokio::{sync::mpsc, time::timeout};
use super::*;
type Rx = mpsc::UnboundedReceiver<usize>;
async fn task(mut slave: spsc::Slave, tx: impl Fn() + Send + 'static) {
loop {
(&mut slave).await;
tx();
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_wakeups() {
const WORKERS: usize = 2000;
const TRIES: usize = 4;
const TIMEOUT: Duration = Duration::from_millis(500);
let correct: Vec<usize> = (0..WORKERS).collect();
let mut buf = Vec::with_capacity(WORKERS);
async fn case(rx: &mut Rx, correct: &[usize], buf: &mut Vec<usize>) {
unsafe { buf.set_len(0) };
for _ in 0..correct.len() {
let item = timeout(TIMEOUT, rx.recv())
.await
.expect("timed out")
.unwrap();
buf.push(item);
}
buf.sort_unstable();
assert_eq!(buf, correct);
}
let mut master = Master::default();
let (tx, mut rx) = mpsc::unbounded_channel();
for worker_id in 0..WORKERS {
let slave = master.make_slave();
let tx = tx.clone();
tokio::spawn(task(slave, move || tx.send(worker_id).unwrap()));
}
for try_no in 0..TRIES {
master.notify_waiters();
eprint!("try#{try_no}...");
case(&mut rx, &correct, &mut buf).await;
eprintln!(" success");
}
assert!(timeout(Duration::from_millis(100), rx.recv())
.await
.is_err());
}
}