From 209b7de035a0f713a5a16c8fe94832c5bc603858 Mon Sep 17 00:00:00 2001 From: Aleksandr Date: Tue, 8 Jul 2025 00:12:15 +0300 Subject: [PATCH] move to a separate crate --- Cargo.toml | 4 +- src/sync/mod.rs | 7 +- src/sync/mpsc.rs | 80 ----------------- utils/.gitignore | 1 + utils/Cargo.toml | 6 ++ utils/src/lib.rs | 1 + utils/src/sync/chan/mod.rs | 1 + utils/src/sync/chan/oneshot.rs | 67 +++++++++++++++ utils/src/sync/mod.rs | 4 + utils/src/sync/notifies/mod.rs | 1 + utils/src/sync/notifies/mpsc.rs | 148 ++++++++++++++++++++++++++++++++ {src => utils/src}/sync/spin.rs | 19 ++-- 12 files changed, 251 insertions(+), 88 deletions(-) delete mode 100644 src/sync/mpsc.rs create mode 100644 utils/.gitignore create mode 100644 utils/Cargo.toml create mode 100644 utils/src/lib.rs create mode 100644 utils/src/sync/chan/mod.rs create mode 100644 utils/src/sync/chan/oneshot.rs create mode 100644 utils/src/sync/mod.rs create mode 100644 utils/src/sync/notifies/mod.rs create mode 100644 utils/src/sync/notifies/mpsc.rs rename {src => utils/src}/sync/spin.rs (85%) diff --git a/Cargo.toml b/Cargo.toml index 8b1511f..629538f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2024" [workspace] -members = ["macros"] +members = ["macros", "utils"] [features] default = [] @@ -21,6 +21,7 @@ features = [ [dependencies] eva-macros.path = "./macros" +eva-utils.path = "./utils" serde = { version = "1.0", features = ["derive"] } @@ -42,3 +43,4 @@ bytes = { version = "1.10.1", features = ["serde"] } url = { version = "2.5.4", features = ["serde"] } blake3 = "1.8.2" slotmap = { version = "1.0.7", features = ["serde"] } +instant-acme = "0.7.2" diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 9d6d5a6..7eb7957 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1,2 +1,5 @@ -pub mod mpsc; -pub mod spin; +pub use eva_utils::sync::{ + chan, + notifies, + spin, +}; diff --git a/src/sync/mpsc.rs b/src/sync/mpsc.rs deleted file mode 100644 index 1bb0c09..0000000 --- a/src/sync/mpsc.rs +++ /dev/null @@ -1,80 +0,0 @@ -use crate::sync::spin; - -use std::{pin::Pin, sync::{atomic::{AtomicBool, Ordering::Relaxed}, Arc}, task::{Context, Poll, Waker}}; - -struct Cell { - should_wake_up: AtomicBool, - waker: spin::Lock>, -} - -pub struct Rx(Arc); - -impl Rx { - /// Wait for the wake-up flag. - pub async fn wait(&mut self) { - Pin::new(self).await; - } -} - -impl Future for Rx { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - use Poll::*; - - let this = &*self.0; - if this.should_wake_up.swap(false, Relaxed) { - return Ready(()); - } - - { - let Ok(mut waker) = this.waker.try_lock() else { - this.should_wake_up.store(false, Relaxed); - return Ready(()); - }; - match &mut *waker { - Some(prev) => { - prev.clone_from(cx.waker()); - } - None => { - *waker = Some(cx.waker().clone()); - } - } - } - - if this.should_wake_up.swap(false, Relaxed) { - Ready(()) - } else { - Pending - } - } -} - -#[derive(Clone)] -pub struct Tx(Arc); - -impl Tx { - /// Set wake-up flag to associated [`Rx`], if [`Rx`] is currently waiting - - /// it would be woken up. - pub fn wake(&self) { - let cell = &*self.0; - cell.should_wake_up.store(true, Relaxed); - let Ok(waker) = cell.waker.try_lock() else { - return; - }; - let Some(waker) = &*waker else { return; }; - waker.wake_by_ref(); - } -} - -/// Make rx/tx notifier pair. -pub fn make() -> (Tx, Rx) { - let cell = Arc::new(Cell { - should_wake_up: AtomicBool::new(false), - waker: spin::Lock::new(None), - }); - let tx = Tx(Arc::clone(&cell)); - let rx = Rx(cell); - - (tx, rx) -} diff --git a/utils/.gitignore b/utils/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/utils/.gitignore @@ -0,0 +1 @@ +/target diff --git a/utils/Cargo.toml b/utils/Cargo.toml new file mode 100644 index 0000000..b300883 --- /dev/null +++ b/utils/Cargo.toml @@ -0,0 +1,6 @@ +[package] +name = "eva-utils" +version = "0.1.0" +edition = "2024" + +[dependencies] diff --git a/utils/src/lib.rs b/utils/src/lib.rs new file mode 100644 index 0000000..d086d5b --- /dev/null +++ b/utils/src/lib.rs @@ -0,0 +1 @@ +pub mod sync; diff --git a/utils/src/sync/chan/mod.rs b/utils/src/sync/chan/mod.rs new file mode 100644 index 0000000..3dafdb7 --- /dev/null +++ b/utils/src/sync/chan/mod.rs @@ -0,0 +1 @@ +pub mod oneshot; diff --git a/utils/src/sync/chan/oneshot.rs b/utils/src/sync/chan/oneshot.rs new file mode 100644 index 0000000..4a99b80 --- /dev/null +++ b/utils/src/sync/chan/oneshot.rs @@ -0,0 +1,67 @@ +//! # Oneshot channel. + +use std::{ + cell::UnsafeCell, + mem::MaybeUninit, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use crate::sync::notifies::mpsc; + +struct Cell { + item: UnsafeCell>, + notify: mpsc::Cell, +} + +pub struct Rx(Option>>); + +impl Future for Rx { + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + use Poll::*; + + let Some(cell) = self.0.as_deref() else { + return Poll::Ready(None); + }; + + match cell.notify.poll_peek(cx.waker()) { + Ready(()) => { + let item = unsafe { &*cell.item.get() }; + let item = unsafe { item.assume_init_read() }; + + self.get_mut().0 = None; + + Poll::Ready(Some(item)) + } + + Pending => Pending, + } + } +} + +pub struct Tx(Arc>); + +impl Tx { + pub fn send(self, value: T) { + let cell = &*self.0; + let item = unsafe { &mut *cell.item.get() }; + item.write(value); + + cell.notify.wake(); + } +} + +pub fn make() -> (Tx, Rx) { + let cell = Arc::new(Cell { + item: UnsafeCell::new(MaybeUninit::uninit()), + notify: mpsc::Cell::new(), + }); + + let tx = Tx(Arc::clone(&cell)); + let rx = Rx(Some(cell)); + + (tx, rx) +} diff --git a/utils/src/sync/mod.rs b/utils/src/sync/mod.rs new file mode 100644 index 0000000..67401af --- /dev/null +++ b/utils/src/sync/mod.rs @@ -0,0 +1,4 @@ +pub mod spin; + +pub mod chan; +pub mod notifies; diff --git a/utils/src/sync/notifies/mod.rs b/utils/src/sync/notifies/mod.rs new file mode 100644 index 0000000..fac7929 --- /dev/null +++ b/utils/src/sync/notifies/mod.rs @@ -0,0 +1 @@ +pub mod mpsc; diff --git a/utils/src/sync/notifies/mpsc.rs b/utils/src/sync/notifies/mpsc.rs new file mode 100644 index 0000000..58c4c78 --- /dev/null +++ b/utils/src/sync/notifies/mpsc.rs @@ -0,0 +1,148 @@ +//! # Multi producer, single consumer notifier. + +use crate::sync::spin; + +use std::{ + pin::Pin, + sync::{ + Arc, + atomic::{AtomicBool, Ordering::Relaxed}, + }, + task::{Context, Poll, Waker}, +}; + +/// Raw primitive, which is used internally by the notify. +pub struct Cell { + should_wake_up: AtomicBool, + waker: spin::Lock>, +} + +impl Default for Cell { + fn default() -> Self { + Self::new() + } +} + +impl Cell { + pub const fn new() -> Self { + Self { + should_wake_up: AtomicBool::new(false), + waker: spin::Lock::new(None), + } + } + + /// TODO: docs. + pub fn wake(&self) { + self.should_wake_up.store(true, Relaxed); + let Ok(waker) = self.waker.try_lock() else { + return; + }; + let Some(waker) = &*waker else { + return; + }; + waker.wake_by_ref(); + } + + /// TODO: docs. + pub fn poll_peek(&self, new_waker: &Waker) -> Poll<()> { + use Poll::*; + + if self.should_wake_up.load(Relaxed) { + return Ready(()); + } + + { + let Ok(mut waker) = self.waker.try_lock() else { + return Ready(()); + }; + match &mut *waker { + Some(prev) => { + prev.clone_from(new_waker); + } + None => { + *waker = Some(new_waker.clone()); + } + } + } + + if self.should_wake_up.load(Relaxed) { + Ready(()) + } else { + Pending + } + } + + /// TODO: docs. + pub fn poll_once(&self, new_waker: &Waker) -> Poll<()> { + use Poll::*; + + if self.should_wake_up.swap(false, Relaxed) { + return Ready(()); + } + + { + let Ok(mut waker) = self.waker.try_lock() else { + self.should_wake_up.store(false, Relaxed); + return Ready(()); + }; + + match &mut *waker { + Some(prev) => { + prev.clone_from(new_waker); + } + None => { + *waker = Some(new_waker.clone()); + } + } + } + + if self.should_wake_up.swap(false, Relaxed) { + Ready(()) + } else { + Pending + } + } +} + +/// Receiving part of the mpsc notify. +pub struct Rx(Arc); + +impl Rx { + /// Wait for the wake-up flag. + pub async fn wait(&mut self) { + Pin::new(self).await; + } +} + +impl Future for Rx { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.0.poll_once(cx.waker()) + } +} + +/// Sending part of the mpsc notify. +#[derive(Clone)] +pub struct Tx(Arc); + +impl Tx { + /// Set wake-up flag to associated [`Rx`], if [`Rx`] is currently waiting - + /// it would be woken up. + pub fn wake(&self) { + let cell = &*self.0; + cell.wake(); + } +} + +/// Make rx/tx notifier pair. +pub fn make() -> (Tx, Rx) { + let cell = Arc::new(Cell { + should_wake_up: AtomicBool::new(false), + waker: spin::Lock::new(None), + }); + let tx = Tx(Arc::clone(&cell)); + let rx = Rx(cell); + + (tx, rx) +} diff --git a/src/sync/spin.rs b/utils/src/sync/spin.rs similarity index 85% rename from src/sync/spin.rs rename to utils/src/sync/spin.rs index 02fd23e..ab799e8 100644 --- a/src/sync/spin.rs +++ b/utils/src/sync/spin.rs @@ -1,15 +1,24 @@ +//! # Simple spinlock. + use std::{ - sync::atomic::{AtomicBool, Ordering}, cell::UnsafeCell, + error::Error, + fmt, hint, ops::{Deref, DerefMut}, - hint, + sync::atomic::{AtomicBool, Ordering}, }; -use crate::data; - -#[data(error, copy, display("the lock is contended"), crate = crate)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub struct Contended; +impl fmt::Display for Contended { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("the lock is contended") + } +} + +impl Error for Contended {} + pub struct Lock { locked: AtomicBool, data: UnsafeCell,