431 errors left

This commit is contained in:
Timo Kösters 2022-10-05 09:34:25 +02:00 committed by Nyaaori
parent bd8b616ca0
commit 8708cd3b63
No known key found for this signature in database
GPG key ID: E7819C3ED4D1F82E
32 changed files with 640 additions and 973 deletions

View file

@ -196,7 +196,7 @@ pub async fn get_content_thumbnail_route(
.upload_thumbnail(
mxc,
&None,
&get_thumbnail_response.content_type,
&get_thumbnail_response.content_type.as_deref(),
body.width.try_into().expect("all UInts are valid u32s"),
body.height.try_into().expect("all UInts are valid u32s"),
&get_thumbnail_response.file,

View file

@ -481,7 +481,7 @@ async fn join_room_by_id_helper(
let (make_join_response, remote_server) = make_join_response_and_server?;
let room_version = match make_join_response.room_version {
Some(room_version) if services().rooms.is_supported_version(&room_version) => room_version,
Some(room_version) if services().rooms.metadata.is_supported_version(&room_version) => room_version,
_ => return Err(Error::BadServerResponse("Room version is not supported")),
};
@ -591,7 +591,7 @@ async fn join_room_by_id_helper(
Error::BadServerResponse("Invalid PDU in send_join response.")
})?;
services().rooms.add_pdu_outlier(&event_id, &value)?;
services().rooms.outlier.add_pdu_outlier(&event_id, &value)?;
if let Some(state_key) = &pdu.state_key {
let shortstatekey = services().rooms.short.get_or_create_shortstatekey(
&pdu.kind.to_string().into(),
@ -621,14 +621,6 @@ async fn join_room_by_id_helper(
return Err(Error::BadServerResponse("State contained no create event."));
}
services().rooms.state.force_state(
room_id,
state
.into_iter()
.map(|(k, id)| services().rooms.compress_state_event(k, &id))
.collect::<Result<_>>()?,
)?;
for result in send_join_response
.room_state
.auth_chain
@ -640,14 +632,21 @@ async fn join_room_by_id_helper(
Err(_) => continue,
};
services().rooms.add_pdu_outlier(&event_id, &value)?;
services().rooms.outlier.add_pdu_outlier(&event_id, &value)?;
}
let shortstatehash = services().rooms.state.set_event_state(
event_id,
room_id,
state
.into_iter()
.map(|(k, id)| services().rooms.state_compressor.compress_state_event(k, &id))
.collect::<Result<_>>()?,
)?;
// We append to state before appending the pdu, so we don't have a moment in time with the
// pdu without it's state. This is okay because append_pdu can't fail.
let statehashid = services().rooms.append_to_state(&parsed_pdu)?;
services().rooms.append_pdu(
services().rooms.timeline.append_pdu(
&parsed_pdu,
join_event,
iter::once(&*parsed_pdu.event_id),
@ -655,7 +654,9 @@ async fn join_room_by_id_helper(
// We set the room state after inserting the pdu, so that we never have a moment in time
// where events in the current room state do not exist
services().rooms.set_room_state(room_id, statehashid)?;
services().rooms.state.set_room_state(room_id, shortstatehash)?;
let statehashid = services().rooms.state.append_to_state(&parsed_pdu)?;
} else {
let event = RoomMemberEventContent {
membership: MembershipState::Join,
@ -668,7 +669,7 @@ async fn join_room_by_id_helper(
join_authorized_via_users_server: None,
};
services().rooms.build_and_append_pdu(
services().rooms.timeline.build_and_append_pdu(
PduBuilder {
event_type: RoomEventType::RoomMember,
content: to_raw_value(&event).expect("event is valid, we just created it"),
@ -678,7 +679,6 @@ async fn join_room_by_id_helper(
},
sender_user,
room_id,
services(),
&state_lock,
)?;
}
@ -786,7 +786,7 @@ pub(crate) async fn invite_helper<'a>(
unsigned: None,
state_key: Some(user_id.to_string()),
redacts: None,
}, sender_user, room_id, &state_lock);
}, sender_user, room_id, &state_lock)?;
let invite_room_state = services().rooms.state.calculate_invite_state(&pdu)?;
@ -811,7 +811,7 @@ pub(crate) async fn invite_helper<'a>(
create_invite::v2::Request {
room_id,
event_id: expected_event_id,
room_version: &services().state.get_room_version(&room_id)?,
room_version: &services().rooms.state.get_room_version(&room_id)?,
event: &PduEvent::convert_to_outgoing_federation_event(pdu_json.clone()),
invite_room_state: &invite_room_state,
},
@ -846,7 +846,7 @@ pub(crate) async fn invite_helper<'a>(
)
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Origin field is invalid."))?;
let pdu_id = services().rooms.event_handler.handle_incoming_pdu(
let pdu_id: Vec<u8> = services().rooms.event_handler.handle_incoming_pdu(
&origin,
&event_id,
room_id,
@ -854,13 +854,7 @@ pub(crate) async fn invite_helper<'a>(
true,
&pub_key_map,
)
.await
.map_err(|_| {
Error::BadRequest(
ErrorKind::InvalidParam,
"Error while handling incoming PDU.",
)
})?
.await?
.ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"Could not accept incoming PDU as timeline event.",
@ -868,6 +862,7 @@ pub(crate) async fn invite_helper<'a>(
let servers = services()
.rooms
.state_cache
.room_servers(room_id)
.filter_map(|r| r.ok())
.filter(|server| &**server != services().globals.server_name());
@ -877,7 +872,7 @@ pub(crate) async fn invite_helper<'a>(
return Ok(());
}
if !services().rooms.is_joined(sender_user, &room_id)? {
if !services().rooms.state_cache.is_joined(sender_user, &room_id)? {
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"You don't have permission to view this room.",
@ -894,7 +889,7 @@ pub(crate) async fn invite_helper<'a>(
);
let state_lock = mutex_state.lock().await;
services().rooms.build_and_append_pdu(
services().rooms.timeline.build_and_append_pdu(
PduBuilder {
event_type: RoomEventType::RoomMember,
content: to_raw_value(&RoomMemberEventContent {
@ -926,8 +921,9 @@ pub(crate) async fn invite_helper<'a>(
pub async fn leave_all_rooms(user_id: &UserId) -> Result<()> {
let all_rooms = services()
.rooms
.state_cache
.rooms_joined(user_id)
.chain(services().rooms.rooms_invited(user_id).map(|t| t.map(|(r, _)| r)))
.chain(services().rooms.state_cache.rooms_invited(user_id).map(|t| t.map(|(r, _)| r)))
.collect::<Vec<_>>();
for room_id in all_rooms {
@ -955,7 +951,7 @@ pub async fn leave_room(
let last_state = services().rooms.state_cache
.invite_state(user_id, room_id)?
.map_or_else(|| services().rooms.left_state(user_id, room_id), |s| Ok(Some(s)))?;
.map_or_else(|| services().rooms.state_cache.left_state(user_id, room_id), |s| Ok(Some(s)))?;
// We always drop the invite, we can't rely on other servers
services().rooms.state_cache.update_membership(
@ -978,7 +974,7 @@ pub async fn leave_room(
let state_lock = mutex_state.lock().await;
let mut event: RoomMemberEventContent = serde_json::from_str(
services().rooms.state.room_state_get(room_id, &StateEventType::RoomMember, user_id.as_str())?
services().rooms.state_accessor.room_state_get(room_id, &StateEventType::RoomMember, user_id.as_str())?
.ok_or(Error::BadRequest(
ErrorKind::BadState,
"Cannot leave a room you are not a member of.",
@ -1017,6 +1013,7 @@ async fn remote_leave_room(
let invite_state = services()
.rooms
.state_cache
.invite_state(user_id, room_id)?
.ok_or(Error::BadRequest(
ErrorKind::BadState,

View file

@ -68,7 +68,7 @@ pub async fn send_message_event_route(
let mut unsigned = BTreeMap::new();
unsigned.insert("transaction_id".to_owned(), body.txn_id.to_string().into());
let event_id = services().rooms.build_and_append_pdu(
let event_id = services().rooms.timeline.build_and_append_pdu(
PduBuilder {
event_type: body.event_type.to_string().into(),
content: serde_json::from_str(body.body.body.json().get())
@ -108,7 +108,7 @@ pub async fn get_message_events_route(
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let sender_device = body.sender_device.as_ref().expect("user is authenticated");
if !services().rooms.is_joined(sender_user, &body.room_id)? {
if !services().rooms.state_cache.is_joined(sender_user, &body.room_id)? {
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"You don't have permission to view this room.",
@ -129,7 +129,7 @@ pub async fn get_message_events_route(
let to = body.to.as_ref().map(|t| t.parse());
services().rooms
.lazy_load_confirm_delivery(sender_user, sender_device, &body.room_id, from)?;
.lazy_loading.lazy_load_confirm_delivery(sender_user, sender_device, &body.room_id, from)?;
// Use limit or else 10
let limit = body.limit.try_into().map_or(10_usize, |l: u32| l as usize);
@ -144,12 +144,13 @@ pub async fn get_message_events_route(
get_message_events::v3::Direction::Forward => {
let events_after: Vec<_> = services()
.rooms
.timeline
.pdus_after(sender_user, &body.room_id, from)?
.take(limit)
.filter_map(|r| r.ok()) // Filter out buggy events
.filter_map(|(pdu_id, pdu)| {
services().rooms
.pdu_count(&pdu_id)
.timeline.pdu_count(&pdu_id)
.map(|pdu_count| (pdu_count, pdu))
.ok()
})
@ -157,7 +158,7 @@ pub async fn get_message_events_route(
.collect();
for (_, event) in &events_after {
if !services().rooms.lazy_load_was_sent_before(
if !services().rooms.lazy_loading.lazy_load_was_sent_before(
sender_user,
sender_device,
&body.room_id,
@ -181,11 +182,13 @@ pub async fn get_message_events_route(
get_message_events::v3::Direction::Backward => {
let events_before: Vec<_> = services()
.rooms
.timeline
.pdus_until(sender_user, &body.room_id, from)?
.take(limit)
.filter_map(|r| r.ok()) // Filter out buggy events
.filter_map(|(pdu_id, pdu)| {
services().rooms
.timeline
.pdu_count(&pdu_id)
.map(|pdu_count| (pdu_count, pdu))
.ok()
@ -194,7 +197,7 @@ pub async fn get_message_events_route(
.collect();
for (_, event) in &events_before {
if !services().rooms.lazy_load_was_sent_before(
if !services().rooms.lazy_loading.lazy_load_was_sent_before(
sender_user,
sender_device,
&body.room_id,
@ -220,7 +223,7 @@ pub async fn get_message_events_route(
resp.state = Vec::new();
for ll_id in &lazy_loaded {
if let Some(member_event) =
services().rooms
services().rooms.state_accessor
.room_state_get(&body.room_id, &StateEventType::RoomMember, ll_id.as_str())?
{
resp.state.push(member_event.to_state_event());
@ -228,7 +231,7 @@ pub async fn get_message_events_route(
}
if let Some(next_token) = next_token {
services().rooms.lazy_load_mark_sent(
services().rooms.lazy_loading.lazy_load_mark_sent(
sender_user,
sender_device,
&body.room_id,

View file

@ -10,10 +10,10 @@ pub async fn set_presence_route(
) -> Result<set_presence::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
for room_id in services().rooms.rooms_joined(sender_user) {
for room_id in services().rooms.state_cache.rooms_joined(sender_user) {
let room_id = room_id?;
services().rooms.edus.update_presence(
services().rooms.edus.presence.update_presence(
sender_user,
&room_id,
ruma::events::presence::PresenceEvent {
@ -51,13 +51,14 @@ pub async fn get_presence_route(
for room_id in services()
.rooms
.get_shared_rooms(vec![sender_user.clone(), body.user_id.clone()])?
.user.get_shared_rooms(vec![sender_user.clone(), body.user_id.clone()])?
{
let room_id = room_id?;
if let Some(presence) = services()
.rooms
.edus
.presence
.get_last_presence_event(sender_user, &room_id)?
{
presence_event = Some(presence);

View file

@ -30,6 +30,7 @@ pub async fn set_displayname_route(
// Send a new membership event and presence update into all joined rooms
let all_rooms_joined: Vec<_> = services()
.rooms
.state_cache
.rooms_joined(sender_user)
.filter_map(|r| r.ok())
.map(|room_id| {
@ -40,6 +41,7 @@ pub async fn set_displayname_route(
displayname: body.displayname.clone(),
..serde_json::from_str(
services().rooms
.state_accessor
.room_state_get(
&room_id,
&StateEventType::RoomMember,
@ -80,10 +82,11 @@ pub async fn set_displayname_route(
let _ = services()
.rooms
.timeline
.build_and_append_pdu(pdu_builder, sender_user, &room_id, &state_lock);
// Presence update
services().rooms.edus.update_presence(
services().rooms.edus.presence.update_presence(
sender_user,
&room_id,
ruma::events::presence::PresenceEvent {
@ -155,6 +158,7 @@ pub async fn set_avatar_url_route(
// Send a new membership event and presence update into all joined rooms
let all_joined_rooms: Vec<_> = services()
.rooms
.state_cache
.rooms_joined(sender_user)
.filter_map(|r| r.ok())
.map(|room_id| {
@ -165,6 +169,7 @@ pub async fn set_avatar_url_route(
avatar_url: body.avatar_url.clone(),
..serde_json::from_str(
services().rooms
.state_accessor
.room_state_get(
&room_id,
&StateEventType::RoomMember,
@ -205,10 +210,11 @@ pub async fn set_avatar_url_route(
let _ = services()
.rooms
.timeline
.build_and_append_pdu(pdu_builder, sender_user, &room_id, &state_lock);
// Presence update
services().rooms.edus.update_presence(
services().rooms.edus.presence.update_presence(
sender_user,
&room_id,
ruma::events::presence::PresenceEvent {
@ -226,7 +232,6 @@ pub async fn set_avatar_url_route(
},
sender: sender_user.clone(),
},
&services().globals,
)?;
}

View file

@ -31,15 +31,15 @@ pub async fn set_read_marker_route(
)?;
if let Some(event) = &body.read_receipt {
services().rooms.edus.private_read_set(
services().rooms.edus.read_receipt.private_read_set(
&body.room_id,
sender_user,
services().rooms.get_pdu_count(event)?.ok_or(Error::BadRequest(
services().rooms.timeline.get_pdu_count(event)?.ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"Event does not exist.",
))?,
)?;
services().rooms
services().rooms.user
.reset_notification_counts(sender_user, &body.room_id)?;
let mut user_receipts = BTreeMap::new();
@ -56,7 +56,7 @@ pub async fn set_read_marker_route(
let mut receipt_content = BTreeMap::new();
receipt_content.insert(event.to_owned(), receipts);
services().rooms.edus.readreceipt_update(
services().rooms.edus.read_receipt.readreceipt_update(
sender_user,
&body.room_id,
ruma::events::receipt::ReceiptEvent {
@ -77,17 +77,18 @@ pub async fn create_receipt_route(
) -> Result<create_receipt::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
services().rooms.edus.private_read_set(
services().rooms.edus.read_receipt.private_read_set(
&body.room_id,
sender_user,
services().rooms
.timeline
.get_pdu_count(&body.event_id)?
.ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"Event does not exist.",
))?,
)?;
services().rooms
services().rooms.user
.reset_notification_counts(sender_user, &body.room_id)?;
let mut user_receipts = BTreeMap::new();
@ -103,7 +104,7 @@ pub async fn create_receipt_route(
let mut receipt_content = BTreeMap::new();
receipt_content.insert(body.event_id.to_owned(), receipts);
services().rooms.edus.readreceipt_update(
services().rooms.edus.read_receipt.readreceipt_update(
sender_user,
&body.room_id,
ruma::events::receipt::ReceiptEvent {
@ -112,7 +113,5 @@ pub async fn create_receipt_route(
},
)?;
services().flush()?;
Ok(create_receipt::v3::Response {})
}

View file

@ -29,7 +29,7 @@ pub async fn redact_event_route(
);
let state_lock = mutex_state.lock().await;
let event_id = services().rooms.build_and_append_pdu(
let event_id = services().rooms.timeline.build_and_append_pdu(
PduBuilder {
event_type: RoomEventType::RoomRedaction,
content: to_raw_value(&RoomRedactionEventContent {

View file

@ -14,7 +14,7 @@ pub async fn report_event_route(
) -> Result<report_content::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let pdu = match services().rooms.get_pdu(&body.event_id)? {
let pdu = match services().rooms.timeline.get_pdu(&body.event_id)? {
Some(pdu) => pdu,
_ => {
return Err(Error::BadRequest(

View file

@ -54,7 +54,7 @@ pub async fn create_room_route(
let room_id = RoomId::new(services().globals.server_name());
services().rooms.get_or_create_shortroomid(&room_id)?;
services().rooms.short.get_or_create_shortroomid(&room_id)?;
let mutex_state = Arc::clone(
services().globals
@ -162,7 +162,7 @@ pub async fn create_room_route(
}
// 1. The room create event
services().rooms.build_and_append_pdu(
services().rooms.timeline.build_and_append_pdu(
PduBuilder {
event_type: RoomEventType::RoomCreate,
content: to_raw_value(&content).expect("event is valid, we just created it"),
@ -176,7 +176,7 @@ pub async fn create_room_route(
)?;
// 2. Let the room creator join
services().rooms.build_and_append_pdu(
services().rooms.timeline.build_and_append_pdu(
PduBuilder {
event_type: RoomEventType::RoomMember,
content: to_raw_value(&RoomMemberEventContent {
@ -237,7 +237,7 @@ pub async fn create_room_route(
}
}
services().rooms.build_and_append_pdu(
services().rooms.timeline.build_and_append_pdu(
PduBuilder {
event_type: RoomEventType::RoomPowerLevels,
content: to_raw_value(&power_levels_content)
@ -253,7 +253,7 @@ pub async fn create_room_route(
// 4. Canonical room alias
if let Some(room_alias_id) = &alias {
services().rooms.build_and_append_pdu(
services().rooms.timeline.build_and_append_pdu(
PduBuilder {
event_type: RoomEventType::RoomCanonicalAlias,
content: to_raw_value(&RoomCanonicalAliasEventContent {
@ -274,7 +274,7 @@ pub async fn create_room_route(
// 5. Events set by preset
// 5.1 Join Rules
services().rooms.build_and_append_pdu(
services().rooms.timeline.build_and_append_pdu(
PduBuilder {
event_type: RoomEventType::RoomJoinRules,
content: to_raw_value(&RoomJoinRulesEventContent::new(match preset {
@ -293,7 +293,7 @@ pub async fn create_room_route(
)?;
// 5.2 History Visibility
services().rooms.build_and_append_pdu(
services().rooms.timeline.build_and_append_pdu(
PduBuilder {
event_type: RoomEventType::RoomHistoryVisibility,
content: to_raw_value(&RoomHistoryVisibilityEventContent::new(
@ -310,7 +310,7 @@ pub async fn create_room_route(
)?;
// 5.3 Guest Access
services().rooms.build_and_append_pdu(
services().rooms.timeline.build_and_append_pdu(
PduBuilder {
event_type: RoomEventType::RoomGuestAccess,
content: to_raw_value(&RoomGuestAccessEventContent::new(match preset {
@ -344,12 +344,12 @@ pub async fn create_room_route(
}
services().rooms
.build_and_append_pdu(pdu_builder, sender_user, &room_id, &state_lock)?;
.timeline.build_and_append_pdu(pdu_builder, sender_user, &room_id, &state_lock)?;
}
// 7. Events implied by name and topic
if let Some(name) = &body.name {
services().rooms.build_and_append_pdu(
services().rooms.timeline.build_and_append_pdu(
PduBuilder {
event_type: RoomEventType::RoomName,
content: to_raw_value(&RoomNameEventContent::new(Some(name.clone())))
@ -365,7 +365,7 @@ pub async fn create_room_route(
}
if let Some(topic) = &body.topic {
services().rooms.build_and_append_pdu(
services().rooms.timeline.build_and_append_pdu(
PduBuilder {
event_type: RoomEventType::RoomTopic,
content: to_raw_value(&RoomTopicEventContent {
@ -390,11 +390,11 @@ pub async fn create_room_route(
// Homeserver specific stuff
if let Some(alias) = alias {
services().rooms.set_alias(&alias, Some(&room_id))?;
services().rooms.alias.set_alias(&alias, &room_id)?;
}
if body.visibility == room::Visibility::Public {
services().rooms.set_public(&room_id, true)?;
services().rooms.directory.set_public(&room_id)?;
}
info!("{} created a room", sender_user);
@ -412,7 +412,7 @@ pub async fn get_room_event_route(
) -> Result<get_room_event::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
if !services().rooms.is_joined(sender_user, &body.room_id)? {
if !services().rooms.state_cache.is_joined(sender_user, &body.room_id)? {
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"You don't have permission to view this room.",
@ -422,6 +422,7 @@ pub async fn get_room_event_route(
Ok(get_room_event::v3::Response {
event: services()
.rooms
.timeline
.get_pdu(&body.event_id)?
.ok_or(Error::BadRequest(ErrorKind::NotFound, "Event not found."))?
.to_room_event(),
@ -438,7 +439,7 @@ pub async fn get_room_aliases_route(
) -> Result<aliases::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
if !services().rooms.is_joined(sender_user, &body.room_id)? {
if !services().rooms.state_cache.is_joined(sender_user, &body.room_id)? {
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"You don't have permission to view this room.",
@ -448,7 +449,7 @@ pub async fn get_room_aliases_route(
Ok(aliases::v3::Response {
aliases: services()
.rooms
.room_aliases(&body.room_id)
.alias.local_aliases_for_room(&body.room_id)
.filter_map(|a| a.ok())
.collect(),
})
@ -479,7 +480,7 @@ pub async fn upgrade_room_route(
// Create a replacement room
let replacement_room = RoomId::new(services().globals.server_name());
services().rooms
.get_or_create_shortroomid(&replacement_room)?;
.short.get_or_create_shortroomid(&replacement_room)?;
let mutex_state = Arc::clone(
services().globals
@ -493,7 +494,7 @@ pub async fn upgrade_room_route(
// Send a m.room.tombstone event to the old room to indicate that it is not intended to be used any further
// Fail if the sender does not have the required permissions
let tombstone_event_id = services().rooms.build_and_append_pdu(
let tombstone_event_id = services().rooms.timeline.build_and_append_pdu(
PduBuilder {
event_type: RoomEventType::RoomTombstone,
content: to_raw_value(&RoomTombstoneEventContent {
@ -525,6 +526,7 @@ pub async fn upgrade_room_route(
// Get the old room creation event
let mut create_event_content = serde_json::from_str::<CanonicalJsonObject>(
services().rooms
.state_accessor
.room_state_get(&body.room_id, &StateEventType::RoomCreate, "")?
.ok_or_else(|| Error::bad_database("Found room without m.room.create event."))?
.content
@ -572,7 +574,7 @@ pub async fn upgrade_room_route(
));
}
services().rooms.build_and_append_pdu(
services().rooms.timeline.build_and_append_pdu(
PduBuilder {
event_type: RoomEventType::RoomCreate,
content: to_raw_value(&create_event_content)
@ -587,7 +589,7 @@ pub async fn upgrade_room_route(
)?;
// Join the new room
services().rooms.build_and_append_pdu(
services().rooms.timeline.build_and_append_pdu(
PduBuilder {
event_type: RoomEventType::RoomMember,
content: to_raw_value(&RoomMemberEventContent {
@ -625,12 +627,12 @@ pub async fn upgrade_room_route(
// Replicate transferable state events to the new room
for event_type in transferable_state_events {
let event_content = match services().rooms.room_state_get(&body.room_id, &event_type, "")? {
let event_content = match services().rooms.state_accessor.room_state_get(&body.room_id, &event_type, "")? {
Some(v) => v.content.clone(),
None => continue, // Skipping missing events.
};
services().rooms.build_and_append_pdu(
services().rooms.timeline.build_and_append_pdu(
PduBuilder {
event_type: event_type.to_string().into(),
content: event_content,
@ -645,14 +647,15 @@ pub async fn upgrade_room_route(
}
// Moves any local aliases to the new room
for alias in services().rooms.room_aliases(&body.room_id).filter_map(|r| r.ok()) {
for alias in services().rooms.alias.local_aliases_for_room(&body.room_id).filter_map(|r| r.ok()) {
services().rooms
.set_alias(&alias, Some(&replacement_room))?;
.alias.set_alias(&alias, &replacement_room)?;
}
// Get the old room power levels
let mut power_levels_event_content: RoomPowerLevelsEventContent = serde_json::from_str(
services().rooms
.state_accessor
.room_state_get(&body.room_id, &StateEventType::RoomPowerLevels, "")?
.ok_or_else(|| Error::bad_database("Found room without m.room.create event."))?
.content
@ -666,7 +669,7 @@ pub async fn upgrade_room_route(
power_levels_event_content.invite = new_level;
// Modify the power levels in the old room to prevent sending of events and inviting new users
let _ = services().rooms.build_and_append_pdu(
let _ = services().rooms.timeline.build_and_append_pdu(
PduBuilder {
event_type: RoomEventType::RoomPowerLevels,
content: to_raw_value(&power_levels_event_content)

View file

@ -24,6 +24,7 @@ pub async fn search_events_route(
let room_ids = filter.rooms.clone().unwrap_or_else(|| {
services().rooms
.state_cache
.rooms_joined(sender_user)
.filter_map(|r| r.ok())
.collect()
@ -34,7 +35,7 @@ pub async fn search_events_route(
let mut searches = Vec::new();
for room_id in room_ids {
if !services().rooms.is_joined(sender_user, &room_id)? {
if !services().rooms.state_cache.is_joined(sender_user, &room_id)? {
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"You don't have permission to view this room.",
@ -43,6 +44,7 @@ pub async fn search_events_route(
if let Some(search) = services()
.rooms
.search
.search_pdus(&room_id, &search_criteria.search_term)?
{
searches.push(search.0.peekable());
@ -86,6 +88,7 @@ pub async fn search_events_route(
rank: None,
result: services()
.rooms
.timeline
.get_pdu_from_id(result)?
.map(|pdu| pdu.to_room_event()),
})

View file

@ -90,9 +90,10 @@ pub async fn get_state_events_route(
#[allow(clippy::blocks_in_if_conditions)]
// Users not in the room should not be able to access the state unless history_visibility is
// WorldReadable
if !services().rooms.is_joined(sender_user, &body.room_id)?
if !services().rooms.state_cache.is_joined(sender_user, &body.room_id)?
&& !matches!(
services().rooms
.state_accessor
.room_state_get(&body.room_id, &StateEventType::RoomHistoryVisibility, "")?
.map(|event| {
serde_json::from_str(event.content.get())
@ -115,6 +116,7 @@ pub async fn get_state_events_route(
Ok(get_state_events::v3::Response {
room_state: services()
.rooms
.state_accessor
.room_state_full(&body.room_id)
.await?
.values()
@ -136,10 +138,10 @@ pub async fn get_state_events_for_key_route(
#[allow(clippy::blocks_in_if_conditions)]
// Users not in the room should not be able to access the state unless history_visibility is
// WorldReadable
if !services().rooms.is_joined(sender_user, &body.room_id)?
if !services().rooms.state_cache.is_joined(sender_user, &body.room_id)?
&& !matches!(
services().rooms
.room_state_get(&body.room_id, &StateEventType::RoomHistoryVisibility, "")?
.state_accessor.room_state_get(&body.room_id, &StateEventType::RoomHistoryVisibility, "")?
.map(|event| {
serde_json::from_str(event.content.get())
.map(|e: RoomHistoryVisibilityEventContent| e.history_visibility)
@ -160,7 +162,7 @@ pub async fn get_state_events_for_key_route(
let event = services()
.rooms
.room_state_get(&body.room_id, &body.event_type, &body.state_key)?
.state_accessor.room_state_get(&body.room_id, &body.event_type, &body.state_key)?
.ok_or(Error::BadRequest(
ErrorKind::NotFound,
"State event not found.",
@ -185,10 +187,10 @@ pub async fn get_state_events_for_empty_key_route(
#[allow(clippy::blocks_in_if_conditions)]
// Users not in the room should not be able to access the state unless history_visibility is
// WorldReadable
if !services().rooms.is_joined(sender_user, &body.room_id)?
if !services().rooms.state_cache.is_joined(sender_user, &body.room_id)?
&& !matches!(
services().rooms
.room_state_get(&body.room_id, &StateEventType::RoomHistoryVisibility, "")?
.state_accessor.room_state_get(&body.room_id, &StateEventType::RoomHistoryVisibility, "")?
.map(|event| {
serde_json::from_str(event.content.get())
.map(|e: RoomHistoryVisibilityEventContent| e.history_visibility)
@ -209,7 +211,7 @@ pub async fn get_state_events_for_empty_key_route(
let event = services()
.rooms
.room_state_get(&body.room_id, &body.event_type, "")?
.state_accessor.room_state_get(&body.room_id, &body.event_type, "")?
.ok_or(Error::BadRequest(
ErrorKind::NotFound,
"State event not found.",
@ -269,7 +271,7 @@ async fn send_state_event_for_key_helper(
);
let state_lock = mutex_state.lock().await;
let event_id = services().rooms.build_and_append_pdu(
let event_id = services().rooms.timeline.build_and_append_pdu(
PduBuilder {
event_type: event_type.to_string().into(),
content: serde_json::from_str(json.json().get()).expect("content is valid json"),

View file

@ -172,7 +172,7 @@ async fn sync_helper(
};
// TODO: match body.set_presence {
services().rooms.edus.ping_presence(&sender_user)?;
services().rooms.edus.presence.ping_presence(&sender_user)?;
// Setup watchers, so if there's no response, we can wait for them
let watcher = services().watch(&sender_user, &sender_device);
@ -216,7 +216,7 @@ async fn sync_helper(
.filter_map(|r| r.ok()),
);
let all_joined_rooms = services().rooms.rooms_joined(&sender_user).collect::<Vec<_>>();
let all_joined_rooms = services().rooms.state_cache.rooms_joined(&sender_user).collect::<Vec<_>>();
for room_id in all_joined_rooms {
let room_id = room_id?;
@ -237,9 +237,10 @@ async fn sync_helper(
let timeline_pdus;
let limited;
if services().rooms.last_timeline_count(&sender_user, &room_id)? > since {
if services().rooms.timeline.last_timeline_count(&sender_user, &room_id)? > since {
let mut non_timeline_pdus = services()
.rooms
.timeline
.pdus_until(&sender_user, &room_id, u64::MAX)?
.filter_map(|r| {
// Filter out buggy events
@ -250,6 +251,7 @@ async fn sync_helper(
})
.take_while(|(pduid, _)| {
services().rooms
.timeline
.pdu_count(pduid)
.map_or(false, |count| count > since)
});
@ -275,6 +277,7 @@ async fn sync_helper(
|| services()
.rooms
.edus
.read_receipt
.last_privateread_update(&sender_user, &room_id)?
> since;
@ -283,24 +286,24 @@ async fn sync_helper(
timeline_users.insert(event.sender.as_str().to_owned());
}
services().rooms
services().rooms.lazy_loading
.lazy_load_confirm_delivery(&sender_user, &sender_device, &room_id, since)?;
// Database queries:
let current_shortstatehash = if let Some(s) = services().rooms.current_shortstatehash(&room_id)? {
let current_shortstatehash = if let Some(s) = services().rooms.state.get_room_shortstatehash(&room_id)? {
s
} else {
error!("Room {} has no state", room_id);
continue;
};
let since_shortstatehash = services().rooms.get_token_shortstatehash(&room_id, since)?;
let since_shortstatehash = services().rooms.user.get_token_shortstatehash(&room_id, since)?;
// Calculates joined_member_count, invited_member_count and heroes
let calculate_counts = || {
let joined_member_count = services().rooms.room_joined_count(&room_id)?.unwrap_or(0);
let invited_member_count = services().rooms.room_invited_count(&room_id)?.unwrap_or(0);
let joined_member_count = services().rooms.state_cache.room_joined_count(&room_id)?.unwrap_or(0);
let invited_member_count = services().rooms.state_cache.room_invited_count(&room_id)?.unwrap_or(0);
// Recalculate heroes (first 5 members)
let mut heroes = Vec::new();
@ -311,7 +314,7 @@ async fn sync_helper(
for hero in services()
.rooms
.all_pdus(&sender_user, &room_id)?
.timeline.all_pdus(&sender_user, &room_id)?
.filter_map(|pdu| pdu.ok()) // Ignore all broken pdus
.filter(|(_, pdu)| pdu.kind == RoomEventType::RoomMember)
.map(|(_, pdu)| {
@ -329,8 +332,8 @@ async fn sync_helper(
if matches!(
content.membership,
MembershipState::Join | MembershipState::Invite
) && (services().rooms.is_joined(&user_id, &room_id)?
|| services().rooms.is_invited(&user_id, &room_id)?)
) && (services().rooms.state_cache.is_joined(&user_id, &room_id)?
|| services().rooms.state_cache.is_invited(&user_id, &room_id)?)
{
Ok::<_, Error>(Some(state_key.clone()))
} else {
@ -371,17 +374,17 @@ async fn sync_helper(
let (joined_member_count, invited_member_count, heroes) = calculate_counts()?;
let current_state_ids = services().rooms.state_full_ids(current_shortstatehash).await?;
let current_state_ids = services().rooms.state_accessor.state_full_ids(current_shortstatehash).await?;
let mut state_events = Vec::new();
let mut lazy_loaded = HashSet::new();
let mut i = 0;
for (shortstatekey, id) in current_state_ids {
let (event_type, state_key) = services().rooms.get_statekey_from_short(shortstatekey)?;
let (event_type, state_key) = services().rooms.short.get_statekey_from_short(shortstatekey)?;
if event_type != StateEventType::RoomMember {
let pdu = match services().rooms.get_pdu(&id)? {
let pdu = match services().rooms.timeline.get_pdu(&id)? {
Some(pdu) => pdu,
None => {
error!("Pdu in state not found: {}", id);
@ -398,7 +401,7 @@ async fn sync_helper(
|| body.full_state
|| timeline_users.contains(&state_key)
{
let pdu = match services().rooms.get_pdu(&id)? {
let pdu = match services().rooms.timeline.get_pdu(&id)? {
Some(pdu) => pdu,
None => {
error!("Pdu in state not found: {}", id);
@ -420,12 +423,12 @@ async fn sync_helper(
}
// Reset lazy loading because this is an initial sync
services().rooms
services().rooms.lazy_loading
.lazy_load_reset(&sender_user, &sender_device, &room_id)?;
// The state_events above should contain all timeline_users, let's mark them as lazy
// loaded.
services().rooms.lazy_load_mark_sent(
services().rooms.lazy_loading.lazy_load_mark_sent(
&sender_user,
&sender_device,
&room_id,
@ -449,6 +452,7 @@ async fn sync_helper(
let since_sender_member: Option<RoomMemberEventContent> = services()
.rooms
.state_accessor
.state_get(
since_shortstatehash,
&StateEventType::RoomMember,
@ -467,12 +471,12 @@ async fn sync_helper(
let mut lazy_loaded = HashSet::new();
if since_shortstatehash != current_shortstatehash {
let current_state_ids = services().rooms.state_full_ids(current_shortstatehash).await?;
let since_state_ids = services().rooms.state_full_ids(since_shortstatehash).await?;
let current_state_ids = services().rooms.state_accessor.state_full_ids(current_shortstatehash).await?;
let since_state_ids = services().rooms.state_accessor.state_full_ids(since_shortstatehash).await?;
for (key, id) in current_state_ids {
if body.full_state || since_state_ids.get(&key) != Some(&id) {
let pdu = match services().rooms.get_pdu(&id)? {
let pdu = match services().rooms.timeline.get_pdu(&id)? {
Some(pdu) => pdu,
None => {
error!("Pdu in state not found: {}", id);
@ -505,14 +509,14 @@ async fn sync_helper(
continue;
}
if !services().rooms.lazy_load_was_sent_before(
if !services().rooms.lazy_loading.lazy_load_was_sent_before(
&sender_user,
&sender_device,
&room_id,
&event.sender,
)? || lazy_load_send_redundant
{
if let Some(member_event) = services().rooms.room_state_get(
if let Some(member_event) = services().rooms.state_accessor.room_state_get(
&room_id,
&StateEventType::RoomMember,
event.sender.as_str(),
@ -523,7 +527,7 @@ async fn sync_helper(
}
}
services().rooms.lazy_load_mark_sent(
services().rooms.lazy_loading.lazy_load_mark_sent(
&sender_user,
&sender_device,
&room_id,
@ -533,11 +537,12 @@ async fn sync_helper(
let encrypted_room = services()
.rooms
.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")?
.state_accessor.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")?
.is_some();
let since_encryption =
services().rooms
.state_accessor
.state_get(since_shortstatehash, &StateEventType::RoomEncryption, "")?;
// Calculations:
@ -588,6 +593,7 @@ async fn sync_helper(
// If the user is in a new encrypted room, give them all joined users
device_list_updates.extend(
services().rooms
.state_cache
.room_members(&room_id)
.flatten()
.filter(|user_id| {
@ -627,6 +633,7 @@ async fn sync_helper(
let notification_count = if send_notification_counts {
Some(
services().rooms
.user
.notification_count(&sender_user, &room_id)?
.try_into()
.expect("notification count can't go that high"),
@ -638,6 +645,7 @@ async fn sync_helper(
let highlight_count = if send_notification_counts {
Some(
services().rooms
.user
.highlight_count(&sender_user, &room_id)?
.try_into()
.expect("highlight count can't go that high"),
@ -649,7 +657,7 @@ async fn sync_helper(
let prev_batch = timeline_pdus
.first()
.map_or(Ok::<_, Error>(None), |(pdu_id, _)| {
Ok(Some(services().rooms.pdu_count(pdu_id)?.to_string()))
Ok(Some(services().rooms.timeline.pdu_count(pdu_id)?.to_string()))
})?;
let room_events: Vec<_> = timeline_pdus
@ -660,15 +668,16 @@ async fn sync_helper(
let mut edus: Vec<_> = services()
.rooms
.edus
.read_receipt
.readreceipts_since(&room_id, since)
.filter_map(|r| r.ok()) // Filter out buggy events
.map(|(_, _, v)| v)
.collect();
if services().rooms.edus.last_typing_update(&room_id, &services().globals)? > since {
if services().rooms.edus.typing.last_typing_update(&room_id)? > since {
edus.push(
serde_json::from_str(
&serde_json::to_string(&services().rooms.edus.typings_all(&room_id)?)
&serde_json::to_string(&services().rooms.edus.typing.typings_all(&room_id)?)
.expect("event is valid, we just created it"),
)
.expect("event is valid, we just created it"),
@ -676,7 +685,7 @@ async fn sync_helper(
}
// Save the state after this sync so we can send the correct state diff next sync
services().rooms
services().rooms.user
.associate_token_shortstatehash(&room_id, next_batch, current_shortstatehash)?;
let joined_room = JoinedRoom {
@ -723,6 +732,7 @@ async fn sync_helper(
for (user_id, presence) in
services().rooms
.edus
.presence
.presence_since(&room_id, since)?
{
match presence_updates.entry(user_id) {
@ -755,7 +765,7 @@ async fn sync_helper(
}
let mut left_rooms = BTreeMap::new();
let all_left_rooms: Vec<_> = services().rooms.rooms_left(&sender_user).collect();
let all_left_rooms: Vec<_> = services().rooms.state_cache.rooms_left(&sender_user).collect();
for result in all_left_rooms {
let (room_id, left_state_events) = result?;
@ -773,7 +783,7 @@ async fn sync_helper(
drop(insert_lock);
}
let left_count = services().rooms.get_left_count(&room_id, &sender_user)?;
let left_count = services().rooms.state_cache.get_left_count(&room_id, &sender_user)?;
// Left before last sync
if Some(since) >= left_count {
@ -797,7 +807,7 @@ async fn sync_helper(
}
let mut invited_rooms = BTreeMap::new();
let all_invited_rooms: Vec<_> = services().rooms.rooms_invited(&sender_user).collect();
let all_invited_rooms: Vec<_> = services().rooms.state_cache.rooms_invited(&sender_user).collect();
for result in all_invited_rooms {
let (room_id, invite_state_events) = result?;
@ -815,7 +825,7 @@ async fn sync_helper(
drop(insert_lock);
}
let invite_count = services().rooms.get_invite_count(&room_id, &sender_user)?;
let invite_count = services().rooms.state_cache.get_invite_count(&room_id, &sender_user)?;
// Invited before last sync
if Some(since) >= invite_count {
@ -835,12 +845,13 @@ async fn sync_helper(
for user_id in left_encrypted_users {
let still_share_encrypted_room = services()
.rooms
.user
.get_shared_rooms(vec![sender_user.clone(), user_id.clone()])?
.filter_map(|r| r.ok())
.filter_map(|other_room_id| {
Some(
services().rooms
.room_state_get(&other_room_id, &StateEventType::RoomEncryption, "")
.state_accessor.room_state_get(&other_room_id, &StateEventType::RoomEncryption, "")
.ok()?
.is_some(),
)
@ -925,12 +936,14 @@ fn share_encrypted_room(
) -> Result<bool> {
Ok(services()
.rooms
.user
.get_shared_rooms(vec![sender_user.to_owned(), user_id.to_owned()])?
.filter_map(|r| r.ok())
.filter(|room_id| room_id != ignore_room)
.filter_map(|other_room_id| {
Some(
services().rooms
.state_accessor
.room_state_get(&other_room_id, &StateEventType::RoomEncryption, "")
.ok()?
.is_some(),

View file

@ -11,7 +11,7 @@ pub async fn create_typing_event_route(
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
if !services().rooms.is_joined(sender_user, &body.room_id)? {
if !services().rooms.state_cache.is_joined(sender_user, &body.room_id)? {
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"You are not in this room.",
@ -19,14 +19,14 @@ pub async fn create_typing_event_route(
}
if let Typing::Yes(duration) = body.state {
services().rooms.edus.typing_add(
services().rooms.edus.typing.typing_add(
sender_user,
&body.room_id,
duration.as_millis() as u64 + utils::millis_since_unix_epoch(),
)?;
} else {
services().rooms
.edus
.edus.typing
.typing_remove(sender_user, &body.room_id)?;
}

View file

@ -50,11 +50,11 @@ pub async fn search_users_route(
let user_is_in_public_rooms =
services().rooms
.rooms_joined(&user_id)
.state_cache.rooms_joined(&user_id)
.filter_map(|r| r.ok())
.any(|room| {
services().rooms
.room_state_get(&room, &StateEventType::RoomJoinRules, "")
.state_accessor.room_state_get(&room, &StateEventType::RoomJoinRules, "")
.map_or(false, |event| {
event.map_or(false, |event| {
serde_json::from_str(event.content.get())
@ -71,7 +71,7 @@ pub async fn search_users_route(
let user_is_in_shared_rooms = services()
.rooms
.get_shared_rooms(vec![sender_user.clone(), user_id.clone()])
.user.get_shared_rooms(vec![sender_user.clone(), user_id.clone()])
.ok()?
.next()
.is_some();

View file

@ -14,7 +14,7 @@ pub async fn turn_server_route(
) -> Result<get_turn_server_info::v3::Response> {
let sender_user = body.sender_user.as_ref().expect("user is authenticated");
let turn_secret = services().globals.turn_secret();
let turn_secret = services().globals.turn_secret().clone();
let (username, password) = if !turn_secret.is_empty() {
let expiry = SecondsSinceUnixEpoch::from_system_time(

View file

@ -669,7 +669,7 @@ pub async fn send_transaction_message_route(
}
};
acl_check(&sender_servername, &room_id)?;
services().rooms.event_handler.acl_check(&sender_servername, &room_id)?;
let mutex = Arc::clone(
services().globals
@ -727,7 +727,7 @@ pub async fn send_transaction_message_route(
.event_ids
.iter()
.filter_map(|id| {
services().rooms.get_pdu_count(id).ok().flatten().map(|r| (id, r))
services().rooms.timeline.get_pdu_count(id).ok().flatten().map(|r| (id, r))
})
.max_by_key(|(_, count)| *count)
{
@ -744,7 +744,7 @@ pub async fn send_transaction_message_route(
content: ReceiptEventContent(receipt_content),
room_id: room_id.clone(),
};
services().rooms.edus.readreceipt_update(
services().rooms.edus.read_receipt.readreceipt_update(
&user_id,
&room_id,
event,
@ -757,15 +757,15 @@ pub async fn send_transaction_message_route(
}
}
Edu::Typing(typing) => {
if services().rooms.is_joined(&typing.user_id, &typing.room_id)? {
if services().rooms.state_cache.is_joined(&typing.user_id, &typing.room_id)? {
if typing.typing {
services().rooms.edus.typing_add(
services().rooms.edus.typing.typing_add(
&typing.user_id,
&typing.room_id,
3000 + utils::millis_since_unix_epoch(),
)?;
} else {
services().rooms.edus.typing_remove(
services().rooms.edus.typing.typing_remove(
&typing.user_id,
&typing.room_id,
)?;
@ -1031,7 +1031,7 @@ pub(crate) async fn get_auth_chain<'a>(
let mut i = 0;
for id in starting_events {
let short = services().rooms.get_or_create_shorteventid(&id)?;
let short = services().rooms.short.get_or_create_shorteventid(&id)?;
let bucket_id = (short % NUM_BUCKETS as u64) as usize;
buckets[bucket_id].insert((short, id.clone()));
i += 1;
@ -1050,7 +1050,7 @@ pub(crate) async fn get_auth_chain<'a>(
}
let chunk_key: Vec<u64> = chunk.iter().map(|(short, _)| short).copied().collect();
if let Some(cached) = services().rooms.get_auth_chain_from_cache(&chunk_key)? {
if let Some(cached) = services().rooms.auth_chain.get_auth_chain_from_cache(&chunk_key)? {
hits += 1;
full_auth_chain.extend(cached.iter().copied());
continue;
@ -1062,13 +1062,14 @@ pub(crate) async fn get_auth_chain<'a>(
let mut misses2 = 0;
let mut i = 0;
for (sevent_id, event_id) in chunk {
if let Some(cached) = services().rooms.get_auth_chain_from_cache(&[sevent_id])? {
if let Some(cached) = services().rooms.auth_chain.get_auth_chain_from_cache(&[sevent_id])? {
hits2 += 1;
chunk_cache.extend(cached.iter().copied());
} else {
misses2 += 1;
let auth_chain = Arc::new(get_auth_chain_inner(room_id, &event_id)?);
services().rooms
.auth_chain
.cache_auth_chain(vec![sevent_id], Arc::clone(&auth_chain))?;
println!(
"cache missed event {} with auth chain len {}",
@ -1091,7 +1092,7 @@ pub(crate) async fn get_auth_chain<'a>(
);
let chunk_cache = Arc::new(chunk_cache);
services().rooms
.cache_auth_chain(chunk_key, Arc::clone(&chunk_cache))?;
.auth_chain.cache_auth_chain(chunk_key, Arc::clone(&chunk_cache))?;
full_auth_chain.extend(chunk_cache.iter());
}
@ -1104,7 +1105,7 @@ pub(crate) async fn get_auth_chain<'a>(
Ok(full_auth_chain
.into_iter()
.filter_map(move |sid| services().rooms.get_eventid_from_short(sid).ok()))
.filter_map(move |sid| services().rooms.short.get_eventid_from_short(sid).ok()))
}
#[tracing::instrument(skip(event_id))]
@ -1116,14 +1117,14 @@ fn get_auth_chain_inner(
let mut found = HashSet::new();
while let Some(event_id) = todo.pop() {
match services().rooms.get_pdu(&event_id) {
match services().rooms.timeline.get_pdu(&event_id) {
Ok(Some(pdu)) => {
if pdu.room_id != room_id {
return Err(Error::BadRequest(ErrorKind::Forbidden, "Evil event in db"));
}
for auth_event in &pdu.auth_events {
let sauthevent = services()
.rooms
.rooms.short
.get_or_create_shorteventid(auth_event)?;
if !found.contains(&sauthevent) {
@ -1162,7 +1163,7 @@ pub async fn get_event_route(
.expect("server is authenticated");
let event = services()
.rooms
.rooms.timeline
.get_pdu_json(&body.event_id)?
.ok_or(Error::BadRequest(ErrorKind::NotFound, "Event not found."))?;
@ -1174,7 +1175,7 @@ pub async fn get_event_route(
let room_id = <&RoomId>::try_from(room_id_str)
.map_err(|_| Error::bad_database("Invalid room id field in event in database"))?;
if !services().rooms.server_in_room(sender_servername, room_id)? {
if !services().rooms.state_cache.server_in_room(sender_servername, room_id)? {
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Server is not in room",
@ -1203,21 +1204,21 @@ pub async fn get_missing_events_route(
.as_ref()
.expect("server is authenticated");
if !services().rooms.server_in_room(sender_servername, &body.room_id)? {
if !services().rooms.state_cache.server_in_room(sender_servername, &body.room_id)? {
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Server is not in room",
));
}
acl_check(sender_servername, &body.room_id)?;
services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?;
let mut queued_events = body.latest_events.clone();
let mut events = Vec::new();
let mut i = 0;
while i < queued_events.len() && events.len() < u64::from(body.limit) as usize {
if let Some(pdu) = services().rooms.get_pdu_json(&queued_events[i])? {
if let Some(pdu) = services().rooms.timeline.get_pdu_json(&queued_events[i])? {
let room_id_str = pdu
.get("room_id")
.and_then(|val| val.as_str())
@ -1275,17 +1276,17 @@ pub async fn get_event_authorization_route(
.as_ref()
.expect("server is authenticated");
if !services().rooms.server_in_room(sender_servername, &body.room_id)? {
if !services().rooms.state_cache.server_in_room(sender_servername, &body.room_id)? {
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Server is not in room.",
));
}
acl_check(sender_servername, &body.room_id)?;
services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?;
let event = services()
.rooms
.rooms.timeline
.get_pdu_json(&body.event_id)?
.ok_or(Error::BadRequest(ErrorKind::NotFound, "Event not found."))?;
@ -1301,7 +1302,7 @@ pub async fn get_event_authorization_route(
Ok(get_event_authorization::v1::Response {
auth_chain: auth_chain_ids
.filter_map(|id| services().rooms.get_pdu_json(&id).ok()?)
.filter_map(|id| services().rooms.timeline.get_pdu_json(&id).ok()?)
.map(PduEvent::convert_to_outgoing_federation_event)
.collect(),
})
@ -1322,17 +1323,17 @@ pub async fn get_room_state_route(
.as_ref()
.expect("server is authenticated");
if !services().rooms.server_in_room(sender_servername, &body.room_id)? {
if !services().rooms.state_cache.server_in_room(sender_servername, &body.room_id)? {
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Server is not in room.",
));
}
acl_check(sender_servername, &body.room_id)?;
services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?;
let shortstatehash = services()
.rooms
.rooms.state_accessor
.pdu_shortstatehash(&body.event_id)?
.ok_or(Error::BadRequest(
ErrorKind::NotFound,
@ -1340,13 +1341,13 @@ pub async fn get_room_state_route(
))?;
let pdus = services()
.rooms
.rooms.state_accessor
.state_full_ids(shortstatehash)
.await?
.into_iter()
.map(|(_, id)| {
PduEvent::convert_to_outgoing_federation_event(
services().rooms.get_pdu_json(&id).unwrap().unwrap(),
services().rooms.timeline.get_pdu_json(&id).unwrap().unwrap(),
)
})
.collect();
@ -1357,7 +1358,7 @@ pub async fn get_room_state_route(
Ok(get_room_state::v1::Response {
auth_chain: auth_chain_ids
.map(|id| {
services().rooms.get_pdu_json(&id).map(|maybe_json| {
services().rooms.timeline.get_pdu_json(&id).map(|maybe_json| {
PduEvent::convert_to_outgoing_federation_event(maybe_json.unwrap())
})
})
@ -1382,17 +1383,17 @@ pub async fn get_room_state_ids_route(
.as_ref()
.expect("server is authenticated");
if !services().rooms.server_in_room(sender_servername, &body.room_id)? {
if !services().rooms.state_cache.server_in_room(sender_servername, &body.room_id)? {
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Server is not in room.",
));
}
acl_check(sender_servername, &body.room_id)?;
services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?;
let shortstatehash = services()
.rooms
.rooms.state_accessor
.pdu_shortstatehash(&body.event_id)?
.ok_or(Error::BadRequest(
ErrorKind::NotFound,
@ -1400,7 +1401,7 @@ pub async fn get_room_state_ids_route(
))?;
let pdu_ids = services()
.rooms
.rooms.state_accessor
.state_full_ids(shortstatehash)
.await?
.into_iter()
@ -1426,7 +1427,7 @@ pub async fn create_join_event_template_route(
return Err(Error::bad_config("Federation is disabled."));
}
if !services().rooms.exists(&body.room_id)? {
if !services().rooms.metadata.exists(&body.room_id)? {
return Err(Error::BadRequest(
ErrorKind::NotFound,
"Room is unknown to this server.",
@ -1438,7 +1439,7 @@ pub async fn create_join_event_template_route(
.as_ref()
.expect("server is authenticated");
acl_check(sender_servername, &body.room_id)?;
services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?;
let mutex_state = Arc::clone(
services().globals
@ -1452,7 +1453,7 @@ pub async fn create_join_event_template_route(
// TODO: Conduit does not implement restricted join rules yet, we always reject
let join_rules_event =
services().rooms
services().rooms.state_accessor
.room_state_get(&body.room_id, &StateEventType::RoomJoinRules, "")?;
let join_rules_event_content: Option<RoomJoinRulesEventContent> = join_rules_event
@ -1477,8 +1478,8 @@ pub async fn create_join_event_template_route(
}
}
let room_version_id = services().rooms.state.get_room_version(&body.room_id);
if !body.ver.contains(room_version_id) {
let room_version_id = services().rooms.state.get_room_version(&body.room_id)?;
if !body.ver.contains(&room_version_id) {
return Err(Error::BadRequest(
ErrorKind::IncompatibleRoomVersion {
room_version: room_version_id,
@ -1505,7 +1506,7 @@ pub async fn create_join_event_template_route(
unsigned: None,
state_key: Some(body.user_id.to_string()),
redacts: None,
}, &body.user_id, &body.room_id, &state_lock);
}, &body.user_id, &body.room_id, &state_lock)?;
drop(state_lock);
@ -1524,18 +1525,18 @@ async fn create_join_event(
return Err(Error::bad_config("Federation is disabled."));
}
if !services().rooms.exists(room_id)? {
if !services().rooms.metadata.exists(room_id)? {
return Err(Error::BadRequest(
ErrorKind::NotFound,
"Room is unknown to this server.",
));
}
acl_check(sender_servername, room_id)?;
services().rooms.event_handler.acl_check(&sender_servername, room_id)?;
// TODO: Conduit does not implement restricted join rules yet, we always reject
let join_rules_event = services()
.rooms
.rooms.state_accessor
.room_state_get(room_id, &StateEventType::RoomJoinRules, "")?;
let join_rules_event_content: Option<RoomJoinRulesEventContent> = join_rules_event
@ -1562,8 +1563,8 @@ async fn create_join_event(
// We need to return the state prior to joining, let's keep a reference to that here
let shortstatehash = services()
.rooms
.current_shortstatehash(room_id)?
.rooms.state
.get_room_shortstatehash(room_id)?
.ok_or(Error::BadRequest(
ErrorKind::NotFound,
"Pdu state not found.",
@ -1602,22 +1603,15 @@ async fn create_join_event(
.or_default(),
);
let mutex_lock = mutex.lock().await;
let pdu_id = services().rooms.event_handler.handle_incoming_pdu(&origin, &event_id, room_id, value, true, &pub_key_map)
.await
.map_err(|e| {
warn!("Error while handling incoming send join PDU: {}", e);
Error::BadRequest(
ErrorKind::InvalidParam,
"Error while handling incoming PDU.",
)
})?
let pdu_id: Vec<u8> = services().rooms.event_handler.handle_incoming_pdu(&origin, &event_id, room_id, value, true, &pub_key_map)
.await?
.ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"Could not accept incoming PDU as timeline event.",
))?;
drop(mutex_lock);
let state_ids = services().rooms.state_full_ids(shortstatehash).await?;
let state_ids = services().rooms.state_accessor.state_full_ids(shortstatehash).await?;
let auth_chain_ids = get_auth_chain(
room_id,
state_ids.iter().map(|(_, id)| id.clone()).collect(),
@ -1626,6 +1620,7 @@ async fn create_join_event(
let servers = services()
.rooms
.state_cache
.room_servers(room_id)
.filter_map(|r| r.ok())
.filter(|server| &**server != services().globals.server_name());
@ -1634,12 +1629,12 @@ async fn create_join_event(
Ok(RoomState {
auth_chain: auth_chain_ids
.filter_map(|id| services().rooms.get_pdu_json(&id).ok().flatten())
.filter_map(|id| services().rooms.timeline.get_pdu_json(&id).ok().flatten())
.map(PduEvent::convert_to_outgoing_federation_event)
.collect(),
state: state_ids
.iter()
.filter_map(|(_, id)| services().rooms.get_pdu_json(id).ok().flatten())
.filter_map(|(_, id)| services().rooms.timeline.get_pdu_json(id).ok().flatten())
.map(PduEvent::convert_to_outgoing_federation_event)
.collect(),
})
@ -1692,7 +1687,7 @@ pub async fn create_invite_route(
.as_ref()
.expect("server is authenticated");
acl_check(sender_servername, &body.room_id)?;
services().rooms.event_handler.acl_check(&sender_servername, &body.room_id)?;
if !services().rooms.is_supported_version(&body.room_version) {
return Err(Error::BadRequest(
@ -1767,8 +1762,8 @@ pub async fn create_invite_route(
invite_state.push(pdu.to_stripped_state_event());
// If the room already exists, the remote server will notify us about the join via /send
if !services().rooms.exists(&pdu.room_id)? {
services().rooms.update_membership(
if !services().rooms.metadata.exists(&pdu.room_id)? {
services().rooms.state_cache.update_membership(
&body.room_id,
&invited_user,
MembershipState::Invite,
@ -1931,274 +1926,6 @@ pub async fn claim_keys_route(
})
}
#[tracing::instrument(skip_all)]
pub(crate) async fn fetch_required_signing_keys(
event: &BTreeMap<String, CanonicalJsonValue>,
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, Base64>>>,
) -> Result<()> {
let signatures = event
.get("signatures")
.ok_or(Error::BadServerResponse(
"No signatures in server response pdu.",
))?
.as_object()
.ok_or(Error::BadServerResponse(
"Invalid signatures object in server response pdu.",
))?;
// We go through all the signatures we see on the value and fetch the corresponding signing
// keys
for (signature_server, signature) in signatures {
let signature_object = signature.as_object().ok_or(Error::BadServerResponse(
"Invalid signatures content object in server response pdu.",
))?;
let signature_ids = signature_object.keys().cloned().collect::<Vec<_>>();
let fetch_res = fetch_signing_keys(
signature_server.as_str().try_into().map_err(|_| {
Error::BadServerResponse("Invalid servername in signatures of server response pdu.")
})?,
signature_ids,
)
.await;
let keys = match fetch_res {
Ok(keys) => keys,
Err(_) => {
warn!("Signature verification failed: Could not fetch signing key.",);
continue;
}
};
pub_key_map
.write()
.map_err(|_| Error::bad_database("RwLock is poisoned."))?
.insert(signature_server.clone(), keys);
}
Ok(())
}
// Gets a list of servers for which we don't have the signing key yet. We go over
// the PDUs and either cache the key or add it to the list that needs to be retrieved.
fn get_server_keys_from_cache(
pdu: &RawJsonValue,
servers: &mut BTreeMap<Box<ServerName>, BTreeMap<Box<ServerSigningKeyId>, QueryCriteria>>,
room_version: &RoomVersionId,
pub_key_map: &mut RwLockWriteGuard<'_, BTreeMap<String, BTreeMap<String, Base64>>>,
) -> Result<()> {
let value: CanonicalJsonObject = serde_json::from_str(pdu.get()).map_err(|e| {
error!("Invalid PDU in server response: {:?}: {:?}", pdu, e);
Error::BadServerResponse("Invalid PDU in server response")
})?;
let event_id = format!(
"${}",
ruma::signatures::reference_hash(&value, room_version)
.expect("ruma can calculate reference hashes")
);
let event_id = <&EventId>::try_from(event_id.as_str())
.expect("ruma's reference hashes are valid event ids");
if let Some((time, tries)) = services()
.globals
.bad_event_ratelimiter
.read()
.unwrap()
.get(event_id)
{
// Exponential backoff
let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries);
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) {
min_elapsed_duration = Duration::from_secs(60 * 60 * 24);
}
if time.elapsed() < min_elapsed_duration {
debug!("Backing off from {}", event_id);
return Err(Error::BadServerResponse("bad event, still backing off"));
}
}
let signatures = value
.get("signatures")
.ok_or(Error::BadServerResponse(
"No signatures in server response pdu.",
))?
.as_object()
.ok_or(Error::BadServerResponse(
"Invalid signatures object in server response pdu.",
))?;
for (signature_server, signature) in signatures {
let signature_object = signature.as_object().ok_or(Error::BadServerResponse(
"Invalid signatures content object in server response pdu.",
))?;
let signature_ids = signature_object.keys().cloned().collect::<Vec<_>>();
let contains_all_ids =
|keys: &BTreeMap<String, Base64>| signature_ids.iter().all(|id| keys.contains_key(id));
let origin = <&ServerName>::try_from(signature_server.as_str()).map_err(|_| {
Error::BadServerResponse("Invalid servername in signatures of server response pdu.")
})?;
if servers.contains_key(origin) || pub_key_map.contains_key(origin.as_str()) {
continue;
}
trace!("Loading signing keys for {}", origin);
let result: BTreeMap<_, _> = services()
.globals
.signing_keys_for(origin)?
.into_iter()
.map(|(k, v)| (k.to_string(), v.key))
.collect();
if !contains_all_ids(&result) {
trace!("Signing key not loaded for {}", origin);
servers.insert(origin.to_owned(), BTreeMap::new());
}
pub_key_map.insert(origin.to_string(), result);
}
Ok(())
}
pub(crate) async fn fetch_join_signing_keys(
event: &create_join_event::v2::Response,
room_version: &RoomVersionId,
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, Base64>>>,
) -> Result<()> {
let mut servers: BTreeMap<Box<ServerName>, BTreeMap<Box<ServerSigningKeyId>, QueryCriteria>> =
BTreeMap::new();
{
let mut pkm = pub_key_map
.write()
.map_err(|_| Error::bad_database("RwLock is poisoned."))?;
// Try to fetch keys, failure is okay
// Servers we couldn't find in the cache will be added to `servers`
for pdu in &event.room_state.state {
let _ = get_server_keys_from_cache(pdu, &mut servers, room_version, &mut pkm);
}
for pdu in &event.room_state.auth_chain {
let _ = get_server_keys_from_cache(pdu, &mut servers, room_version, &mut pkm);
}
drop(pkm);
}
if servers.is_empty() {
// We had all keys locally
return Ok(());
}
for server in services().globals.trusted_servers() {
trace!("Asking batch signing keys from trusted server {}", server);
if let Ok(keys) = services()
.sending
.send_federation_request(
server,
get_remote_server_keys_batch::v2::Request {
server_keys: servers.clone(),
},
)
.await
{
trace!("Got signing keys: {:?}", keys);
let mut pkm = pub_key_map
.write()
.map_err(|_| Error::bad_database("RwLock is poisoned."))?;
for k in keys.server_keys {
let k = k.deserialize().unwrap();
// TODO: Check signature from trusted server?
servers.remove(&k.server_name);
let result = services()
.globals
.add_signing_key(&k.server_name, k.clone())?
.into_iter()
.map(|(k, v)| (k.to_string(), v.key))
.collect::<BTreeMap<_, _>>();
pkm.insert(k.server_name.to_string(), result);
}
}
if servers.is_empty() {
return Ok(());
}
}
let mut futures: FuturesUnordered<_> = servers
.into_iter()
.map(|(server, _)| async move {
(
services().sending
.send_federation_request(
&server,
get_server_keys::v2::Request::new(),
)
.await,
server,
)
})
.collect();
while let Some(result) = futures.next().await {
if let (Ok(get_keys_response), origin) = result {
let result: BTreeMap<_, _> = services()
.globals
.add_signing_key(&origin, get_keys_response.server_key.deserialize().unwrap())?
.into_iter()
.map(|(k, v)| (k.to_string(), v.key))
.collect();
pub_key_map
.write()
.map_err(|_| Error::bad_database("RwLock is poisoned."))?
.insert(origin.to_string(), result);
}
}
Ok(())
}
/// Returns Ok if the acl allows the server
fn acl_check(server_name: &ServerName, room_id: &RoomId) -> Result<()> {
let acl_event = match services()
.rooms
.room_state_get(room_id, &StateEventType::RoomServerAcl, "")?
{
Some(acl) => acl,
None => return Ok(()),
};
let acl_event_content: RoomServerAclEventContent =
match serde_json::from_str(acl_event.content.get()) {
Ok(content) => content,
Err(_) => {
warn!("Invalid ACL event");
return Ok(());
}
};
if acl_event_content.is_allowed(server_name) {
Ok(())
} else {
Err(Error::BadRequest(
ErrorKind::Forbidden,
"Server was denied by ACL",
))
}
}
#[cfg(test)]
mod tests {
use super::{add_port_to_hostname, get_ip_with_port, FedDest};