Refactor appservices, pusher, timeline, transactionids, users

This commit is contained in:
Timo Kösters 2022-08-07 19:42:22 +02:00 committed by Nyaaori
parent 01bf348811
commit f56424bc8d
No known key found for this signature in database
GPG key ID: E7819C3ED4D1F82E
18 changed files with 546 additions and 4950 deletions

View file

@ -1,4 +1,14 @@
mod data;
pub use data::Data;
use crate::service::*;
pub struct Service<D: Data> {
db: D,
}
impl Service<_> {
/*
/// Checks if a room exists.
#[tracing::instrument(skip(self))]
pub fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<Option<Arc<PduEvent>>> {
@ -20,38 +30,15 @@
.next()
.transpose()
}
*/
#[tracing::instrument(skip(self))]
pub fn last_timeline_count(&self, sender_user: &UserId, room_id: &RoomId) -> Result<u64> {
match self
.lasttimelinecount_cache
.lock()
.unwrap()
.entry(room_id.to_owned())
{
hash_map::Entry::Vacant(v) => {
if let Some(last_count) = self
.pdus_until(&sender_user, &room_id, u64::MAX)?
.filter_map(|r| {
// Filter out buggy events
if r.is_err() {
error!("Bad pdu in pdus_since: {:?}", r);
}
r.ok()
})
.map(|(pduid, _)| self.pdu_count(&pduid))
.next()
{
Ok(*v.insert(last_count?))
} else {
Ok(0)
}
}
hash_map::Entry::Occupied(o) => Ok(*o.get()),
}
self.db.last_timeline_count(sender_user: &UserId, room_id: &RoomId)
}
// TODO Is this the same as the function above?
/*
#[tracing::instrument(skip(self))]
pub fn latest_pdu_count(&self, room_id: &RoomId) -> Result<u64> {
let prefix = self
@ -71,33 +58,16 @@
.transpose()
.map(|op| op.unwrap_or_default())
}
*/
/// Returns the `count` of this pdu's id.
pub fn get_pdu_count(&self, event_id: &EventId) -> Result<Option<u64>> {
self.eventid_pduid
.get(event_id.as_bytes())?
.map(|pdu_id| self.pdu_count(&pdu_id))
.transpose()
self.db.get_pdu_count(event_id)
}
/// Returns the json of a pdu.
pub fn get_pdu_json(&self, event_id: &EventId) -> Result<Option<CanonicalJsonObject>> {
self.eventid_pduid
.get(event_id.as_bytes())?
.map_or_else(
|| self.eventid_outlierpdu.get(event_id.as_bytes()),
|pduid| {
Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| {
Error::bad_database("Invalid pduid in eventid_pduid.")
})?))
},
)?
.map(|pdu| {
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
})
.transpose()
self.db.get_pdu_json(event_id)
}
/// Returns the json of a pdu.
@ -105,122 +75,49 @@
&self,
event_id: &EventId,
) -> Result<Option<CanonicalJsonObject>> {
self.eventid_pduid
.get(event_id.as_bytes())?
.map(|pduid| {
self.pduid_pdu
.get(&pduid)?
.ok_or_else(|| Error::bad_database("Invalid pduid in eventid_pduid."))
})
.transpose()?
.map(|pdu| {
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
})
.transpose()
self.db.get_non_outlier_pdu(event_id)
}
/// Returns the pdu's id.
pub fn get_pdu_id(&self, event_id: &EventId) -> Result<Option<Vec<u8>>> {
self.eventid_pduid.get(event_id.as_bytes())
self.db.get_pdu_id(event_id)
}
/// Returns the pdu.
///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
pub fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result<Option<PduEvent>> {
self.eventid_pduid
.get(event_id.as_bytes())?
.map(|pduid| {
self.pduid_pdu
.get(&pduid)?
.ok_or_else(|| Error::bad_database("Invalid pduid in eventid_pduid."))
})
.transpose()?
.map(|pdu| {
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
})
.transpose()
self.db.get_non_outlier_pdu(event_id)
}
/// Returns the pdu.
///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
pub fn get_pdu(&self, event_id: &EventId) -> Result<Option<Arc<PduEvent>>> {
if let Some(p) = self.pdu_cache.lock().unwrap().get_mut(event_id) {
return Ok(Some(Arc::clone(p)));
}
if let Some(pdu) = self
.eventid_pduid
.get(event_id.as_bytes())?
.map_or_else(
|| self.eventid_outlierpdu.get(event_id.as_bytes()),
|pduid| {
Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| {
Error::bad_database("Invalid pduid in eventid_pduid.")
})?))
},
)?
.map(|pdu| {
serde_json::from_slice(&pdu)
.map_err(|_| Error::bad_database("Invalid PDU in db."))
.map(Arc::new)
})
.transpose()?
{
self.pdu_cache
.lock()
.unwrap()
.insert(event_id.to_owned(), Arc::clone(&pdu));
Ok(Some(pdu))
} else {
Ok(None)
}
self.db.get_pdu(event_id)
}
/// Returns the pdu.
///
/// This does __NOT__ check the outliers `Tree`.
pub fn get_pdu_from_id(&self, pdu_id: &[u8]) -> Result<Option<PduEvent>> {
self.pduid_pdu.get(pdu_id)?.map_or(Ok(None), |pdu| {
Ok(Some(
serde_json::from_slice(&pdu)
.map_err(|_| Error::bad_database("Invalid PDU in db."))?,
))
})
self.db.get_pdu_from_id(pdu_id)
}
/// Returns the pdu as a `BTreeMap<String, CanonicalJsonValue>`.
pub fn get_pdu_json_from_id(&self, pdu_id: &[u8]) -> Result<Option<CanonicalJsonObject>> {
self.pduid_pdu.get(pdu_id)?.map_or(Ok(None), |pdu| {
Ok(Some(
serde_json::from_slice(&pdu)
.map_err(|_| Error::bad_database("Invalid PDU in db."))?,
))
})
self.db.get_pdu_json_from_id(pdu_id)
}
/// Returns the `count` of this pdu's id.
pub fn pdu_count(&self, pdu_id: &[u8]) -> Result<u64> {
utils::u64_from_bytes(&pdu_id[pdu_id.len() - size_of::<u64>()..])
.map_err(|_| Error::bad_database("PDU has invalid count bytes."))
self.db.pdu_count(pdu_id)
}
/// Removes a pdu and creates a new one with the same id.
#[tracing::instrument(skip(self))]
fn replace_pdu(&self, pdu_id: &[u8], pdu: &PduEvent) -> Result<()> {
if self.pduid_pdu.get(pdu_id)?.is_some() {
self.pduid_pdu.insert(
pdu_id,
&serde_json::to_vec(pdu).expect("PduEvent::to_vec always works"),
)?;
Ok(())
} else {
Err(Error::BadRequest(
ErrorKind::NotFound,
"PDU does not exist.",
))
}
self.db.pdu_count(pdu_id, pdu: &PduEvent)
}
/// Creates a new persisted data unit and adds it to a room.
@ -803,7 +700,6 @@
}
/// Returns an iterator over all PDUs in a room.
#[tracing::instrument(skip(self))]
pub fn all_pdus<'a>(
&'a self,
user_id: &UserId,
@ -814,37 +710,13 @@
/// Returns an iterator over all events in a room that happened after the event with id `since`
/// in chronological order.
#[tracing::instrument(skip(self))]
pub fn pdus_since<'a>(
&'a self,
user_id: &UserId,
room_id: &RoomId,
since: u64,
) -> Result<impl Iterator<Item = Result<(Vec<u8>, PduEvent)>> + 'a> {
let prefix = self
.get_shortroomid(room_id)?
.expect("room exists")
.to_be_bytes()
.to_vec();
// Skip the first pdu if it's exactly at since, because we sent that last time
let mut first_pdu_id = prefix.clone();
first_pdu_id.extend_from_slice(&(since + 1).to_be_bytes());
let user_id = user_id.to_owned();
Ok(self
.pduid_pdu
.iter_from(&first_pdu_id, false)
.take_while(move |(k, _)| k.starts_with(&prefix))
.map(move |(pdu_id, v)| {
let mut pdu = serde_json::from_slice::<PduEvent>(&v)
.map_err(|_| Error::bad_database("PDU in db is invalid."))?;
if pdu.sender != user_id {
pdu.remove_transaction_id()?;
}
Ok((pdu_id, pdu))
}))
self.db.pdus_since(user_id, room_id, since)
}
/// Returns an iterator over all events and their tokens in a room that happened before the
@ -856,32 +728,7 @@
room_id: &RoomId,
until: u64,
) -> Result<impl Iterator<Item = Result<(Vec<u8>, PduEvent)>> + 'a> {
// Create the first part of the full pdu id
let prefix = self
.get_shortroomid(room_id)?
.expect("room exists")
.to_be_bytes()
.to_vec();
let mut current = prefix.clone();
current.extend_from_slice(&(until.saturating_sub(1)).to_be_bytes()); // -1 because we don't want event at `until`
let current: &[u8] = &current;
let user_id = user_id.to_owned();
Ok(self
.pduid_pdu
.iter_from(current, true)
.take_while(move |(k, _)| k.starts_with(&prefix))
.map(move |(pdu_id, v)| {
let mut pdu = serde_json::from_slice::<PduEvent>(&v)
.map_err(|_| Error::bad_database("PDU in db is invalid."))?;
if pdu.sender != user_id {
pdu.remove_transaction_id()?;
}
Ok((pdu_id, pdu))
}))
self.db.pdus_until(user_id, room_id, until)
}
/// Returns an iterator over all events and their token in a room that happened after the event
@ -893,32 +740,7 @@
room_id: &RoomId,
from: u64,
) -> Result<impl Iterator<Item = Result<(Vec<u8>, PduEvent)>> + 'a> {
// Create the first part of the full pdu id
let prefix = self
.get_shortroomid(room_id)?
.expect("room exists")
.to_be_bytes()
.to_vec();
let mut current = prefix.clone();
current.extend_from_slice(&(from + 1).to_be_bytes()); // +1 so we don't send the base event
let current: &[u8] = &current;
let user_id = user_id.to_owned();
Ok(self
.pduid_pdu
.iter_from(current, false)
.take_while(move |(k, _)| k.starts_with(&prefix))
.map(move |(pdu_id, v)| {
let mut pdu = serde_json::from_slice::<PduEvent>(&v)
.map_err(|_| Error::bad_database("PDU in db is invalid."))?;
if pdu.sender != user_id {
pdu.remove_transaction_id()?;
}
Ok((pdu_id, pdu))
}))
self.db.pdus_after(user_id, room_id, from)
}
/// Replace a PDU with the redacted form.