37 errors left

This commit is contained in:
Timo Kösters 2022-10-05 20:33:55 +02:00 committed by Nyaaori
parent 44fe6d1554
commit 33a2b2b772
No known key found for this signature in database
GPG key ID: E7819C3ED4D1F82E
19 changed files with 764 additions and 482 deletions

View file

@ -1,6 +1,7 @@
mod data;
use std::borrow::Cow;
use std::sync::Arc;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::{iter, collections::HashSet};
use std::fmt::Debug;
@ -22,6 +23,8 @@ use super::state_compressor::CompressedStateEvent;
pub struct Service {
db: Arc<dyn Data>,
pub(super) lasttimelinecount_cache: Mutex<HashMap<Box<RoomId>, u64>>,
}
impl Service {
@ -73,7 +76,7 @@ impl Service {
&self,
event_id: &EventId,
) -> Result<Option<CanonicalJsonObject>> {
self.db.get_non_outlier_pdu(event_id)
self.db.get_non_outlier_pdu_json(event_id)
}
/// Returns the pdu's id.
@ -129,9 +132,10 @@ impl Service {
&self,
pdu: &PduEvent,
mut pdu_json: CanonicalJsonObject,
leaves: impl IntoIterator<Item = &'a EventId> + Debug,
leaves: Vec<Box<EventId>>,
state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<Vec<u8>> {
let shortroomid = self.get_shortroomid(&pdu.room_id)?.expect("room exists");
let shortroomid = services().rooms.short.get_shortroomid(&pdu.room_id)?.expect("room exists");
// Make unsigned fields correct. This is not properly documented in the spec, but state
// events need to have previous content in the unsigned field, so clients can easily
@ -141,8 +145,8 @@ impl Service {
.entry("unsigned".to_owned())
.or_insert_with(|| CanonicalJsonValue::Object(Default::default()))
{
if let Some(shortstatehash) = self.pdu_shortstatehash(&pdu.event_id).unwrap() {
if let Some(prev_state) = self
if let Some(shortstatehash) = services().rooms.state_accessor.pdu_shortstatehash(&pdu.event_id).unwrap() {
if let Some(prev_state) = services().rooms.state_accessor
.state_get(shortstatehash, &pdu.kind.to_string().into(), state_key)
.unwrap()
{
@ -161,8 +165,8 @@ impl Service {
}
// We must keep track of all events that have been referenced.
self.mark_as_referenced(&pdu.room_id, &pdu.prev_events)?;
self.replace_pdu_leaves(&pdu.room_id, leaves)?;
services().rooms.pdu_metadata.mark_as_referenced(&pdu.room_id, &pdu.prev_events)?;
services().rooms.state.set_forward_extremities(&pdu.room_id, leaves, state_lock)?;
let mutex_insert = Arc::clone(
services().globals
@ -177,37 +181,23 @@ impl Service {
let count1 = services().globals.next_count()?;
// Mark as read first so the sending client doesn't get a notification even if appending
// fails
self.edus
services().rooms.edus.read_receipt
.private_read_set(&pdu.room_id, &pdu.sender, count1)?;
self.reset_notification_counts(&pdu.sender, &pdu.room_id)?;
services().rooms.user.reset_notification_counts(&pdu.sender, &pdu.room_id)?;
let count2 = services().globals.next_count()?;
let mut pdu_id = shortroomid.to_be_bytes().to_vec();
pdu_id.extend_from_slice(&count2.to_be_bytes());
// There's a brief moment of time here where the count is updated but the pdu does not
// exist. This could theoretically lead to dropped pdus, but it's extremely rare
//
// Update: We fixed this using insert_lock
self.pduid_pdu.insert(
&pdu_id,
&serde_json::to_vec(&pdu_json).expect("CanonicalJsonObject is always a valid"),
)?;
self.lasttimelinecount_cache
.lock()
.unwrap()
.insert(pdu.room_id.clone(), count2);
self.eventid_pduid
.insert(pdu.event_id.as_bytes(), &pdu_id)?;
self.eventid_outlierpdu.remove(pdu.event_id.as_bytes())?;
// Insert pdu
self.db.append_pdu(&pdu_id, pdu, &pdu_json, count2)?;
drop(insert_lock);
// See if the event matches any known pushers
let power_levels: RoomPowerLevelsEventContent = services()
.rooms
.state_accessor
.room_state_get(&pdu.room_id, &StateEventType::RoomPowerLevels, "")?
.map(|ev| {
serde_json::from_str(ev.content.get())
@ -221,9 +211,9 @@ impl Service {
let mut notifies = Vec::new();
let mut highlights = Vec::new();
for user in self.get_our_real_users(&pdu.room_id)?.iter() {
for user in services().rooms.state_cache.get_our_real_users(&pdu.room_id)?.into_iter() {
// Don't notify the user of their own events
if user == &pdu.sender {
if &user == &pdu.sender {
continue;
}
@ -231,17 +221,19 @@ impl Service {
.account_data
.get(
None,
user,
&user,
GlobalAccountDataEventType::PushRules.to_string().into(),
)?
.map(|event| serde_json::from_str::<PushRulesEvent>(event.get())
.map_err(|_| Error::bad_database("Invalid push rules event in db."))).transpose()?
.map(|ev: PushRulesEvent| ev.content.global)
.unwrap_or_else(|| Ruleset::server_default(user));
.unwrap_or_else(|| Ruleset::server_default(&user));
let mut highlight = false;
let mut notify = false;
for action in services().pusher.get_actions(
user,
&user,
&rules_for_user,
&power_levels,
&sync_pdu,
@ -258,27 +250,20 @@ impl Service {
};
}
let mut userroom_id = user.as_bytes().to_vec();
userroom_id.push(0xff);
userroom_id.extend_from_slice(pdu.room_id.as_bytes());
if notify {
notifies.push(userroom_id.clone());
notifies.push(user);
}
if highlight {
highlights.push(userroom_id);
highlights.push(user);
}
for senderkey in services().pusher.get_pusher_senderkeys(user) {
for senderkey in services().pusher.get_pusher_senderkeys(&user) {
services().sending.send_push_pdu(&*pdu_id, senderkey)?;
}
}
self.userroomid_notificationcount
.increment_batch(&mut notifies.into_iter())?;
self.userroomid_highlightcount
.increment_batch(&mut highlights.into_iter())?;
self.db.increment_notification_counts(&pdu.room_id, notifies, highlights);
match pdu.kind {
RoomEventType::RoomRedaction => {
@ -302,7 +287,7 @@ impl Service {
let invite_state = match content.membership {
MembershipState::Invite => {
let state = self.calculate_invite_state(pdu)?;
let state = services().rooms.state.calculate_invite_state(pdu)?;
Some(state)
}
_ => None,
@ -310,7 +295,7 @@ impl Service {
// Update our membership info, we do this here incase a user is invited
// and immediately leaves we need the DB to record the invite event for auth
self.update_membership(
services().rooms.state_cache.update_membership(
&pdu.room_id,
&target_user_id,
content.membership,
@ -322,18 +307,17 @@ impl Service {
}
RoomEventType::RoomMessage => {
#[derive(Deserialize)]
struct ExtractBody<'a> {
#[serde(borrow)]
body: Option<Cow<'a, str>>,
struct ExtractBody {
body: Option<String>,
}
let content = serde_json::from_str::<ExtractBody<'_>>(pdu.content.get())
let content = serde_json::from_str::<ExtractBody>(pdu.content.get())
.map_err(|_| Error::bad_database("Invalid content in pdu."))?;
if let Some(body) = content.body {
services().rooms.search.index_pdu(shortroomid, pdu_id, body)?;
services().rooms.search.index_pdu(shortroomid, &pdu_id, body)?;
let admin_room = self.alias.resolve_local_alias(
let admin_room = services().rooms.alias.resolve_local_alias(
<&RoomAliasId>::try_from(
format!("#admins:{}", services().globals.server_name()).as_str(),
)
@ -357,7 +341,7 @@ impl Service {
}
for appservice in services().appservice.all()? {
if self.appservice_in_room(&pdu.room_id, &appservice)? {
if services().rooms.state_cache.appservice_in_room(&pdu.room_id, &appservice)? {
services().sending.send_pdu_appservice(&appservice.0, &pdu_id)?;
continue;
}
@ -418,7 +402,7 @@ impl Service {
.map_or(false, |state_key| users.is_match(state_key))
};
let matching_aliases = |aliases: &Regex| {
self.room_aliases(&pdu.room_id)
services().rooms.alias.local_aliases_for_room(&pdu.room_id)
.filter_map(|r| r.ok())
.any(|room_alias| aliases.is_match(room_alias.as_str()))
};
@ -461,6 +445,7 @@ impl Service {
let create_event = services()
.rooms
.state_accessor
.room_state_get(room_id, &StateEventType::RoomCreate, "")?;
let create_event_content: Option<RoomCreateEventContent> = create_event
@ -483,12 +468,12 @@ impl Service {
RoomVersion::new(&room_version_id).expect("room version is supported");
let auth_events =
self.get_auth_events(room_id, &event_type, sender, state_key.as_deref(), &content)?;
services().rooms.state.get_auth_events(room_id, &event_type, sender, state_key.as_deref(), &content)?;
// Our depth is the maximum depth of prev_events + 1
let depth = prev_events
.iter()
.filter_map(|event_id| Some(services().rooms.get_pdu(event_id).ok()??.depth))
.filter_map(|event_id| Some(services().rooms.timeline.get_pdu(event_id).ok()??.depth))
.max()
.unwrap_or_else(|| uint!(0))
+ uint!(1);
@ -497,7 +482,7 @@ impl Service {
if let Some(state_key) = &state_key {
if let Some(prev_pdu) =
self.room_state_get(room_id, &event_type.to_string().into(), state_key)?
services().rooms.state_accessor.room_state_get(room_id, &event_type.to_string().into(), state_key)?
{
unsigned.insert(
"prev_content".to_owned(),
@ -604,7 +589,7 @@ impl Service {
);
// Generate short event id
let _shorteventid = self.get_or_create_shorteventid(&pdu.event_id)?;
let _shorteventid = services().rooms.short.get_or_create_shorteventid(&pdu.event_id)?;
Ok((pdu, pdu_json))
}
@ -623,22 +608,23 @@ impl Service {
// We append to state before appending the pdu, so we don't have a moment in time with the
// pdu without it's state. This is okay because append_pdu can't fail.
let statehashid = self.append_to_state(&pdu)?;
let statehashid = services().rooms.state.append_to_state(&pdu)?;
let pdu_id = self.append_pdu(
&pdu,
pdu_json,
// Since this PDU references all pdu_leaves we can update the leaves
// of the room
iter::once(&*pdu.event_id),
vec![(*pdu.event_id).to_owned()],
state_lock,
)?;
// We set the room state after inserting the pdu, so that we never have a moment in time
// where events in the current room state do not exist
self.set_room_state(room_id, statehashid)?;
services().rooms.state.set_room_state(room_id, statehashid, state_lock)?;
let mut servers: HashSet<Box<ServerName>> =
self.room_servers(room_id).filter_map(|r| r.ok()).collect();
services().rooms.state_cache.room_servers(room_id).filter_map(|r| r.ok()).collect();
// In case we are kicking or banning a user, we need to inform their server of the change
if pdu.kind == RoomEventType::RoomMember {
@ -666,27 +652,27 @@ impl Service {
&self,
pdu: &PduEvent,
pdu_json: CanonicalJsonObject,
new_room_leaves: impl IntoIterator<Item = &'a EventId> + Clone + Debug,
new_room_leaves: Vec<Box<EventId>>,
state_ids_compressed: HashSet<CompressedStateEvent>,
soft_fail: bool,
_mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<Option<Vec<u8>>> {
// We append to state before appending the pdu, so we don't have a moment in time with the
// pdu without it's state. This is okay because append_pdu can't fail.
services().rooms.set_event_state(
services().rooms.state.set_event_state(
&pdu.event_id,
&pdu.room_id,
state_ids_compressed,
)?;
if soft_fail {
services().rooms
services().rooms.pdu_metadata
.mark_as_referenced(&pdu.room_id, &pdu.prev_events)?;
services().rooms.replace_pdu_leaves(&pdu.room_id, new_room_leaves)?;
services().rooms.state.set_forward_extremities(&pdu.room_id, new_room_leaves, state_lock)?;
return Ok(None);
}
let pdu_id = services().rooms.append_pdu(pdu, pdu_json, new_room_leaves)?;
let pdu_id = services().rooms.timeline.append_pdu(pdu, pdu_json, new_room_leaves, state_lock)?;
Ok(Some(pdu_id))
}