Keep track of State at event for state resolution

feat: first steps towards joining rooms over federation
Add state-res as a dependency of conduit
Add reverse_topological_power_sort before append_pdu
Implement statehashstatid_pduid tree for keeping track of state
Clean up implementation of state_hash as key for tracking state
This commit is contained in:
Devin Ragotzy 2020-08-06 08:29:59 -04:00
parent 8e55623bde
commit c4f5a0a631
24 changed files with 818 additions and 356 deletions

View file

@ -9,7 +9,7 @@ use ruma::{
events::{
ignored_user_list,
room::{
join_rules, member,
member,
power_levels::{self, PowerLevelsEventContent},
redaction,
},
@ -18,19 +18,31 @@ use ruma::{
EventId, Raw, RoomAliasId, RoomId, UserId,
};
use sled::IVec;
use state_res::{event_auth, Requester, StateEvent, StateMap, StateStore};
use std::{
collections::{BTreeMap, HashMap},
collections::{hash_map::DefaultHasher, BTreeMap, HashMap},
convert::{TryFrom, TryInto},
hash::{Hash, Hasher},
mem,
result::Result as StdResult,
};
/// The unique identifier of each state group.
///
/// This is created when a state group is added to the database by
/// hashing the entire state.
pub type StateHashId = String;
/// This identifier consists of roomId + count. It represents a
/// unique event, it will never be overwritten or removed.
pub type PduId = IVec;
pub struct Rooms {
pub edus: edus::RoomEdus,
pub(super) pduid_pdu: sled::Tree, // PduId = RoomId + Count
pub(super) eventid_pduid: sled::Tree,
pub(super) roomid_pduleaves: sled::Tree,
pub(super) roomstateid_pdu: sled::Tree, // RoomStateId = Room + StateType + StateKey
pub(super) alias_roomid: sled::Tree,
pub(super) aliasid_alias: sled::Tree, // AliasId = RoomId + Count
pub(super) publicroomids: sled::Tree,
@ -42,9 +54,263 @@ pub struct Rooms {
pub(super) userroomid_invited: sled::Tree,
pub(super) roomuserid_invited: sled::Tree,
pub(super) userroomid_left: sled::Tree,
// STATE TREES
/// This holds the full current state, including the latest event.
pub(super) roomstateid_pduid: sled::Tree, // RoomStateId = Room + StateType + StateKey
/// This holds the full room state minus the latest event.
pub(super) pduid_statehash: sled::Tree, // PDU id -> StateHash
/// Also holds the full room state minus the latest event.
pub(super) stateid_pduid: sled::Tree, // StateId = StateHash + (EventType, StateKey)
}
impl StateStore for Rooms {
fn get_event(&self, room_id: &RoomId, event_id: &EventId) -> StdResult<StateEvent, String> {
let pid = self
.eventid_pduid
.get(event_id.as_bytes())
.map_err(|e| e.to_string())?
.ok_or_else(|| "PDU via room_id and event_id not found in the db.".to_owned())?;
utils::deserialize(
&self
.pduid_pdu
.get(pid)
.map_err(|e| e.to_string())?
.ok_or_else(|| "PDU via pduid not found in db.".to_owned())?,
)
.and_then(|pdu: StateEvent| {
// conduit's PDU's always contain a room_id but some
// of ruma's do not so this must be an Option
if pdu.room_id() == Some(room_id) {
Ok(pdu)
} else {
Err(Error::bad_database("Found PDU for incorrect room in db."))
}
})
.map_err(|e| e.to_string())
}
}
// These are the methods related to STATE resolution.
impl Rooms {
/// Generates a new StateHash and associates it with the incoming event.
///
/// This adds all current state events (not including the incoming event)
/// to `stateid_pduid` and adds the incoming event to `pduid_statehash`.
/// The incoming event is the `pdu_id` passed to this method.
pub fn append_state_pdu(&self, room_id: &RoomId, pdu_id: &[u8]) -> Result<StateHashId> {
let state_hash = self.new_state_hash_id(room_id)?;
let state = self.current_state_pduids(room_id)?;
let mut key = state_hash.as_bytes().to_vec();
key.push(0xff);
// TODO eventually we could avoid writing to the DB so much on every event
// by keeping track of the delta and write that every so often
for ((ev_ty, state_key), pid) in state {
let mut state_id = key.to_vec();
state_id.extend_from_slice(ev_ty.to_string().as_bytes());
key.push(0xff);
state_id.extend_from_slice(state_key.expect("state event").as_bytes());
key.push(0xff);
self.stateid_pduid.insert(&state_id, &pid)?;
}
// This event's state does not include the event itself. `current_state_pduids`
// uses `roomstateid_pduid` before the current event is inserted to the tree so the state
// will be everything up to but not including the incoming event.
self.pduid_statehash.insert(pdu_id, state_hash.as_bytes())?;
Ok(state_hash)
}
/// Builds a `StateMap` by iterating over all keys that start
/// with `state_hash`, this gives the full state at event "x".
pub fn get_statemap_by_hash(&self, state_hash: StateHashId) -> Result<StateMap<EventId>> {
self.stateid_pduid
.scan_prefix(state_hash.as_bytes())
.values()
.map(|pduid| {
self.pduid_pdu.get(&pduid?)?.map_or_else(
|| Err(Error::bad_database("Failed to find StateMap.")),
|b| {
serde_json::from_slice::<PduEvent>(&b)
.map_err(|_| Error::bad_database("Invalid PDU in db."))
},
)
})
.map(|pdu| {
let pdu = pdu?;
Ok(((pdu.kind, pdu.state_key), pdu.event_id))
})
.collect::<Result<StateMap<_>>>()
}
// TODO make this return Result
/// Fetches the previous StateHash ID to `current`.
pub fn prev_state_hash(&self, current: StateHashId) -> Option<StateHashId> {
let mut found = false;
for pair in self.pduid_statehash.iter().rev() {
let prev = utils::string_from_bytes(&pair.ok()?.1).ok()?;
if current == prev {
found = true;
}
if current != prev && found {
return Some(prev);
}
}
None
}
/// Fetch the current State using the `roomstateid_pduid` tree.
pub fn current_state_pduids(&self, room_id: &RoomId) -> Result<StateMap<PduId>> {
// TODO this could also scan roomstateid_pduid if we passed in room_id ?
self.roomstateid_pduid
.scan_prefix(room_id.as_bytes())
.values()
.map(|pduid| {
let pduid = &pduid?;
self.pduid_pdu.get(pduid)?.map_or_else(
|| {
Err(Error::bad_database(
"Failed to find current state of pduid's.",
))
},
|b| {
Ok((
serde_json::from_slice::<PduEvent>(&b)
.map_err(|_| Error::bad_database("Invalid PDU in db."))?,
pduid.clone(),
))
},
)
})
.map(|pair| {
let (pdu, id) = pair?;
Ok(((pdu.kind, pdu.state_key), id))
})
.collect::<Result<StateMap<_>>>()
}
/// Returns the last state hash key added to the db.
pub fn current_state_hash(&self, room_id: &RoomId) -> Result<StateHashId> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
// We must check here because this method is called outside and before
// `append_state_pdu` so the DB can be empty
if self.pduid_statehash.scan_prefix(prefix).next().is_none() {
// TODO use ring crate to hash
return Ok(room_id.as_str().to_owned());
}
self.pduid_statehash
.iter()
.next_back()
.map(|pair| {
utils::string_from_bytes(&pair?.1)
.map_err(|_| Error::bad_database("Invalid state hash string in db."))
})
.ok_or_else(|| Error::bad_database("No PDU's found for this room."))?
}
/// This fetches auth event_ids from the current state using the
/// full `roomstateid_pdu` tree.
pub fn get_auth_event_ids(
&self,
room_id: &RoomId,
kind: &EventType,
sender: &UserId,
state_key: Option<&str>,
content: serde_json::Value,
) -> Result<Vec<EventId>> {
let auth_events = state_res::auth_types_for_event(
kind.clone(),
sender,
state_key.map(|s| s.to_string()),
content,
);
let mut events = vec![];
for (event_type, state_key) in auth_events {
if let Some(state_key) = state_key.as_ref() {
if let Some(id) = self.room_state_get(room_id, &event_type, state_key)? {
events.push(id.event_id);
}
}
}
Ok(events)
}
// This fetches auth events from the current state using the
/// full `roomstateid_pdu` tree.
pub fn get_auth_events(
&self,
room_id: &RoomId,
kind: &EventType,
sender: &UserId,
state_key: Option<&str>,
content: serde_json::Value,
) -> Result<StateMap<PduEvent>> {
let auth_events = state_res::auth_types_for_event(
kind.clone(),
sender,
state_key.map(|s| s.to_string()),
content,
);
let mut events = StateMap::new();
for (event_type, state_key) in auth_events {
if let Some(s_key) = state_key.as_ref() {
if let Some(pdu) = self.room_state_get(room_id, &event_type, s_key)? {
events.insert((event_type, state_key), pdu);
}
}
}
Ok(events)
}
/// Generate a new StateHash.
///
/// A unique hash made from hashing the current states pduid's.
/// Because `append_state_pdu` handles the empty state db case it does not
/// have to be here.
fn new_state_hash_id(&self, room_id: &RoomId) -> Result<StateHashId> {
// Use hashed roomId as the first StateHash key for first state event in room
if self
.pduid_statehash
.scan_prefix(room_id.as_bytes())
.next()
.is_none()
{
// TODO use ring crate to hash
return Ok(room_id.as_str().to_owned());
}
let pdu_ids_to_hash = self
.pduid_statehash
.scan_prefix(room_id.as_bytes())
.values()
.next_back()
.unwrap() // We just checked if the tree was empty
.map(|hash| {
self.stateid_pduid
.scan_prefix(hash)
.values()
// pduid is roomId + count so just hash the whole thing
.map(|pid| Ok(pid?.to_vec()))
.collect::<Result<Vec<Vec<u8>>>>()
})??;
let mut hasher = DefaultHasher::new();
pdu_ids_to_hash.hash(&mut hasher);
let hash = hasher.finish().to_string();
// TODO not sure how you want to hash this
Ok(hash)
}
/// Checks if a room exists.
pub fn exists(&self, room_id: &RoomId) -> Result<bool> {
let mut prefix = room_id.to_string().as_bytes().to_vec();
@ -64,16 +330,20 @@ impl Rooms {
room_id: &RoomId,
) -> Result<HashMap<(EventType, String), PduEvent>> {
let mut hashmap = HashMap::new();
for pdu in self
.roomstateid_pdu
.scan_prefix(&room_id.to_string().as_bytes())
.values()
.map(|value| {
Ok::<_, Error>(
serde_json::from_slice::<PduEvent>(&value?)
for pdu in
self.roomstateid_pduid
.scan_prefix(&room_id.to_string().as_bytes())
.values()
.map(|value| {
Ok::<_, Error>(
serde_json::from_slice::<PduEvent>(
&self.pduid_pdu.get(value?)?.ok_or_else(|| {
Error::bad_database("PDU not found for ID in db.")
})?,
)
.map_err(|_| Error::bad_database("Invalid PDU in db."))?,
)
})
)
})
{
let pdu = pdu?;
let state_key = pdu.state_key.clone().ok_or_else(|| {
@ -95,16 +365,20 @@ impl Rooms {
prefix.extend_from_slice(&event_type.to_string().as_bytes());
let mut hashmap = HashMap::new();
for pdu in self
.roomstateid_pdu
.scan_prefix(&prefix)
.values()
.map(|value| {
Ok::<_, Error>(
serde_json::from_slice::<PduEvent>(&value?)
for pdu in
self.roomstateid_pduid
.scan_prefix(&prefix)
.values()
.map(|value| {
Ok::<_, Error>(
serde_json::from_slice::<PduEvent>(
&self.pduid_pdu.get(value?)?.ok_or_else(|| {
Error::bad_database("PDU not found for ID in db.")
})?,
)
.map_err(|_| Error::bad_database("Invalid PDU in db."))?,
)
})
)
})
{
let pdu = pdu?;
let state_key = pdu.state_key.clone().ok_or_else(|| {
@ -115,23 +389,28 @@ impl Rooms {
Ok(hashmap)
}
/// Returns the full room state.
/// Returns a single PDU in `room_id` with key (`event_type`, `state_key`).
pub fn room_state_get(
&self,
room_id: &RoomId,
event_type: &EventType,
state_key: &str,
) -> Result<Option<PduEvent>> {
let mut key = room_id.to_string().as_bytes().to_vec();
let mut key = room_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(&event_type.to_string().as_bytes());
key.push(0xff);
key.extend_from_slice(&state_key.as_bytes());
self.roomstateid_pdu.get(&key)?.map_or(Ok(None), |value| {
self.roomstateid_pduid.get(&key)?.map_or(Ok(None), |value| {
Ok::<_, Error>(Some(
serde_json::from_slice::<PduEvent>(&value)
.map_err(|_| Error::bad_database("Invalid PDU in db."))?,
serde_json::from_slice::<PduEvent>(
&self
.pduid_pdu
.get(value)?
.ok_or_else(|| Error::bad_database("PDU not found for ID in db."))?,
)
.map_err(|_| Error::bad_database("Invalid PDU in db."))?,
))
})
}
@ -139,7 +418,7 @@ impl Rooms {
/// Returns the `count` of this pdu's id.
pub fn get_pdu_count(&self, event_id: &EventId) -> Result<Option<u64>> {
self.eventid_pduid
.get(event_id.to_string().as_bytes())?
.get(event_id.as_bytes())?
.map_or(Ok(None), |pdu_id| {
Ok(Some(
utils::u64_from_bytes(
@ -153,7 +432,7 @@ impl Rooms {
/// Returns the json of a pdu.
pub fn get_pdu_json(&self, event_id: &EventId) -> Result<Option<serde_json::Value>> {
self.eventid_pduid
.get(event_id.to_string().as_bytes())?
.get(event_id.as_bytes())?
.map_or(Ok(None), |pdu_id| {
Ok(Some(
serde_json::from_slice(&self.pduid_pdu.get(pdu_id)?.ok_or_else(|| {
@ -174,7 +453,7 @@ impl Rooms {
/// Returns the pdu.
pub fn get_pdu(&self, event_id: &EventId) -> Result<Option<PduEvent>> {
self.eventid_pduid
.get(event_id.to_string().as_bytes())?
.get(event_id.as_bytes())?
.map_or(Ok(None), |pdu_id| {
Ok(Some(
serde_json::from_slice(&self.pduid_pdu.get(pdu_id)?.ok_or_else(|| {
@ -238,16 +517,15 @@ impl Rooms {
/// Replace the leaves of a room with a new event.
pub fn replace_pdu_leaves(&self, room_id: &RoomId, event_id: &EventId) -> Result<()> {
let mut prefix = room_id.to_string().as_bytes().to_vec();
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
for key in self.roomid_pduleaves.scan_prefix(&prefix).keys() {
self.roomid_pduleaves.remove(key?)?;
}
prefix.extend_from_slice(event_id.to_string().as_bytes());
self.roomid_pduleaves
.insert(&prefix, &*event_id.to_string())?;
prefix.extend_from_slice(event_id.as_bytes());
self.roomid_pduleaves.insert(&prefix, event_id.as_bytes())?;
Ok(())
}
@ -272,6 +550,14 @@ impl Rooms {
// TODO: Make sure this isn't called twice in parallel
let prev_events = self.get_pdu_leaves(&room_id)?;
let auth_events = self.get_auth_events(
&room_id,
&event_type,
&sender,
state_key.as_deref(),
content.clone(),
)?;
// Is the event authorized?
if let Some(state_key) = &state_key {
let power_levels = self
@ -333,138 +619,24 @@ impl Rooms {
// Don't allow encryption events when it's disabled
!globals.encryption_disabled()
}
EventType::RoomMember => {
let target_user_id = UserId::try_from(&**state_key).map_err(|_| {
Error::BadRequest(
ErrorKind::InvalidParam,
"State key of member event does not contain user id.",
)
})?;
let current_membership = self
.room_state_get(
&room_id,
&EventType::RoomMember,
&target_user_id.to_string(),
)?
.map_or(Ok::<_, Error>(member::MembershipState::Leave), |pdu| {
Ok(serde_json::from_value::<Raw<member::MemberEventContent>>(
pdu.content,
)
.expect("Raw::from_value always works.")
.deserialize()
.map_err(|_| Error::bad_database("Invalid Member event in db."))?
.membership)
})?;
let target_membership =
serde_json::from_value::<Raw<member::MemberEventContent>>(content.clone())
.expect("Raw::from_value always works.")
.deserialize()
.map_err(|_| Error::bad_database("Invalid Member event in db."))?
.membership;
let target_power = power_levels.users.get(&target_user_id).map_or_else(
|| {
if target_membership != member::MembershipState::Join {
None
} else {
Some(&power_levels.users_default)
}
},
// If it's okay, wrap with Some(_)
Some,
);
let join_rules =
self.room_state_get(&room_id, &EventType::RoomJoinRules, "")?
.map_or(Ok::<_, Error>(join_rules::JoinRule::Public), |pdu| {
Ok(serde_json::from_value::<
Raw<join_rules::JoinRulesEventContent>,
>(pdu.content)
.expect("Raw::from_value always works.")
.deserialize()
.map_err(|_| {
Error::bad_database("Database contains invalid JoinRules event")
})?
.join_rule)
})?;
if target_membership == member::MembershipState::Join {
let mut prev_events = prev_events.iter();
let prev_event = self
.get_pdu(prev_events.next().ok_or(Error::BadRequest(
ErrorKind::Unknown,
"Membership can't be the first event",
))?)?
.ok_or_else(|| {
Error::bad_database("PDU leaf points to invalid event!")
})?;
if prev_event.kind == EventType::RoomCreate
&& prev_event.prev_events.is_empty()
{
true
} else if sender != target_user_id {
false
} else if let member::MembershipState::Ban = current_membership {
false
} else {
join_rules == join_rules::JoinRule::Invite
&& (current_membership == member::MembershipState::Join
|| current_membership == member::MembershipState::Invite)
|| join_rules == join_rules::JoinRule::Public
}
} else if target_membership == member::MembershipState::Invite {
if let Some(third_party_invite_json) = content.get("third_party_invite") {
if current_membership == member::MembershipState::Ban {
false
} else {
let _third_party_invite =
serde_json::from_value::<member::ThirdPartyInvite>(
third_party_invite_json.clone(),
)
.map_err(|_| {
Error::BadRequest(
ErrorKind::InvalidParam,
"ThirdPartyInvite is invalid",
)
})?;
todo!("handle third party invites");
}
} else if sender_membership != member::MembershipState::Join
|| current_membership == member::MembershipState::Join
|| current_membership == member::MembershipState::Ban
{
false
} else {
sender_power
.filter(|&p| p >= &power_levels.invite)
.is_some()
}
} else if target_membership == member::MembershipState::Leave {
if sender == target_user_id {
current_membership == member::MembershipState::Join
|| current_membership == member::MembershipState::Invite
} else if sender_membership != member::MembershipState::Join
|| current_membership == member::MembershipState::Ban
&& sender_power.filter(|&p| p < &power_levels.ban).is_some()
{
false
} else {
sender_power.filter(|&p| p >= &power_levels.kick).is_some()
&& target_power < sender_power
}
} else if target_membership == member::MembershipState::Ban {
if sender_membership != member::MembershipState::Join {
false
} else {
sender_power.filter(|&p| p >= &power_levels.ban).is_some()
&& target_power < sender_power
}
} else {
false
}
}
EventType::RoomMember => event_auth::is_membership_change_allowed(
// TODO this is a bit of a hack but not sure how to have a type
// declared in `state_res` crate be
Requester {
prev_event_ids: prev_events.to_owned(),
room_id: &room_id,
content: &content,
state_key: Some(state_key.to_owned()),
sender: &sender,
},
&auth_events
.iter()
.map(|((ty, key), pdu)| {
Ok(((ty.clone(), key.clone()), pdu.convert_for_state_res()?))
})
.collect::<Result<StateMap<_>>>()?,
)
.ok_or(Error::Conflict("Found incoming PDU with invalid data."))?,
EventType::RoomCreate => prev_events.is_empty(),
// Not allow any of the following events if the sender is not joined.
_ if sender_membership != member::MembershipState::Join => false,
@ -474,7 +646,7 @@ impl Rooms {
>= &power_levels.state_default
}
} {
error!("Unauthorized");
error!("Unauthorized {}", event_type);
// Not authorized
return Err(Error::BadRequest(
ErrorKind::Forbidden,
@ -483,7 +655,7 @@ impl Rooms {
}
} else if !self.is_joined(&sender, &room_id)? {
// TODO: auth rules apply to all events, not only those with a state key
error!("Unauthorized");
error!("Unauthorized {}", event_type);
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Event is not authorized",
@ -524,7 +696,10 @@ impl Rooms {
depth: depth
.try_into()
.map_err(|_| Error::bad_database("Depth is invalid"))?,
auth_events: Vec::new(),
auth_events: auth_events
.into_iter()
.map(|(_, pdu)| pdu.event_id)
.collect(),
redacts: redacts.clone(),
unsigned,
hashes: ruma::events::pdu::EventHash {
@ -564,15 +739,19 @@ impl Rooms {
self.pduid_pdu.insert(&pdu_id, &*pdu_json.to_string())?;
self.eventid_pduid
.insert(pdu.event_id.to_string(), pdu_id.clone())?;
.insert(pdu.event_id.to_string(), &*pdu_id)?;
if let Some(state_key) = pdu.state_key {
let mut key = room_id.to_string().as_bytes().to_vec();
if let Some(state_key) = &pdu.state_key {
// We call this first because our StateHash relies on the
// state before the new event
self.append_state_pdu(&room_id, &pdu_id)?;
let mut key = room_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(pdu.kind.to_string().as_bytes());
key.push(0xff);
key.extend_from_slice(state_key.as_bytes());
self.roomstateid_pdu.insert(key, &*pdu_json.to_string())?;
self.roomstateid_pduid.insert(key, pdu_id.as_slice())?;
}
match event_type {