From 050eedc7c8d0d343977b28deb8d78c6252f46a23 Mon Sep 17 00:00:00 2001 From: Aleksandr Date: Fri, 2 May 2025 20:37:44 +0300 Subject: [PATCH] implement --- Cargo.toml | 22 +++++++++ README.md | 3 ++ src/lib.rs | 10 ++++ src/spmc.rs | 125 ++++++++++++++++++++++++++++++++++++++++++++++++ src/spsc.rs | 85 ++++++++++++++++++++++++++++++++ src/utils.rs | 13 +++++ src/volmutex.rs | 83 ++++++++++++++++++++++++++++++++ 7 files changed, 341 insertions(+) create mode 100644 Cargo.toml create mode 100644 src/lib.rs create mode 100644 src/spmc.rs create mode 100644 src/spsc.rs create mode 100644 src/utils.rs create mode 100644 src/volmutex.rs diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..5020f82 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "notifies" +version = "0.1.0" +edition = "2021" +authors = ["nerodono "] +description = "various efficient async notifies" +homepage = "https://git.viende.su/VienDesu/notifies" +repository = "https://git.viende.su/VienDesu/notifies" + +license = "MIT" +keywords = ["async", "atomic", "waker"] + +[dependencies] + +[dev-dependencies] +tokio = { version = "1.44.2", features = [ + "rt", + "rt-multi-thread", + "sync", + "macros", + "time", +] } diff --git a/README.md b/README.md index 159edc9..1ae5bc9 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,5 @@ # notifies +Various efficient notifies for your async code. + + diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..a1b7301 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,10 @@ +//! # Notifies +//! +//! - [`spmc`] - single producer, multi consumer +//! - [`spsc`] - single producer, single consumer + +pub mod spmc; +pub mod spsc; + +mod utils; +mod volmutex; diff --git a/src/spmc.rs b/src/spmc.rs new file mode 100644 index 0000000..7922100 --- /dev/null +++ b/src/spmc.rs @@ -0,0 +1,125 @@ +//! # Single producer, multi-consumer notify +//! +//! See [`Master`] for more info on it. + +use crate::spsc; + +/// Single producer, multi-consumer notify. Essentially just collection of +/// [`spsc`] masters. +#[derive(Default)] +pub struct Master { + masters: Vec, +} + +impl Master { + /// Create master. + pub const fn new() -> Self { + Self { + masters: Vec::new(), + } + } + + /// Preallocate storage for `cap` slaves. + pub fn with_capacity(cap: usize) -> Self { + Self { + masters: Vec::with_capacity(cap), + } + } + + /// Reserve space for at least `additional` more slaves. + pub fn reserve(&mut self, additional: usize) { + self.masters.reserve(additional); + } +} + +impl Master { + /// Register slave and return handle to it. Dropping slave + /// does nothing, as for now, space reclamation is not implemented. + pub fn make_slave(&mut self) -> spsc::Slave { + let (master, slave) = spsc::make(); + self.masters.push(master); + + slave + } + + /// Get number of currently registered slaves. + pub fn len(&self) -> usize { + self.masters.len() + } + + /// if [`Master::len`] == 0. + pub fn is_empty(&self) -> bool { + self.masters.is_empty() + } + + /// Notify every registered slave. In tokio terms, this method stores + /// permit for every slave, thus next await of [`spsc::Slave`] will wake-up + /// immediately. Only one permit is stored. + pub fn notify_waiters(&mut self) { + for master in self.masters.iter_mut() { + master.notify(); + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use tokio::{sync::mpsc, time::timeout}; + + use super::*; + + type Rx = mpsc::UnboundedReceiver; + + async fn task(mut slave: spsc::Slave, tx: impl Fn() + Send + 'static) { + loop { + (&mut slave).await; + tx(); + } + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_wakeups() { + const WORKERS: usize = 2000; + const TRIES: usize = 4; + const TIMEOUT: Duration = Duration::from_millis(500); + + let correct: Vec = (0..WORKERS).collect(); + let mut buf = Vec::with_capacity(WORKERS); + + async fn case(rx: &mut Rx, correct: &[usize], buf: &mut Vec) { + unsafe { buf.set_len(0) }; + for _ in 0..correct.len() { + let item = timeout(TIMEOUT, rx.recv()) + .await + .expect("timed out") + .unwrap(); + buf.push(item); + } + buf.sort_unstable(); + + assert_eq!(buf, correct); + } + + let mut master = Master::default(); + let (tx, mut rx) = mpsc::unbounded_channel(); + + for worker_id in 0..WORKERS { + let slave = master.make_slave(); + let tx = tx.clone(); + tokio::spawn(task(slave, move || tx.send(worker_id).unwrap())); + } + + for try_no in 0..TRIES { + master.notify_waiters(); + eprint!("try#{try_no}..."); + case(&mut rx, &correct, &mut buf).await; + eprintln!(" success"); + } + + assert!(timeout(Duration::from_millis(100), rx.recv()) + .await + .is_err()); + } +} diff --git a/src/spsc.rs b/src/spsc.rs new file mode 100644 index 0000000..b58e0e5 --- /dev/null +++ b/src/spsc.rs @@ -0,0 +1,85 @@ +use std::{ + future::Future, + pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering::Relaxed}, + Arc, + }, + task::{Context, Poll, Waker}, +}; + +use crate::{utils::try_ready, volmutex::VolMutex}; + +/// Notification receiver. Await it to wait for notification. +pub struct Slave(Arc); + +struct State { + should_wake_up: AtomicBool, + waker: VolMutex>, +} + +impl Future for Slave { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let state = &*self.get_mut().0; + if state.should_wake_up.swap(false, Relaxed) { + return Poll::Ready(()); + } + + { + let Some(mut locked) = state.waker.try_lock() else { + state.should_wake_up.store(false, Relaxed); + return Poll::Ready(()); + }; + + let maybe_waker = &mut *locked; + if let Some(waker) = maybe_waker { + waker.clone_from(cx.waker()); + } else { + *maybe_waker = Some(cx.waker().clone()); + } + } + + try_ready(&state.should_wake_up) + } +} + +pub struct Master { + slave: Arc, + stored_waker: Option, +} + +impl Master { + /// Notify waiter. If [`Slave`] wasn't listening to the notification right now, + /// next await would return immediately. In tokio terms, wake-up permit is stored, + /// only one wake-up permit is kept. + pub fn notify(&mut self) { + self.slave.should_wake_up.store(true, Relaxed); + { + let Some(locked) = self.slave.waker.try_lock() else { + return; + }; + self.stored_waker.clone_from(&*locked); + } + + if let Some(waker) = &self.stored_waker { + waker.wake_by_ref(); + } + } +} + +/// Make master/slave pair. +pub fn make() -> (Master, Slave) { + let state = Arc::new(State { + should_wake_up: AtomicBool::new(false), + waker: VolMutex::new(None), + }); + let master = Master { + slave: Arc::clone(&state), + stored_waker: None, + }; + let slave = Slave(state); + + (master, slave) +} diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..e9a30d2 --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,13 @@ +use std::{ + sync::atomic::{AtomicBool, Ordering::Relaxed}, + task::Poll, +}; + +#[inline] +pub fn try_ready(flag: &AtomicBool) -> Poll<()> { + if flag.swap(false, Relaxed) { + Poll::Ready(()) + } else { + Poll::Pending + } +} diff --git a/src/volmutex.rs b/src/volmutex.rs new file mode 100644 index 0000000..a38041f --- /dev/null +++ b/src/volmutex.rs @@ -0,0 +1,83 @@ +//! # Volunary mutex +//! +//! Simple mutex which can be acquired only in non-waiting manner. + +use std::{ + cell::UnsafeCell, + ops::{Deref, DerefMut}, + sync::atomic::{ + AtomicBool, + Ordering::{Acquire, Relaxed, Release}, + }, +}; + +pub struct VolMutex { + mutex: RawVolMutex, + data: UnsafeCell, +} + +unsafe impl Send for VolMutex {} +unsafe impl Sync for VolMutex {} + +impl VolMutex { + pub const fn new(data: T) -> Self { + Self { + mutex: RawVolMutex::new(), + data: UnsafeCell::new(data), + } + } + + pub fn try_lock(&self) -> Option> { + self.mutex.try_lock().map(|guard| VolGuard { + _guard: guard, + data: &self.data, + }) + } +} + +pub struct VolGuard<'a, T> { + _guard: RawVolGuard<'a>, + data: &'a UnsafeCell, +} + +impl DerefMut for VolGuard<'_, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { &mut *self.data.get() } + } +} + +impl Deref for VolGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + unsafe { &*self.data.get() } + } +} + +#[derive(Debug, Default)] +pub struct RawVolMutex { + flag: AtomicBool, +} + +impl RawVolMutex { + pub const fn new() -> Self { + Self { + flag: AtomicBool::new(false), + } + } + + pub fn try_lock(&self) -> Option> { + self.flag + .compare_exchange(false, true, Acquire, Relaxed) + .is_ok() + .then_some(RawVolGuard(&self.flag)) + } +} + +pub struct RawVolGuard<'a>(&'a AtomicBool); + +impl Drop for RawVolGuard<'_> { + fn drop(&mut self) { + self.0.store(false, Release); + } +}