feat: ask for backfill

This commit is contained in:
Timo Kösters 2023-02-20 22:59:45 +01:00
parent 23b18d71ee
commit 7bdd9660aa
No known key found for this signature in database
GPG key ID: 0B25E636FBA7E4CB
12 changed files with 502 additions and 269 deletions

View file

@ -7,6 +7,8 @@ use tracing::error;
use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result};
use service::rooms::timeline::PduCount;
impl service::rooms::timeline::Data for KeyValueDatabase {
fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<Option<Arc<PduEvent>>> {
let prefix = services()
@ -30,7 +32,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
.transpose()
}
fn last_timeline_count(&self, sender_user: &UserId, room_id: &RoomId) -> Result<u64> {
fn last_timeline_count(&self, sender_user: &UserId, room_id: &RoomId) -> Result<PduCount> {
match self
.lasttimelinecount_cache
.lock()
@ -39,20 +41,18 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
{
hash_map::Entry::Vacant(v) => {
if let Some(last_count) = self
.pdus_until(sender_user, room_id, u64::MAX)?
.filter_map(|r| {
.pdus_until(sender_user, room_id, PduCount::max())?
.find_map(|r| {
// Filter out buggy events
if r.is_err() {
error!("Bad pdu in pdus_since: {:?}", r);
}
r.ok()
})
.map(|(pduid, _)| self.pdu_count(&pduid))
.next()
{
Ok(*v.insert(last_count?))
Ok(*v.insert(last_count.0))
} else {
Ok(0)
Ok(PduCount::Normal(0))
}
}
hash_map::Entry::Occupied(o) => Ok(*o.get()),
@ -60,11 +60,23 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
}
/// Returns the `count` of this pdu's id.
fn get_pdu_count(&self, event_id: &EventId) -> Result<Option<u64>> {
self.eventid_pduid
fn get_pdu_count(&self, event_id: &EventId) -> Result<Option<PduCount>> {
Ok(self
.eventid_pduid
.get(event_id.as_bytes())?
.map(|pdu_id| self.pdu_count(&pdu_id))
.transpose()
.map(|pdu_id| Ok::<_, Error>(PduCount::Normal(pdu_count(&pdu_id)?)))
.transpose()?
.map_or_else(
|| {
Ok::<_, Error>(
self.eventid_backfillpduid
.get(event_id.as_bytes())?
.map(|pdu_id| Ok::<_, Error>(PduCount::Backfilled(pdu_count(&pdu_id)?)))
.transpose()?,
)
},
|x| Ok(Some(x)),
)?)
}
/// Returns the json of a pdu.
@ -182,12 +194,6 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
})
}
/// Returns the `count` of this pdu's id.
fn pdu_count(&self, pdu_id: &[u8]) -> Result<u64> {
utils::u64_from_bytes(&pdu_id[pdu_id.len() - size_of::<u64>()..])
.map_err(|_| Error::bad_database("PDU has invalid count bytes."))
}
fn append_pdu(
&self,
pdu_id: &[u8],
@ -203,7 +209,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
self.lasttimelinecount_cache
.lock()
.unwrap()
.insert(pdu.room_id.clone(), count);
.insert(pdu.room_id.clone(), PduCount::Normal(count));
self.eventid_pduid.insert(pdu.event_id.as_bytes(), pdu_id)?;
self.eventid_outlierpdu.remove(pdu.event_id.as_bytes())?;
@ -211,6 +217,24 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
Ok(())
}
fn prepend_backfill_pdu(
&self,
pdu_id: &[u8],
event_id: &EventId,
json: &CanonicalJsonObject,
) -> Result<()> {
self.pduid_backfillpdu.insert(
pdu_id,
&serde_json::to_vec(json).expect("CanonicalJsonObject is always a valid"),
)?;
self.eventid_backfillpduid
.insert(event_id.as_bytes(), pdu_id)?;
self.eventid_outlierpdu.remove(event_id.as_bytes())?;
Ok(())
}
/// Removes a pdu and creates a new one with the same id.
fn replace_pdu(&self, pdu_id: &[u8], pdu: &PduEvent) -> Result<()> {
if self.pduid_pdu.get(pdu_id)?.is_some() {
@ -227,51 +251,14 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
}
}
/// Returns an iterator over all events in a room that happened after the event with id `since`
/// in chronological order.
fn pdus_since<'a>(
&'a self,
user_id: &UserId,
room_id: &RoomId,
since: u64,
) -> Result<Box<dyn Iterator<Item = Result<(Vec<u8>, PduEvent)>> + 'a>> {
let prefix = services()
.rooms
.short
.get_shortroomid(room_id)?
.expect("room exists")
.to_be_bytes()
.to_vec();
// Skip the first pdu if it's exactly at since, because we sent that last time
let mut first_pdu_id = prefix.clone();
first_pdu_id.extend_from_slice(&(since + 1).to_be_bytes());
let user_id = user_id.to_owned();
Ok(Box::new(
self.pduid_pdu
.iter_from(&first_pdu_id, false)
.take_while(move |(k, _)| k.starts_with(&prefix))
.map(move |(pdu_id, v)| {
let mut pdu = serde_json::from_slice::<PduEvent>(&v)
.map_err(|_| Error::bad_database("PDU in db is invalid."))?;
if pdu.sender != user_id {
pdu.remove_transaction_id()?;
}
Ok((pdu_id, pdu))
}),
))
}
/// Returns an iterator over all events and their tokens in a room that happened before the
/// event with id `until` in reverse-chronological order.
fn pdus_until<'a>(
&'a self,
user_id: &UserId,
room_id: &RoomId,
until: u64,
) -> Result<Box<dyn Iterator<Item = Result<(Vec<u8>, PduEvent)>> + 'a>> {
until: PduCount,
) -> Result<Box<dyn Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>> {
// Create the first part of the full pdu id
let prefix = services()
.rooms
@ -281,34 +268,63 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
.to_be_bytes()
.to_vec();
let mut current = prefix.clone();
current.extend_from_slice(&(until.saturating_sub(1)).to_be_bytes()); // -1 because we don't want event at `until`
let current: &[u8] = &current;
let mut current_backfill = prefix.clone();
// +1 so we don't send the base event
let backfill_count = match until {
PduCount::Backfilled(x) => x + 1,
PduCount::Normal(_) => 0,
};
current_backfill.extend_from_slice(&backfill_count.to_be_bytes());
let user_id = user_id.to_owned();
let user_id2 = user_id.to_owned();
let prefix2 = prefix.clone();
Ok(Box::new(
self.pduid_pdu
.iter_from(current, true)
.take_while(move |(k, _)| k.starts_with(&prefix))
.map(move |(pdu_id, v)| {
let mut pdu = serde_json::from_slice::<PduEvent>(&v)
.map_err(|_| Error::bad_database("PDU in db is invalid."))?;
if pdu.sender != user_id {
pdu.remove_transaction_id()?;
}
Ok((pdu_id, pdu))
}),
))
let backfill_iter = self
.pduid_backfillpdu
.iter_from(&current_backfill, false)
.take_while(move |(k, _)| k.starts_with(&prefix))
.map(move |(pdu_id, v)| {
let mut pdu = serde_json::from_slice::<PduEvent>(&v)
.map_err(|_| Error::bad_database("PDU in db is invalid."))?;
if pdu.sender != user_id {
pdu.remove_transaction_id()?;
}
let count = PduCount::Backfilled(pdu_count(&pdu_id)?);
Ok((count, pdu))
});
match until {
PduCount::Backfilled(_) => Ok(Box::new(backfill_iter)),
PduCount::Normal(x) => {
let mut current_normal = prefix2.clone();
// -1 so we don't send the base event
current_normal.extend_from_slice(&x.saturating_sub(1).to_be_bytes());
let normal_iter = self
.pduid_pdu
.iter_from(&current_normal, true)
.take_while(move |(k, _)| k.starts_with(&prefix2))
.map(move |(pdu_id, v)| {
let mut pdu = serde_json::from_slice::<PduEvent>(&v)
.map_err(|_| Error::bad_database("PDU in db is invalid."))?;
if pdu.sender != user_id2 {
pdu.remove_transaction_id()?;
}
let count = PduCount::Normal(pdu_count(&pdu_id)?);
Ok((count, pdu))
});
Ok(Box::new(normal_iter.chain(backfill_iter)))
}
}
}
fn pdus_after<'a>(
&'a self,
user_id: &UserId,
room_id: &RoomId,
from: u64,
) -> Result<Box<dyn Iterator<Item = Result<(Vec<u8>, PduEvent)>> + 'a>> {
from: PduCount,
) -> Result<Box<dyn Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>> {
// Create the first part of the full pdu id
let prefix = services()
.rooms
@ -318,26 +334,55 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
.to_be_bytes()
.to_vec();
let mut current = prefix.clone();
current.extend_from_slice(&(from + 1).to_be_bytes()); // +1 so we don't send the base event
let current: &[u8] = &current;
let mut current_normal = prefix.clone();
// +1 so we don't send the base event
let normal_count = match from {
PduCount::Normal(x) => x + 1,
PduCount::Backfilled(_) => 0,
};
current_normal.extend_from_slice(&normal_count.to_be_bytes());
let user_id = user_id.to_owned();
let user_id2 = user_id.to_owned();
let prefix2 = prefix.clone();
Ok(Box::new(
self.pduid_pdu
.iter_from(current, false)
.take_while(move |(k, _)| k.starts_with(&prefix))
.map(move |(pdu_id, v)| {
let mut pdu = serde_json::from_slice::<PduEvent>(&v)
.map_err(|_| Error::bad_database("PDU in db is invalid."))?;
if pdu.sender != user_id {
pdu.remove_transaction_id()?;
}
Ok((pdu_id, pdu))
}),
))
let normal_iter = self
.pduid_pdu
.iter_from(&current_normal, false)
.take_while(move |(k, _)| k.starts_with(&prefix))
.map(move |(pdu_id, v)| {
let mut pdu = serde_json::from_slice::<PduEvent>(&v)
.map_err(|_| Error::bad_database("PDU in db is invalid."))?;
if pdu.sender != user_id {
pdu.remove_transaction_id()?;
}
let count = PduCount::Normal(pdu_count(&pdu_id)?);
Ok((count, pdu))
});
match from {
PduCount::Normal(_) => Ok(Box::new(normal_iter)),
PduCount::Backfilled(x) => {
let mut current_backfill = prefix2.clone();
// -1 so we don't send the base event
current_backfill.extend_from_slice(&x.saturating_sub(1).to_be_bytes());
let backfill_iter = self
.pduid_backfillpdu
.iter_from(&current_backfill, true)
.take_while(move |(k, _)| k.starts_with(&prefix2))
.map(move |(pdu_id, v)| {
let mut pdu = serde_json::from_slice::<PduEvent>(&v)
.map_err(|_| Error::bad_database("PDU in db is invalid."))?;
if pdu.sender != user_id2 {
pdu.remove_transaction_id()?;
}
let count = PduCount::Backfilled(pdu_count(&pdu_id)?);
Ok((count, pdu))
});
Ok(Box::new(backfill_iter.chain(normal_iter)))
}
}
}
fn increment_notification_counts(
@ -368,3 +413,9 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
Ok(())
}
}
/// Returns the `count` of this pdu's id.
fn pdu_count(pdu_id: &[u8]) -> Result<u64> {
utils::u64_from_bytes(&pdu_id[pdu_id.len() - size_of::<u64>()..])
.map_err(|_| Error::bad_database("PDU has invalid count bytes."))
}

View file

@ -1,7 +1,10 @@
pub mod abstraction;
pub mod key_value;
use crate::{services, utils, Config, Error, PduEvent, Result, Services, SERVICES};
use crate::{
service::rooms::timeline::PduCount, services, utils, Config, Error, PduEvent, Result, Services,
SERVICES,
};
use abstraction::{KeyValueDatabaseEngine, KvTree};
use directories::ProjectDirs;
use lru_cache::LruCache;
@ -71,7 +74,9 @@ pub struct KeyValueDatabase {
//pub rooms: rooms::Rooms,
pub(super) pduid_pdu: Arc<dyn KvTree>, // PduId = ShortRoomId + Count
pub(super) pduid_backfillpdu: Arc<dyn KvTree>, // PduId = ShortRoomId + Count
pub(super) eventid_pduid: Arc<dyn KvTree>,
pub(super) eventid_backfillpduid: Arc<dyn KvTree>,
pub(super) roomid_pduleaves: Arc<dyn KvTree>,
pub(super) alias_roomid: Arc<dyn KvTree>,
pub(super) aliasid_alias: Arc<dyn KvTree>, // AliasId = RoomId + Count
@ -161,7 +166,7 @@ pub struct KeyValueDatabase {
pub(super) shortstatekey_cache: Mutex<LruCache<u64, (StateEventType, String)>>,
pub(super) our_real_users_cache: RwLock<HashMap<OwnedRoomId, Arc<HashSet<OwnedUserId>>>>,
pub(super) appservice_in_room_cache: RwLock<HashMap<OwnedRoomId, HashMap<String, bool>>>,
pub(super) lasttimelinecount_cache: Mutex<HashMap<OwnedRoomId, u64>>,
pub(super) lasttimelinecount_cache: Mutex<HashMap<OwnedRoomId, PduCount>>,
}
impl KeyValueDatabase {
@ -292,7 +297,9 @@ impl KeyValueDatabase {
presenceid_presence: builder.open_tree("presenceid_presence")?,
userid_lastpresenceupdate: builder.open_tree("userid_lastpresenceupdate")?,
pduid_pdu: builder.open_tree("pduid_pdu")?,
pduid_backfillpdu: builder.open_tree("pduid_backfillpdu")?,
eventid_pduid: builder.open_tree("eventid_pduid")?,
eventid_backfillpduid: builder.open_tree("eventid_backfillpduid")?,
roomid_pduleaves: builder.open_tree("roomid_pduleaves")?,
alias_roomid: builder.open_tree("alias_roomid")?,