move to a separate crate

This commit is contained in:
Aleksandr 2025-07-08 00:12:15 +03:00
parent dc7b1d1f41
commit 209b7de035
12 changed files with 251 additions and 88 deletions

1
utils/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/target

6
utils/Cargo.toml Normal file
View file

@ -0,0 +1,6 @@
[package]
name = "eva-utils"
version = "0.1.0"
edition = "2024"
[dependencies]

1
utils/src/lib.rs Normal file
View file

@ -0,0 +1 @@
pub mod sync;

View file

@ -0,0 +1 @@
pub mod oneshot;

View file

@ -0,0 +1,67 @@
//! # Oneshot channel.
use std::{
cell::UnsafeCell,
mem::MaybeUninit,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use crate::sync::notifies::mpsc;
struct Cell<T> {
item: UnsafeCell<MaybeUninit<T>>,
notify: mpsc::Cell,
}
pub struct Rx<T>(Option<Arc<Cell<T>>>);
impl<T> Future for Rx<T> {
type Output = Option<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use Poll::*;
let Some(cell) = self.0.as_deref() else {
return Poll::Ready(None);
};
match cell.notify.poll_peek(cx.waker()) {
Ready(()) => {
let item = unsafe { &*cell.item.get() };
let item = unsafe { item.assume_init_read() };
self.get_mut().0 = None;
Poll::Ready(Some(item))
}
Pending => Pending,
}
}
}
pub struct Tx<T>(Arc<Cell<T>>);
impl<T> Tx<T> {
pub fn send(self, value: T) {
let cell = &*self.0;
let item = unsafe { &mut *cell.item.get() };
item.write(value);
cell.notify.wake();
}
}
pub fn make<T>() -> (Tx<T>, Rx<T>) {
let cell = Arc::new(Cell {
item: UnsafeCell::new(MaybeUninit::uninit()),
notify: mpsc::Cell::new(),
});
let tx = Tx(Arc::clone(&cell));
let rx = Rx(Some(cell));
(tx, rx)
}

4
utils/src/sync/mod.rs Normal file
View file

@ -0,0 +1,4 @@
pub mod spin;
pub mod chan;
pub mod notifies;

View file

@ -0,0 +1 @@
pub mod mpsc;

View file

@ -0,0 +1,148 @@
//! # Multi producer, single consumer notifier.
use crate::sync::spin;
use std::{
pin::Pin,
sync::{
Arc,
atomic::{AtomicBool, Ordering::Relaxed},
},
task::{Context, Poll, Waker},
};
/// Raw primitive, which is used internally by the notify.
pub struct Cell {
should_wake_up: AtomicBool,
waker: spin::Lock<Option<Waker>>,
}
impl Default for Cell {
fn default() -> Self {
Self::new()
}
}
impl Cell {
pub const fn new() -> Self {
Self {
should_wake_up: AtomicBool::new(false),
waker: spin::Lock::new(None),
}
}
/// TODO: docs.
pub fn wake(&self) {
self.should_wake_up.store(true, Relaxed);
let Ok(waker) = self.waker.try_lock() else {
return;
};
let Some(waker) = &*waker else {
return;
};
waker.wake_by_ref();
}
/// TODO: docs.
pub fn poll_peek(&self, new_waker: &Waker) -> Poll<()> {
use Poll::*;
if self.should_wake_up.load(Relaxed) {
return Ready(());
}
{
let Ok(mut waker) = self.waker.try_lock() else {
return Ready(());
};
match &mut *waker {
Some(prev) => {
prev.clone_from(new_waker);
}
None => {
*waker = Some(new_waker.clone());
}
}
}
if self.should_wake_up.load(Relaxed) {
Ready(())
} else {
Pending
}
}
/// TODO: docs.
pub fn poll_once(&self, new_waker: &Waker) -> Poll<()> {
use Poll::*;
if self.should_wake_up.swap(false, Relaxed) {
return Ready(());
}
{
let Ok(mut waker) = self.waker.try_lock() else {
self.should_wake_up.store(false, Relaxed);
return Ready(());
};
match &mut *waker {
Some(prev) => {
prev.clone_from(new_waker);
}
None => {
*waker = Some(new_waker.clone());
}
}
}
if self.should_wake_up.swap(false, Relaxed) {
Ready(())
} else {
Pending
}
}
}
/// Receiving part of the mpsc notify.
pub struct Rx(Arc<Cell>);
impl Rx {
/// Wait for the wake-up flag.
pub async fn wait(&mut self) {
Pin::new(self).await;
}
}
impl Future for Rx {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.poll_once(cx.waker())
}
}
/// Sending part of the mpsc notify.
#[derive(Clone)]
pub struct Tx(Arc<Cell>);
impl Tx {
/// Set wake-up flag to associated [`Rx`], if [`Rx`] is currently waiting -
/// it would be woken up.
pub fn wake(&self) {
let cell = &*self.0;
cell.wake();
}
}
/// Make rx/tx notifier pair.
pub fn make() -> (Tx, Rx) {
let cell = Arc::new(Cell {
should_wake_up: AtomicBool::new(false),
waker: spin::Lock::new(None),
});
let tx = Tx(Arc::clone(&cell));
let rx = Rx(cell);
(tx, rx)
}

92
utils/src/sync/spin.rs Normal file
View file

@ -0,0 +1,92 @@
//! # Simple spinlock.
use std::{
cell::UnsafeCell,
error::Error,
fmt, hint,
ops::{Deref, DerefMut},
sync::atomic::{AtomicBool, Ordering},
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct Contended;
impl fmt::Display for Contended {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("the lock is contended")
}
}
impl Error for Contended {}
pub struct Lock<T: ?Sized> {
locked: AtomicBool,
data: UnsafeCell<T>,
}
unsafe impl<T: ?Sized> Send for Lock<T> {}
unsafe impl<T: ?Sized> Sync for Lock<T> {}
impl<T: Default> Default for Lock<T> {
fn default() -> Self {
Self::new(T::default())
}
}
impl<T> Lock<T> {
pub const fn new(data: T) -> Self {
Self {
locked: AtomicBool::new(false),
data: UnsafeCell::new(data),
}
}
/// Get exclusive access, spin until it's available.
pub fn lock(&self) -> Guard<'_, T> {
loop {
match self.try_lock() {
Ok(g) => return g,
Err(Contended) => {}
}
hint::spin_loop();
}
}
/// Try get exclusive access without spinning.
pub fn try_lock(&self) -> Result<Guard<'_, T>, Contended> {
if self.locked.swap(true, Ordering::Acquire) {
Err(Contended)
} else {
Ok(Guard {
ref_: unsafe { &mut *self.data.get() },
unlock: &self.locked,
})
}
}
}
#[derive(Debug)]
pub struct Guard<'a, T: ?Sized> {
ref_: &'a mut T,
unlock: &'a AtomicBool,
}
impl<'a, T: ?Sized> Deref for Guard<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&*self.ref_
}
}
impl<'a, T: ?Sized> DerefMut for Guard<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut *self.ref_
}
}
impl<'a, T: ?Sized> Drop for Guard<'a, T> {
fn drop(&mut self) {
self.unlock.store(false, Ordering::Release);
}
}