implement

This commit is contained in:
Aleksandr 2025-05-02 20:37:44 +03:00
parent d58b347140
commit 050eedc7c8
7 changed files with 341 additions and 0 deletions

22
Cargo.toml Normal file
View file

@ -0,0 +1,22 @@
[package]
name = "notifies"
version = "0.1.0"
edition = "2021"
authors = ["nerodono <nerodono0@gmail.com>"]
description = "various efficient async notifies"
homepage = "https://git.viende.su/VienDesu/notifies"
repository = "https://git.viende.su/VienDesu/notifies"
license = "MIT"
keywords = ["async", "atomic", "waker"]
[dependencies]
[dev-dependencies]
tokio = { version = "1.44.2", features = [
"rt",
"rt-multi-thread",
"sync",
"macros",
"time",
] }

View file

@ -1,2 +1,5 @@
# notifies
Various efficient notifies for your async code.

10
src/lib.rs Normal file
View file

@ -0,0 +1,10 @@
//! # Notifies
//!
//! - [`spmc`] - single producer, multi consumer
//! - [`spsc`] - single producer, single consumer
pub mod spmc;
pub mod spsc;
mod utils;
mod volmutex;

125
src/spmc.rs Normal file
View file

@ -0,0 +1,125 @@
//! # Single producer, multi-consumer notify
//!
//! See [`Master`] for more info on it.
use crate::spsc;
/// Single producer, multi-consumer notify. Essentially just collection of
/// [`spsc`] masters.
#[derive(Default)]
pub struct Master {
masters: Vec<spsc::Master>,
}
impl Master {
/// Create master.
pub const fn new() -> Self {
Self {
masters: Vec::new(),
}
}
/// Preallocate storage for `cap` slaves.
pub fn with_capacity(cap: usize) -> Self {
Self {
masters: Vec::with_capacity(cap),
}
}
/// Reserve space for at least `additional` more slaves.
pub fn reserve(&mut self, additional: usize) {
self.masters.reserve(additional);
}
}
impl Master {
/// Register slave and return handle to it. Dropping slave
/// does nothing, as for now, space reclamation is not implemented.
pub fn make_slave(&mut self) -> spsc::Slave {
let (master, slave) = spsc::make();
self.masters.push(master);
slave
}
/// Get number of currently registered slaves.
pub fn len(&self) -> usize {
self.masters.len()
}
/// if [`Master::len`] == 0.
pub fn is_empty(&self) -> bool {
self.masters.is_empty()
}
/// Notify every registered slave. In tokio terms, this method stores
/// permit for every slave, thus next await of [`spsc::Slave`] will wake-up
/// immediately. Only one permit is stored.
pub fn notify_waiters(&mut self) {
for master in self.masters.iter_mut() {
master.notify();
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use tokio::{sync::mpsc, time::timeout};
use super::*;
type Rx = mpsc::UnboundedReceiver<usize>;
async fn task(mut slave: spsc::Slave, tx: impl Fn() + Send + 'static) {
loop {
(&mut slave).await;
tx();
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_wakeups() {
const WORKERS: usize = 2000;
const TRIES: usize = 4;
const TIMEOUT: Duration = Duration::from_millis(500);
let correct: Vec<usize> = (0..WORKERS).collect();
let mut buf = Vec::with_capacity(WORKERS);
async fn case(rx: &mut Rx, correct: &[usize], buf: &mut Vec<usize>) {
unsafe { buf.set_len(0) };
for _ in 0..correct.len() {
let item = timeout(TIMEOUT, rx.recv())
.await
.expect("timed out")
.unwrap();
buf.push(item);
}
buf.sort_unstable();
assert_eq!(buf, correct);
}
let mut master = Master::default();
let (tx, mut rx) = mpsc::unbounded_channel();
for worker_id in 0..WORKERS {
let slave = master.make_slave();
let tx = tx.clone();
tokio::spawn(task(slave, move || tx.send(worker_id).unwrap()));
}
for try_no in 0..TRIES {
master.notify_waiters();
eprint!("try#{try_no}...");
case(&mut rx, &correct, &mut buf).await;
eprintln!(" success");
}
assert!(timeout(Duration::from_millis(100), rx.recv())
.await
.is_err());
}
}

85
src/spsc.rs Normal file
View file

@ -0,0 +1,85 @@
use std::{
future::Future,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering::Relaxed},
Arc,
},
task::{Context, Poll, Waker},
};
use crate::{utils::try_ready, volmutex::VolMutex};
/// Notification receiver. Await it to wait for notification.
pub struct Slave(Arc<State>);
struct State {
should_wake_up: AtomicBool,
waker: VolMutex<Option<Waker>>,
}
impl Future for Slave {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let state = &*self.get_mut().0;
if state.should_wake_up.swap(false, Relaxed) {
return Poll::Ready(());
}
{
let Some(mut locked) = state.waker.try_lock() else {
state.should_wake_up.store(false, Relaxed);
return Poll::Ready(());
};
let maybe_waker = &mut *locked;
if let Some(waker) = maybe_waker {
waker.clone_from(cx.waker());
} else {
*maybe_waker = Some(cx.waker().clone());
}
}
try_ready(&state.should_wake_up)
}
}
pub struct Master {
slave: Arc<State>,
stored_waker: Option<Waker>,
}
impl Master {
/// Notify waiter. If [`Slave`] wasn't listening to the notification right now,
/// next await would return immediately. In tokio terms, wake-up permit is stored,
/// only one wake-up permit is kept.
pub fn notify(&mut self) {
self.slave.should_wake_up.store(true, Relaxed);
{
let Some(locked) = self.slave.waker.try_lock() else {
return;
};
self.stored_waker.clone_from(&*locked);
}
if let Some(waker) = &self.stored_waker {
waker.wake_by_ref();
}
}
}
/// Make master/slave pair.
pub fn make() -> (Master, Slave) {
let state = Arc::new(State {
should_wake_up: AtomicBool::new(false),
waker: VolMutex::new(None),
});
let master = Master {
slave: Arc::clone(&state),
stored_waker: None,
};
let slave = Slave(state);
(master, slave)
}

13
src/utils.rs Normal file
View file

@ -0,0 +1,13 @@
use std::{
sync::atomic::{AtomicBool, Ordering::Relaxed},
task::Poll,
};
#[inline]
pub fn try_ready(flag: &AtomicBool) -> Poll<()> {
if flag.swap(false, Relaxed) {
Poll::Ready(())
} else {
Poll::Pending
}
}

83
src/volmutex.rs Normal file
View file

@ -0,0 +1,83 @@
//! # Volunary mutex
//!
//! Simple mutex which can be acquired only in non-waiting manner.
use std::{
cell::UnsafeCell,
ops::{Deref, DerefMut},
sync::atomic::{
AtomicBool,
Ordering::{Acquire, Relaxed, Release},
},
};
pub struct VolMutex<T> {
mutex: RawVolMutex,
data: UnsafeCell<T>,
}
unsafe impl<T: Send> Send for VolMutex<T> {}
unsafe impl<T: Send> Sync for VolMutex<T> {}
impl<T> VolMutex<T> {
pub const fn new(data: T) -> Self {
Self {
mutex: RawVolMutex::new(),
data: UnsafeCell::new(data),
}
}
pub fn try_lock(&self) -> Option<VolGuard<'_, T>> {
self.mutex.try_lock().map(|guard| VolGuard {
_guard: guard,
data: &self.data,
})
}
}
pub struct VolGuard<'a, T> {
_guard: RawVolGuard<'a>,
data: &'a UnsafeCell<T>,
}
impl<T> DerefMut for VolGuard<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut *self.data.get() }
}
}
impl<T> Deref for VolGuard<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { &*self.data.get() }
}
}
#[derive(Debug, Default)]
pub struct RawVolMutex {
flag: AtomicBool,
}
impl RawVolMutex {
pub const fn new() -> Self {
Self {
flag: AtomicBool::new(false),
}
}
pub fn try_lock(&self) -> Option<RawVolGuard<'_>> {
self.flag
.compare_exchange(false, true, Acquire, Relaxed)
.is_ok()
.then_some(RawVolGuard(&self.flag))
}
}
pub struct RawVolGuard<'a>(&'a AtomicBool);
impl Drop for RawVolGuard<'_> {
fn drop(&mut self) {
self.0.store(false, Release);
}
}