Upgrade Ruma

This commit is contained in:
Jonas Platte 2021-11-26 20:36:40 +01:00
parent 1fc616320a
commit 892a0525f2
No known key found for this signature in database
GPG key ID: CC154DE0E30B7C67
25 changed files with 297 additions and 304 deletions

View file

@ -552,7 +552,7 @@ pub fn get_server_keys_route(db: DatabaseGuard) -> Json<String> {
let mut verify_keys = BTreeMap::new();
verify_keys.insert(
ServerSigningKeyId::try_from(
Box::<ServerSigningKeyId>::try_from(
format!("ed25519:{}", db.globals.keypair().version()).as_str(),
)
.expect("found invalid server signing keys in DB"),
@ -736,7 +736,7 @@ pub async fn send_transaction_message_route(
// 0. Check the server is in the room
let room_id = match value
.get("room_id")
.and_then(|id| RoomId::try_from(id.as_str()?).ok())
.and_then(|id| Box::<RoomId>::try_from(id.as_str()?).ok())
{
Some(id) => id,
None => {
@ -1003,11 +1003,10 @@ pub(crate) async fn handle_incoming_pdu<'a>(
// 9. Fetch any missing prev events doing all checks listed here starting at 1. These are timeline events
let mut graph = HashMap::new();
let mut eventid_info = HashMap::new();
let mut todo_outlier_stack: Vec<_> = incoming_pdu
let mut todo_outlier_stack: Vec<Arc<EventId>> = incoming_pdu
.prev_events
.iter()
.cloned()
.map(Arc::new)
.map(|x| Arc::from(&**x))
.collect();
let mut amount = 0;
@ -1027,7 +1026,7 @@ pub(crate) async fn handle_incoming_pdu<'a>(
if amount > 100 {
// Max limit reached
warn!("Max prev event limit reached!");
graph.insert((*prev_event_id).clone(), HashSet::new());
graph.insert((*prev_event_id).to_owned(), HashSet::new());
continue;
}
@ -1038,27 +1037,27 @@ pub(crate) async fn handle_incoming_pdu<'a>(
amount += 1;
for prev_prev in &pdu.prev_events {
if !graph.contains_key(prev_prev) {
todo_outlier_stack.push(dbg!(Arc::new(prev_prev.clone())));
todo_outlier_stack.push(dbg!(Arc::from(&**prev_prev)));
}
}
graph.insert(
(*prev_event_id).clone(),
(*prev_event_id).to_owned(),
pdu.prev_events.iter().cloned().collect(),
);
} else {
// Time based check failed
graph.insert((*prev_event_id).clone(), HashSet::new());
graph.insert((*prev_event_id).to_owned(), HashSet::new());
}
eventid_info.insert(prev_event_id.clone(), (pdu, json));
} else {
// Get json failed
graph.insert((*prev_event_id).clone(), HashSet::new());
graph.insert((*prev_event_id).to_owned(), HashSet::new());
}
} else {
// Fetch and handle failed
graph.insert((*prev_event_id).clone(), HashSet::new());
graph.insert((*prev_event_id).to_owned(), HashSet::new());
}
}
@ -1074,7 +1073,6 @@ pub(crate) async fn handle_incoming_pdu<'a>(
.get(event_id)
.map_or_else(|| uint!(0), |info| info.0.origin_server_ts),
),
ruma::event_id!("$notimportant"),
))
})
.map_err(|_| "Error sorting prev events".to_owned())?;
@ -1084,7 +1082,7 @@ pub(crate) async fn handle_incoming_pdu<'a>(
if errors >= 5 {
break;
}
if let Some((pdu, json)) = eventid_info.remove(&prev_id) {
if let Some((pdu, json)) = eventid_info.remove(&*prev_id) {
if pdu.origin_server_ts < first_pdu_in_room.origin_server_ts {
continue;
}
@ -1200,8 +1198,7 @@ fn handle_outlier_pdu<'a>(
&incoming_pdu
.auth_events
.iter()
.cloned()
.map(Arc::new)
.map(|x| Arc::from(&**x))
.collect::<Vec<_>>(),
create_event,
room_id,
@ -1331,7 +1328,7 @@ async fn upgrade_outlier_to_timeline_pdu(
let mut state_at_incoming_event = None;
if incoming_pdu.prev_events.len() == 1 {
let prev_event = &incoming_pdu.prev_events[0];
let prev_event = &*incoming_pdu.prev_events[0];
let prev_event_sstatehash = db
.rooms
.pdu_shortstatehash(prev_event)
@ -1353,7 +1350,7 @@ async fn upgrade_outlier_to_timeline_pdu(
.get_or_create_shortstatekey(&prev_pdu.kind, state_key, &db.globals)
.map_err(|_| "Failed to create shortstatekey.".to_owned())?;
state.insert(shortstatekey, Arc::new(prev_event.clone()));
state.insert(shortstatekey, Arc::from(prev_event));
// Now it's the state after the pdu
}
@ -1397,7 +1394,7 @@ async fn upgrade_outlier_to_timeline_pdu(
.rooms
.get_or_create_shortstatekey(&prev_event.kind, state_key, &db.globals)
.map_err(|_| "Failed to create shortstatekey.".to_owned())?;
leaf_state.insert(shortstatekey, Arc::new(prev_event.event_id.clone()));
leaf_state.insert(shortstatekey, Arc::from(&*prev_event.event_id));
// Now it's the state after the pdu
}
@ -1410,14 +1407,14 @@ async fn upgrade_outlier_to_timeline_pdu(
.get_statekey_from_short(k)
.map_err(|_| "Failed to get_statekey_from_short.".to_owned())?;
state.insert(k, (*id).clone());
state.insert(k, (*id).to_owned());
starting_events.push(id);
}
auth_chain_sets.push(
get_auth_chain(room_id, starting_events, db)
.map_err(|_| "Failed to load auth chain.".to_owned())?
.map(|event_id| (*event_id).clone())
.map(|event_id| (*event_id).to_owned())
.collect(),
);
@ -1444,7 +1441,7 @@ async fn upgrade_outlier_to_timeline_pdu(
.rooms
.get_or_create_shortstatekey(&event_type, &state_key, &db.globals)
.map_err(|_| "Failed to get_or_create_shortstatekey".to_owned())?;
Ok((shortstatekey, Arc::new(event_id)))
Ok((shortstatekey, Arc::from(event_id)))
})
.collect::<Result<_, String>>()?,
),
@ -1479,8 +1476,7 @@ async fn upgrade_outlier_to_timeline_pdu(
origin,
&res.pdu_ids
.iter()
.cloned()
.map(Arc::new)
.map(|x| Arc::from(&**x))
.collect::<Vec<_>>(),
create_event,
room_id,
@ -1488,7 +1484,7 @@ async fn upgrade_outlier_to_timeline_pdu(
)
.await;
let mut state = BTreeMap::new();
let mut state: BTreeMap<_, Arc<EventId>> = BTreeMap::new();
for (pdu, _) in state_vec {
let state_key = pdu
.state_key
@ -1502,7 +1498,7 @@ async fn upgrade_outlier_to_timeline_pdu(
match state.entry(shortstatekey) {
btree_map::Entry::Vacant(v) => {
v.insert(Arc::new(pdu.event_id.clone()));
v.insert(Arc::from(&*pdu.event_id));
}
btree_map::Entry::Occupied(_) => return Err(
"State event's type and state_key combination exists multiple times."
@ -1577,7 +1573,7 @@ async fn upgrade_outlier_to_timeline_pdu(
.roomid_mutex_state
.write()
.unwrap()
.entry(room_id.clone())
.entry(room_id.to_owned())
.or_default(),
);
let state_lock = mutex_state.lock().await;
@ -1715,7 +1711,7 @@ async fn upgrade_outlier_to_timeline_pdu(
.rooms
.get_or_create_shortstatekey(&leaf_pdu.kind, state_key, &db.globals)
.map_err(|_| "Failed to create shortstatekey.".to_owned())?;
leaf_state.insert(shortstatekey, Arc::new(leaf_pdu.event_id.clone()));
leaf_state.insert(shortstatekey, Arc::from(&*leaf_pdu.event_id));
// Now it's the state after the pdu
}
@ -1730,7 +1726,7 @@ async fn upgrade_outlier_to_timeline_pdu(
.get_or_create_shortstatekey(&incoming_pdu.kind, state_key, &db.globals)
.map_err(|_| "Failed to create shortstatekey.".to_owned())?;
state_after.insert(shortstatekey, Arc::new(incoming_pdu.event_id.clone()));
state_after.insert(shortstatekey, Arc::from(&*incoming_pdu.event_id));
}
fork_states.push(state_after);
@ -1762,7 +1758,7 @@ async fn upgrade_outlier_to_timeline_pdu(
db,
)
.map_err(|_| "Failed to load auth chain.".to_owned())?
.map(|event_id| (*event_id).clone())
.map(|event_id| (*event_id).to_owned())
.collect(),
);
}
@ -1774,7 +1770,7 @@ async fn upgrade_outlier_to_timeline_pdu(
.map(|(k, id)| {
db.rooms
.get_statekey_from_short(k)
.map(|k| (k, (*id).clone()))
.map(|k| (k, (*id).to_owned()))
})
.collect::<Result<StateMap<_>>>()
})
@ -1874,7 +1870,8 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
let mut pdus = vec![];
for id in events {
if let Some((time, tries)) = db.globals.bad_event_ratelimiter.read().unwrap().get(id) {
if let Some((time, tries)) = db.globals.bad_event_ratelimiter.read().unwrap().get(&**id)
{
// Exponential backoff
let mut min_elapsed_duration = Duration::from_secs(5 * 60) * (*tries) * (*tries);
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) {
@ -1914,7 +1911,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
match crate::pdu::gen_event_id_canonical_json(&res.pdu) {
Ok(t) => t,
Err(_) => {
back_off((**id).clone());
back_off((**id).to_owned());
continue;
}
};
@ -1939,14 +1936,14 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
Ok((pdu, json)) => (pdu, Some(json)),
Err(e) => {
warn!("Authentication of event {} failed: {:?}", id, e);
back_off((**id).clone());
back_off((**id).to_owned());
continue;
}
}
}
Err(_) => {
warn!("Failed to fetch event: {}", id);
back_off((**id).clone());
back_off((**id).to_owned());
continue;
}
}
@ -2128,7 +2125,7 @@ fn append_incoming_pdu(
db: &Database,
pdu: &PduEvent,
pdu_json: CanonicalJsonObject,
new_room_leaves: HashSet<EventId>,
new_room_leaves: HashSet<Box<EventId>>,
state_ids_compressed: HashSet<CompressedStateEvent>,
soft_fail: bool,
_mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room mutex
@ -2298,13 +2295,13 @@ fn get_auth_chain_inner(
event_id: &EventId,
db: &Database,
) -> Result<HashSet<u64>> {
let mut todo = vec![event_id.clone()];
let mut todo = vec![event_id.to_owned()];
let mut found = HashSet::new();
while let Some(event_id) = todo.pop() {
match db.rooms.get_pdu(&event_id) {
Ok(Some(pdu)) => {
if &pdu.room_id != room_id {
if pdu.room_id != room_id {
return Err(Error::BadRequest(ErrorKind::Forbidden, "Evil event in db"));
}
for auth_event in &pdu.auth_events {
@ -2314,7 +2311,7 @@ fn get_auth_chain_inner(
if !found.contains(&sauthevent) {
found.insert(sauthevent);
todo.push(auth_event.clone());
todo.push(auth_event.to_owned());
}
}
}
@ -2363,7 +2360,7 @@ pub fn get_event_route(
.and_then(|val| val.as_str())
.ok_or_else(|| Error::bad_database("Invalid event in database"))?;
let room_id = RoomId::try_from(room_id_str)
let room_id = Box::<RoomId>::try_from(room_id_str)
.map_err(|_| Error::bad_database("Invalid room id field in event in database"))?;
if !db.rooms.server_in_room(sender_servername, &room_id)? {
@ -2417,7 +2414,7 @@ pub fn get_missing_events_route(
.and_then(|val| val.as_str())
.ok_or_else(|| Error::bad_database("Invalid event in database"))?;
let event_room_id = RoomId::try_from(room_id_str)
let event_room_id = Box::<RoomId>::try_from(room_id_str)
.map_err(|_| Error::bad_database("Invalid room id field in event in database"))?;
if event_room_id != body.room_id {
@ -2436,7 +2433,7 @@ pub fn get_missing_events_route(
continue;
}
queued_events.extend_from_slice(
&serde_json::from_value::<Vec<EventId>>(
&serde_json::from_value::<Vec<Box<EventId>>>(
serde_json::to_value(pdu.get("prev_events").cloned().ok_or_else(|| {
Error::bad_database("Event in db has no prev_events field.")
})?)
@ -2485,14 +2482,14 @@ pub fn get_event_authorization_route(
.and_then(|val| val.as_str())
.ok_or_else(|| Error::bad_database("Invalid event in database"))?;
let room_id = RoomId::try_from(room_id_str)
let room_id = Box::<RoomId>::try_from(room_id_str)
.map_err(|_| Error::bad_database("Invalid room id field in event in database"))?;
if !db.rooms.server_in_room(sender_servername, &room_id)? {
return Err(Error::BadRequest(ErrorKind::NotFound, "Event not found."));
}
let auth_chain_ids = get_auth_chain(&room_id, vec![Arc::new(body.event_id.clone())], &db)?;
let auth_chain_ids = get_auth_chain(&room_id, vec![Arc::from(&*body.event_id)], &db)?;
Ok(get_event_authorization::v1::Response {
auth_chain: auth_chain_ids
@ -2550,7 +2547,7 @@ pub fn get_room_state_route(
})
.collect();
let auth_chain_ids = get_auth_chain(&body.room_id, vec![Arc::new(body.event_id.clone())], &db)?;
let auth_chain_ids = get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)], &db)?;
Ok(get_room_state::v1::Response {
auth_chain: auth_chain_ids
@ -2606,13 +2603,13 @@ pub fn get_room_state_ids_route(
.rooms
.state_full_ids(shortstatehash)?
.into_iter()
.map(|(_, id)| (*id).clone())
.map(|(_, id)| (*id).to_owned())
.collect();
let auth_chain_ids = get_auth_chain(&body.room_id, vec![Arc::new(body.event_id.clone())], &db)?;
let auth_chain_ids = get_auth_chain(&body.room_id, vec![Arc::from(&*body.event_id)], &db)?;
Ok(get_room_state_ids::v1::Response {
auth_chain_ids: auth_chain_ids.map(|id| (*id).clone()).collect(),
auth_chain_ids: auth_chain_ids.map(|id| (*id).to_owned()).collect(),
pdu_ids,
}
.into())
@ -2671,9 +2668,8 @@ pub fn create_join_event_template_route(
};
// If there was no create event yet, assume we are creating a version 6 room right now
let room_version_id = create_event_content.map_or(RoomVersionId::Version6, |create_event| {
create_event.room_version
});
let room_version_id =
create_event_content.map_or(RoomVersionId::V6, |create_event| create_event.room_version);
let room_version = RoomVersion::new(&room_version_id).expect("room version is supported");
if !body.ver.contains(&room_version_id) {
@ -2726,7 +2722,7 @@ pub fn create_join_event_template_route(
}
let pdu = PduEvent {
event_id: ruma::event_id!("$thiswillbefilledinlater"),
event_id: ruma::event_id!("$thiswillbefilledinlater").to_owned(),
room_id: body.room_id.clone(),
sender: body.user_id.clone(),
origin_server_ts: utils::millis_since_unix_epoch()
@ -2838,7 +2834,7 @@ async fn create_join_event(
.roomid_mutex_federation
.write()
.unwrap()
.entry(room_id.clone())
.entry(room_id.to_owned())
.or_default(),
);
let mutex_lock = mutex.lock().await;
@ -2937,8 +2933,7 @@ pub async fn create_invite_route(
return Err(Error::bad_config("Federation is disabled."));
}
if body.room_version != RoomVersionId::Version5 && body.room_version != RoomVersionId::Version6
{
if body.room_version != RoomVersionId::V5 && body.room_version != RoomVersionId::V6 {
return Err(Error::BadRequest(
ErrorKind::IncompatibleRoomVersion {
room_version: body.room_version.clone(),
@ -2959,7 +2954,7 @@ pub async fn create_invite_route(
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Failed to sign event."))?;
// Generate event id
let event_id = EventId::try_from(&*format!(
let event_id = Box::<EventId>::try_from(&*format!(
"${}",
ruma::signatures::reference_hash(&signed_event, &body.room_version)
.expect("ruma can calculate reference hashes")
@ -2972,7 +2967,7 @@ pub async fn create_invite_route(
CanonicalJsonValue::String(event_id.into()),
);
let sender = serde_json::from_value(
let sender: Box<_> = serde_json::from_value(
signed_event
.get("sender")
.ok_or(Error::BadRequest(
@ -2984,7 +2979,7 @@ pub async fn create_invite_route(
)
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "sender is not a user id."))?;
let invited_user = serde_json::from_value(
let invited_user: Box<_> = serde_json::from_value(
signed_event
.get("state_key")
.ok_or(Error::BadRequest(
@ -3263,7 +3258,7 @@ pub(crate) async fn fetch_required_signing_keys(
// the PDUs and either cache the key or add it to the list that needs to be retrieved.
fn get_server_keys_from_cache(
pdu: &RawJsonValue,
servers: &mut BTreeMap<Box<ServerName>, BTreeMap<ServerSigningKeyId, QueryCriteria>>,
servers: &mut BTreeMap<Box<ServerName>, BTreeMap<Box<ServerSigningKeyId>, QueryCriteria>>,
room_version: &RoomVersionId,
pub_key_map: &mut RwLockWriteGuard<'_, BTreeMap<String, BTreeMap<String, String>>>,
db: &Database,
@ -3273,7 +3268,7 @@ fn get_server_keys_from_cache(
Error::BadServerResponse("Invalid PDU in server response")
})?;
let event_id = EventId::try_from(&*format!(
let event_id = Box::<EventId>::try_from(&*format!(
"${}",
ruma::signatures::reference_hash(&value, room_version)
.expect("ruma can calculate reference hashes")
@ -3353,7 +3348,7 @@ pub(crate) async fn fetch_join_signing_keys(
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, String>>>,
db: &Database,
) -> Result<()> {
let mut servers: BTreeMap<Box<ServerName>, BTreeMap<ServerSigningKeyId, QueryCriteria>> =
let mut servers: BTreeMap<Box<ServerName>, BTreeMap<Box<ServerSigningKeyId>, QueryCriteria>> =
BTreeMap::new();
{
@ -3387,10 +3382,6 @@ pub(crate) async fn fetch_join_signing_keys(
server,
get_remote_server_keys_batch::v2::Request {
server_keys: servers.clone(),
minimum_valid_until_ts: MilliSecondsSinceUnixEpoch::from_system_time(
SystemTime::now() + Duration::from_secs(60),
)
.expect("time is valid"),
},
)
.await