From dc7b1d1f410708932982c70336a81f6270caa315 Mon Sep 17 00:00:00 2001 From: Aleksandr Date: Mon, 7 Jul 2025 23:34:20 +0300 Subject: [PATCH] notifies --- src/lib.rs | 2 ++ src/sync/mod.rs | 2 ++ src/sync/mpsc.rs | 80 ++++++++++++++++++++++++++++++++++++++++++++++ src/sync/spin.rs | 83 ++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 167 insertions(+) create mode 100644 src/sync/mod.rs create mode 100644 src/sync/mpsc.rs create mode 100644 src/sync/spin.rs diff --git a/src/lib.rs b/src/lib.rs index 00d1928..7e43cb1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,6 +29,8 @@ pub mod array; pub mod error; pub mod utils; +pub mod sync; + pub mod fut; pub mod time; diff --git a/src/sync/mod.rs b/src/sync/mod.rs new file mode 100644 index 0000000..9d6d5a6 --- /dev/null +++ b/src/sync/mod.rs @@ -0,0 +1,2 @@ +pub mod mpsc; +pub mod spin; diff --git a/src/sync/mpsc.rs b/src/sync/mpsc.rs new file mode 100644 index 0000000..1bb0c09 --- /dev/null +++ b/src/sync/mpsc.rs @@ -0,0 +1,80 @@ +use crate::sync::spin; + +use std::{pin::Pin, sync::{atomic::{AtomicBool, Ordering::Relaxed}, Arc}, task::{Context, Poll, Waker}}; + +struct Cell { + should_wake_up: AtomicBool, + waker: spin::Lock>, +} + +pub struct Rx(Arc); + +impl Rx { + /// Wait for the wake-up flag. + pub async fn wait(&mut self) { + Pin::new(self).await; + } +} + +impl Future for Rx { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + use Poll::*; + + let this = &*self.0; + if this.should_wake_up.swap(false, Relaxed) { + return Ready(()); + } + + { + let Ok(mut waker) = this.waker.try_lock() else { + this.should_wake_up.store(false, Relaxed); + return Ready(()); + }; + match &mut *waker { + Some(prev) => { + prev.clone_from(cx.waker()); + } + None => { + *waker = Some(cx.waker().clone()); + } + } + } + + if this.should_wake_up.swap(false, Relaxed) { + Ready(()) + } else { + Pending + } + } +} + +#[derive(Clone)] +pub struct Tx(Arc); + +impl Tx { + /// Set wake-up flag to associated [`Rx`], if [`Rx`] is currently waiting - + /// it would be woken up. + pub fn wake(&self) { + let cell = &*self.0; + cell.should_wake_up.store(true, Relaxed); + let Ok(waker) = cell.waker.try_lock() else { + return; + }; + let Some(waker) = &*waker else { return; }; + waker.wake_by_ref(); + } +} + +/// Make rx/tx notifier pair. +pub fn make() -> (Tx, Rx) { + let cell = Arc::new(Cell { + should_wake_up: AtomicBool::new(false), + waker: spin::Lock::new(None), + }); + let tx = Tx(Arc::clone(&cell)); + let rx = Rx(cell); + + (tx, rx) +} diff --git a/src/sync/spin.rs b/src/sync/spin.rs new file mode 100644 index 0000000..02fd23e --- /dev/null +++ b/src/sync/spin.rs @@ -0,0 +1,83 @@ +use std::{ + sync::atomic::{AtomicBool, Ordering}, + cell::UnsafeCell, + ops::{Deref, DerefMut}, + hint, +}; + +use crate::data; + +#[data(error, copy, display("the lock is contended"), crate = crate)] +pub struct Contended; + +pub struct Lock { + locked: AtomicBool, + data: UnsafeCell, +} + +unsafe impl Send for Lock {} +unsafe impl Sync for Lock {} + +impl Default for Lock { + fn default() -> Self { + Self::new(T::default()) + } +} + +impl Lock { + pub const fn new(data: T) -> Self { + Self { + locked: AtomicBool::new(false), + data: UnsafeCell::new(data), + } + } + + /// Get exclusive access, spin until it's available. + pub fn lock(&self) -> Guard<'_, T> { + loop { + match self.try_lock() { + Ok(g) => return g, + Err(Contended) => {} + } + hint::spin_loop(); + } + } + + /// Try get exclusive access without spinning. + pub fn try_lock(&self) -> Result, Contended> { + if self.locked.swap(true, Ordering::Acquire) { + Err(Contended) + } else { + Ok(Guard { + ref_: unsafe { &mut *self.data.get() }, + unlock: &self.locked, + }) + } + } +} + +#[derive(Debug)] +pub struct Guard<'a, T: ?Sized> { + ref_: &'a mut T, + unlock: &'a AtomicBool, +} + +impl<'a, T: ?Sized> Deref for Guard<'a, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + &*self.ref_ + } +} + +impl<'a, T: ?Sized> DerefMut for Guard<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut *self.ref_ + } +} + +impl<'a, T: ?Sized> Drop for Guard<'a, T> { + fn drop(&mut self) { + self.unlock.store(false, Ordering::Release); + } +}