Merge branch 'up-ruma' into 'next'

Upgrade Ruma

See merge request famedly/conduit!210
This commit is contained in:
Timo Kösters 2021-10-15 11:38:12 +00:00
commit dc8bc4a880
39 changed files with 902 additions and 948 deletions

View file

@ -22,7 +22,7 @@ pub trait Tree: Send + Sync {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>>;
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()>;
fn insert_batch<'a>(&self, iter: &mut dyn Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<()>;
fn insert_batch(&self, iter: &mut dyn Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<()>;
fn remove(&self, key: &[u8]) -> Result<()>;
@ -35,7 +35,7 @@ pub trait Tree: Send + Sync {
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>;
fn increment(&self, key: &[u8]) -> Result<Vec<u8>>;
fn increment_batch<'a>(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()>;
fn increment_batch(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()>;
fn scan_prefix<'a>(
&'a self,

View file

@ -6,16 +6,17 @@ use std::{
use crate::{pdu::PduBuilder, Database};
use rocket::futures::{channel::mpsc, stream::StreamExt};
use ruma::{
events::{room::message, EventType},
events::{room::message::RoomMessageEventContent, EventType},
UserId,
};
use serde_json::value::to_raw_value;
use tokio::sync::{MutexGuard, RwLock, RwLockReadGuard};
use tracing::warn;
pub enum AdminCommand {
RegisterAppservice(serde_yaml::Value),
ListAppservices,
SendMessage(message::MessageEventContent),
SendMessage(RoomMessageEventContent),
}
#[derive(Clone)]
@ -58,7 +59,7 @@ impl Admin {
drop(guard);
let send_message = |message: message::MessageEventContent,
let send_message = |message: RoomMessageEventContent,
guard: RwLockReadGuard<'_, Database>,
mutex_lock: &MutexGuard<'_, ()>| {
guard
@ -66,7 +67,7 @@ impl Admin {
.build_and_append_pdu(
PduBuilder {
event_type: EventType::RoomMessage,
content: serde_json::to_value(message)
content: to_raw_value(&message)
.expect("event is valid, we just created it"),
unsigned: None,
state_key: None,
@ -106,9 +107,9 @@ impl Admin {
count,
appservices.into_iter().filter_map(|r| r.ok()).collect::<Vec<_>>().join(", ")
);
send_message(message::MessageEventContent::text_plain(output), guard, &state_lock);
send_message(RoomMessageEventContent::text_plain(output), guard, &state_lock);
} else {
send_message(message::MessageEventContent::text_plain("Failed to get appservices."), guard, &state_lock);
send_message(RoomMessageEventContent::text_plain("Failed to get appservices."), guard, &state_lock);
}
}
AdminCommand::SendMessage(message) => {

View file

@ -57,8 +57,7 @@ pub struct RotationHandler(broadcast::Sender<()>, broadcast::Receiver<()>);
impl RotationHandler {
pub fn new() -> Self {
let (s, r) = broadcast::channel::<()>(1);
let (s, r) = broadcast::channel(1);
Self(s, r)
}
@ -278,8 +277,8 @@ impl Globals {
let signingkeys = self
.server_signingkeys
.get(origin.as_bytes())?
.and_then(|bytes| serde_json::from_slice::<ServerSigningKeys>(&bytes).ok())
.map(|keys| {
.and_then(|bytes| serde_json::from_slice(&bytes).ok())
.map(|keys: ServerSigningKeys| {
let mut tree = keys.verify_keys;
tree.extend(
keys.old_verify_keys

View file

@ -81,7 +81,7 @@ impl KeyBackups {
)?;
self.backupid_etag
.insert(&key, &globals.next_count()?.to_be_bytes())?;
Ok(version.to_string())
Ok(version.to_owned())
}
pub fn get_latest_backup_version(&self, user_id: &UserId) -> Result<Option<String>> {
@ -94,15 +94,15 @@ impl KeyBackups {
.iter_from(&last_possible_key, true)
.take_while(move |(k, _)| k.starts_with(&prefix))
.next()
.map_or(Ok(None), |(key, _)| {
.map(|(key, _)| {
utils::string_from_bytes(
key.rsplit(|&b| b == 0xff)
.next()
.expect("rsplit always returns an element"),
)
.map_err(|_| Error::bad_database("backupid_algorithm key is invalid."))
.map(Some)
})
.transpose()
}
pub fn get_latest_backup(&self, user_id: &UserId) -> Result<Option<(String, BackupAlgorithm)>> {
@ -115,7 +115,7 @@ impl KeyBackups {
.iter_from(&last_possible_key, true)
.take_while(move |(k, _)| k.starts_with(&prefix))
.next()
.map_or(Ok(None), |(key, value)| {
.map(|(key, value)| {
let version = utils::string_from_bytes(
key.rsplit(|&b| b == 0xff)
.next()
@ -123,13 +123,14 @@ impl KeyBackups {
)
.map_err(|_| Error::bad_database("backupid_algorithm key is invalid."))?;
Ok(Some((
Ok((
version,
serde_json::from_slice(&value).map_err(|_| {
Error::bad_database("Algorithm in backupid_algorithm is invalid.")
})?,
)))
))
})
.transpose()
}
pub fn get_backup(&self, user_id: &UserId, version: &str) -> Result<Option<BackupAlgorithm>> {

View file

@ -125,7 +125,7 @@ impl WildCardedDomain {
}
impl std::str::FromStr for WildCardedDomain {
type Err = std::convert::Infallible;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
fn from_str(s: &str) -> Result<Self, Self::Err> {
// maybe do some domain validation?
Ok(if s.starts_with("*.") {
WildCardedDomain::WildCarded(s[1..].to_owned())
@ -136,8 +136,8 @@ impl std::str::FromStr for WildCardedDomain {
})
}
}
impl<'de> serde::de::Deserialize<'de> for WildCardedDomain {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
impl<'de> Deserialize<'de> for WildCardedDomain {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::de::Deserializer<'de>,
{

View file

@ -9,8 +9,10 @@ use ruma::{
},
IncomingResponse, OutgoingRequest, SendAccessToken,
},
events::{room::power_levels::PowerLevelsEventContent, AnySyncRoomEvent, EventType},
identifiers::RoomName,
events::{
room::{name::RoomNameEventContent, power_levels::RoomPowerLevelsEventContent},
AnySyncRoomEvent, EventType,
},
push::{Action, PushConditionRoomCtx, PushFormat, Ruleset, Tweak},
serde::Raw,
uint, RoomId, UInt, UserId,
@ -177,11 +179,11 @@ pub async fn send_push_notice(
let mut notify = None;
let mut tweaks = Vec::new();
let power_levels: PowerLevelsEventContent = db
let power_levels: RoomPowerLevelsEventContent = db
.rooms
.room_state_get(&pdu.room_id, &EventType::RoomPowerLevels, "")?
.map(|ev| {
serde_json::from_value(ev.content.clone())
serde_json::from_str(ev.content.get())
.map_err(|_| Error::bad_database("invalid m.room.power_levels event"))
})
.transpose()?
@ -226,7 +228,7 @@ pub async fn send_push_notice(
pub fn get_actions<'a>(
user: &UserId,
ruleset: &'a Ruleset,
power_levels: &PowerLevelsEventContent,
power_levels: &RoomPowerLevelsEventContent,
pdu: &Raw<AnySyncRoomEvent>,
room_id: &RoomId,
db: &Database,
@ -318,16 +320,18 @@ async fn send_notice(
let user_name = db.users.displayname(&event.sender)?;
notifi.sender_display_name = user_name.as_deref();
let room_name = db
.rooms
.room_state_get(&event.room_id, &EventType::RoomName, "")?
.map(|pdu| match pdu.content.get("name") {
Some(serde_json::Value::String(s)) => {
Some(Box::<RoomName>::try_from(&**s).expect("room name is valid"))
}
_ => None,
})
.flatten();
let room_name = if let Some(room_name_pdu) =
db.rooms
.room_state_get(&event.room_id, &EventType::RoomName, "")?
{
serde_json::from_str::<RoomNameEventContent>(room_name_pdu.content.get())
.map_err(|_| Error::bad_database("Invalid room name event in database."))?
.name
} else {
None
};
notifi.room_name = room_name.as_deref();
send_request(

View file

@ -1,9 +1,11 @@
mod edus;
pub use edus::RoomEdus;
use member::MembershipState;
use crate::{pdu::PduBuilder, server_server, utils, Database, Error, PduEvent, Result};
use crate::{
pdu::{EventHash, PduBuilder},
server_server, utils, Database, Error, PduEvent, Result,
};
use lru_cache::LruCache;
use regex::Regex;
use ring::digest;
@ -11,18 +13,27 @@ use rocket::http::RawStr;
use ruma::{
api::{client::error::ErrorKind, federation},
events::{
ignored_user_list, push_rules,
direct::DirectEvent,
ignored_user_list::IgnoredUserListEvent,
push_rules::PushRulesEvent,
room::{
create::CreateEventContent, member, message, power_levels::PowerLevelsEventContent,
create::RoomCreateEventContent,
member::{MembershipState, RoomMemberEventContent},
message::RoomMessageEventContent,
power_levels::RoomPowerLevelsEventContent,
},
tag::TagEvent,
AnyStrippedStateEvent, AnySyncStateEvent, EventType,
},
push::{self, Action, Tweak},
push::{Action, Ruleset, Tweak},
serde::{CanonicalJsonObject, CanonicalJsonValue, Raw},
state_res::{self, RoomVersion, StateMap},
uint, EventId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId,
};
use serde::Deserialize;
use serde_json::value::to_raw_value;
use std::{
borrow::Cow,
collections::{BTreeMap, HashMap, HashSet},
convert::{TryFrom, TryInto},
mem::size_of,
@ -42,7 +53,7 @@ pub type StateHashId = Vec<u8>;
pub type CompressedStateEvent = [u8; 2 * size_of::<u64>()];
pub struct Rooms {
pub edus: edus::RoomEdus,
pub edus: RoomEdus,
pub(super) pduid_pdu: Arc<dyn Tree>, // PduId = ShortRoomId + Count
pub(super) eventid_pduid: Arc<dyn Tree>,
pub(super) roomid_pduleaves: Arc<dyn Tree>,
@ -210,16 +221,16 @@ impl Rooms {
self.eventid_shorteventid
.get(event_id.as_bytes())?
.map_or(Ok(None), |shorteventid| {
self.shorteventid_shortstatehash.get(&shorteventid)?.map_or(
Ok::<_, Error>(None),
|bytes| {
Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| {
self.shorteventid_shortstatehash
.get(&shorteventid)?
.map(|bytes| {
utils::u64_from_bytes(&bytes).map_err(|_| {
Error::bad_database(
"Invalid shortstatehash bytes in shorteventid_shortstatehash",
)
})?))
},
)
})
})
.transpose()
})
}
@ -243,7 +254,7 @@ impl Rooms {
kind: &EventType,
sender: &UserId,
state_key: Option<&str>,
content: &serde_json::Value,
content: &serde_json::value::RawValue,
) -> Result<StateMap<Arc<PduEvent>>> {
let shortstatehash =
if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? {
@ -252,7 +263,8 @@ impl Rooms {
return Ok(HashMap::new());
};
let auth_events = state_res::auth_types_for_event(kind, sender, state_key, content);
let auth_events = state_res::auth_types_for_event(kind, sender, state_key, content)
.expect("content is a valid JSON object");
let mut sauthevents = auth_events
.into_iter()
@ -360,16 +372,16 @@ impl Rooms {
let (statediffnew, statediffremoved) = if let Some(parent_stateinfo) = states_parents.last()
{
let statediffnew = new_state_ids_compressed
let statediffnew: HashSet<_> = new_state_ids_compressed
.difference(&parent_stateinfo.1)
.cloned()
.collect::<HashSet<_>>();
.copied()
.collect();
let statediffremoved = parent_stateinfo
let statediffremoved: HashSet<_> = parent_stateinfo
.1
.difference(&new_state_ids_compressed)
.cloned()
.collect::<HashSet<_>>();
.copied()
.collect();
(statediffnew, statediffremoved)
} else {
@ -391,37 +403,43 @@ impl Rooms {
.ok()
.map(|(_, id)| id)
}) {
if let Some(pdu) = self.get_pdu_json(&event_id)? {
if pdu.get("type").and_then(|val| val.as_str()) == Some("m.room.member") {
if let Ok(pdu) = serde_json::from_value::<PduEvent>(
serde_json::to_value(&pdu).expect("CanonicalJsonObj is a valid JsonValue"),
) {
if let Some(membership) =
pdu.content.get("membership").and_then(|membership| {
serde_json::from_value::<member::MembershipState>(
membership.clone(),
)
.ok()
})
{
if let Some(state_key) = pdu
.state_key
.and_then(|state_key| UserId::try_from(state_key).ok())
{
self.update_membership(
room_id,
&state_key,
membership,
&pdu.sender,
None,
db,
false,
)?;
}
}
}
}
let pdu = match self.get_pdu_json(&event_id)? {
Some(pdu) => pdu,
None => continue,
};
if pdu.get("type").and_then(|val| val.as_str()) != Some("m.room.member") {
continue;
}
let pdu: PduEvent = match serde_json::from_str(
&serde_json::to_string(&pdu).expect("CanonicalJsonObj can be serialized to JSON"),
) {
Ok(pdu) => pdu,
Err(_) => continue,
};
#[derive(Deserialize)]
struct ExtractMembership {
membership: MembershipState,
}
let membership = match serde_json::from_str::<ExtractMembership>(pdu.content.get()) {
Ok(e) => e.membership,
Err(_) => continue,
};
let state_key = match pdu.state_key {
Some(k) => k,
None => continue,
};
let user_id = match UserId::try_from(state_key) {
Ok(id) => id,
Err(_) => continue,
};
self.update_membership(room_id, &user_id, membership, &pdu.sender, None, db, false)?;
}
self.update_joined_count(room_id, db)?;
@ -483,7 +501,7 @@ impl Rooms {
if parent != 0_u64 {
let mut response = self.load_shortstatehash_info(parent)?;
let mut state = response.last().unwrap().1.clone();
state.extend(added.iter().cloned());
state.extend(added.iter().copied());
for r in &removed {
state.remove(r);
}
@ -965,7 +983,8 @@ impl Rooms {
pub fn get_pdu_count(&self, event_id: &EventId) -> Result<Option<u64>> {
self.eventid_pduid
.get(event_id.as_bytes())?
.map_or(Ok(None), |pdu_id| self.pdu_count(&pdu_id).map(Some))
.map(|pdu_id| self.pdu_count(&pdu_id))
.transpose()
}
#[tracing::instrument(skip(self))]
@ -993,7 +1012,7 @@ impl Rooms {
pub fn get_pdu_json(&self, event_id: &EventId) -> Result<Option<CanonicalJsonObject>> {
self.eventid_pduid
.get(event_id.as_bytes())?
.map_or_else::<Result<_>, _, _>(
.map_or_else(
|| self.eventid_outlierpdu.get(event_id.as_bytes()),
|pduid| {
Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| {
@ -1026,14 +1045,12 @@ impl Rooms {
) -> Result<Option<CanonicalJsonObject>> {
self.eventid_pduid
.get(event_id.as_bytes())?
.map_or_else::<Result<_>, _, _>(
|| Ok(None),
|pduid| {
Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| {
Error::bad_database("Invalid pduid in eventid_pduid.")
})?))
},
)?
.map(|pduid| {
self.pduid_pdu
.get(&pduid)?
.ok_or_else(|| Error::bad_database("Invalid pduid in eventid_pduid."))
})
.transpose()?
.map(|pdu| {
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
})
@ -1043,9 +1060,7 @@ impl Rooms {
/// Returns the pdu's id.
#[tracing::instrument(skip(self))]
pub fn get_pdu_id(&self, event_id: &EventId) -> Result<Option<Vec<u8>>> {
self.eventid_pduid
.get(event_id.as_bytes())?
.map_or(Ok(None), |pdu_id| Ok(Some(pdu_id)))
self.eventid_pduid.get(event_id.as_bytes())
}
/// Returns the pdu.
@ -1055,14 +1070,12 @@ impl Rooms {
pub fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result<Option<PduEvent>> {
self.eventid_pduid
.get(event_id.as_bytes())?
.map_or_else::<Result<_>, _, _>(
|| Ok(None),
|pduid| {
Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| {
Error::bad_database("Invalid pduid in eventid_pduid.")
})?))
},
)?
.map(|pduid| {
self.pduid_pdu
.get(&pduid)?
.ok_or_else(|| Error::bad_database("Invalid pduid in eventid_pduid."))
})
.transpose()?
.map(|pdu| {
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
})
@ -1081,11 +1094,8 @@ impl Rooms {
if let Some(pdu) = self
.eventid_pduid
.get(event_id.as_bytes())?
.map_or_else::<Result<_>, _, _>(
|| {
let r = self.eventid_outlierpdu.get(event_id.as_bytes());
r
},
.map_or_else(
|| self.eventid_outlierpdu.get(event_id.as_bytes()),
|pduid| {
Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| {
Error::bad_database("Invalid pduid in eventid_pduid.")
@ -1325,11 +1335,11 @@ impl Rooms {
drop(insert_lock);
// See if the event matches any known pushers
let power_levels: PowerLevelsEventContent = db
let power_levels: RoomPowerLevelsEventContent = db
.rooms
.room_state_get(&pdu.room_id, &EventType::RoomPowerLevels, "")?
.map(|ev| {
serde_json::from_value(ev.content.clone())
serde_json::from_str(ev.content.get())
.map_err(|_| Error::bad_database("invalid m.room.power_levels event"))
})
.transpose()?
@ -1348,9 +1358,9 @@ impl Rooms {
let rules_for_user = db
.account_data
.get::<push_rules::PushRulesEvent>(None, user, EventType::PushRules)?
.map(|ev| ev.content.global)
.unwrap_or_else(|| push::Ruleset::server_default(user));
.get(None, user, EventType::PushRules)?
.map(|ev: PushRulesEvent| ev.content.global)
.unwrap_or_else(|| Ruleset::server_default(user));
let mut highlight = false;
let mut notify = false;
@ -1404,30 +1414,21 @@ impl Rooms {
}
EventType::RoomMember => {
if let Some(state_key) = &pdu.state_key {
#[derive(Deserialize)]
struct ExtractMembership {
membership: MembershipState,
}
// if the state_key fails
let target_user_id = UserId::try_from(state_key.clone())
.expect("This state_key was previously validated");
let membership = serde_json::from_value::<member::MembershipState>(
pdu.content
.get("membership")
.ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"Invalid member event content",
))?
.clone(),
)
.map_err(|_| {
Error::BadRequest(
ErrorKind::InvalidParam,
"Invalid membership state content.",
)
})?;
let content = serde_json::from_str::<ExtractMembership>(pdu.content.get())
.map_err(|_| Error::bad_database("Invalid content in pdu."))?;
let invite_state = match membership {
member::MembershipState::Invite => {
let invite_state = match content.membership {
MembershipState::Invite => {
let state = self.calculate_invite_state(pdu)?;
Some(state)
}
_ => None,
@ -1438,7 +1439,7 @@ impl Rooms {
self.update_membership(
&pdu.room_id,
&target_user_id,
membership,
content.membership,
&pdu.sender,
invite_state,
db,
@ -1447,7 +1448,16 @@ impl Rooms {
}
}
EventType::RoomMessage => {
if let Some(body) = pdu.content.get("body").and_then(|b| b.as_str()) {
#[derive(Deserialize)]
struct ExtractBody<'a> {
#[serde(borrow)]
body: Option<Cow<'a, str>>,
}
let content = serde_json::from_str::<ExtractBody<'_>>(pdu.content.get())
.map_err(|_| Error::bad_database("Invalid content in pdu."))?;
if let Some(body) = content.body {
let mut batch = body
.split_terminator(|c: char| !c.is_alphanumeric())
.filter(|s| !s.is_empty())
@ -1475,11 +1485,11 @@ impl Rooms {
{
let mut lines = body.lines();
let command_line = lines.next().expect("each string has at least one line");
let body = lines.collect::<Vec<_>>();
let body: Vec<_> = lines.collect();
let mut parts = command_line.split_whitespace().skip(1);
if let Some(command) = parts.next() {
let args = parts.collect::<Vec<_>>();
let args: Vec<_> = parts.collect();
match command {
"register_appservice" => {
@ -1498,18 +1508,16 @@ impl Rooms {
}
Err(e) => {
db.admin.send(AdminCommand::SendMessage(
message::MessageEventContent::text_plain(
format!(
"Could not parse appservice config: {}",
e
),
),
RoomMessageEventContent::text_plain(format!(
"Could not parse appservice config: {}",
e
)),
));
}
}
} else {
db.admin.send(AdminCommand::SendMessage(
message::MessageEventContent::text_plain(
RoomMessageEventContent::text_plain(
"Expected code block in command body.",
),
));
@ -1542,12 +1550,10 @@ impl Rooms {
.count();
let elapsed = start.elapsed();
db.admin.send(AdminCommand::SendMessage(
message::MessageEventContent::text_plain(
format!(
"Loaded auth chain with length {} in {:?}",
count, elapsed
),
),
RoomMessageEventContent::text_plain(format!(
"Loaded auth chain with length {} in {:?}",
count, elapsed
)),
));
}
}
@ -1580,14 +1586,17 @@ impl Rooms {
) {
Ok(pdu) => {
db.admin.send(AdminCommand::SendMessage(
message::MessageEventContent::text_plain(
format!("EventId: {:?}\n{:#?}", event_id, pdu),
RoomMessageEventContent::text_plain(
format!(
"EventId: {:?}\n{:#?}",
event_id, pdu
),
),
));
}
Err(e) => {
db.admin.send(AdminCommand::SendMessage(
message::MessageEventContent::text_plain(
RoomMessageEventContent::text_plain(
format!("EventId: {:?}\nCould not parse event: {}", event_id, e),
),
));
@ -1596,18 +1605,16 @@ impl Rooms {
}
Err(e) => {
db.admin.send(AdminCommand::SendMessage(
message::MessageEventContent::text_plain(
format!(
"Invalid json in command body: {}",
e
),
),
RoomMessageEventContent::text_plain(format!(
"Invalid json in command body: {}",
e
)),
));
}
}
} else {
db.admin.send(AdminCommand::SendMessage(
message::MessageEventContent::text_plain(
RoomMessageEventContent::text_plain(
"Expected code block in command body.",
),
));
@ -1629,7 +1636,7 @@ impl Rooms {
serde_json::to_string_pretty(&json)
.expect("canonical json is valid json");
db.admin.send(AdminCommand::SendMessage(
message::MessageEventContent::text_html(
RoomMessageEventContent::text_html(
format!("{}\n```json\n{}\n```",
if outlier {
"PDU is outlier"
@ -1643,7 +1650,7 @@ impl Rooms {
}
None => {
db.admin.send(AdminCommand::SendMessage(
message::MessageEventContent::text_plain(
RoomMessageEventContent::text_plain(
"PDU not found.",
),
));
@ -1651,14 +1658,14 @@ impl Rooms {
}
} else {
db.admin.send(AdminCommand::SendMessage(
message::MessageEventContent::text_plain(
RoomMessageEventContent::text_plain(
"Event ID could not be parsed.",
),
));
}
} else {
db.admin.send(AdminCommand::SendMessage(
message::MessageEventContent::text_plain(
RoomMessageEventContent::text_plain(
"Usage: get_pdu <eventid>",
),
));
@ -1666,7 +1673,7 @@ impl Rooms {
}
_ => {
db.admin.send(AdminCommand::SendMessage(
message::MessageEventContent::text_plain(format!(
RoomMessageEventContent::text_plain(format!(
"Unrecognized command: {}",
command
)),
@ -1759,16 +1766,16 @@ impl Rooms {
let (statediffnew, statediffremoved) =
if let Some(parent_stateinfo) = states_parents.last() {
let statediffnew = state_ids_compressed
let statediffnew: HashSet<_> = state_ids_compressed
.difference(&parent_stateinfo.1)
.cloned()
.collect::<HashSet<_>>();
.copied()
.collect();
let statediffremoved = parent_stateinfo
let statediffremoved: HashSet<_> = parent_stateinfo
.1
.difference(&state_ids_compressed)
.cloned()
.collect::<HashSet<_>>();
.copied()
.collect();
(statediffnew, statediffremoved)
} else {
@ -1958,16 +1965,13 @@ impl Rooms {
let create_event = self.room_state_get(room_id, &EventType::RoomCreate, "")?;
let create_event_content = create_event
let create_event_content: Option<RoomCreateEventContent> = create_event
.as_ref()
.map(|create_event| {
serde_json::from_value::<Raw<CreateEventContent>>(create_event.content.clone())
.expect("Raw::from_value always works.")
.deserialize()
.map_err(|e| {
warn!("Invalid create event: {}", e);
Error::bad_database("Invalid create event in db.")
})
serde_json::from_str(create_event.content.get()).map_err(|e| {
warn!("Invalid create event: {}", e);
Error::bad_database("Invalid create event in db.")
})
})
.transpose()?;
@ -2000,7 +2004,10 @@ impl Rooms {
let mut unsigned = unsigned.unwrap_or_default();
if let Some(state_key) = &state_key {
if let Some(prev_pdu) = self.room_state_get(room_id, &event_type, state_key)? {
unsigned.insert("prev_content".to_owned(), prev_pdu.content.clone());
unsigned.insert(
"prev_content".to_owned(),
serde_json::from_str(prev_pdu.content.get()).expect("string is valid json"),
);
unsigned.insert(
"prev_sender".to_owned(),
serde_json::to_value(&prev_pdu.sender).expect("UserId::to_value always works"),
@ -2025,11 +2032,15 @@ impl Rooms {
.map(|(_, pdu)| pdu.event_id.clone())
.collect(),
redacts,
unsigned,
hashes: ruma::events::pdu::EventHash {
unsigned: if unsigned.is_empty() {
None
} else {
Some(to_raw_value(&unsigned).expect("to_raw_value always works"))
},
hashes: EventHash {
sha256: "aaa".to_owned(),
},
signatures: BTreeMap::new(),
signatures: None,
};
let auth_check = state_res::auth_check(
@ -2205,7 +2216,7 @@ impl Rooms {
let mut pdu = serde_json::from_slice::<PduEvent>(&v)
.map_err(|_| Error::bad_database("PDU in db is invalid."))?;
if pdu.sender != user_id {
pdu.unsigned.remove("transaction_id");
pdu.remove_transaction_id()?;
}
Ok((pdu_id, pdu))
}))
@ -2242,7 +2253,7 @@ impl Rooms {
let mut pdu = serde_json::from_slice::<PduEvent>(&v)
.map_err(|_| Error::bad_database("PDU in db is invalid."))?;
if pdu.sender != user_id {
pdu.unsigned.remove("transaction_id");
pdu.remove_transaction_id()?;
}
Ok((pdu_id, pdu))
}))
@ -2279,7 +2290,7 @@ impl Rooms {
let mut pdu = serde_json::from_slice::<PduEvent>(&v)
.map_err(|_| Error::bad_database("PDU in db is invalid."))?;
if pdu.sender != user_id {
pdu.unsigned.remove("transaction_id");
pdu.remove_transaction_id()?;
}
Ok((pdu_id, pdu))
}))
@ -2309,7 +2320,7 @@ impl Rooms {
&self,
room_id: &RoomId,
user_id: &UserId,
membership: member::MembershipState,
membership: MembershipState,
sender: &UserId,
last_state: Option<Vec<Raw<AnyStrippedStateEvent>>>,
db: &Database,
@ -2338,7 +2349,7 @@ impl Rooms {
roomuser_id.extend_from_slice(user_id.as_bytes());
match &membership {
member::MembershipState::Join => {
MembershipState::Join => {
// Check if the user never joined this room
if !self.once_joined(user_id, room_id)? {
// Add the user ID to the join list then
@ -2347,23 +2358,16 @@ impl Rooms {
// Check if the room has a predecessor
if let Some(predecessor) = self
.room_state_get(room_id, &EventType::RoomCreate, "")?
.and_then(|create| {
serde_json::from_value::<
Raw<ruma::events::room::create::CreateEventContent>,
>(create.content.clone())
.expect("Raw::from_value always works")
.deserialize()
.ok()
})
.and_then(|content| content.predecessor)
.and_then(|create| serde_json::from_str(create.content.get()).ok())
.and_then(|content: RoomCreateEventContent| content.predecessor)
{
// Copy user settings from predecessor to the current room:
// - Push rules
//
// TODO: finish this once push rules are implemented.
//
// let mut push_rules_event_content = account_data
// .get::<ruma::events::push_rules::PushRulesEvent>(
// let mut push_rules_event_content: PushRulesEvent = account_data
// .get(
// None,
// user_id,
// EventType::PushRules,
@ -2383,13 +2387,11 @@ impl Rooms {
// .ok();
// Copy old tags to new room
if let Some(tag_event) =
db.account_data.get::<ruma::events::tag::TagEvent>(
Some(&predecessor.room_id),
user_id,
EventType::Tag,
)?
{
if let Some(tag_event) = db.account_data.get::<TagEvent>(
Some(&predecessor.room_id),
user_id,
EventType::Tag,
)? {
db.account_data
.update(
Some(room_id),
@ -2403,11 +2405,8 @@ impl Rooms {
// Copy direct chat flag
if let Some(mut direct_event) =
db.account_data.get::<ruma::events::direct::DirectEvent>(
None,
user_id,
EventType::Direct,
)?
db.account_data
.get::<DirectEvent>(None, user_id, EventType::Direct)?
{
let mut room_ids_updated = false;
@ -2442,11 +2441,11 @@ impl Rooms {
self.userroomid_leftstate.remove(&userroom_id)?;
self.roomuserid_leftcount.remove(&roomuser_id)?;
}
member::MembershipState::Invite => {
MembershipState::Invite => {
// We want to know if the sender is ignored by the receiver
let is_ignored = db
.account_data
.get::<ignored_user_list::IgnoredUserListEvent>(
.get::<IgnoredUserListEvent>(
None, // Ignored users are in global account data
user_id, // Receiver
EventType::IgnoredUserList,
@ -2475,7 +2474,7 @@ impl Rooms {
self.userroomid_leftstate.remove(&userroom_id)?;
self.roomuserid_leftcount.remove(&roomuser_id)?;
}
member::MembershipState::Leave | member::MembershipState::Ban => {
MembershipState::Leave | MembershipState::Ban => {
if update_joined_count
&& self
.room_members(room_id)
@ -2700,26 +2699,23 @@ impl Rooms {
);
let state_lock = mutex_state.lock().await;
let mut event = serde_json::from_value::<Raw<member::MemberEventContent>>(
let mut event: RoomMemberEventContent = serde_json::from_str(
self.room_state_get(room_id, &EventType::RoomMember, &user_id.to_string())?
.ok_or(Error::BadRequest(
ErrorKind::BadState,
"Cannot leave a room you are not a member of.",
))?
.content
.clone(),
.get(),
)
.expect("from_value::<Raw<..>> can never fail")
.deserialize()
.map_err(|_| Error::bad_database("Invalid member event in database."))?;
event.membership = member::MembershipState::Leave;
event.membership = MembershipState::Leave;
self.build_and_append_pdu(
PduBuilder {
event_type: EventType::RoomMember,
content: serde_json::to_value(event)
.expect("event is valid, we just created it"),
content: to_raw_value(&event).expect("event is valid, we just created it"),
unsigned: None,
state_key: Some(user_id.to_string()),
redacts: None,
@ -2753,16 +2749,14 @@ impl Rooms {
"User is not invited.",
))?;
let servers = invite_state
let servers: HashSet<_> = invite_state
.iter()
.filter_map(|event| {
serde_json::from_str::<serde_json::Value>(&event.json().to_string()).ok()
})
.filter_map(|event| event.get("sender").cloned())
.filter_map(|event| serde_json::from_str(event.json().get()).ok())
.filter_map(|event: serde_json::Value| event.get("sender").cloned())
.filter_map(|sender| sender.as_str().map(|s| s.to_owned()))
.filter_map(|sender| UserId::try_from(sender).ok())
.map(|user| user.server_name().to_owned())
.collect::<HashSet<_>>();
.collect();
for remote_server in servers {
let make_leave_response = db
@ -2793,10 +2787,9 @@ impl Rooms {
};
let mut leave_event_stub =
serde_json::from_str::<CanonicalJsonObject>(make_leave_response.event.json().get())
.map_err(|_| {
Error::BadServerResponse("Invalid make_leave event json received from server.")
})?;
serde_json::from_str::<CanonicalJsonObject>(make_leave_response.event.get()).map_err(
|_| Error::BadServerResponse("Invalid make_leave event json received from server."),
)?;
// TODO: Is origin needed?
leave_event_stub.insert(
@ -2847,7 +2840,7 @@ impl Rooms {
federation::membership::create_leave_event::v2::Request {
room_id,
event_id: &event_id,
pdu: PduEvent::convert_to_outgoing_federation_event(leave_event.clone()),
pdu: &PduEvent::convert_to_outgoing_federation_event(leave_event.clone()),
},
)
.await?;
@ -2912,14 +2905,13 @@ impl Rooms {
pub fn id_from_alias(&self, alias: &RoomAliasId) -> Result<Option<RoomId>> {
self.alias_roomid
.get(alias.alias().as_bytes())?
.map_or(Ok(None), |bytes| {
Ok(Some(
RoomId::try_from(utils::string_from_bytes(&bytes).map_err(|_| {
Error::bad_database("Room ID in alias_roomid is invalid unicode.")
})?)
.map_err(|_| Error::bad_database("Room ID in alias_roomid is invalid."))?,
))
.map(|bytes| {
RoomId::try_from(utils::string_from_bytes(&bytes).map_err(|_| {
Error::bad_database("Room ID in alias_roomid is invalid unicode.")
})?)
.map_err(|_| Error::bad_database("Room ID in alias_roomid is invalid."))
})
.transpose()
}
#[tracing::instrument(skip(self))]
@ -2979,11 +2971,11 @@ impl Rooms {
.to_vec();
let prefix_clone = prefix.clone();
let words = search_string
let words: Vec<_> = search_string
.split_terminator(|c: char| !c.is_alphanumeric())
.filter(|s| !s.is_empty())
.map(str::to_lowercase)
.collect::<Vec<_>>();
.collect();
let iterators = words.clone().into_iter().map(move |word| {
let mut prefix2 = prefix.clone();
@ -2996,12 +2988,7 @@ impl Rooms {
self.tokenids
.iter_from(&last_possible_id, true) // Newest pdus first
.take_while(move |(k, _)| k.starts_with(&prefix2))
.map(|(key, _)| {
let pdu_id = key[key.len() - size_of::<u64>()..].to_vec();
Ok::<_, Error>(pdu_id)
})
.filter_map(|r| r.ok())
.map(|(key, _)| key[key.len() - size_of::<u64>()..].to_vec())
});
Ok((
@ -3233,11 +3220,11 @@ impl Rooms {
self.roomuserid_leftcount
.get(&key)?
.map_or(Ok(None), |bytes| {
Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| {
Error::bad_database("Invalid leftcount in db.")
})?))
.map(|bytes| {
utils::u64_from_bytes(&bytes)
.map_err(|_| Error::bad_database("Invalid leftcount in db."))
})
.transpose()
}
/// Returns an iterator over all rooms this user joined.

View file

@ -162,11 +162,12 @@ impl RoomEdus {
Ok(self
.roomuserid_lastprivatereadupdate
.get(&key)?
.map_or(Ok::<_, Error>(None), |bytes| {
Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| {
.map(|bytes| {
utils::u64_from_bytes(&bytes).map_err(|_| {
Error::bad_database("Count in roomuserid_lastprivatereadupdate is invalid.")
})?))
})?
})
})
.transpose()?
.unwrap_or(0))
}
@ -286,11 +287,12 @@ impl RoomEdus {
Ok(self
.roomid_lasttypingupdate
.get(room_id.as_bytes())?
.map_or(Ok::<_, Error>(None), |bytes| {
Ok(Some(utils::u64_from_bytes(&bytes).map_err(|_| {
.map(|bytes| {
utils::u64_from_bytes(&bytes).map_err(|_| {
Error::bad_database("Count in roomid_lastroomactiveupdate is invalid.")
})?))
})?
})
})
.transpose()?
.unwrap_or(0))
}
@ -331,7 +333,7 @@ impl RoomEdus {
&self,
user_id: &UserId,
room_id: &RoomId,
presence: ruma::events::presence::PresenceEvent,
presence: PresenceEvent,
globals: &super::super::globals::Globals,
) -> Result<()> {
// TODO: Remove old entry? Or maybe just wipe completely from time to time?
@ -399,7 +401,7 @@ impl RoomEdus {
self.presenceid_presence
.get(&presence_id)?
.map(|value| {
let mut presence = serde_json::from_slice::<PresenceEvent>(&value)
let mut presence: PresenceEvent = serde_json::from_slice(&value)
.map_err(|_| Error::bad_database("Invalid presence event in db."))?;
let current_timestamp: UInt = utils::millis_since_unix_epoch()
.try_into()
@ -521,7 +523,7 @@ impl RoomEdus {
)
.map_err(|_| Error::bad_database("Invalid UserId in presenceid_presence."))?;
let mut presence = serde_json::from_slice::<PresenceEvent>(&value)
let mut presence: PresenceEvent = serde_json::from_slice(&value)
.map_err(|_| Error::bad_database("Invalid presence event in db."))?;
let current_timestamp: UInt = utils::millis_since_unix_epoch()

View file

@ -27,7 +27,7 @@ use ruma::{
OutgoingRequest,
},
device_id,
events::{push_rules, AnySyncEphemeralRoomEvent, EventType},
events::{push_rules::PushRulesEvent, AnySyncEphemeralRoomEvent, EventType},
push,
receipt::ReceiptType,
uint, MilliSecondsSinceUnixEpoch, ServerName, UInt, UserId,
@ -165,13 +165,13 @@ impl Sending {
}
// Find events that have been added since starting the last request
let new_events = guard.sending.servernameevent_data
let new_events: Vec<_> = guard.sending.servernameevent_data
.scan_prefix(prefix.clone())
.filter_map(|(k, v)| {
Self::parse_servercurrentevent(&k, v).ok().map(|ev| (ev, k))
})
.take(30)
.collect::<Vec<_>>();
.collect::<>();
// TODO: find edus
@ -344,8 +344,8 @@ impl Sending {
continue;
}
let event =
serde_json::from_str::<AnySyncEphemeralRoomEvent>(read_receipt.json().get())
let event: AnySyncEphemeralRoomEvent =
serde_json::from_str(read_receipt.json().get())
.map_err(|_| Error::bad_database("Invalid edu event in read_receipts."))?;
let federation_event = match event {
AnySyncEphemeralRoomEvent::Receipt(r) => {
@ -398,7 +398,7 @@ impl Sending {
let edu = Edu::DeviceListUpdate(DeviceListUpdateContent {
user_id,
device_id: device_id!("dummy"),
device_display_name: "Dummy".to_owned(),
device_display_name: Some("Dummy".to_owned()),
stream_id: uint!(1),
prev_id: Vec::new(),
deleted: None,
@ -485,7 +485,7 @@ impl Sending {
kind: OutgoingKind,
events: Vec<SendingEventType>,
db: Arc<RwLock<Database>>,
) -> std::result::Result<OutgoingKind, (OutgoingKind, Error)> {
) -> Result<OutgoingKind, (OutgoingKind, Error)> {
let db = db.read().await;
match &kind {
@ -573,8 +573,14 @@ impl Sending {
for pdu in pdus {
// Redacted events are not notification targets (we don't send push for them)
if pdu.unsigned.get("redacted_because").is_some() {
continue;
if let Some(unsigned) = &pdu.unsigned {
if let Ok(unsigned) =
serde_json::from_str::<serde_json::Value>(unsigned.get())
{
if unsigned.get("redacted_because").is_some() {
continue;
}
}
}
let userid =
@ -606,9 +612,9 @@ impl Sending {
let rules_for_user = db
.account_data
.get::<push_rules::PushRulesEvent>(None, &userid, EventType::PushRules)
.get(None, &userid, EventType::PushRules)
.unwrap_or_default()
.map(|ev| ev.content.global)
.map(|ev: PushRulesEvent| ev.content.global)
.unwrap_or_else(|| push::Ruleset::server_default(&userid));
let unread: UInt = db

View file

@ -5,7 +5,8 @@ use ruma::{
api::client::{
error::ErrorKind,
r0::uiaa::{
IncomingAuthData, IncomingPassword, IncomingUserIdentifier::MatrixId, UiaaInfo,
AuthType, IncomingAuthData, IncomingPassword, IncomingUserIdentifier::MatrixId,
UiaaInfo,
},
},
signatures::CanonicalJsonValue,
@ -99,10 +100,10 @@ impl Uiaa {
}
// Password was correct! Let's add it to `completed`
uiaainfo.completed.push("m.login.password".to_owned());
uiaainfo.completed.push(AuthType::Password);
}
IncomingAuthData::Dummy(_) => {
uiaainfo.completed.push("m.login.dummy".to_owned());
uiaainfo.completed.push(AuthType::Dummy);
}
k => error!("type not supported: {:?}", k),
}
@ -174,16 +175,14 @@ impl Uiaa {
self.userdevicesessionid_uiaarequest
.get(&userdevicesessionid)?
.map_or(Ok(None), |bytes| {
Ok::<_, Error>(Some(
serde_json::from_str::<CanonicalJsonValue>(
&utils::string_from_bytes(&bytes).map_err(|_| {
Error::bad_database("Invalid uiaa request bytes in db.")
})?,
)
.map_err(|_| Error::bad_database("Invalid uiaa request in db."))?,
))
.map(|bytes| {
serde_json::from_str::<CanonicalJsonValue>(
&utils::string_from_bytes(&bytes)
.map_err(|_| Error::bad_database("Invalid uiaa request bytes in db."))?,
)
.map_err(|_| Error::bad_database("Invalid uiaa request in db."))
})
.transpose()
}
fn update_uiaa_session(
@ -224,7 +223,7 @@ impl Uiaa {
userdevicesessionid.push(0xff);
userdevicesessionid.extend_from_slice(session.as_bytes());
let uiaainfo = serde_json::from_slice::<UiaaInfo>(
serde_json::from_slice(
&self
.userdevicesessionid_uiaainfo
.get(&userdevicesessionid)?
@ -233,8 +232,6 @@ impl Uiaa {
"UIAA session does not exist.",
))?,
)
.map_err(|_| Error::bad_database("UiaaInfo in userdeviceid_uiaainfo is invalid."))?;
Ok(uiaainfo)
.map_err(|_| Error::bad_database("UiaaInfo in userdeviceid_uiaainfo is invalid."))
}
}

View file

@ -620,10 +620,11 @@ impl Users {
key.push(0xff);
key.extend_from_slice(key_id.as_bytes());
let mut cross_signing_key =
serde_json::from_slice::<serde_json::Value>(&self.keyid_key.get(&key)?.ok_or(
Error::BadRequest(ErrorKind::InvalidParam, "Tried to sign nonexistent key."),
)?)
let mut cross_signing_key: serde_json::Value =
serde_json::from_slice(&self.keyid_key.get(&key)?.ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"Tried to sign nonexistent key.",
))?)
.map_err(|_| Error::bad_database("key in keyid_key is invalid."))?;
let signatures = cross_signing_key