refactor: work on auth chain and state compressor
This commit is contained in:
parent
0daa3209db
commit
e045abe961
6 changed files with 81 additions and 1111 deletions
|
@ -1,319 +1,7 @@
|
||||||
|
impl service::room::auth_chain::Data for KeyValueDatabase {
|
||||||
/// Returns a stack with info on shortstatehash, full state, added diff and removed diff for the selected shortstatehash and each parent layer.
|
fn get_cached_eventid_authchain<'a>() -> Result<HashSet<u64>> {
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn load_shortstatehash_info(
|
|
||||||
&self,
|
|
||||||
shortstatehash: u64,
|
|
||||||
) -> Result<
|
|
||||||
Vec<(
|
|
||||||
u64, // sstatehash
|
|
||||||
HashSet<CompressedStateEvent>, // full state
|
|
||||||
HashSet<CompressedStateEvent>, // added
|
|
||||||
HashSet<CompressedStateEvent>, // removed
|
|
||||||
)>,
|
|
||||||
> {
|
|
||||||
if let Some(r) = self
|
|
||||||
.stateinfo_cache
|
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.get_mut(&shortstatehash)
|
|
||||||
{
|
|
||||||
return Ok(r.clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
let value = self
|
|
||||||
.shortstatehash_statediff
|
|
||||||
.get(&shortstatehash.to_be_bytes())?
|
|
||||||
.ok_or_else(|| Error::bad_database("State hash does not exist"))?;
|
|
||||||
let parent =
|
|
||||||
utils::u64_from_bytes(&value[0..size_of::<u64>()]).expect("bytes have right length");
|
|
||||||
|
|
||||||
let mut add_mode = true;
|
|
||||||
let mut added = HashSet::new();
|
|
||||||
let mut removed = HashSet::new();
|
|
||||||
|
|
||||||
let mut i = size_of::<u64>();
|
|
||||||
while let Some(v) = value.get(i..i + 2 * size_of::<u64>()) {
|
|
||||||
if add_mode && v.starts_with(&0_u64.to_be_bytes()) {
|
|
||||||
add_mode = false;
|
|
||||||
i += size_of::<u64>();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if add_mode {
|
|
||||||
added.insert(v.try_into().expect("we checked the size above"));
|
|
||||||
} else {
|
|
||||||
removed.insert(v.try_into().expect("we checked the size above"));
|
|
||||||
}
|
|
||||||
i += 2 * size_of::<u64>();
|
|
||||||
}
|
|
||||||
|
|
||||||
if parent != 0_u64 {
|
|
||||||
let mut response = self.load_shortstatehash_info(parent)?;
|
|
||||||
let mut state = response.last().unwrap().1.clone();
|
|
||||||
state.extend(added.iter().copied());
|
|
||||||
for r in &removed {
|
|
||||||
state.remove(r);
|
|
||||||
}
|
|
||||||
|
|
||||||
response.push((shortstatehash, state, added, removed));
|
|
||||||
|
|
||||||
Ok(response)
|
|
||||||
} else {
|
|
||||||
let response = vec![(shortstatehash, added.clone(), added, removed)];
|
|
||||||
self.stateinfo_cache
|
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.insert(shortstatehash, response.clone());
|
|
||||||
Ok(response)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn compress_state_event(
|
|
||||||
&self,
|
|
||||||
shortstatekey: u64,
|
|
||||||
event_id: &EventId,
|
|
||||||
globals: &super::globals::Globals,
|
|
||||||
) -> Result<CompressedStateEvent> {
|
|
||||||
let mut v = shortstatekey.to_be_bytes().to_vec();
|
|
||||||
v.extend_from_slice(
|
|
||||||
&self
|
|
||||||
.get_or_create_shorteventid(event_id, globals)?
|
|
||||||
.to_be_bytes(),
|
|
||||||
);
|
|
||||||
Ok(v.try_into().expect("we checked the size above"))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns shortstatekey, event id
|
|
||||||
pub fn parse_compressed_state_event(
|
|
||||||
&self,
|
|
||||||
compressed_event: CompressedStateEvent,
|
|
||||||
) -> Result<(u64, Arc<EventId>)> {
|
|
||||||
Ok((
|
|
||||||
utils::u64_from_bytes(&compressed_event[0..size_of::<u64>()])
|
|
||||||
.expect("bytes have right length"),
|
|
||||||
self.get_eventid_from_short(
|
|
||||||
utils::u64_from_bytes(&compressed_event[size_of::<u64>()..])
|
|
||||||
.expect("bytes have right length"),
|
|
||||||
)?,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Creates a new shortstatehash that often is just a diff to an already existing
|
|
||||||
/// shortstatehash and therefore very efficient.
|
|
||||||
///
|
|
||||||
/// There are multiple layers of diffs. The bottom layer 0 always contains the full state. Layer
|
|
||||||
/// 1 contains diffs to states of layer 0, layer 2 diffs to layer 1 and so on. If layer n > 0
|
|
||||||
/// grows too big, it will be combined with layer n-1 to create a new diff on layer n-1 that's
|
|
||||||
/// based on layer n-2. If that layer is also too big, it will recursively fix above layers too.
|
|
||||||
///
|
|
||||||
/// * `shortstatehash` - Shortstatehash of this state
|
|
||||||
/// * `statediffnew` - Added to base. Each vec is shortstatekey+shorteventid
|
|
||||||
/// * `statediffremoved` - Removed from base. Each vec is shortstatekey+shorteventid
|
|
||||||
/// * `diff_to_sibling` - Approximately how much the diff grows each time for this layer
|
|
||||||
/// * `parent_states` - A stack with info on shortstatehash, full state, added diff and removed diff for each parent layer
|
|
||||||
#[tracing::instrument(skip(
|
|
||||||
self,
|
|
||||||
statediffnew,
|
|
||||||
statediffremoved,
|
|
||||||
diff_to_sibling,
|
|
||||||
parent_states
|
|
||||||
))]
|
|
||||||
pub fn save_state_from_diff(
|
|
||||||
&self,
|
|
||||||
shortstatehash: u64,
|
|
||||||
statediffnew: HashSet<CompressedStateEvent>,
|
|
||||||
statediffremoved: HashSet<CompressedStateEvent>,
|
|
||||||
diff_to_sibling: usize,
|
|
||||||
mut parent_states: Vec<(
|
|
||||||
u64, // sstatehash
|
|
||||||
HashSet<CompressedStateEvent>, // full state
|
|
||||||
HashSet<CompressedStateEvent>, // added
|
|
||||||
HashSet<CompressedStateEvent>, // removed
|
|
||||||
)>,
|
|
||||||
) -> Result<()> {
|
|
||||||
let diffsum = statediffnew.len() + statediffremoved.len();
|
|
||||||
|
|
||||||
if parent_states.len() > 3 {
|
|
||||||
// Number of layers
|
|
||||||
// To many layers, we have to go deeper
|
|
||||||
let parent = parent_states.pop().unwrap();
|
|
||||||
|
|
||||||
let mut parent_new = parent.2;
|
|
||||||
let mut parent_removed = parent.3;
|
|
||||||
|
|
||||||
for removed in statediffremoved {
|
|
||||||
if !parent_new.remove(&removed) {
|
|
||||||
// It was not added in the parent and we removed it
|
|
||||||
parent_removed.insert(removed);
|
|
||||||
}
|
|
||||||
// Else it was added in the parent and we removed it again. We can forget this change
|
|
||||||
}
|
|
||||||
|
|
||||||
for new in statediffnew {
|
|
||||||
if !parent_removed.remove(&new) {
|
|
||||||
// It was not touched in the parent and we added it
|
|
||||||
parent_new.insert(new);
|
|
||||||
}
|
|
||||||
// Else it was removed in the parent and we added it again. We can forget this change
|
|
||||||
}
|
|
||||||
|
|
||||||
self.save_state_from_diff(
|
|
||||||
shortstatehash,
|
|
||||||
parent_new,
|
|
||||||
parent_removed,
|
|
||||||
diffsum,
|
|
||||||
parent_states,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
if parent_states.is_empty() {
|
|
||||||
// There is no parent layer, create a new state
|
|
||||||
let mut value = 0_u64.to_be_bytes().to_vec(); // 0 means no parent
|
|
||||||
for new in &statediffnew {
|
|
||||||
value.extend_from_slice(&new[..]);
|
|
||||||
}
|
|
||||||
|
|
||||||
if !statediffremoved.is_empty() {
|
|
||||||
warn!("Tried to create new state with removals");
|
|
||||||
}
|
|
||||||
|
|
||||||
self.shortstatehash_statediff
|
|
||||||
.insert(&shortstatehash.to_be_bytes(), &value)?;
|
|
||||||
|
|
||||||
return Ok(());
|
|
||||||
};
|
|
||||||
|
|
||||||
// Else we have two options.
|
|
||||||
// 1. We add the current diff on top of the parent layer.
|
|
||||||
// 2. We replace a layer above
|
|
||||||
|
|
||||||
let parent = parent_states.pop().unwrap();
|
|
||||||
let parent_diff = parent.2.len() + parent.3.len();
|
|
||||||
|
|
||||||
if diffsum * diffsum >= 2 * diff_to_sibling * parent_diff {
|
|
||||||
// Diff too big, we replace above layer(s)
|
|
||||||
let mut parent_new = parent.2;
|
|
||||||
let mut parent_removed = parent.3;
|
|
||||||
|
|
||||||
for removed in statediffremoved {
|
|
||||||
if !parent_new.remove(&removed) {
|
|
||||||
// It was not added in the parent and we removed it
|
|
||||||
parent_removed.insert(removed);
|
|
||||||
}
|
|
||||||
// Else it was added in the parent and we removed it again. We can forget this change
|
|
||||||
}
|
|
||||||
|
|
||||||
for new in statediffnew {
|
|
||||||
if !parent_removed.remove(&new) {
|
|
||||||
// It was not touched in the parent and we added it
|
|
||||||
parent_new.insert(new);
|
|
||||||
}
|
|
||||||
// Else it was removed in the parent and we added it again. We can forget this change
|
|
||||||
}
|
|
||||||
|
|
||||||
self.save_state_from_diff(
|
|
||||||
shortstatehash,
|
|
||||||
parent_new,
|
|
||||||
parent_removed,
|
|
||||||
diffsum,
|
|
||||||
parent_states,
|
|
||||||
)?;
|
|
||||||
} else {
|
|
||||||
// Diff small enough, we add diff as layer on top of parent
|
|
||||||
let mut value = parent.0.to_be_bytes().to_vec();
|
|
||||||
for new in &statediffnew {
|
|
||||||
value.extend_from_slice(&new[..]);
|
|
||||||
}
|
|
||||||
|
|
||||||
if !statediffremoved.is_empty() {
|
|
||||||
value.extend_from_slice(&0_u64.to_be_bytes());
|
|
||||||
for removed in &statediffremoved {
|
|
||||||
value.extend_from_slice(&removed[..]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
self.shortstatehash_statediff
|
|
||||||
.insert(&shortstatehash.to_be_bytes(), &value)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the new shortstatehash
|
|
||||||
pub fn save_state(
|
|
||||||
room_id: &RoomId,
|
|
||||||
new_state_ids_compressed: HashSet<CompressedStateEvent>,
|
|
||||||
) -> Result<(u64,
|
|
||||||
HashSet<CompressedStateEvent>, // added
|
|
||||||
HashSet<CompressedStateEvent>)> // removed
|
|
||||||
{
|
|
||||||
let previous_shortstatehash = self.d.current_shortstatehash(room_id)?;
|
|
||||||
|
|
||||||
let state_hash = self.calculate_hash(
|
|
||||||
&new_state_ids_compressed
|
|
||||||
.iter()
|
|
||||||
.map(|bytes| &bytes[..])
|
|
||||||
.collect::<Vec<_>>(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let (new_shortstatehash, already_existed) =
|
|
||||||
self.get_or_create_shortstatehash(&state_hash, &db.globals)?;
|
|
||||||
|
|
||||||
if Some(new_shortstatehash) == previous_shortstatehash {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
let states_parents = previous_shortstatehash
|
|
||||||
.map_or_else(|| Ok(Vec::new()), |p| self.load_shortstatehash_info(p))?;
|
|
||||||
|
|
||||||
let (statediffnew, statediffremoved) = if let Some(parent_stateinfo) = states_parents.last()
|
|
||||||
{
|
|
||||||
let statediffnew: HashSet<_> = new_state_ids_compressed
|
|
||||||
.difference(&parent_stateinfo.1)
|
|
||||||
.copied()
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let statediffremoved: HashSet<_> = parent_stateinfo
|
|
||||||
.1
|
|
||||||
.difference(&new_state_ids_compressed)
|
|
||||||
.copied()
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
(statediffnew, statediffremoved)
|
|
||||||
} else {
|
|
||||||
(new_state_ids_compressed, HashSet::new())
|
|
||||||
};
|
|
||||||
|
|
||||||
if !already_existed {
|
|
||||||
self.save_state_from_diff(
|
|
||||||
new_shortstatehash,
|
|
||||||
statediffnew.clone(),
|
|
||||||
statediffremoved,
|
|
||||||
2, // every state change is 2 event changes on average
|
|
||||||
states_parents,
|
|
||||||
)?;
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok((new_shortstatehash, statediffnew, statediffremoved))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn get_auth_chain_from_cache<'a>(
|
|
||||||
&'a self,
|
|
||||||
key: &[u64],
|
|
||||||
) -> Result<Option<Arc<HashSet<u64>>>> {
|
|
||||||
// Check RAM cache
|
|
||||||
if let Some(result) = self.auth_chain_cache.lock().unwrap().get_mut(key) {
|
|
||||||
return Ok(Some(Arc::clone(result)));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check DB cache
|
|
||||||
if key.len() == 1 {
|
|
||||||
if let Some(chain) =
|
|
||||||
self.shorteventid_authchain
|
self.shorteventid_authchain
|
||||||
.get(&key[0].to_be_bytes())?
|
.get(&shorteventid.to_be_bytes())?
|
||||||
.map(|chain| {
|
.map(|chain| {
|
||||||
chain
|
chain
|
||||||
.chunks_exact(size_of::<u64>())
|
.chunks_exact(size_of::<u64>())
|
||||||
|
@ -322,37 +10,15 @@
|
||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
})
|
})
|
||||||
{
|
|
||||||
let chain = Arc::new(chain);
|
|
||||||
|
|
||||||
// Cache in RAM
|
|
||||||
self.auth_chain_cache
|
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.insert(vec![key[0]], Arc::clone(&chain));
|
|
||||||
|
|
||||||
return Ok(Some(chain));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(None)
|
fn cache_eventid_authchain<'a>(shorteventid: u64, auth_chain: &HashSet<u64>) -> Result<()> {
|
||||||
}
|
shorteventid_authchain.insert(
|
||||||
|
&shorteventid.to_be_bytes(),
|
||||||
#[tracing::instrument(skip(self))]
|
&auth_chain
|
||||||
pub fn cache_auth_chain(&self, key: Vec<u64>, chain: Arc<HashSet<u64>>) -> Result<()> {
|
|
||||||
// Persist in db
|
|
||||||
if key.len() == 1 {
|
|
||||||
self.shorteventid_authchain.insert(
|
|
||||||
&key[0].to_be_bytes(),
|
|
||||||
&chain
|
|
||||||
.iter()
|
.iter()
|
||||||
.flat_map(|s| s.to_be_bytes().to_vec())
|
.flat_map(|s| s.to_be_bytes().to_vec())
|
||||||
.collect::<Vec<u8>>(),
|
.collect::<Vec<u8>>(),
|
||||||
)?;
|
)
|
||||||
}
|
|
||||||
|
|
||||||
// Cache in RAM
|
|
||||||
self.auth_chain_cache.lock().unwrap().insert(key, chain);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,26 +1,5 @@
|
||||||
|
impl service::room::state_compressor::Data for KeyValueDatabase {
|
||||||
/// Returns a stack with info on shortstatehash, full state, added diff and removed diff for the selected shortstatehash and each parent layer.
|
fn get_statediff(shortstatehash: u64) -> Result<StateDiff> {
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn load_shortstatehash_info(
|
|
||||||
&self,
|
|
||||||
shortstatehash: u64,
|
|
||||||
) -> Result<
|
|
||||||
Vec<(
|
|
||||||
u64, // sstatehash
|
|
||||||
HashSet<CompressedStateEvent>, // full state
|
|
||||||
HashSet<CompressedStateEvent>, // added
|
|
||||||
HashSet<CompressedStateEvent>, // removed
|
|
||||||
)>,
|
|
||||||
> {
|
|
||||||
if let Some(r) = self
|
|
||||||
.stateinfo_cache
|
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.get_mut(&shortstatehash)
|
|
||||||
{
|
|
||||||
return Ok(r.clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
let value = self
|
let value = self
|
||||||
.shortstatehash_statediff
|
.shortstatehash_statediff
|
||||||
.get(&shortstatehash.to_be_bytes())?
|
.get(&shortstatehash.to_be_bytes())?
|
||||||
|
@ -47,189 +26,18 @@
|
||||||
i += 2 * size_of::<u64>();
|
i += 2 * size_of::<u64>();
|
||||||
}
|
}
|
||||||
|
|
||||||
if parent != 0_u64 {
|
StateDiff { parent, added, removed }
|
||||||
let mut response = self.load_shortstatehash_info(parent)?;
|
|
||||||
let mut state = response.last().unwrap().1.clone();
|
|
||||||
state.extend(added.iter().copied());
|
|
||||||
for r in &removed {
|
|
||||||
state.remove(r);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
response.push((shortstatehash, state, added, removed));
|
fn save_statediff(shortstatehash: u64, diff: StateDiff) -> Result<()> {
|
||||||
|
let mut value = diff.parent.to_be_bytes().to_vec();
|
||||||
Ok(response)
|
for new in &diff.new {
|
||||||
} else {
|
|
||||||
let response = vec![(shortstatehash, added.clone(), added, removed)];
|
|
||||||
self.stateinfo_cache
|
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.insert(shortstatehash, response.clone());
|
|
||||||
Ok(response)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn compress_state_event(
|
|
||||||
&self,
|
|
||||||
shortstatekey: u64,
|
|
||||||
event_id: &EventId,
|
|
||||||
globals: &super::globals::Globals,
|
|
||||||
) -> Result<CompressedStateEvent> {
|
|
||||||
let mut v = shortstatekey.to_be_bytes().to_vec();
|
|
||||||
v.extend_from_slice(
|
|
||||||
&self
|
|
||||||
.get_or_create_shorteventid(event_id, globals)?
|
|
||||||
.to_be_bytes(),
|
|
||||||
);
|
|
||||||
Ok(v.try_into().expect("we checked the size above"))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns shortstatekey, event id
|
|
||||||
pub fn parse_compressed_state_event(
|
|
||||||
&self,
|
|
||||||
compressed_event: CompressedStateEvent,
|
|
||||||
) -> Result<(u64, Arc<EventId>)> {
|
|
||||||
Ok((
|
|
||||||
utils::u64_from_bytes(&compressed_event[0..size_of::<u64>()])
|
|
||||||
.expect("bytes have right length"),
|
|
||||||
self.get_eventid_from_short(
|
|
||||||
utils::u64_from_bytes(&compressed_event[size_of::<u64>()..])
|
|
||||||
.expect("bytes have right length"),
|
|
||||||
)?,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Creates a new shortstatehash that often is just a diff to an already existing
|
|
||||||
/// shortstatehash and therefore very efficient.
|
|
||||||
///
|
|
||||||
/// There are multiple layers of diffs. The bottom layer 0 always contains the full state. Layer
|
|
||||||
/// 1 contains diffs to states of layer 0, layer 2 diffs to layer 1 and so on. If layer n > 0
|
|
||||||
/// grows too big, it will be combined with layer n-1 to create a new diff on layer n-1 that's
|
|
||||||
/// based on layer n-2. If that layer is also too big, it will recursively fix above layers too.
|
|
||||||
///
|
|
||||||
/// * `shortstatehash` - Shortstatehash of this state
|
|
||||||
/// * `statediffnew` - Added to base. Each vec is shortstatekey+shorteventid
|
|
||||||
/// * `statediffremoved` - Removed from base. Each vec is shortstatekey+shorteventid
|
|
||||||
/// * `diff_to_sibling` - Approximately how much the diff grows each time for this layer
|
|
||||||
/// * `parent_states` - A stack with info on shortstatehash, full state, added diff and removed diff for each parent layer
|
|
||||||
#[tracing::instrument(skip(
|
|
||||||
self,
|
|
||||||
statediffnew,
|
|
||||||
statediffremoved,
|
|
||||||
diff_to_sibling,
|
|
||||||
parent_states
|
|
||||||
))]
|
|
||||||
pub fn save_state_from_diff(
|
|
||||||
&self,
|
|
||||||
shortstatehash: u64,
|
|
||||||
statediffnew: HashSet<CompressedStateEvent>,
|
|
||||||
statediffremoved: HashSet<CompressedStateEvent>,
|
|
||||||
diff_to_sibling: usize,
|
|
||||||
mut parent_states: Vec<(
|
|
||||||
u64, // sstatehash
|
|
||||||
HashSet<CompressedStateEvent>, // full state
|
|
||||||
HashSet<CompressedStateEvent>, // added
|
|
||||||
HashSet<CompressedStateEvent>, // removed
|
|
||||||
)>,
|
|
||||||
) -> Result<()> {
|
|
||||||
let diffsum = statediffnew.len() + statediffremoved.len();
|
|
||||||
|
|
||||||
if parent_states.len() > 3 {
|
|
||||||
// Number of layers
|
|
||||||
// To many layers, we have to go deeper
|
|
||||||
let parent = parent_states.pop().unwrap();
|
|
||||||
|
|
||||||
let mut parent_new = parent.2;
|
|
||||||
let mut parent_removed = parent.3;
|
|
||||||
|
|
||||||
for removed in statediffremoved {
|
|
||||||
if !parent_new.remove(&removed) {
|
|
||||||
// It was not added in the parent and we removed it
|
|
||||||
parent_removed.insert(removed);
|
|
||||||
}
|
|
||||||
// Else it was added in the parent and we removed it again. We can forget this change
|
|
||||||
}
|
|
||||||
|
|
||||||
for new in statediffnew {
|
|
||||||
if !parent_removed.remove(&new) {
|
|
||||||
// It was not touched in the parent and we added it
|
|
||||||
parent_new.insert(new);
|
|
||||||
}
|
|
||||||
// Else it was removed in the parent and we added it again. We can forget this change
|
|
||||||
}
|
|
||||||
|
|
||||||
self.save_state_from_diff(
|
|
||||||
shortstatehash,
|
|
||||||
parent_new,
|
|
||||||
parent_removed,
|
|
||||||
diffsum,
|
|
||||||
parent_states,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
if parent_states.is_empty() {
|
|
||||||
// There is no parent layer, create a new state
|
|
||||||
let mut value = 0_u64.to_be_bytes().to_vec(); // 0 means no parent
|
|
||||||
for new in &statediffnew {
|
|
||||||
value.extend_from_slice(&new[..]);
|
value.extend_from_slice(&new[..]);
|
||||||
}
|
}
|
||||||
|
|
||||||
if !statediffremoved.is_empty() {
|
if !diff.removed.is_empty() {
|
||||||
warn!("Tried to create new state with removals");
|
|
||||||
}
|
|
||||||
|
|
||||||
self.shortstatehash_statediff
|
|
||||||
.insert(&shortstatehash.to_be_bytes(), &value)?;
|
|
||||||
|
|
||||||
return Ok(());
|
|
||||||
};
|
|
||||||
|
|
||||||
// Else we have two options.
|
|
||||||
// 1. We add the current diff on top of the parent layer.
|
|
||||||
// 2. We replace a layer above
|
|
||||||
|
|
||||||
let parent = parent_states.pop().unwrap();
|
|
||||||
let parent_diff = parent.2.len() + parent.3.len();
|
|
||||||
|
|
||||||
if diffsum * diffsum >= 2 * diff_to_sibling * parent_diff {
|
|
||||||
// Diff too big, we replace above layer(s)
|
|
||||||
let mut parent_new = parent.2;
|
|
||||||
let mut parent_removed = parent.3;
|
|
||||||
|
|
||||||
for removed in statediffremoved {
|
|
||||||
if !parent_new.remove(&removed) {
|
|
||||||
// It was not added in the parent and we removed it
|
|
||||||
parent_removed.insert(removed);
|
|
||||||
}
|
|
||||||
// Else it was added in the parent and we removed it again. We can forget this change
|
|
||||||
}
|
|
||||||
|
|
||||||
for new in statediffnew {
|
|
||||||
if !parent_removed.remove(&new) {
|
|
||||||
// It was not touched in the parent and we added it
|
|
||||||
parent_new.insert(new);
|
|
||||||
}
|
|
||||||
// Else it was removed in the parent and we added it again. We can forget this change
|
|
||||||
}
|
|
||||||
|
|
||||||
self.save_state_from_diff(
|
|
||||||
shortstatehash,
|
|
||||||
parent_new,
|
|
||||||
parent_removed,
|
|
||||||
diffsum,
|
|
||||||
parent_states,
|
|
||||||
)?;
|
|
||||||
} else {
|
|
||||||
// Diff small enough, we add diff as layer on top of parent
|
|
||||||
let mut value = parent.0.to_be_bytes().to_vec();
|
|
||||||
for new in &statediffnew {
|
|
||||||
value.extend_from_slice(&new[..]);
|
|
||||||
}
|
|
||||||
|
|
||||||
if !statediffremoved.is_empty() {
|
|
||||||
value.extend_from_slice(&0_u64.to_be_bytes());
|
value.extend_from_slice(&0_u64.to_be_bytes());
|
||||||
for removed in &statediffremoved {
|
for removed in &diff.removed {
|
||||||
value.extend_from_slice(&removed[..]);
|
value.extend_from_slice(&removed[..]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -237,122 +45,4 @@
|
||||||
self.shortstatehash_statediff
|
self.shortstatehash_statediff
|
||||||
.insert(&shortstatehash.to_be_bytes(), &value)?;
|
.insert(&shortstatehash.to_be_bytes(), &value)?;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the new shortstatehash
|
|
||||||
pub fn save_state(
|
|
||||||
room_id: &RoomId,
|
|
||||||
new_state_ids_compressed: HashSet<CompressedStateEvent>,
|
|
||||||
) -> Result<(u64,
|
|
||||||
HashSet<CompressedStateEvent>, // added
|
|
||||||
HashSet<CompressedStateEvent>)> // removed
|
|
||||||
{
|
|
||||||
let previous_shortstatehash = self.d.current_shortstatehash(room_id)?;
|
|
||||||
|
|
||||||
let state_hash = self.calculate_hash(
|
|
||||||
&new_state_ids_compressed
|
|
||||||
.iter()
|
|
||||||
.map(|bytes| &bytes[..])
|
|
||||||
.collect::<Vec<_>>(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let (new_shortstatehash, already_existed) =
|
|
||||||
self.get_or_create_shortstatehash(&state_hash, &db.globals)?;
|
|
||||||
|
|
||||||
if Some(new_shortstatehash) == previous_shortstatehash {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
let states_parents = previous_shortstatehash
|
|
||||||
.map_or_else(|| Ok(Vec::new()), |p| self.load_shortstatehash_info(p))?;
|
|
||||||
|
|
||||||
let (statediffnew, statediffremoved) = if let Some(parent_stateinfo) = states_parents.last()
|
|
||||||
{
|
|
||||||
let statediffnew: HashSet<_> = new_state_ids_compressed
|
|
||||||
.difference(&parent_stateinfo.1)
|
|
||||||
.copied()
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let statediffremoved: HashSet<_> = parent_stateinfo
|
|
||||||
.1
|
|
||||||
.difference(&new_state_ids_compressed)
|
|
||||||
.copied()
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
(statediffnew, statediffremoved)
|
|
||||||
} else {
|
|
||||||
(new_state_ids_compressed, HashSet::new())
|
|
||||||
};
|
|
||||||
|
|
||||||
if !already_existed {
|
|
||||||
self.save_state_from_diff(
|
|
||||||
new_shortstatehash,
|
|
||||||
statediffnew.clone(),
|
|
||||||
statediffremoved,
|
|
||||||
2, // every state change is 2 event changes on average
|
|
||||||
states_parents,
|
|
||||||
)?;
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok((new_shortstatehash, statediffnew, statediffremoved))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn get_auth_chain_from_cache<'a>(
|
|
||||||
&'a self,
|
|
||||||
key: &[u64],
|
|
||||||
) -> Result<Option<Arc<HashSet<u64>>>> {
|
|
||||||
// Check RAM cache
|
|
||||||
if let Some(result) = self.auth_chain_cache.lock().unwrap().get_mut(key) {
|
|
||||||
return Ok(Some(Arc::clone(result)));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check DB cache
|
|
||||||
if key.len() == 1 {
|
|
||||||
if let Some(chain) =
|
|
||||||
self.shorteventid_authchain
|
|
||||||
.get(&key[0].to_be_bytes())?
|
|
||||||
.map(|chain| {
|
|
||||||
chain
|
|
||||||
.chunks_exact(size_of::<u64>())
|
|
||||||
.map(|chunk| {
|
|
||||||
utils::u64_from_bytes(chunk).expect("byte length is correct")
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
})
|
|
||||||
{
|
|
||||||
let chain = Arc::new(chain);
|
|
||||||
|
|
||||||
// Cache in RAM
|
|
||||||
self.auth_chain_cache
|
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.insert(vec![key[0]], Arc::clone(&chain));
|
|
||||||
|
|
||||||
return Ok(Some(chain));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn cache_auth_chain(&self, key: Vec<u64>, chain: Arc<HashSet<u64>>) -> Result<()> {
|
|
||||||
// Persist in db
|
|
||||||
if key.len() == 1 {
|
|
||||||
self.shorteventid_authchain.insert(
|
|
||||||
&key[0].to_be_bytes(),
|
|
||||||
&chain
|
|
||||||
.iter()
|
|
||||||
.flat_map(|s| s.to_be_bytes().to_vec())
|
|
||||||
.collect::<Vec<u8>>(),
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Cache in RAM
|
|
||||||
self.auth_chain_cache.lock().unwrap().insert(key, chain);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
4
src/service/rooms/auth_chain/data.rs
Normal file
4
src/service/rooms/auth_chain/data.rs
Normal file
|
@ -0,0 +1,4 @@
|
||||||
|
pub trait Data {
|
||||||
|
fn get_cached_eventid_authchain<'a>() -> Result<HashSet<u64>>;
|
||||||
|
fn cache_eventid_authchain<'a>(shorteventid: u64, auth_chain: &HashSet<u64>) -> Result<HashSet<u64>>;
|
||||||
|
}
|
|
@ -1,327 +1,27 @@
|
||||||
|
mod data;
|
||||||
|
pub use data::Data;
|
||||||
|
|
||||||
/// Returns a stack with info on shortstatehash, full state, added diff and removed diff for the selected shortstatehash and each parent layer.
|
use crate::service::*;
|
||||||
|
|
||||||
|
pub struct Service<D: Data> {
|
||||||
|
db: D,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Service<_> {
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
pub fn load_shortstatehash_info(
|
pub fn get_cached_eventid_authchain<'a>(
|
||||||
&self,
|
|
||||||
shortstatehash: u64,
|
|
||||||
) -> Result<
|
|
||||||
Vec<(
|
|
||||||
u64, // sstatehash
|
|
||||||
HashSet<CompressedStateEvent>, // full state
|
|
||||||
HashSet<CompressedStateEvent>, // added
|
|
||||||
HashSet<CompressedStateEvent>, // removed
|
|
||||||
)>,
|
|
||||||
> {
|
|
||||||
if let Some(r) = self
|
|
||||||
.stateinfo_cache
|
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.get_mut(&shortstatehash)
|
|
||||||
{
|
|
||||||
return Ok(r.clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
let value = self
|
|
||||||
.shortstatehash_statediff
|
|
||||||
.get(&shortstatehash.to_be_bytes())?
|
|
||||||
.ok_or_else(|| Error::bad_database("State hash does not exist"))?;
|
|
||||||
let parent =
|
|
||||||
utils::u64_from_bytes(&value[0..size_of::<u64>()]).expect("bytes have right length");
|
|
||||||
|
|
||||||
let mut add_mode = true;
|
|
||||||
let mut added = HashSet::new();
|
|
||||||
let mut removed = HashSet::new();
|
|
||||||
|
|
||||||
let mut i = size_of::<u64>();
|
|
||||||
while let Some(v) = value.get(i..i + 2 * size_of::<u64>()) {
|
|
||||||
if add_mode && v.starts_with(&0_u64.to_be_bytes()) {
|
|
||||||
add_mode = false;
|
|
||||||
i += size_of::<u64>();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if add_mode {
|
|
||||||
added.insert(v.try_into().expect("we checked the size above"));
|
|
||||||
} else {
|
|
||||||
removed.insert(v.try_into().expect("we checked the size above"));
|
|
||||||
}
|
|
||||||
i += 2 * size_of::<u64>();
|
|
||||||
}
|
|
||||||
|
|
||||||
if parent != 0_u64 {
|
|
||||||
let mut response = self.load_shortstatehash_info(parent)?;
|
|
||||||
let mut state = response.last().unwrap().1.clone();
|
|
||||||
state.extend(added.iter().copied());
|
|
||||||
for r in &removed {
|
|
||||||
state.remove(r);
|
|
||||||
}
|
|
||||||
|
|
||||||
response.push((shortstatehash, state, added, removed));
|
|
||||||
|
|
||||||
Ok(response)
|
|
||||||
} else {
|
|
||||||
let response = vec![(shortstatehash, added.clone(), added, removed)];
|
|
||||||
self.stateinfo_cache
|
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.insert(shortstatehash, response.clone());
|
|
||||||
Ok(response)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn compress_state_event(
|
|
||||||
&self,
|
|
||||||
shortstatekey: u64,
|
|
||||||
event_id: &EventId,
|
|
||||||
globals: &super::globals::Globals,
|
|
||||||
) -> Result<CompressedStateEvent> {
|
|
||||||
let mut v = shortstatekey.to_be_bytes().to_vec();
|
|
||||||
v.extend_from_slice(
|
|
||||||
&self
|
|
||||||
.get_or_create_shorteventid(event_id, globals)?
|
|
||||||
.to_be_bytes(),
|
|
||||||
);
|
|
||||||
Ok(v.try_into().expect("we checked the size above"))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns shortstatekey, event id
|
|
||||||
pub fn parse_compressed_state_event(
|
|
||||||
&self,
|
|
||||||
compressed_event: CompressedStateEvent,
|
|
||||||
) -> Result<(u64, Arc<EventId>)> {
|
|
||||||
Ok((
|
|
||||||
utils::u64_from_bytes(&compressed_event[0..size_of::<u64>()])
|
|
||||||
.expect("bytes have right length"),
|
|
||||||
self.get_eventid_from_short(
|
|
||||||
utils::u64_from_bytes(&compressed_event[size_of::<u64>()..])
|
|
||||||
.expect("bytes have right length"),
|
|
||||||
)?,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Creates a new shortstatehash that often is just a diff to an already existing
|
|
||||||
/// shortstatehash and therefore very efficient.
|
|
||||||
///
|
|
||||||
/// There are multiple layers of diffs. The bottom layer 0 always contains the full state. Layer
|
|
||||||
/// 1 contains diffs to states of layer 0, layer 2 diffs to layer 1 and so on. If layer n > 0
|
|
||||||
/// grows too big, it will be combined with layer n-1 to create a new diff on layer n-1 that's
|
|
||||||
/// based on layer n-2. If that layer is also too big, it will recursively fix above layers too.
|
|
||||||
///
|
|
||||||
/// * `shortstatehash` - Shortstatehash of this state
|
|
||||||
/// * `statediffnew` - Added to base. Each vec is shortstatekey+shorteventid
|
|
||||||
/// * `statediffremoved` - Removed from base. Each vec is shortstatekey+shorteventid
|
|
||||||
/// * `diff_to_sibling` - Approximately how much the diff grows each time for this layer
|
|
||||||
/// * `parent_states` - A stack with info on shortstatehash, full state, added diff and removed diff for each parent layer
|
|
||||||
#[tracing::instrument(skip(
|
|
||||||
self,
|
|
||||||
statediffnew,
|
|
||||||
statediffremoved,
|
|
||||||
diff_to_sibling,
|
|
||||||
parent_states
|
|
||||||
))]
|
|
||||||
pub fn save_state_from_diff(
|
|
||||||
&self,
|
|
||||||
shortstatehash: u64,
|
|
||||||
statediffnew: HashSet<CompressedStateEvent>,
|
|
||||||
statediffremoved: HashSet<CompressedStateEvent>,
|
|
||||||
diff_to_sibling: usize,
|
|
||||||
mut parent_states: Vec<(
|
|
||||||
u64, // sstatehash
|
|
||||||
HashSet<CompressedStateEvent>, // full state
|
|
||||||
HashSet<CompressedStateEvent>, // added
|
|
||||||
HashSet<CompressedStateEvent>, // removed
|
|
||||||
)>,
|
|
||||||
) -> Result<()> {
|
|
||||||
let diffsum = statediffnew.len() + statediffremoved.len();
|
|
||||||
|
|
||||||
if parent_states.len() > 3 {
|
|
||||||
// Number of layers
|
|
||||||
// To many layers, we have to go deeper
|
|
||||||
let parent = parent_states.pop().unwrap();
|
|
||||||
|
|
||||||
let mut parent_new = parent.2;
|
|
||||||
let mut parent_removed = parent.3;
|
|
||||||
|
|
||||||
for removed in statediffremoved {
|
|
||||||
if !parent_new.remove(&removed) {
|
|
||||||
// It was not added in the parent and we removed it
|
|
||||||
parent_removed.insert(removed);
|
|
||||||
}
|
|
||||||
// Else it was added in the parent and we removed it again. We can forget this change
|
|
||||||
}
|
|
||||||
|
|
||||||
for new in statediffnew {
|
|
||||||
if !parent_removed.remove(&new) {
|
|
||||||
// It was not touched in the parent and we added it
|
|
||||||
parent_new.insert(new);
|
|
||||||
}
|
|
||||||
// Else it was removed in the parent and we added it again. We can forget this change
|
|
||||||
}
|
|
||||||
|
|
||||||
self.save_state_from_diff(
|
|
||||||
shortstatehash,
|
|
||||||
parent_new,
|
|
||||||
parent_removed,
|
|
||||||
diffsum,
|
|
||||||
parent_states,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
if parent_states.is_empty() {
|
|
||||||
// There is no parent layer, create a new state
|
|
||||||
let mut value = 0_u64.to_be_bytes().to_vec(); // 0 means no parent
|
|
||||||
for new in &statediffnew {
|
|
||||||
value.extend_from_slice(&new[..]);
|
|
||||||
}
|
|
||||||
|
|
||||||
if !statediffremoved.is_empty() {
|
|
||||||
warn!("Tried to create new state with removals");
|
|
||||||
}
|
|
||||||
|
|
||||||
self.shortstatehash_statediff
|
|
||||||
.insert(&shortstatehash.to_be_bytes(), &value)?;
|
|
||||||
|
|
||||||
return Ok(());
|
|
||||||
};
|
|
||||||
|
|
||||||
// Else we have two options.
|
|
||||||
// 1. We add the current diff on top of the parent layer.
|
|
||||||
// 2. We replace a layer above
|
|
||||||
|
|
||||||
let parent = parent_states.pop().unwrap();
|
|
||||||
let parent_diff = parent.2.len() + parent.3.len();
|
|
||||||
|
|
||||||
if diffsum * diffsum >= 2 * diff_to_sibling * parent_diff {
|
|
||||||
// Diff too big, we replace above layer(s)
|
|
||||||
let mut parent_new = parent.2;
|
|
||||||
let mut parent_removed = parent.3;
|
|
||||||
|
|
||||||
for removed in statediffremoved {
|
|
||||||
if !parent_new.remove(&removed) {
|
|
||||||
// It was not added in the parent and we removed it
|
|
||||||
parent_removed.insert(removed);
|
|
||||||
}
|
|
||||||
// Else it was added in the parent and we removed it again. We can forget this change
|
|
||||||
}
|
|
||||||
|
|
||||||
for new in statediffnew {
|
|
||||||
if !parent_removed.remove(&new) {
|
|
||||||
// It was not touched in the parent and we added it
|
|
||||||
parent_new.insert(new);
|
|
||||||
}
|
|
||||||
// Else it was removed in the parent and we added it again. We can forget this change
|
|
||||||
}
|
|
||||||
|
|
||||||
self.save_state_from_diff(
|
|
||||||
shortstatehash,
|
|
||||||
parent_new,
|
|
||||||
parent_removed,
|
|
||||||
diffsum,
|
|
||||||
parent_states,
|
|
||||||
)?;
|
|
||||||
} else {
|
|
||||||
// Diff small enough, we add diff as layer on top of parent
|
|
||||||
let mut value = parent.0.to_be_bytes().to_vec();
|
|
||||||
for new in &statediffnew {
|
|
||||||
value.extend_from_slice(&new[..]);
|
|
||||||
}
|
|
||||||
|
|
||||||
if !statediffremoved.is_empty() {
|
|
||||||
value.extend_from_slice(&0_u64.to_be_bytes());
|
|
||||||
for removed in &statediffremoved {
|
|
||||||
value.extend_from_slice(&removed[..]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
self.shortstatehash_statediff
|
|
||||||
.insert(&shortstatehash.to_be_bytes(), &value)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the new shortstatehash
|
|
||||||
pub fn save_state(
|
|
||||||
room_id: &RoomId,
|
|
||||||
new_state_ids_compressed: HashSet<CompressedStateEvent>,
|
|
||||||
) -> Result<(u64,
|
|
||||||
HashSet<CompressedStateEvent>, // added
|
|
||||||
HashSet<CompressedStateEvent>)> // removed
|
|
||||||
{
|
|
||||||
let previous_shortstatehash = self.d.current_shortstatehash(room_id)?;
|
|
||||||
|
|
||||||
let state_hash = self.calculate_hash(
|
|
||||||
&new_state_ids_compressed
|
|
||||||
.iter()
|
|
||||||
.map(|bytes| &bytes[..])
|
|
||||||
.collect::<Vec<_>>(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let (new_shortstatehash, already_existed) =
|
|
||||||
self.get_or_create_shortstatehash(&state_hash, &db.globals)?;
|
|
||||||
|
|
||||||
if Some(new_shortstatehash) == previous_shortstatehash {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
let states_parents = previous_shortstatehash
|
|
||||||
.map_or_else(|| Ok(Vec::new()), |p| self.load_shortstatehash_info(p))?;
|
|
||||||
|
|
||||||
let (statediffnew, statediffremoved) = if let Some(parent_stateinfo) = states_parents.last()
|
|
||||||
{
|
|
||||||
let statediffnew: HashSet<_> = new_state_ids_compressed
|
|
||||||
.difference(&parent_stateinfo.1)
|
|
||||||
.copied()
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let statediffremoved: HashSet<_> = parent_stateinfo
|
|
||||||
.1
|
|
||||||
.difference(&new_state_ids_compressed)
|
|
||||||
.copied()
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
(statediffnew, statediffremoved)
|
|
||||||
} else {
|
|
||||||
(new_state_ids_compressed, HashSet::new())
|
|
||||||
};
|
|
||||||
|
|
||||||
if !already_existed {
|
|
||||||
self.save_state_from_diff(
|
|
||||||
new_shortstatehash,
|
|
||||||
statediffnew.clone(),
|
|
||||||
statediffremoved,
|
|
||||||
2, // every state change is 2 event changes on average
|
|
||||||
states_parents,
|
|
||||||
)?;
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok((new_shortstatehash, statediffnew, statediffremoved))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn get_auth_chain_from_cache<'a>(
|
|
||||||
&'a self,
|
&'a self,
|
||||||
key: &[u64],
|
key: &[u64],
|
||||||
) -> Result<Option<Arc<HashSet<u64>>>> {
|
) -> Result<Option<Arc<HashSet<u64>>>> {
|
||||||
// Check RAM cache
|
// Check RAM cache
|
||||||
if let Some(result) = self.auth_chain_cache.lock().unwrap().get_mut(key) {
|
if let Some(result) = self.auth_chain_cache.lock().unwrap().get_mut(key.to_be_bytes()) {
|
||||||
return Ok(Some(Arc::clone(result)));
|
return Ok(Some(Arc::clone(result)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We only save auth chains for single events in the db
|
||||||
|
if key.len == 1 {
|
||||||
// Check DB cache
|
// Check DB cache
|
||||||
if key.len() == 1 {
|
if let Some(chain) = self.db.get_cached_eventid_authchain(key[0])
|
||||||
if let Some(chain) =
|
|
||||||
self.shorteventid_authchain
|
|
||||||
.get(&key[0].to_be_bytes())?
|
|
||||||
.map(|chain| {
|
|
||||||
chain
|
|
||||||
.chunks_exact(size_of::<u64>())
|
|
||||||
.map(|chunk| {
|
|
||||||
utils::u64_from_bytes(chunk).expect("byte length is correct")
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
})
|
|
||||||
{
|
{
|
||||||
let chain = Arc::new(chain);
|
let chain = Arc::new(chain);
|
||||||
|
|
||||||
|
@ -339,20 +39,15 @@
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
pub fn cache_auth_chain(&self, key: Vec<u64>, chain: Arc<HashSet<u64>>) -> Result<()> {
|
pub fn cache_auth_chain(&self, key: Vec<u64>, auth_chain: Arc<HashSet<u64>>) -> Result<()> {
|
||||||
// Persist in db
|
// Only persist single events in db
|
||||||
if key.len() == 1 {
|
if key.len() == 1 {
|
||||||
self.shorteventid_authchain.insert(
|
self.db.cache_auth_chain(key[0], auth_chain)?;
|
||||||
&key[0].to_be_bytes(),
|
|
||||||
&chain
|
|
||||||
.iter()
|
|
||||||
.flat_map(|s| s.to_be_bytes().to_vec())
|
|
||||||
.collect::<Vec<u8>>(),
|
|
||||||
)?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cache in RAM
|
// Cache in RAM
|
||||||
self.auth_chain_cache.lock().unwrap().insert(key, chain);
|
self.auth_chain_cache.lock().unwrap().insert(key, auth_chain);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
10
src/service/rooms/state_compressor/data.rs
Normal file
10
src/service/rooms/state_compressor/data.rs
Normal file
|
@ -0,0 +1,10 @@
|
||||||
|
struct StateDiff {
|
||||||
|
parent: Option<u64>,
|
||||||
|
added: Vec<CompressedStateEvent>,
|
||||||
|
removed: Vec<CompressedStateEvent>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait Data {
|
||||||
|
fn get_statediff(shortstatehash: u64) -> Result<StateDiff>;
|
||||||
|
fn save_statediff(shortstatehash: u64, diff: StateDiff) -> Result<()>;
|
||||||
|
}
|
|
@ -1,4 +1,13 @@
|
||||||
|
mod data;
|
||||||
|
pub use data::Data;
|
||||||
|
|
||||||
|
use crate::service::*;
|
||||||
|
|
||||||
|
pub struct Service<D: Data> {
|
||||||
|
db: D,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Service<_> {
|
||||||
/// Returns a stack with info on shortstatehash, full state, added diff and removed diff for the selected shortstatehash and each parent layer.
|
/// Returns a stack with info on shortstatehash, full state, added diff and removed diff for the selected shortstatehash and each parent layer.
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
pub fn load_shortstatehash_info(
|
pub fn load_shortstatehash_info(
|
||||||
|
@ -21,31 +30,7 @@
|
||||||
return Ok(r.clone());
|
return Ok(r.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
let value = self
|
self.db.get_statediff(shortstatehash)?;
|
||||||
.shortstatehash_statediff
|
|
||||||
.get(&shortstatehash.to_be_bytes())?
|
|
||||||
.ok_or_else(|| Error::bad_database("State hash does not exist"))?;
|
|
||||||
let parent =
|
|
||||||
utils::u64_from_bytes(&value[0..size_of::<u64>()]).expect("bytes have right length");
|
|
||||||
|
|
||||||
let mut add_mode = true;
|
|
||||||
let mut added = HashSet::new();
|
|
||||||
let mut removed = HashSet::new();
|
|
||||||
|
|
||||||
let mut i = size_of::<u64>();
|
|
||||||
while let Some(v) = value.get(i..i + 2 * size_of::<u64>()) {
|
|
||||||
if add_mode && v.starts_with(&0_u64.to_be_bytes()) {
|
|
||||||
add_mode = false;
|
|
||||||
i += size_of::<u64>();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if add_mode {
|
|
||||||
added.insert(v.try_into().expect("we checked the size above"));
|
|
||||||
} else {
|
|
||||||
removed.insert(v.try_into().expect("we checked the size above"));
|
|
||||||
}
|
|
||||||
i += 2 * size_of::<u64>();
|
|
||||||
}
|
|
||||||
|
|
||||||
if parent != 0_u64 {
|
if parent != 0_u64 {
|
||||||
let mut response = self.load_shortstatehash_info(parent)?;
|
let mut response = self.load_shortstatehash_info(parent)?;
|
||||||
|
@ -170,17 +155,7 @@
|
||||||
|
|
||||||
if parent_states.is_empty() {
|
if parent_states.is_empty() {
|
||||||
// There is no parent layer, create a new state
|
// There is no parent layer, create a new state
|
||||||
let mut value = 0_u64.to_be_bytes().to_vec(); // 0 means no parent
|
self.db.save_statediff(shortstatehash, StateDiff { parent: 0, new: statediffnew, removed: statediffremoved })?;
|
||||||
for new in &statediffnew {
|
|
||||||
value.extend_from_slice(&new[..]);
|
|
||||||
}
|
|
||||||
|
|
||||||
if !statediffremoved.is_empty() {
|
|
||||||
warn!("Tried to create new state with removals");
|
|
||||||
}
|
|
||||||
|
|
||||||
self.shortstatehash_statediff
|
|
||||||
.insert(&shortstatehash.to_be_bytes(), &value)?;
|
|
||||||
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
};
|
};
|
||||||
|
@ -222,20 +197,7 @@
|
||||||
)?;
|
)?;
|
||||||
} else {
|
} else {
|
||||||
// Diff small enough, we add diff as layer on top of parent
|
// Diff small enough, we add diff as layer on top of parent
|
||||||
let mut value = parent.0.to_be_bytes().to_vec();
|
self.db.save_statediff(shortstatehash, StateDiff { parent: parent.0, new: statediffnew, removed: statediffremoved })?;
|
||||||
for new in &statediffnew {
|
|
||||||
value.extend_from_slice(&new[..]);
|
|
||||||
}
|
|
||||||
|
|
||||||
if !statediffremoved.is_empty() {
|
|
||||||
value.extend_from_slice(&0_u64.to_be_bytes());
|
|
||||||
for removed in &statediffremoved {
|
|
||||||
value.extend_from_slice(&removed[..]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
self.shortstatehash_statediff
|
|
||||||
.insert(&shortstatehash.to_be_bytes(), &value)?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -298,61 +260,4 @@
|
||||||
|
|
||||||
Ok((new_shortstatehash, statediffnew, statediffremoved))
|
Ok((new_shortstatehash, statediffnew, statediffremoved))
|
||||||
}
|
}
|
||||||
|
}
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn get_auth_chain_from_cache<'a>(
|
|
||||||
&'a self,
|
|
||||||
key: &[u64],
|
|
||||||
) -> Result<Option<Arc<HashSet<u64>>>> {
|
|
||||||
// Check RAM cache
|
|
||||||
if let Some(result) = self.auth_chain_cache.lock().unwrap().get_mut(key) {
|
|
||||||
return Ok(Some(Arc::clone(result)));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check DB cache
|
|
||||||
if key.len() == 1 {
|
|
||||||
if let Some(chain) =
|
|
||||||
self.shorteventid_authchain
|
|
||||||
.get(&key[0].to_be_bytes())?
|
|
||||||
.map(|chain| {
|
|
||||||
chain
|
|
||||||
.chunks_exact(size_of::<u64>())
|
|
||||||
.map(|chunk| {
|
|
||||||
utils::u64_from_bytes(chunk).expect("byte length is correct")
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
})
|
|
||||||
{
|
|
||||||
let chain = Arc::new(chain);
|
|
||||||
|
|
||||||
// Cache in RAM
|
|
||||||
self.auth_chain_cache
|
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.insert(vec![key[0]], Arc::clone(&chain));
|
|
||||||
|
|
||||||
return Ok(Some(chain));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn cache_auth_chain(&self, key: Vec<u64>, chain: Arc<HashSet<u64>>) -> Result<()> {
|
|
||||||
// Persist in db
|
|
||||||
if key.len() == 1 {
|
|
||||||
self.shorteventid_authchain.insert(
|
|
||||||
&key[0].to_be_bytes(),
|
|
||||||
&chain
|
|
||||||
.iter()
|
|
||||||
.flat_map(|s| s.to_be_bytes().to_vec())
|
|
||||||
.collect::<Vec<u8>>(),
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Cache in RAM
|
|
||||||
self.auth_chain_cache.lock().unwrap().insert(key, chain);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue