Improve sync performance with more caching and wrapping things in Arcs to avoid copies

This commit is contained in:
Timo Kösters 2023-06-27 13:06:55 +02:00
parent 7c6d25dcd1
commit be877ef719
No known key found for this signature in database
GPG key ID: 0B25E636FBA7E4CB
12 changed files with 112 additions and 98 deletions

View file

@ -90,7 +90,7 @@ impl Services {
state_compressor: rooms::state_compressor::Service {
db,
stateinfo_cache: Mutex::new(LruCache::new(
(100.0 * config.conduit_cache_capacity_modifier) as usize,
(1000.0 * config.conduit_cache_capacity_modifier) as usize,
)),
},
timeline: rooms::timeline::Service {

View file

@ -774,15 +774,17 @@ impl Service {
});
info!("Compressing state at event");
let state_ids_compressed = state_at_incoming_event
.iter()
.map(|(shortstatekey, id)| {
services()
.rooms
.state_compressor
.compress_state_event(*shortstatekey, id)
})
.collect::<Result<_>>()?;
let state_ids_compressed = Arc::new(
state_at_incoming_event
.iter()
.map(|(shortstatekey, id)| {
services()
.rooms
.state_compressor
.compress_state_event(*shortstatekey, id)
})
.collect::<Result<_>>()?,
);
if incoming_pdu.state_key.is_some() {
info!("Preparing for stateres to derive new room state");
@ -886,7 +888,7 @@ impl Service {
room_id: &RoomId,
room_version_id: &RoomVersionId,
incoming_state: HashMap<u64, Arc<EventId>>,
) -> Result<HashSet<CompressedStateEvent>> {
) -> Result<Arc<HashSet<CompressedStateEvent>>> {
info!("Loading current room state ids");
let current_sstatehash = services()
.rooms
@ -966,7 +968,7 @@ impl Service {
})
.collect::<Result<_>>()?;
Ok(new_room_state)
Ok(Arc::new(new_room_state))
}
/// Find the event and auth it. Once the event is validated (steps 1 - 8)

View file

@ -32,11 +32,11 @@ impl Service {
&self,
room_id: &RoomId,
shortstatehash: u64,
statediffnew: HashSet<CompressedStateEvent>,
_statediffremoved: HashSet<CompressedStateEvent>,
statediffnew: Arc<HashSet<CompressedStateEvent>>,
_statediffremoved: Arc<HashSet<CompressedStateEvent>>,
state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<()> {
for event_id in statediffnew.into_iter().filter_map(|new| {
for event_id in statediffnew.iter().filter_map(|new| {
services()
.rooms
.state_compressor
@ -107,7 +107,7 @@ impl Service {
&self,
event_id: &EventId,
room_id: &RoomId,
state_ids_compressed: HashSet<CompressedStateEvent>,
state_ids_compressed: Arc<HashSet<CompressedStateEvent>>,
) -> Result<u64> {
let shorteventid = services()
.rooms
@ -152,9 +152,9 @@ impl Service {
.copied()
.collect();
(statediffnew, statediffremoved)
(Arc::new(statediffnew), Arc::new(statediffremoved))
} else {
(state_ids_compressed, HashSet::new())
(state_ids_compressed, Arc::new(HashSet::new()))
};
services().rooms.state_compressor.save_state_from_diff(
shortstatehash,
@ -234,8 +234,8 @@ impl Service {
services().rooms.state_compressor.save_state_from_diff(
shortstatehash,
statediffnew,
statediffremoved,
Arc::new(statediffnew),
Arc::new(statediffremoved),
2,
states_parents,
)?;
@ -396,7 +396,7 @@ impl Service {
.1;
Ok(full_state
.into_iter()
.iter()
.filter_map(|compressed| {
services()
.rooms

View file

@ -1,12 +1,12 @@
use std::collections::HashSet;
use std::{collections::HashSet, sync::Arc};
use super::CompressedStateEvent;
use crate::Result;
pub struct StateDiff {
pub parent: Option<u64>,
pub added: HashSet<CompressedStateEvent>,
pub removed: HashSet<CompressedStateEvent>,
pub added: Arc<HashSet<CompressedStateEvent>>,
pub removed: Arc<HashSet<CompressedStateEvent>>,
}
pub trait Data: Send + Sync {

View file

@ -20,10 +20,10 @@ pub struct Service {
LruCache<
u64,
Vec<(
u64, // sstatehash
HashSet<CompressedStateEvent>, // full state
HashSet<CompressedStateEvent>, // added
HashSet<CompressedStateEvent>, // removed
u64, // sstatehash
Arc<HashSet<CompressedStateEvent>>, // full state
Arc<HashSet<CompressedStateEvent>>, // added
Arc<HashSet<CompressedStateEvent>>, // removed
)>,
>,
>,
@ -39,10 +39,10 @@ impl Service {
shortstatehash: u64,
) -> Result<
Vec<(
u64, // sstatehash
HashSet<CompressedStateEvent>, // full state
HashSet<CompressedStateEvent>, // added
HashSet<CompressedStateEvent>, // removed
u64, // sstatehash
Arc<HashSet<CompressedStateEvent>>, // full state
Arc<HashSet<CompressedStateEvent>>, // added
Arc<HashSet<CompressedStateEvent>>, // removed
)>,
> {
if let Some(r) = self
@ -62,13 +62,19 @@ impl Service {
if let Some(parent) = parent {
let mut response = self.load_shortstatehash_info(parent)?;
let mut state = response.last().unwrap().1.clone();
let mut state = (*response.last().unwrap().1).clone();
state.extend(added.iter().copied());
let removed = (*removed).clone();
for r in &removed {
state.remove(r);
}
response.push((shortstatehash, state, added, removed));
response.push((shortstatehash, Arc::new(state), added, Arc::new(removed)));
self.stateinfo_cache
.lock()
.unwrap()
.insert(shortstatehash, response.clone());
Ok(response)
} else {
@ -135,14 +141,14 @@ impl Service {
pub fn save_state_from_diff(
&self,
shortstatehash: u64,
statediffnew: HashSet<CompressedStateEvent>,
statediffremoved: HashSet<CompressedStateEvent>,
statediffnew: Arc<HashSet<CompressedStateEvent>>,
statediffremoved: Arc<HashSet<CompressedStateEvent>>,
diff_to_sibling: usize,
mut parent_states: Vec<(
u64, // sstatehash
HashSet<CompressedStateEvent>, // full state
HashSet<CompressedStateEvent>, // added
HashSet<CompressedStateEvent>, // removed
u64, // sstatehash
Arc<HashSet<CompressedStateEvent>>, // full state
Arc<HashSet<CompressedStateEvent>>, // added
Arc<HashSet<CompressedStateEvent>>, // removed
)>,
) -> Result<()> {
let diffsum = statediffnew.len() + statediffremoved.len();
@ -152,29 +158,29 @@ impl Service {
// To many layers, we have to go deeper
let parent = parent_states.pop().unwrap();
let mut parent_new = parent.2;
let mut parent_removed = parent.3;
let mut parent_new = (*parent.2).clone();
let mut parent_removed = (*parent.3).clone();
for removed in statediffremoved {
if !parent_new.remove(&removed) {
for removed in statediffremoved.iter() {
if !parent_new.remove(removed) {
// It was not added in the parent and we removed it
parent_removed.insert(removed);
parent_removed.insert(removed.clone());
}
// Else it was added in the parent and we removed it again. We can forget this change
}
for new in statediffnew {
if !parent_removed.remove(&new) {
for new in statediffnew.iter() {
if !parent_removed.remove(new) {
// It was not touched in the parent and we added it
parent_new.insert(new);
parent_new.insert(new.clone());
}
// Else it was removed in the parent and we added it again. We can forget this change
}
self.save_state_from_diff(
shortstatehash,
parent_new,
parent_removed,
Arc::new(parent_new),
Arc::new(parent_removed),
diffsum,
parent_states,
)?;
@ -205,29 +211,29 @@ impl Service {
if diffsum * diffsum >= 2 * diff_to_sibling * parent_diff {
// Diff too big, we replace above layer(s)
let mut parent_new = parent.2;
let mut parent_removed = parent.3;
let mut parent_new = (*parent.2).clone();
let mut parent_removed = (*parent.3).clone();
for removed in statediffremoved {
if !parent_new.remove(&removed) {
for removed in statediffremoved.iter() {
if !parent_new.remove(removed) {
// It was not added in the parent and we removed it
parent_removed.insert(removed);
parent_removed.insert(removed.clone());
}
// Else it was added in the parent and we removed it again. We can forget this change
}
for new in statediffnew {
if !parent_removed.remove(&new) {
for new in statediffnew.iter() {
if !parent_removed.remove(new) {
// It was not touched in the parent and we added it
parent_new.insert(new);
parent_new.insert(new.clone());
}
// Else it was removed in the parent and we added it again. We can forget this change
}
self.save_state_from_diff(
shortstatehash,
parent_new,
parent_removed,
Arc::new(parent_new),
Arc::new(parent_removed),
diffsum,
parent_states,
)?;
@ -250,11 +256,11 @@ impl Service {
pub fn save_state(
&self,
room_id: &RoomId,
new_state_ids_compressed: HashSet<CompressedStateEvent>,
new_state_ids_compressed: Arc<HashSet<CompressedStateEvent>>,
) -> Result<(
u64,
HashSet<CompressedStateEvent>,
HashSet<CompressedStateEvent>,
Arc<HashSet<CompressedStateEvent>>,
Arc<HashSet<CompressedStateEvent>>,
)> {
let previous_shortstatehash = services().rooms.state.get_room_shortstatehash(room_id)?;
@ -271,7 +277,11 @@ impl Service {
.get_or_create_shortstatehash(&state_hash)?;
if Some(new_shortstatehash) == previous_shortstatehash {
return Ok((new_shortstatehash, HashSet::new(), HashSet::new()));
return Ok((
new_shortstatehash,
Arc::new(HashSet::new()),
Arc::new(HashSet::new()),
));
}
let states_parents = previous_shortstatehash
@ -290,9 +300,9 @@ impl Service {
.copied()
.collect();
(statediffnew, statediffremoved)
(Arc::new(statediffnew), Arc::new(statediffremoved))
} else {
(new_state_ids_compressed, HashSet::new())
(new_state_ids_compressed, Arc::new(HashSet::new()))
};
if !already_existed {

View file

@ -946,7 +946,7 @@ impl Service {
pdu: &PduEvent,
pdu_json: CanonicalJsonObject,
new_room_leaves: Vec<OwnedEventId>,
state_ids_compressed: HashSet<CompressedStateEvent>,
state_ids_compressed: Arc<HashSet<CompressedStateEvent>>,
soft_fail: bool,
state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<Option<Vec<u8>>> {