Merge branch 'next' into 'room-v11'

# Conflicts:
#   src/service/rooms/timeline/mod.rs
#   src/utils/error.rs
This commit is contained in:
Val Lorentz 2024-04-11 17:34:42 +00:00
commit b5e21f761b
109 changed files with 4239 additions and 3708 deletions

View file

@ -2,16 +2,12 @@ mod data;
use std::{
cmp::Ordering,
collections::{BTreeMap, HashMap},
};
use std::{
collections::HashSet,
sync::{Arc, Mutex, RwLock},
collections::{BTreeMap, HashMap, HashSet},
sync::Arc,
};
pub use data::Data;
use regex::Regex;
use ruma::{
api::{client::error::ErrorKind, federation},
canonical_json::to_canonical_value,
@ -25,19 +21,21 @@ use ruma::{
},
push::{Action, Ruleset, Tweak},
serde::Base64,
state_res,
state_res::{Event, RoomVersion},
state_res::{self, Event, RoomVersion},
uint, user_id, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId,
OwnedServerName, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId,
OwnedServerName, RoomId, RoomVersionId, ServerName, UserId,
};
use serde::Deserialize;
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
use tokio::sync::MutexGuard;
use tokio::sync::{Mutex, MutexGuard, RwLock};
use tracing::{error, info, warn};
use crate::{
api::server_server,
service::pdu::{EventHash, PduBuilder},
service::{
appservice::NamespaceRegex,
pdu::{EventHash, PduBuilder},
},
services, utils, Error, PduEvent, Result,
};
@ -58,8 +56,8 @@ impl PduCount {
}
pub fn try_from_string(token: &str) -> Result<Self> {
if token.starts_with('-') {
token[1..].parse().map(PduCount::Backfilled)
if let Some(stripped) = token.strip_prefix('-') {
stripped.parse().map(PduCount::Backfilled)
} else {
token.parse().map(PduCount::Normal)
}
@ -90,18 +88,6 @@ impl Ord for PduCount {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn comparisons() {
assert!(PduCount::Normal(1) < PduCount::Normal(2));
assert!(PduCount::Backfilled(2) < PduCount::Backfilled(1));
assert!(PduCount::Normal(1) > PduCount::Backfilled(1));
assert!(PduCount::Backfilled(1) < PduCount::Normal(1));
}
}
pub struct Service {
pub db: &'static dyn Data,
@ -112,7 +98,7 @@ pub struct Service {
impl Service {
#[tracing::instrument(skip(self))]
pub fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<Option<Arc<PduEvent>>> {
self.all_pdus(&user_id!("@doesntmatter:conduit.rs"), &room_id)?
self.all_pdus(user_id!("@doesntmatter:conduit.rs"), room_id)?
.next()
.map(|o| o.map(|(_, p)| Arc::new(p)))
.transpose()
@ -213,7 +199,7 @@ impl Service {
///
/// Returns pdu id
#[tracing::instrument(skip(self, pdu, pdu_json, leaves))]
pub fn append_pdu<'a>(
pub async fn append_pdu<'a>(
&self,
pdu: &PduEvent,
mut pdu_json: CanonicalJsonObject,
@ -275,11 +261,11 @@ impl Service {
.globals
.roomid_mutex_insert
.write()
.unwrap()
.await
.entry(pdu.room_id.clone())
.or_default(),
);
let insert_lock = mutex_insert.lock().unwrap();
let insert_lock = mutex_insert.lock().await;
let count1 = services().globals.next_count()?;
// Mark as read first so the sending client doesn't get a notification even if appending
@ -320,12 +306,25 @@ impl Service {
let mut notifies = Vec::new();
let mut highlights = Vec::new();
for user in services()
let mut push_target = services()
.rooms
.state_cache
.get_our_real_users(&pdu.room_id)?
.iter()
{
.get_our_real_users(&pdu.room_id)?;
if pdu.kind == TimelineEventType::RoomMember {
if let Some(state_key) = &pdu.state_key {
let target_user_id = UserId::parse(state_key.clone())
.expect("This state_key was previously validated");
if !push_target.contains(&target_user_id) {
let mut target = push_target.as_ref().clone();
target.insert(target_user_id);
push_target = Arc::new(target);
}
}
}
for user in push_target.iter() {
// Don't notify the user of their own events
if user == &pdu.sender {
continue;
@ -418,7 +417,7 @@ impl Service {
.spaces
.roomid_spacechunk_cache
.lock()
.unwrap()
.await
.remove(&pdu.room_id);
}
}
@ -471,26 +470,22 @@ impl Service {
.search
.index_pdu(shortroomid, &pdu_id, &body)?;
let admin_room = services().rooms.alias.resolve_local_alias(
<&RoomAliasId>::try_from(
format!("#admins:{}", services().globals.server_name()).as_str(),
)
.expect("#admins:server_name is a valid room alias"),
)?;
let server_user = format!("@conduit:{}", services().globals.server_name());
let to_conduit = body.starts_with(&format!("{server_user}: "))
|| body.starts_with(&format!("{server_user} "))
|| body == format!("{server_user}:")
|| body == format!("{server_user}");
|| body == server_user;
// This will evaluate to false if the emergency password is set up so that
// the administrator can execute commands as conduit
let from_conduit = pdu.sender == server_user
&& services().globals.emergency_password().is_none();
if to_conduit && !from_conduit && admin_room.as_ref() == Some(&pdu.room_id) {
services().admin.process_message(body);
if let Some(admin_room) = services().admin.get_admin_room()? {
if to_conduit && !from_conduit && admin_room == pdu.room_id {
services().admin.process_message(body);
}
}
}
}
@ -553,15 +548,15 @@ impl Service {
}
}
for appservice in services().appservice.all()? {
for appservice in services().appservice.read().await.values() {
if services()
.rooms
.state_cache
.appservice_in_room(&pdu.room_id, &appservice)?
.appservice_in_room(&pdu.room_id, appservice)?
{
services()
.sending
.send_pdu_appservice(appservice.0, pdu_id.clone())?;
.send_pdu_appservice(appservice.registration.id.clone(), pdu_id.clone())?;
continue;
}
@ -573,73 +568,41 @@ impl Service {
.as_ref()
.and_then(|state_key| UserId::parse(state_key.as_str()).ok())
{
if let Some(appservice_uid) = appservice
.1
.get("sender_localpart")
.and_then(|string| string.as_str())
.and_then(|string| {
UserId::parse_with_server_name(string, services().globals.server_name())
.ok()
})
{
if state_key_uid == &appservice_uid {
services()
.sending
.send_pdu_appservice(appservice.0, pdu_id.clone())?;
continue;
}
let appservice_uid = appservice.registration.sender_localpart.as_str();
if state_key_uid == appservice_uid {
services().sending.send_pdu_appservice(
appservice.registration.id.clone(),
pdu_id.clone(),
)?;
continue;
}
}
}
if let Some(namespaces) = appservice.1.get("namespaces") {
let users = namespaces
.get("users")
.and_then(|users| users.as_sequence())
.map_or_else(Vec::new, |users| {
users
.iter()
.filter_map(|users| Regex::new(users.get("regex")?.as_str()?).ok())
.collect::<Vec<_>>()
});
let aliases = namespaces
.get("aliases")
.and_then(|aliases| aliases.as_sequence())
.map_or_else(Vec::new, |aliases| {
aliases
.iter()
.filter_map(|aliases| Regex::new(aliases.get("regex")?.as_str()?).ok())
.collect::<Vec<_>>()
});
let rooms = namespaces
.get("rooms")
.and_then(|rooms| rooms.as_sequence());
let matching_users = |users: &NamespaceRegex| {
appservice.users.is_match(pdu.sender.as_str())
|| pdu.kind == TimelineEventType::RoomMember
&& pdu
.state_key
.as_ref()
.map_or(false, |state_key| users.is_match(state_key))
};
let matching_aliases = |aliases: &NamespaceRegex| {
services()
.rooms
.alias
.local_aliases_for_room(&pdu.room_id)
.filter_map(|r| r.ok())
.any(|room_alias| aliases.is_match(room_alias.as_str()))
};
let matching_users = |users: &Regex| {
users.is_match(pdu.sender.as_str())
|| pdu.kind == TimelineEventType::RoomMember
&& pdu
.state_key
.as_ref()
.map_or(false, |state_key| users.is_match(state_key))
};
let matching_aliases = |aliases: &Regex| {
services()
.rooms
.alias
.local_aliases_for_room(&pdu.room_id)
.filter_map(|r| r.ok())
.any(|room_alias| aliases.is_match(room_alias.as_str()))
};
if aliases.iter().any(matching_aliases)
|| rooms.map_or(false, |rooms| rooms.contains(&pdu.room_id.as_str().into()))
|| users.iter().any(matching_users)
{
services()
.sending
.send_pdu_appservice(appservice.0, pdu_id.clone())?;
}
if matching_aliases(&appservice.aliases)
|| appservice.rooms.is_match(pdu.room_id.as_str())
|| matching_users(&appservice.users)
{
services()
.sending
.send_pdu_appservice(appservice.registration.id.clone(), pdu_id.clone())?;
}
}
@ -829,7 +792,7 @@ impl Service {
/// Creates a new persisted data unit and adds it to a room. This function takes a
/// roomid_mutex_state, meaning that only this function is able to mutate the room state.
#[tracing::instrument(skip(self, state_lock))]
pub fn build_and_append_pdu(
pub async fn build_and_append_pdu(
&self,
pdu_builder: PduBuilder,
sender: &UserId,
@ -839,89 +802,85 @@ impl Service {
let (pdu, pdu_json) =
self.create_hash_and_sign_event(pdu_builder, sender, room_id, state_lock)?;
let admin_room = services().rooms.alias.resolve_local_alias(
<&RoomAliasId>::try_from(
format!("#admins:{}", services().globals.server_name()).as_str(),
)
.expect("#admins:server_name is a valid room alias"),
)?;
if admin_room.filter(|v| v == room_id).is_some() {
match pdu.event_type() {
TimelineEventType::RoomEncryption => {
warn!("Encryption is not allowed in the admins room");
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Encryption is not allowed in the admins room.",
));
if let Some(admin_room) = services().admin.get_admin_room()? {
if admin_room == room_id {
match pdu.event_type() {
TimelineEventType::RoomEncryption => {
warn!("Encryption is not allowed in the admins room");
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Encryption is not allowed in the admins room.",
));
}
TimelineEventType::RoomMember => {
#[derive(Deserialize)]
struct ExtractMembership {
membership: MembershipState,
}
let target = pdu
.state_key()
.filter(|v| v.starts_with('@'))
.unwrap_or(sender.as_str());
let server_name = services().globals.server_name();
let server_user = format!("@conduit:{}", server_name);
let content = serde_json::from_str::<ExtractMembership>(pdu.content.get())
.map_err(|_| Error::bad_database("Invalid content in pdu."))?;
if content.membership == MembershipState::Leave {
if target == server_user {
warn!("Conduit user cannot leave from admins room");
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Conduit user cannot leave from admins room.",
));
}
let count = services()
.rooms
.state_cache
.room_members(room_id)
.filter_map(|m| m.ok())
.filter(|m| m.server_name() == server_name)
.filter(|m| m != target)
.count();
if count < 2 {
warn!("Last admin cannot leave from admins room");
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Last admin cannot leave from admins room.",
));
}
}
if content.membership == MembershipState::Ban && pdu.state_key().is_some() {
if target == server_user {
warn!("Conduit user cannot be banned in admins room");
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Conduit user cannot be banned in admins room.",
));
}
let count = services()
.rooms
.state_cache
.room_members(room_id)
.filter_map(|m| m.ok())
.filter(|m| m.server_name() == server_name)
.filter(|m| m != target)
.count();
if count < 2 {
warn!("Last admin cannot be banned in admins room");
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Last admin cannot be banned in admins room.",
));
}
}
}
_ => {}
}
TimelineEventType::RoomMember => {
#[derive(Deserialize)]
struct ExtractMembership {
membership: MembershipState,
}
let target = pdu
.state_key()
.filter(|v| v.starts_with("@"))
.unwrap_or(sender.as_str());
let server_name = services().globals.server_name();
let server_user = format!("@conduit:{}", server_name);
let content = serde_json::from_str::<ExtractMembership>(pdu.content.get())
.map_err(|_| Error::bad_database("Invalid content in pdu."))?;
if content.membership == MembershipState::Leave {
if target == &server_user {
warn!("Conduit user cannot leave from admins room");
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Conduit user cannot leave from admins room.",
));
}
let count = services()
.rooms
.state_cache
.room_members(room_id)
.filter_map(|m| m.ok())
.filter(|m| m.server_name() == server_name)
.filter(|m| m != target)
.count();
if count < 2 {
warn!("Last admin cannot leave from admins room");
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Last admin cannot leave from admins room.",
));
}
}
if content.membership == MembershipState::Ban && pdu.state_key().is_some() {
if target == &server_user {
warn!("Conduit user cannot be banned in admins room");
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Conduit user cannot be banned in admins room.",
));
}
let count = services()
.rooms
.state_cache
.room_members(room_id)
.filter_map(|m| m.ok())
.filter(|m| m.server_name() == server_name)
.filter(|m| m != target)
.count();
if count < 2 {
warn!("Last admin cannot be banned in admins room");
return Err(Error::BadRequest(
ErrorKind::Forbidden,
"Last admin cannot be banned in admins room.",
));
}
}
}
_ => {}
}
}
@ -929,14 +888,16 @@ impl Service {
// pdu without it's state. This is okay because append_pdu can't fail.
let statehashid = services().rooms.state.append_to_state(&pdu)?;
let pdu_id = self.append_pdu(
&pdu,
pdu_json,
// Since this PDU references all pdu_leaves we can update the leaves
// of the room
vec![(*pdu.event_id).to_owned()],
state_lock,
)?;
let pdu_id = self
.append_pdu(
&pdu,
pdu_json,
// Since this PDU references all pdu_leaves we can update the leaves
// of the room
vec![(*pdu.event_id).to_owned()],
state_lock,
)
.await?;
// We set the room state after inserting the pdu, so that we never have a moment in time
// where events in the current room state do not exist
@ -974,7 +935,7 @@ impl Service {
/// Append the incoming event setting the state snapshot to the state from the
/// server that sent the event.
#[tracing::instrument(skip_all)]
pub fn append_incoming_pdu<'a>(
pub async fn append_incoming_pdu<'a>(
&self,
pdu: &PduEvent,
pdu_json: CanonicalJsonObject,
@ -1004,11 +965,11 @@ impl Service {
return Ok(None);
}
let pdu_id =
services()
.rooms
.timeline
.append_pdu(pdu, pdu_json, new_room_leaves, state_lock)?;
let pdu_id = services()
.rooms
.timeline
.append_pdu(pdu, pdu_json, new_room_leaves, state_lock)
.await?;
Ok(Some(pdu_id))
}
@ -1072,7 +1033,7 @@ impl Service {
#[tracing::instrument(skip(self, room_id))]
pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Result<()> {
let first_pdu = self
.all_pdus(&user_id!("@doesntmatter:conduit.rs"), &room_id)?
.all_pdus(user_id!("@doesntmatter:conduit.rs"), room_id)?
.next()
.expect("Room is not empty")?;
@ -1084,7 +1045,7 @@ impl Service {
let power_levels: RoomPowerLevelsEventContent = services()
.rooms
.state_accessor
.room_state_get(&room_id, &StateEventType::RoomPowerLevels, "")?
.room_state_get(room_id, &StateEventType::RoomPowerLevels, "")?
.map(|ev| {
serde_json::from_str(ev.content.get())
.map_err(|_| Error::bad_database("invalid m.room.power_levels event"))
@ -1115,11 +1076,9 @@ impl Service {
.await;
match response {
Ok(response) => {
let mut pub_key_map = RwLock::new(BTreeMap::new());
let pub_key_map = RwLock::new(BTreeMap::new());
for pdu in response.pdus {
if let Err(e) = self
.backfill_pdu(backfill_server, pdu, &mut pub_key_map)
.await
if let Err(e) = self.backfill_pdu(backfill_server, pdu, &pub_key_map).await
{
warn!("Failed to add backfilled pdu: {e}");
}
@ -1151,7 +1110,7 @@ impl Service {
.globals
.roomid_mutex_federation
.write()
.unwrap()
.await
.entry(room_id.to_owned())
.or_default(),
);
@ -1166,7 +1125,7 @@ impl Service {
services()
.rooms
.event_handler
.handle_incoming_pdu(origin, &event_id, &room_id, value, false, &pub_key_map)
.handle_incoming_pdu(origin, &event_id, &room_id, value, false, pub_key_map)
.await?;
let value = self.get_pdu_json(&event_id)?.expect("We just created it");
@ -1183,11 +1142,11 @@ impl Service {
.globals
.roomid_mutex_insert
.write()
.unwrap()
.await
.entry(room_id.clone())
.or_default(),
);
let insert_lock = mutex_insert.lock().unwrap();
let insert_lock = mutex_insert.lock().await;
let count = services().globals.next_count()?;
let mut pdu_id = shortroomid.to_be_bytes().to_vec();
@ -1199,24 +1158,21 @@ impl Service {
drop(insert_lock);
match pdu.kind {
TimelineEventType::RoomMessage => {
#[derive(Deserialize)]
struct ExtractBody {
body: Option<String>,
}
let content = serde_json::from_str::<ExtractBody>(pdu.content.get())
.map_err(|_| Error::bad_database("Invalid content in pdu."))?;
if let Some(body) = content.body {
services()
.rooms
.search
.index_pdu(shortroomid, &pdu_id, &body)?;
}
if pdu.kind == TimelineEventType::RoomMessage {
#[derive(Deserialize)]
struct ExtractBody {
body: Option<String>,
}
let content = serde_json::from_str::<ExtractBody>(pdu.content.get())
.map_err(|_| Error::bad_database("Invalid content in pdu."))?;
if let Some(body) = content.body {
services()
.rooms
.search
.index_pdu(shortroomid, &pdu_id, &body)?;
}
_ => {}
}
drop(mutex_lock);
@ -1224,3 +1180,16 @@ impl Service {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn comparisons() {
assert!(PduCount::Normal(1) < PduCount::Normal(2));
assert!(PduCount::Backfilled(2) < PduCount::Backfilled(1));
assert!(PduCount::Normal(1) > PduCount::Backfilled(1));
assert!(PduCount::Backfilled(1) < PduCount::Normal(1));
}
}