Remove StateStore trait from state-res collect events needed

This commit is contained in:
Devin Ragotzy 2021-01-06 08:52:30 -05:00 committed by Timo Kösters
parent 7c4d74bf9b
commit 8a035880f0
No known key found for this signature in database
GPG key ID: 24DA7517711A2BA4
5 changed files with 170 additions and 117 deletions

View file

@ -618,7 +618,6 @@ async fn join_room_by_id_helper(
&room_id,
&control_events,
&mut event_map,
&db.rooms,
&event_ids,
);
@ -629,7 +628,6 @@ async fn join_room_by_id_helper(
&sorted_control_events,
&BTreeMap::new(), // We have no "clean/resolved" events to add (these extend the `resolved_control_events`)
&mut event_map,
&db.rooms,
)
.expect("iterative auth check failed on resolved events");
@ -654,7 +652,6 @@ async fn join_room_by_id_helper(
&events_to_sort,
power_level,
&mut event_map,
&db.rooms,
);
let resolved_events = state_res::StateResolution::iterative_auth_check(
@ -663,7 +660,6 @@ async fn join_room_by_id_helper(
&sorted_event_ids,
&resolved_control_events,
&mut event_map,
&db.rooms,
)
.expect("iterative auth check failed on resolved events");

View file

@ -67,40 +67,6 @@ pub struct Rooms {
pub(super) stateid_pduid: sled::Tree, // StateId = StateHash + Short, PduId = Count (without roomid)
}
impl StateStore<PduEvent> for Rooms {
fn get_event(&self, room_id: &RoomId, event_id: &EventId) -> state_res::Result<Arc<PduEvent>> {
let pid = self
.get_pdu_id(event_id)
.map_err(StateError::custom)?
.ok_or_else(|| {
StateError::NotFound(format!(
"PDU via room_id and event_id not found in the db: {}",
event_id.as_str()
))
})?;
serde_json::from_slice(
&self
.pduid_pdu
.get(pid)
.map_err(StateError::custom)?
.ok_or_else(|| StateError::NotFound("PDU via pduid not found in db.".into()))?,
)
.map_err(Into::into)
.and_then(|pdu: PduEvent| {
// conduit's PDU's always contain a room_id but some
// of ruma's do not so this must be an Option
if pdu.room_id() == room_id {
Ok(Arc::new(pdu))
} else {
Err(StateError::NotFound(
"Found PDU for incorrect room in db.".into(),
))
}
})
}
}
impl Rooms {
/// Builds a StateMap by iterating over all keys that start
/// with state_hash, this gives the full state for the given state_hash.
@ -222,6 +188,72 @@ impl Rooms {
Ok(events)
}
/// Returns a Vec of the related auth events to the given `event`.
///
/// A recursive list of all the auth_events going back to `RoomCreate` for each event in `event_ids`.
pub fn auth_events_full(
&self,
room_id: &RoomId,
event_ids: &[EventId],
) -> Result<Vec<PduEvent>> {
let mut result = BTreeMap::new();
let mut stack = event_ids.to_vec();
// DFS for auth event chain
while !stack.is_empty() {
let ev_id = stack.pop().unwrap();
if result.contains_key(&ev_id) {
continue;
}
if let Some(ev) = self.get_pdu(&ev_id)? {
stack.extend(ev.auth_events());
result.insert(ev.event_id().clone(), ev);
}
}
Ok(result.into_iter().map(|(_, v)| v).collect())
}
/// Returns a Vec<EventId> representing the difference in auth chains of the given `events`.
///
/// Each inner `Vec` of `event_ids` represents a state set (state at each forward extremity).
pub fn auth_chain_diff(
&self,
room_id: &RoomId,
event_ids: Vec<Vec<EventId>>,
) -> Result<Vec<EventId>> {
use std::collections::BTreeSet;
let mut chains = vec![];
for ids in event_ids {
// TODO state store `auth_event_ids` returns self in the event ids list
// when an event returns `auth_event_ids` self is not contained
let chain = self
.auth_events_full(room_id, &ids)?
.into_iter()
.map(|pdu| pdu.event_id)
.collect::<BTreeSet<_>>();
chains.push(chain);
}
if let Some(chain) = chains.first() {
let rest = chains.iter().skip(1).flatten().cloned().collect();
let common = chain.intersection(&rest).collect::<Vec<_>>();
Ok(chains
.iter()
.flatten()
.filter(|id| !common.contains(&id))
.cloned()
.collect::<BTreeSet<_>>()
.into_iter()
.collect())
} else {
Ok(vec![])
}
}
/// Generate a new StateHash.
///
/// A unique hash made from hashing all PDU ids of the state joined with 0xff.

View file

@ -603,7 +603,7 @@ pub async fn send_transaction_message_route<'a>(
};
// 4. Passes authorization rules based on the event's auth events, otherwise it is rejected.
// TODO: To me this sounds more like the auth_events should be get the pdu.auth_events not
// TODO: To me this sounds more like the auth_events should be "get the pdu.auth_events" not
// the auth events that would be correct for this pdu. Put another way we should use the auth events
// the pdu claims are its auth events
let auth_events = db.rooms.get_auth_events(
@ -637,50 +637,56 @@ pub async fn send_transaction_message_route<'a>(
);
continue;
}
// End of step 4.
let (state_at_event, incoming_auth_events): (StateMap<Arc<PduEvent>>, _) = match db
.sending
.send_federation_request(
&db.globals,
server_name.clone(),
get_room_state_ids::v1::Request {
room_id: pdu.room_id(),
event_id: pdu.event_id(),
},
)
.await
{
Ok(res) => {
let state =
fetch_events(&db, server_name.clone(), &pub_key_map, &res.pdu_ids).await?;
// Sanity check: there are no conflicting events in the state we received
let mut seen = BTreeSet::new();
for ev in &state {
// If the key is already present
if !seen.insert((&ev.kind, &ev.state_key)) {
todo!("Server sent us an invalid state")
}
}
let state = state
.into_iter()
.map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), Arc::new(pdu)))
.collect();
(
state,
fetch_events(&db, server_name.clone(), &pub_key_map, &res.auth_chain_ids)
.await?,
// Step 5. event passes auth based on state at the event
let (state_at_event, incoming_auth_events): (StateMap<Arc<PduEvent>>, Vec<Arc<PduEvent>>) =
match db
.sending
.send_federation_request(
&db.globals,
server_name.clone(),
get_room_state_ids::v1::Request {
room_id: pdu.room_id(),
event_id: pdu.event_id(),
},
)
}
Err(_) => {
resolved_map.insert(
event.event_id().clone(),
Err("Fetching state for event failed".into()),
);
continue;
}
};
.await
{
Ok(res) => {
let state =
fetch_events(&db, server_name.clone(), &pub_key_map, &res.pdu_ids).await?;
// Sanity check: there are no conflicting events in the state we received
let mut seen = BTreeSet::new();
for ev in &state {
// If the key is already present
if !seen.insert((&ev.kind, &ev.state_key)) {
todo!("Server sent us an invalid state")
}
}
let state = state
.into_iter()
.map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), Arc::new(pdu)))
.collect();
(
state,
fetch_events(&db, server_name.clone(), &pub_key_map, &res.auth_chain_ids)
.await?
.into_iter()
.map(Arc::new)
.collect(),
)
}
Err(_) => {
resolved_map.insert(
event.event_id().clone(),
Err("Fetching state for event failed".into()),
);
continue;
}
};
if !state_res::event_auth::auth_check(
&RoomVersionId::Version6,
@ -698,6 +704,7 @@ pub async fn send_transaction_message_route<'a>(
);
continue;
}
// End of step 5.
// The event could still be soft failed
append_state_soft(&db, &pdu)?;
@ -724,18 +731,30 @@ pub async fn send_transaction_message_route<'a>(
}
}
// 6.
// Step 6. event passes auth based on state of all forks and current room state
let state_at_forks = if fork_states.is_empty() {
// State is empty
Default::default()
} else if fork_states.len() == 1 {
fork_states[0].clone()
} else {
let auth_events = fork_states
.iter()
.map(|map| {
db.rooms.auth_events_full(
pdu.room_id(),
&map.values()
.map(|pdu| pdu.event_id().clone())
.collect::<Vec<_>>(),
)
})
.collect();
// Add as much as we can to the `event_map` (less DB hits)
event_map.extend(
incoming_auth_events
.into_iter()
.map(|pdu| (pdu.event_id().clone(), Arc::new(pdu))),
.map(|pdu| (pdu.event_id().clone(), pdu)),
);
event_map.extend(
state_at_event
@ -754,8 +773,8 @@ pub async fn send_transaction_message_route<'a>(
.collect::<StateMap<_>>()
})
.collect::<Vec<_>>(),
&auth_events,
&mut event_map,
&db.rooms,
) {
Ok(res) => res
.into_iter()