feat: very simple sliding sync implementation
This commit is contained in:
parent
f8a36e7554
commit
e4f769963f
12 changed files with 284 additions and 93 deletions
|
@ -1,4 +1,6 @@
|
|||
use crate::{service::rooms::timeline::PduCount, services, Error, Result, Ruma, RumaResponse};
|
||||
use crate::{
|
||||
service::rooms::timeline::PduCount, services, Error, PduEvent, Result, Ruma, RumaResponse,
|
||||
};
|
||||
use ruma::{
|
||||
api::client::{
|
||||
filter::{FilterDefinition, LazyLoadOptions},
|
||||
|
@ -8,6 +10,7 @@ use ruma::{
|
|||
Ephemeral, Filter, GlobalAccountData, InviteState, InvitedRoom, JoinedRoom,
|
||||
LeftRoom, Presence, RoomAccountData, RoomSummary, Rooms, State, Timeline, ToDevice,
|
||||
},
|
||||
v4::SlidingOp,
|
||||
DeviceLists, UnreadNotificationsCount,
|
||||
},
|
||||
uiaa::UiaaResponse,
|
||||
|
@ -17,10 +20,10 @@ use ruma::{
|
|||
StateEventType, TimelineEventType,
|
||||
},
|
||||
serde::Raw,
|
||||
DeviceId, OwnedDeviceId, OwnedUserId, RoomId, UserId,
|
||||
uint, DeviceId, OwnedDeviceId, OwnedUserId, RoomId, UInt, UserId,
|
||||
};
|
||||
use std::{
|
||||
collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
|
||||
collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
|
@ -199,7 +202,7 @@ async fn sync_helper(
|
|||
let mut joined_rooms = BTreeMap::new();
|
||||
let since = body
|
||||
.since
|
||||
.clone()
|
||||
.as_ref()
|
||||
.and_then(|string| string.parse().ok())
|
||||
.unwrap_or(0);
|
||||
let sincecount = PduCount::Normal(since);
|
||||
|
@ -581,43 +584,7 @@ async fn load_joined_room(
|
|||
drop(insert_lock);
|
||||
}
|
||||
|
||||
let timeline_pdus;
|
||||
let limited;
|
||||
if services()
|
||||
.rooms
|
||||
.timeline
|
||||
.last_timeline_count(&sender_user, &room_id)?
|
||||
> sincecount
|
||||
{
|
||||
let mut non_timeline_pdus = services()
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_until(&sender_user, &room_id, PduCount::max())?
|
||||
.filter_map(|r| {
|
||||
// Filter out buggy events
|
||||
if r.is_err() {
|
||||
error!("Bad pdu in pdus_since: {:?}", r);
|
||||
}
|
||||
r.ok()
|
||||
})
|
||||
.take_while(|(pducount, _)| pducount > &sincecount);
|
||||
|
||||
// Take the last 10 events for the timeline
|
||||
timeline_pdus = non_timeline_pdus
|
||||
.by_ref()
|
||||
.take(10)
|
||||
.collect::<Vec<_>>()
|
||||
.into_iter()
|
||||
.rev()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// They /sync response doesn't always return all messages, so we say the output is
|
||||
// limited unless there are events in non_timeline_pdus
|
||||
limited = non_timeline_pdus.next().is_some();
|
||||
} else {
|
||||
timeline_pdus = Vec::new();
|
||||
limited = false;
|
||||
}
|
||||
let (timeline_pdus, limited) = load_timeline(sender_user, room_id, sincecount, 10)?;
|
||||
|
||||
let send_notification_counts = !timeline_pdus.is_empty()
|
||||
|| services()
|
||||
|
@ -1132,6 +1099,52 @@ async fn load_joined_room(
|
|||
})
|
||||
}
|
||||
|
||||
fn load_timeline(
|
||||
sender_user: &UserId,
|
||||
room_id: &RoomId,
|
||||
sincecount: PduCount,
|
||||
limit: u64,
|
||||
) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> {
|
||||
let timeline_pdus;
|
||||
let limited;
|
||||
if services()
|
||||
.rooms
|
||||
.timeline
|
||||
.last_timeline_count(&sender_user, &room_id)?
|
||||
> sincecount
|
||||
{
|
||||
let mut non_timeline_pdus = services()
|
||||
.rooms
|
||||
.timeline
|
||||
.pdus_until(&sender_user, &room_id, PduCount::max())?
|
||||
.filter_map(|r| {
|
||||
// Filter out buggy events
|
||||
if r.is_err() {
|
||||
error!("Bad pdu in pdus_since: {:?}", r);
|
||||
}
|
||||
r.ok()
|
||||
})
|
||||
.take_while(|(pducount, _)| pducount > &sincecount);
|
||||
|
||||
// Take the last events for the timeline
|
||||
timeline_pdus = non_timeline_pdus
|
||||
.by_ref()
|
||||
.take(limit as usize)
|
||||
.collect::<Vec<_>>()
|
||||
.into_iter()
|
||||
.rev()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// They /sync response doesn't always return all messages, so we say the output is
|
||||
// limited unless there are events in non_timeline_pdus
|
||||
limited = non_timeline_pdus.next().is_some();
|
||||
} else {
|
||||
timeline_pdus = Vec::new();
|
||||
limited = false;
|
||||
}
|
||||
Ok((timeline_pdus, limited))
|
||||
}
|
||||
|
||||
fn share_encrypted_room(
|
||||
sender_user: &UserId,
|
||||
user_id: &UserId,
|
||||
|
@ -1155,3 +1168,178 @@ fn share_encrypted_room(
|
|||
})
|
||||
.any(|encrypted| encrypted))
|
||||
}
|
||||
|
||||
pub async fn sync_events_v4_route(
|
||||
body: Ruma<sync_events::v4::Request>,
|
||||
) -> Result<sync_events::v4::Response, RumaResponse<UiaaResponse>> {
|
||||
let sender_user = body.sender_user.expect("user is authenticated");
|
||||
let sender_device = body.sender_device.expect("user is authenticated");
|
||||
let body = dbg!(body.body);
|
||||
|
||||
// Setup watchers, so if there's no response, we can wait for them
|
||||
let watcher = services().globals.watch(&sender_user, &sender_device);
|
||||
|
||||
let next_batch = services().globals.current_count()?;
|
||||
|
||||
let since = body
|
||||
.pos
|
||||
.as_ref()
|
||||
.and_then(|string| string.parse().ok())
|
||||
.unwrap_or(0);
|
||||
let sincecount = PduCount::Normal(since);
|
||||
|
||||
let initial = since == 0;
|
||||
|
||||
let all_joined_rooms = services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.rooms_joined(&sender_user)
|
||||
.filter_map(|r| r.ok())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut lists = BTreeMap::new();
|
||||
let mut todo_rooms = BTreeMap::new(); // and required state
|
||||
|
||||
for (list_id, list) in body.lists {
|
||||
if list.filters.and_then(|f| f.is_invite).unwrap_or(false) {
|
||||
continue;
|
||||
}
|
||||
|
||||
lists.insert(
|
||||
list_id,
|
||||
sync_events::v4::SyncList {
|
||||
ops: list
|
||||
.ranges
|
||||
.into_iter()
|
||||
.map(|mut r| {
|
||||
r.0 =
|
||||
r.0.clamp(uint!(0), UInt::from(all_joined_rooms.len() as u32 - 1));
|
||||
r.1 =
|
||||
r.1.clamp(r.0, UInt::from(all_joined_rooms.len() as u32 - 1));
|
||||
let room_ids = all_joined_rooms
|
||||
[(u64::from(r.0) as usize)..=(u64::from(r.1) as usize)]
|
||||
.to_vec();
|
||||
todo_rooms.extend(room_ids.iter().cloned().map(|r| {
|
||||
let limit = list
|
||||
.room_details
|
||||
.timeline_limit
|
||||
.map_or(10, u64::from)
|
||||
.min(100);
|
||||
(r, (list.room_details.required_state.clone(), limit))
|
||||
}));
|
||||
sync_events::v4::SyncOp {
|
||||
op: SlidingOp::Sync,
|
||||
range: Some(r.clone()),
|
||||
index: None,
|
||||
room_ids,
|
||||
room_id: None,
|
||||
}
|
||||
})
|
||||
.collect(),
|
||||
count: UInt::from(all_joined_rooms.len() as u32),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
let mut rooms = BTreeMap::new();
|
||||
for (room_id, (required_state_request, timeline_limit)) in todo_rooms {
|
||||
let (timeline_pdus, limited) =
|
||||
load_timeline(&sender_user, &room_id, sincecount, timeline_limit)?;
|
||||
|
||||
let room_events: Vec<_> = timeline_pdus
|
||||
.iter()
|
||||
.map(|(_, pdu)| pdu.to_sync_room_event())
|
||||
.collect();
|
||||
|
||||
let required_state = required_state_request
|
||||
.iter()
|
||||
.map(|state| {
|
||||
services()
|
||||
.rooms
|
||||
.state_accessor
|
||||
.room_state_get(&room_id, &state.0, &state.1)
|
||||
})
|
||||
.filter_map(|r| r.ok())
|
||||
.filter_map(|o| o)
|
||||
.map(|state| state.to_sync_state_event())
|
||||
.collect();
|
||||
|
||||
rooms.insert(
|
||||
room_id.clone(),
|
||||
sync_events::v4::SlidingSyncRoom {
|
||||
name: services().rooms.state_accessor.get_name(&room_id)?,
|
||||
initial: Some(initial),
|
||||
is_dm: None,
|
||||
invite_state: None,
|
||||
unread_notifications: UnreadNotificationsCount {
|
||||
highlight_count: None,
|
||||
notification_count: None,
|
||||
},
|
||||
timeline: room_events,
|
||||
required_state,
|
||||
prev_batch: None,
|
||||
limited,
|
||||
joined_count: Some(
|
||||
(services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.room_joined_count(&room_id)?
|
||||
.unwrap_or(0) as u32)
|
||||
.into(),
|
||||
),
|
||||
invited_count: Some(
|
||||
(services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.room_invited_count(&room_id)?
|
||||
.unwrap_or(0) as u32)
|
||||
.into(),
|
||||
),
|
||||
num_live: None,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
if rooms
|
||||
.iter()
|
||||
.all(|(_, r)| r.timeline.is_empty() && r.required_state.is_empty())
|
||||
{
|
||||
// Hang a few seconds so requests are not spammed
|
||||
// Stop hanging if new info arrives
|
||||
let mut duration = body.timeout.unwrap_or(Duration::from_secs(30));
|
||||
if duration.as_secs() > 30 {
|
||||
duration = Duration::from_secs(30);
|
||||
}
|
||||
let _ = tokio::time::timeout(duration, watcher).await;
|
||||
}
|
||||
|
||||
Ok(dbg!(sync_events::v4::Response {
|
||||
initial: initial,
|
||||
txn_id: body.txn_id.clone(),
|
||||
pos: next_batch.to_string(),
|
||||
lists,
|
||||
rooms,
|
||||
extensions: sync_events::v4::Extensions {
|
||||
to_device: None,
|
||||
e2ee: sync_events::v4::E2EE {
|
||||
device_lists: DeviceLists {
|
||||
changed: Vec::new(),
|
||||
left: Vec::new(),
|
||||
},
|
||||
device_one_time_keys_count: BTreeMap::new(),
|
||||
device_unused_fallback_key_types: None,
|
||||
},
|
||||
account_data: sync_events::v4::AccountData {
|
||||
global: Vec::new(),
|
||||
rooms: BTreeMap::new(),
|
||||
},
|
||||
receipts: sync_events::v4::Receipts {
|
||||
rooms: BTreeMap::new(),
|
||||
},
|
||||
typing: sync_events::v4::Typing {
|
||||
rooms: BTreeMap::new(),
|
||||
},
|
||||
},
|
||||
delta_token: None,
|
||||
}))
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue