notifies
This commit is contained in:
parent
210e7c438b
commit
dc7b1d1f41
4 changed files with 167 additions and 0 deletions
|
@ -29,6 +29,8 @@ pub mod array;
|
||||||
pub mod error;
|
pub mod error;
|
||||||
pub mod utils;
|
pub mod utils;
|
||||||
|
|
||||||
|
pub mod sync;
|
||||||
|
|
||||||
pub mod fut;
|
pub mod fut;
|
||||||
pub mod time;
|
pub mod time;
|
||||||
|
|
||||||
|
|
2
src/sync/mod.rs
Normal file
2
src/sync/mod.rs
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
pub mod mpsc;
|
||||||
|
pub mod spin;
|
80
src/sync/mpsc.rs
Normal file
80
src/sync/mpsc.rs
Normal file
|
@ -0,0 +1,80 @@
|
||||||
|
use crate::sync::spin;
|
||||||
|
|
||||||
|
use std::{pin::Pin, sync::{atomic::{AtomicBool, Ordering::Relaxed}, Arc}, task::{Context, Poll, Waker}};
|
||||||
|
|
||||||
|
struct Cell {
|
||||||
|
should_wake_up: AtomicBool,
|
||||||
|
waker: spin::Lock<Option<Waker>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Rx(Arc<Cell>);
|
||||||
|
|
||||||
|
impl Rx {
|
||||||
|
/// Wait for the wake-up flag.
|
||||||
|
pub async fn wait(&mut self) {
|
||||||
|
Pin::new(self).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Future for Rx {
|
||||||
|
type Output = ();
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
use Poll::*;
|
||||||
|
|
||||||
|
let this = &*self.0;
|
||||||
|
if this.should_wake_up.swap(false, Relaxed) {
|
||||||
|
return Ready(());
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let Ok(mut waker) = this.waker.try_lock() else {
|
||||||
|
this.should_wake_up.store(false, Relaxed);
|
||||||
|
return Ready(());
|
||||||
|
};
|
||||||
|
match &mut *waker {
|
||||||
|
Some(prev) => {
|
||||||
|
prev.clone_from(cx.waker());
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
*waker = Some(cx.waker().clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if this.should_wake_up.swap(false, Relaxed) {
|
||||||
|
Ready(())
|
||||||
|
} else {
|
||||||
|
Pending
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Tx(Arc<Cell>);
|
||||||
|
|
||||||
|
impl Tx {
|
||||||
|
/// Set wake-up flag to associated [`Rx`], if [`Rx`] is currently waiting -
|
||||||
|
/// it would be woken up.
|
||||||
|
pub fn wake(&self) {
|
||||||
|
let cell = &*self.0;
|
||||||
|
cell.should_wake_up.store(true, Relaxed);
|
||||||
|
let Ok(waker) = cell.waker.try_lock() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let Some(waker) = &*waker else { return; };
|
||||||
|
waker.wake_by_ref();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Make rx/tx notifier pair.
|
||||||
|
pub fn make() -> (Tx, Rx) {
|
||||||
|
let cell = Arc::new(Cell {
|
||||||
|
should_wake_up: AtomicBool::new(false),
|
||||||
|
waker: spin::Lock::new(None),
|
||||||
|
});
|
||||||
|
let tx = Tx(Arc::clone(&cell));
|
||||||
|
let rx = Rx(cell);
|
||||||
|
|
||||||
|
(tx, rx)
|
||||||
|
}
|
83
src/sync/spin.rs
Normal file
83
src/sync/spin.rs
Normal file
|
@ -0,0 +1,83 @@
|
||||||
|
use std::{
|
||||||
|
sync::atomic::{AtomicBool, Ordering},
|
||||||
|
cell::UnsafeCell,
|
||||||
|
ops::{Deref, DerefMut},
|
||||||
|
hint,
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::data;
|
||||||
|
|
||||||
|
#[data(error, copy, display("the lock is contended"), crate = crate)]
|
||||||
|
pub struct Contended;
|
||||||
|
|
||||||
|
pub struct Lock<T: ?Sized> {
|
||||||
|
locked: AtomicBool,
|
||||||
|
data: UnsafeCell<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl<T: ?Sized> Send for Lock<T> {}
|
||||||
|
unsafe impl<T: ?Sized> Sync for Lock<T> {}
|
||||||
|
|
||||||
|
impl<T: Default> Default for Lock<T> {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new(T::default())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Lock<T> {
|
||||||
|
pub const fn new(data: T) -> Self {
|
||||||
|
Self {
|
||||||
|
locked: AtomicBool::new(false),
|
||||||
|
data: UnsafeCell::new(data),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get exclusive access, spin until it's available.
|
||||||
|
pub fn lock(&self) -> Guard<'_, T> {
|
||||||
|
loop {
|
||||||
|
match self.try_lock() {
|
||||||
|
Ok(g) => return g,
|
||||||
|
Err(Contended) => {}
|
||||||
|
}
|
||||||
|
hint::spin_loop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Try get exclusive access without spinning.
|
||||||
|
pub fn try_lock(&self) -> Result<Guard<'_, T>, Contended> {
|
||||||
|
if self.locked.swap(true, Ordering::Acquire) {
|
||||||
|
Err(Contended)
|
||||||
|
} else {
|
||||||
|
Ok(Guard {
|
||||||
|
ref_: unsafe { &mut *self.data.get() },
|
||||||
|
unlock: &self.locked,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Guard<'a, T: ?Sized> {
|
||||||
|
ref_: &'a mut T,
|
||||||
|
unlock: &'a AtomicBool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T: ?Sized> Deref for Guard<'a, T> {
|
||||||
|
type Target = T;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&*self.ref_
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T: ?Sized> DerefMut for Guard<'a, T> {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
&mut *self.ref_
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T: ?Sized> Drop for Guard<'a, T> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.unlock.store(false, Ordering::Release);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue