feat: implement /keys/changes

This commit is contained in:
timokoesters 2020-07-29 17:37:26 +02:00
parent 0693387769
commit 66bc25fcd3
No known key found for this signature in database
GPG key ID: 24DA7517711A2BA4
6 changed files with 96 additions and 50 deletions

View file

@ -35,7 +35,7 @@ use ruma::{
set_room_visibility,
},
filter::{self, create_filter, get_filter},
keys::{self, claim_keys, get_keys, upload_keys},
keys::{self, claim_keys, get_key_changes, get_keys, upload_keys},
media::{create_content, get_content, get_content_thumbnail, get_media_config},
membership::{
ban_user, forget_room, get_member_events, invite_user, join_room_by_id,
@ -2552,7 +2552,11 @@ pub async fn sync_events_route(
let mut send_member_count = false;
let mut joined_since_last_sync = false;
let mut send_notification_counts = false;
for pdu in db.rooms.pdus_since(&sender_id, &room_id, since)?.filter_map(|r| r.ok()) {
for pdu in db
.rooms
.pdus_since(&sender_id, &room_id, since)?
.filter_map(|r| r.ok())
{
send_notification_counts = true;
if pdu.kind == EventType::RoomMember {
send_member_count = true;
@ -2767,7 +2771,7 @@ pub async fn sync_events_route(
// Look for device list updates in this room
device_list_updates.extend(
db.users
.keys_changed(&room_id, since)
.keys_changed(&room_id, since, None)
.filter_map(|r| r.ok()),
);
@ -3529,6 +3533,39 @@ pub fn upload_signatures_route(
Ok(upload_signatures::Response.into())
}
#[cfg_attr(
feature = "conduit_bin",
get("/_matrix/client/r0/keys/changes", data = "<body>")
)]
pub fn get_key_changes_route(
db: State<'_, Database>,
body: Ruma<get_key_changes::Request>,
) -> ConduitResult<get_key_changes::Response> {
let sender_id = body.sender_id.as_ref().expect("user is authenticated");
let mut device_list_updates = HashSet::new();
for room_id in db.rooms.rooms_joined(sender_id).filter_map(|r| r.ok()) {
device_list_updates.extend(
db.users
.keys_changed(
&room_id,
body.from.parse().map_err(|_| {
Error::BadRequest(ErrorKind::InvalidParam, "Invalid `from`.")
})?,
Some(body.to.parse().map_err(|_| {
Error::BadRequest(ErrorKind::InvalidParam, "Invalid `to`.")
})?),
)
.filter_map(|r| r.ok()),
);
}
Ok(get_key_changes::Response {
changed: device_list_updates.into_iter().collect(),
left: Vec::new(), // TODO
}
.into())
}
#[cfg_attr(feature = "conduit_bin", get("/_matrix/client/r0/pushers"))]
pub fn pushers_route() -> ConduitResult<get_pushers::Response> {
Ok(get_pushers::Response {

View file

@ -611,8 +611,8 @@ impl Rooms {
self.pdus_since(user_id, room_id, 0)
}
/// Returns an iterator over all events in a room that happened after the event with id `since`
/// in reverse-chronological order.
/// Returns a double-ended iterator over all events in a room that happened after the event with id `since`
/// in chronological order.
pub fn pdus_since(
&self,
user_id: &UserId,
@ -624,7 +624,7 @@ impl Rooms {
// Skip the first pdu if it's exactly at since, because we sent that last time
let mut first_pdu_id = prefix.clone();
first_pdu_id.extend_from_slice(&(since+1).to_be_bytes());
first_pdu_id.extend_from_slice(&(since + 1).to_be_bytes());
let mut last_pdu_id = prefix.clone();
last_pdu_id.extend_from_slice(&u64::MAX.to_be_bytes());

View file

@ -352,7 +352,8 @@ impl RoomEdus {
.ok()?,
))
})
.take_while(|(_, timestamp)| current_timestamp - timestamp > 5 * 60_000) // 5 Minutes
.take_while(|(_, timestamp)| current_timestamp - timestamp > 5 * 60_000)
// 5 Minutes
{
self.userid_lastpresenceupdate.remove(&user_id_bytes)?;

View file

@ -9,7 +9,7 @@ use ruma::{
},
},
events::{AnyToDeviceEvent, EventType},
DeviceId, Raw, UserId, RoomId,
DeviceId, Raw, RoomId, UserId,
};
use std::{collections::BTreeMap, convert::TryFrom, mem, time::SystemTime};
@ -389,8 +389,7 @@ impl Users {
key.push(0xff);
key.extend_from_slice(&count);
self.keychangeid_userid
.insert(key, &*user_id.to_string())?;
self.keychangeid_userid.insert(key, &*user_id.to_string())?;
}
Ok(())
@ -497,8 +496,7 @@ impl Users {
key.push(0xff);
key.extend_from_slice(&count);
self.keychangeid_userid
.insert(key, &*user_id.to_string())?;
self.keychangeid_userid.insert(key, &*user_id.to_string())?;
}
Ok(())
@ -556,14 +554,23 @@ impl Users {
Ok(())
}
pub fn keys_changed(&self, room_id: &RoomId, since: u64) -> impl Iterator<Item = Result<UserId>> {
pub fn keys_changed(
&self,
room_id: &RoomId,
from: u64,
to: Option<u64>,
) -> impl Iterator<Item = Result<UserId>> {
let mut prefix = room_id.to_string().as_bytes().to_vec();
prefix.push(0xff);
let mut start = prefix.clone();
start.extend_from_slice(&(since + 1).to_be_bytes());
start.extend_from_slice(&(from + 1).to_be_bytes());
let mut end = prefix.clone();
end.extend_from_slice(&to.unwrap_or(u64::MAX).to_be_bytes());
self.keychangeid_userid
.range(start..)
.range(start..end)
.filter_map(|r| r.ok())
.take_while(move |(k, _)| k.starts_with(&prefix))
.map(|(_, bytes)| {

View file

@ -108,6 +108,7 @@ fn setup_rocket() -> rocket::Rocket {
client_server::options_route,
client_server::upload_signing_keys_route,
client_server::upload_signatures_route,
client_server::get_key_changes_route,
client_server::pushers_route,
client_server::set_pushers_route,
//server_server::well_known_server,