feat: partially support sync filters

This commit is contained in:
Timo Kösters 2022-01-05 18:15:00 +01:00
parent 68e910bb77
commit 1bd9fd74b3
No known key found for this signature in database
GPG key ID: 356E705610F626D5
5 changed files with 133 additions and 61 deletions

View file

@ -1,6 +1,10 @@
use crate::{database::DatabaseGuard, ConduitResult, Database, Error, Result, Ruma, RumaResponse};
use ruma::{
api::client::r0::{sync::sync_events, uiaa::UiaaResponse},
api::client::r0::{
filter::{IncomingFilterDefinition, LazyLoadOptions},
sync::sync_events,
uiaa::UiaaResponse,
},
events::{
room::member::{MembershipState, RoomMemberEventContent},
AnySyncEphemeralRoomEvent, EventType,
@ -77,34 +81,32 @@ pub async fn sync_events_route(
Entry::Vacant(v) => {
let (tx, rx) = tokio::sync::watch::channel(None);
v.insert((body.since.clone(), rx.clone()));
tokio::spawn(sync_helper_wrapper(
Arc::clone(&arc_db),
sender_user.clone(),
sender_device.clone(),
body.since.clone(),
body.full_state,
body.timeout,
body,
tx,
));
v.insert((body.since.clone(), rx)).1.clone()
rx
}
Entry::Occupied(mut o) => {
if o.get().0 != body.since {
let (tx, rx) = tokio::sync::watch::channel(None);
o.insert((body.since.clone(), rx.clone()));
tokio::spawn(sync_helper_wrapper(
Arc::clone(&arc_db),
sender_user.clone(),
sender_device.clone(),
body.since.clone(),
body.full_state,
body.timeout,
body,
tx,
));
o.insert((body.since.clone(), rx.clone()));
rx
} else {
o.get().1.clone()
@ -135,18 +137,16 @@ async fn sync_helper_wrapper(
db: Arc<DatabaseGuard>,
sender_user: Box<UserId>,
sender_device: Box<DeviceId>,
since: Option<String>,
full_state: bool,
timeout: Option<Duration>,
body: sync_events::IncomingRequest,
tx: Sender<Option<ConduitResult<sync_events::Response>>>,
) {
let since = body.since.clone();
let r = sync_helper(
Arc::clone(&db),
sender_user.clone(),
sender_device.clone(),
since.clone(),
full_state,
timeout,
body,
)
.await;
@ -179,9 +179,7 @@ async fn sync_helper(
db: Arc<DatabaseGuard>,
sender_user: Box<UserId>,
sender_device: Box<DeviceId>,
since: Option<String>,
full_state: bool,
timeout: Option<Duration>,
body: sync_events::IncomingRequest,
// bool = caching allowed
) -> Result<(sync_events::Response, bool), Error> {
// TODO: match body.set_presence {
@ -193,8 +191,26 @@ async fn sync_helper(
let next_batch = db.globals.current_count()?;
let next_batch_string = next_batch.to_string();
// Load filter
let filter = match body.filter {
None => IncomingFilterDefinition::default(),
Some(sync_events::IncomingFilter::FilterDefinition(filter)) => filter,
Some(sync_events::IncomingFilter::FilterId(filter_id)) => db
.users
.get_filter(&sender_user, &filter_id)?
.unwrap_or_default(),
};
let (lazy_load_enabled, lazy_load_send_redundant) = match filter.room.state.lazy_load_options {
LazyLoadOptions::Enabled {
include_redundant_members: redundant,
} => (true, redundant),
_ => (false, false),
};
let mut joined_rooms = BTreeMap::new();
let since = since
let since = body
.since
.clone()
.and_then(|string| string.parse().ok())
.unwrap_or(0);
@ -374,8 +390,10 @@ async fn sync_helper(
.expect("state events have state keys");
if pdu.kind != EventType::RoomMember {
state_events.push(pdu);
} else if full_state || timeline_users.contains(state_key) {
// TODO: check filter: is ll enabled?
} else if !lazy_load_enabled
|| body.full_state
|| timeline_users.contains(state_key)
{
lazy_loaded.push(
UserId::parse(state_key.as_ref())
.expect("they are in timeline_users, so they should be correct"),
@ -432,15 +450,6 @@ async fn sync_helper(
let since_state_ids = db.rooms.state_full_ids(since_shortstatehash)?;
/*
let state_events = if joined_since_last_sync || full_state {
current_state_ids
.iter()
.map(|(_, id)| db.rooms.get_pdu(id))
.filter_map(|r| r.ok().flatten())
.collect::<Vec<_>>()
} else {
*/
let mut state_events = Vec::new();
let mut lazy_loaded = Vec::new();
@ -459,7 +468,7 @@ async fn sync_helper(
.expect("state events have state keys");
if pdu.kind != EventType::RoomMember {
if full_state || since_state_ids.get(&key) != Some(&id) {
if body.full_state || since_state_ids.get(&key) != Some(&id) {
state_events.push(pdu);
}
continue;
@ -469,16 +478,16 @@ async fn sync_helper(
let state_key_userid = UserId::parse(state_key.as_ref())
.expect("they are in timeline_users, so they should be correct");
if full_state || since_state_ids.get(&key) != Some(&id) {
if body.full_state || since_state_ids.get(&key) != Some(&id) {
lazy_loaded.push(state_key_userid);
state_events.push(pdu);
} else if timeline_users.contains(state_key)
&& !db.rooms.lazy_load_was_sent_before(
&& (!db.rooms.lazy_load_was_sent_before(
&sender_user,
&sender_device,
&room_id,
&state_key_userid,
)?
)? || lazy_load_send_redundant)
{
lazy_loaded.push(state_key_userid);
state_events.push(pdu);
@ -858,7 +867,7 @@ async fn sync_helper(
};
// TODO: Retry the endpoint instead of returning (waiting for #118)
if !full_state
if !body.full_state
&& response.rooms.is_empty()
&& response.presence.is_empty()
&& response.account_data.is_empty()
@ -867,7 +876,7 @@ async fn sync_helper(
{
// Hang a few seconds so requests are not spammed
// Stop hanging if new info arrives
let mut duration = timeout.unwrap_or_default();
let mut duration = body.timeout.unwrap_or_default();
if duration.as_secs() > 30 {
duration = Duration::from_secs(30);
}