vnj/src/daemon.rs
2024-11-17 18:43:15 +03:00

124 lines
3.6 KiB
Rust

use std::{
future::Future,
sync::Arc,
time::{Duration, Instant},
};
use tokio::{
signal::unix::{signal, SignalKind},
time,
};
use crate::notifies::Notifies;
use crate::state::State;
async fn sleep_or_hang(deadline: Option<Instant>) {
if let Some(deadline) = deadline {
time::sleep_until(deadline.into()).await
} else {
// Never wake-up.
struct Never;
impl Future for Never {
type Output = ();
fn poll(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
std::task::Poll::Pending
}
}
Never.await
}
}
pub async fn run(state: State, notifies: Arc<Notifies>) -> eyre::Result<()> {
tracing::info!("started daemon");
let mut on_disk_update = signal(SignalKind::hangup()).unwrap();
loop {
let closest_wake_up = 'w: {
let app = state.app.read();
if !app.subs.has_listeners() {
tracing::info!("the queue has no listeners, so it's pointless to schedule wake-up");
break 'w None;
}
if let Some(entry) = app.queue.closest() {
let now = jiff::Zoned::now();
let sleep_nanos = now.duration_until(&entry.post_at).as_nanos().max(0) as u64;
Some(Duration::from_nanos(sleep_nanos))
} else {
None
}
}
.map(|dur| {
tracing::info!(after = ?dur, "the next wake-up is scheduled");
Instant::now() + dur
});
tokio::select! {
biased;
_ = on_disk_update.recv() => {
tracing::info!("got sighup, reloading queue...");
let mut app = state.app.write();
app.reload_queue()?;
}
_reload = notifies.update.notified() => {
tracing::info!("the database was updated, woke up");
continue;
}
_new_sub = notifies.new_subscriber.notified() => {
tracing::debug!("got new subscriber");
continue;
}
_woke_for_update = sleep_or_hang(closest_wake_up) => {
let mut app = state.app.write();
let app = &mut *app;
let Some(item) = app.queue.closest_mut() else {
no_updates();
continue;
};
// First we need to get our waiters.
if !app.subs.has_listeners() {
// If no one is interested in update, then we
// don't consume it.
tracing::warn!("got no subscribers for the update");
continue;
}
// And then pop update out of the queue. This is critical,
// since we simply eat update without notifying anyone.
let now = jiff::Zoned::now();
let Some(upd) = item.try_pop(&now) else {
no_updates();
continue;
};
let pending_subs = app.subs.consume();
for sub in pending_subs {
if let Err(e) = sub.send(upd.novel.clone()) {
tracing::warn!(due_to = e, "one subscriber lost the update");
}
}
app.mark_posted_until(now);
}
}
}
}
fn no_updates() {
tracing::info!("got no updates for this period");
}