Sliding sync improvements and redaction fixes

This commit is contained in:
Timo Kösters 2023-09-13 20:54:53 +02:00
parent 9b55ce933a
commit 75c80df271
No known key found for this signature in database
GPG key ID: 0B25E636FBA7E4CB
11 changed files with 270 additions and 219 deletions

View file

@ -17,7 +17,11 @@ use ruma::{
DeviceKeyAlgorithm, OwnedDeviceId, OwnedUserId, UserId,
};
use serde_json::json;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::{
collections::{hash_map, BTreeMap, HashMap, HashSet},
time::{Duration, Instant},
};
use tracing::debug;
/// # `POST /_matrix/client/r0/keys/upload`
///
@ -335,31 +339,68 @@ pub(crate) async fn get_keys_helper<F: Fn(&UserId) -> bool>(
let mut failures = BTreeMap::new();
let back_off = |id| match services()
.globals
.bad_query_ratelimiter
.write()
.unwrap()
.entry(id)
{
hash_map::Entry::Vacant(e) => {
e.insert((Instant::now(), 1));
}
hash_map::Entry::Occupied(mut e) => *e.get_mut() = (Instant::now(), e.get().1 + 1),
};
let mut futures: FuturesUnordered<_> = get_over_federation
.into_iter()
.map(|(server, vec)| async move {
if let Some((time, tries)) = services()
.globals
.bad_query_ratelimiter
.read()
.unwrap()
.get(&*server)
{
// Exponential backoff
let mut min_elapsed_duration = Duration::from_secs(30) * (*tries) * (*tries);
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) {
min_elapsed_duration = Duration::from_secs(60 * 60 * 24);
}
if time.elapsed() < min_elapsed_duration {
debug!("Backing off query from {:?}", server);
return (
server,
Err(Error::BadServerResponse("bad query, still backing off")),
);
}
}
let mut device_keys_input_fed = BTreeMap::new();
for (user_id, keys) in vec {
device_keys_input_fed.insert(user_id.to_owned(), keys.clone());
}
(
server,
services()
.sending
.send_federation_request(
tokio::time::timeout(
Duration::from_secs(25),
services().sending.send_federation_request(
server,
federation::keys::get_keys::v1::Request {
device_keys: device_keys_input_fed,
},
)
.await,
),
)
.await
.map_err(|e| Error::BadServerResponse("Query took too long")),
)
})
.collect();
while let Some((server, response)) = futures.next().await {
match response {
Ok(response) => {
Ok(Ok(response)) => {
for (user, masterkey) in response.master_keys {
let (master_key_id, mut master_key) =
services().users.parse_master_key(&user, &masterkey)?;
@ -386,7 +427,8 @@ pub(crate) async fn get_keys_helper<F: Fn(&UserId) -> bool>(
self_signing_keys.extend(response.self_signing_keys);
device_keys.extend(response.device_keys);
}
Err(_e) => {
_ => {
back_off(server.to_owned());
failures.insert(server.to_string(), json!({}));
}
}

View file

@ -34,6 +34,7 @@ pub async fn redact_event_route(
PduBuilder {
event_type: TimelineEventType::RoomRedaction,
content: to_raw_value(&RoomRedactionEventContent {
redacts: Some(body.event_id.clone()),
reason: body.reason.clone(),
})
.expect("event is valid, we just created it"),

View file

@ -142,8 +142,9 @@ pub async fn create_room_route(
content
}
None => {
// TODO: Add correct value for v11
let mut content = serde_json::from_str::<CanonicalJsonObject>(
to_raw_value(&RoomCreateEventContent::new(sender_user.clone()))
to_raw_value(&RoomCreateEventContent::new_v1(sender_user.clone()))
.map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Invalid creation content"))?
.get(),
)

View file

@ -1102,7 +1102,7 @@ async fn load_joined_room(
fn load_timeline(
sender_user: &UserId,
room_id: &RoomId,
sincecount: PduCount,
roomsincecount: PduCount,
limit: u64,
) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> {
let timeline_pdus;
@ -1111,7 +1111,7 @@ fn load_timeline(
.rooms
.timeline
.last_timeline_count(&sender_user, &room_id)?
> sincecount
> roomsincecount
{
let mut non_timeline_pdus = services()
.rooms
@ -1124,7 +1124,7 @@ fn load_timeline(
}
r.ok()
})
.take_while(|(pducount, _)| pducount > &sincecount);
.take_while(|(pducount, _)| pducount > &roomsincecount);
// Take the last events for the timeline
timeline_pdus = non_timeline_pdus
@ -1172,22 +1172,22 @@ fn share_encrypted_room(
pub async fn sync_events_v4_route(
body: Ruma<sync_events::v4::Request>,
) -> Result<sync_events::v4::Response, RumaResponse<UiaaResponse>> {
dbg!(&body.body);
let sender_user = body.sender_user.expect("user is authenticated");
let sender_device = body.sender_device.expect("user is authenticated");
let mut body = body.body;
// Setup watchers, so if there's no response, we can wait for them
let watcher = services().globals.watch(&sender_user, &sender_device);
let next_batch = services().globals.current_count()?;
let next_batch = services().globals.next_count()?;
let since = body
let globalsince = body
.pos
.as_ref()
.and_then(|string| string.parse().ok())
.unwrap_or(0);
let sincecount = PduCount::Normal(since);
if since == 0 {
if globalsince == 0 {
if let Some(conn_id) = &body.conn_id {
services().users.forget_sync_request_connection(
sender_user.clone(),
@ -1214,7 +1214,7 @@ pub async fn sync_events_v4_route(
if body.extensions.to_device.enabled.unwrap_or(false) {
services()
.users
.remove_to_device_events(&sender_user, &sender_device, since)?;
.remove_to_device_events(&sender_user, &sender_device, globalsince)?;
}
let mut left_encrypted_users = HashSet::new(); // Users that have left any encrypted rooms the sender was in
@ -1226,7 +1226,7 @@ pub async fn sync_events_v4_route(
device_list_changes.extend(
services()
.users
.keys_changed(sender_user.as_ref(), since, None)
.keys_changed(sender_user.as_ref(), globalsince, None)
.filter_map(|r| r.ok()),
);
@ -1242,7 +1242,7 @@ pub async fn sync_events_v4_route(
let since_shortstatehash = services()
.rooms
.user
.get_token_shortstatehash(&room_id, since)?;
.get_token_shortstatehash(&room_id, globalsince)?;
let since_sender_member: Option<RoomMemberEventContent> = since_shortstatehash
.and_then(|shortstatehash| {
@ -1371,7 +1371,7 @@ pub async fn sync_events_v4_route(
device_list_changes.extend(
services()
.users
.keys_changed(room_id.as_ref(), since, None)
.keys_changed(room_id.as_ref(), globalsince, None)
.filter_map(|r| r.ok()),
);
}
@ -1408,7 +1408,7 @@ pub async fn sync_events_v4_route(
continue;
}
let mut new_known_rooms = BTreeMap::new();
let mut new_known_rooms = BTreeSet::new();
lists.insert(
list_id.clone(),
@ -1424,12 +1424,12 @@ pub async fn sync_events_v4_route(
let room_ids = all_joined_rooms
[(u64::from(r.0) as usize)..=(u64::from(r.1) as usize)]
.to_vec();
new_known_rooms.extend(room_ids.iter().cloned().map(|r| (r, true)));
new_known_rooms.extend(room_ids.iter().cloned());
for room_id in &room_ids {
let todo_room = todo_rooms.entry(room_id.clone()).or_insert((
BTreeSet::new(),
0,
true,
u64::MAX,
));
let limit = list
.room_details
@ -1440,10 +1440,14 @@ pub async fn sync_events_v4_route(
.0
.extend(list.room_details.required_state.iter().cloned());
todo_room.1 = todo_room.1.max(limit);
if known_rooms.get(&list_id).and_then(|k| k.get(room_id)) != Some(&true)
{
todo_room.2 = false;
}
// 0 means unknown because it got out of date
todo_room.2 = todo_room.2.min(
known_rooms
.get(&list_id)
.and_then(|k| k.get(room_id))
.copied()
.unwrap_or(0),
);
}
sync_events::v4::SyncOp {
op: SlidingOp::Sync,
@ -1465,26 +1469,28 @@ pub async fn sync_events_v4_route(
conn_id.clone(),
list_id,
new_known_rooms,
globalsince,
);
}
}
let mut known_subscription_rooms = BTreeMap::new();
let mut known_subscription_rooms = BTreeSet::new();
for (room_id, room) in &body.room_subscriptions {
let todo_room = todo_rooms
.entry(room_id.clone())
.or_insert((BTreeSet::new(), 0, true));
.or_insert((BTreeSet::new(), 0, u64::MAX));
let limit = room.timeline_limit.map_or(10, u64::from).min(100);
todo_room.0.extend(room.required_state.iter().cloned());
todo_room.1 = todo_room.1.max(limit);
if known_rooms
.get("subscriptions")
.and_then(|k| k.get(room_id))
!= Some(&true)
{
todo_room.2 = false;
}
known_subscription_rooms.insert(room_id.clone(), true);
// 0 means unknown because it got out of date
todo_room.2 = todo_room.2.min(
known_rooms
.get("subscriptions")
.and_then(|k| k.get(room_id))
.copied()
.unwrap_or(0),
);
known_subscription_rooms.insert(room_id.clone());
}
for r in body.unsubscribe_rooms {
@ -1499,6 +1505,7 @@ pub async fn sync_events_v4_route(
conn_id.clone(),
"subscriptions".to_owned(),
known_subscription_rooms,
globalsince,
);
}
@ -1512,12 +1519,13 @@ pub async fn sync_events_v4_route(
}
let mut rooms = BTreeMap::new();
for (room_id, (required_state_request, timeline_limit, known)) in &todo_rooms {
// TODO: per-room sync tokens
let (timeline_pdus, limited) =
load_timeline(&sender_user, &room_id, sincecount, *timeline_limit)?;
for (room_id, (required_state_request, timeline_limit, roomsince)) in &todo_rooms {
let roomsincecount = PduCount::Normal(*roomsince);
if *known && timeline_pdus.is_empty() {
let (timeline_pdus, limited) =
load_timeline(&sender_user, &room_id, roomsincecount, *timeline_limit)?;
if roomsince != &0 && timeline_pdus.is_empty() {
continue;
}
@ -1533,8 +1541,8 @@ pub async fn sync_events_v4_route(
}))
})?
.or_else(|| {
if since != 0 {
Some(since.to_string())
if roomsince != &0 {
Some(roomsince.to_string())
} else {
None
}
@ -1621,7 +1629,7 @@ pub async fn sync_events_v4_route(
.state_accessor
.get_avatar(&room_id)?
.map_or(avatar, |a| a.url),
initial: Some(!known),
initial: Some(roomsince == &0),
is_dm: None,
invite_state: None,
unread_notifications: UnreadNotificationsCount {
@ -1663,6 +1671,7 @@ pub async fn sync_events_v4_route(
.into(),
),
num_live: None, // Count events in timeline greater than global sync counter
timestamp: None,
},
);
}
@ -1680,8 +1689,8 @@ pub async fn sync_events_v4_route(
let _ = tokio::time::timeout(duration, watcher).await;
}
Ok(sync_events::v4::Response {
initial: since == 0,
Ok(dbg!(sync_events::v4::Response {
initial: globalsince == 0,
txn_id: body.txn_id.clone(),
pos: next_batch.to_string(),
lists,
@ -1712,7 +1721,7 @@ pub async fn sync_events_v4_route(
global: if body.extensions.account_data.enabled.unwrap_or(false) {
services()
.account_data
.changes_since(None, &sender_user, since)?
.changes_since(None, &sender_user, globalsince)?
.into_iter()
.filter_map(|(_, v)| {
serde_json::from_str(v.json().get())
@ -1735,5 +1744,5 @@ pub async fn sync_events_v4_route(
},
},
delta_token: None,
})
}))
}

View file

@ -84,8 +84,6 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
}
/// Returns the pdu.
///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result<Option<PduEvent>> {
self.eventid_pduid
.get(event_id.as_bytes())?

View file

@ -932,7 +932,7 @@ impl Service {
services().users.create(&conduit_user, None)?;
let mut content = RoomCreateEventContent::new(conduit_user.clone());
let mut content = RoomCreateEventContent::new_v1(conduit_user.clone());
content.federate = true;
content.predecessor = None;
content.room_version = services().globals.default_room_version();

View file

@ -56,6 +56,7 @@ pub struct Service {
pub unstable_room_versions: Vec<RoomVersionId>,
pub bad_event_ratelimiter: Arc<RwLock<HashMap<OwnedEventId, RateLimitState>>>,
pub bad_signature_ratelimiter: Arc<RwLock<HashMap<Vec<String>, RateLimitState>>>,
pub bad_query_ratelimiter: Arc<RwLock<HashMap<OwnedServerName, RateLimitState>>>,
pub servername_ratelimiter: Arc<RwLock<HashMap<OwnedServerName, Arc<Semaphore>>>>,
pub sync_receivers: RwLock<HashMap<(OwnedUserId, OwnedDeviceId), SyncHandle>>,
pub roomid_mutex_insert: RwLock<HashMap<OwnedRoomId, Arc<Mutex<()>>>>,
@ -160,6 +161,7 @@ impl Service {
unstable_room_versions,
bad_event_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
bad_signature_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
bad_query_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
servername_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
roomid_mutex_state: RwLock::new(HashMap::new()),
roomid_mutex_insert: RwLock::new(HashMap::new()),

View file

@ -326,7 +326,7 @@ impl Service {
Ok(ruma::signatures::Verified::Signatures) => {
// Redact
warn!("Calculated hash does not match: {}", event_id);
match ruma::canonical_json::redact(value, room_version_id, None) {
let obj = match ruma::canonical_json::redact(value, room_version_id, None) {
Ok(obj) => obj,
Err(_) => {
return Err(Error::BadRequest(
@ -334,7 +334,17 @@ impl Service {
"Redaction failed",
))
}
};
// Skip the PDU if it is redacted and we already have it as an outlier event
if services().rooms.timeline.get_pdu_json(event_id)?.is_some() {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Event was redacted and we already knew about it",
));
}
obj
}
Ok(ruma::signatures::Verified::All) => value,
};
@ -1564,6 +1574,11 @@ impl Service {
}
};
if acl_event_content.allow.is_empty() {
// Ignore broken acl events
return Ok(());
}
if acl_event_content.is_allowed(server_name) {
Ok(())
} else {

View file

@ -1,6 +1,6 @@
mod data;
use std::{
collections::BTreeMap,
collections::{BTreeMap, BTreeSet},
mem,
sync::{Arc, Mutex},
};
@ -28,7 +28,7 @@ use crate::{services, Error, Result};
pub struct SlidingSyncCache {
lists: BTreeMap<String, SyncRequestList>,
subscriptions: BTreeMap<OwnedRoomId, sync_events::v4::RoomSubscription>,
known_rooms: BTreeMap<String, BTreeMap<OwnedRoomId, bool>>,
known_rooms: BTreeMap<String, BTreeMap<OwnedRoomId, u64>>, // For every room, the roomsince number
extensions: ExtensionsConfig,
}
@ -61,7 +61,7 @@ impl Service {
user_id: OwnedUserId,
device_id: OwnedDeviceId,
request: &mut sync_events::v4::Request,
) -> BTreeMap<String, BTreeMap<OwnedRoomId, bool>> {
) -> BTreeMap<String, BTreeMap<OwnedRoomId, u64>> {
let Some(conn_id) = request.conn_id.clone() else {
return BTreeMap::new();
};
@ -127,6 +127,7 @@ impl Service {
}
}
(_, Some(cached_filters)) => list.filters = Some(cached_filters),
(Some(list_filters), _) => list.filters = Some(list_filters.clone()),
(_, _) => {}
}
if list.bump_event_types.is_empty() {
@ -210,7 +211,8 @@ impl Service {
device_id: OwnedDeviceId,
conn_id: String,
list_id: String,
new_cached_rooms: BTreeMap<OwnedRoomId, bool>,
new_cached_rooms: BTreeSet<OwnedRoomId>,
globalsince: u64,
) {
let mut cache = self.connections.lock().unwrap();
let cached = Arc::clone(
@ -228,7 +230,20 @@ impl Service {
let cached = &mut cached.lock().unwrap();
drop(cache);
cached.known_rooms.insert(list_id, new_cached_rooms);
for (roomid, lastsince) in cached
.known_rooms
.entry(list_id.clone())
.or_default()
.iter_mut()
{
if !new_cached_rooms.contains(roomid) {
*lastsince = 0;
}
}
let list = cached.known_rooms.entry(list_id).or_default();
for roomid in new_cached_rooms {
list.insert(roomid, globalsince);
}
}
/// Check if account is deactivated