Merge remote-tracking branch 'refs/remotes/origin/next' into command-refactor

This commit is contained in:
Andrei Vasiliu 2022-01-21 10:19:17 +02:00
commit f244c0e2ce
26 changed files with 278 additions and 198 deletions

View file

@ -23,12 +23,12 @@ pub trait DatabaseEngine: Send + Sync {
where
Self: Sized;
fn open_tree(&self, name: &'static str) -> Result<Arc<dyn Tree>>;
fn flush(self: &Self) -> Result<()>;
fn cleanup(self: &Self) -> Result<()> {
fn flush(&self) -> Result<()>;
fn cleanup(&self) -> Result<()> {
Ok(())
}
fn memory_usage(self: &Self) -> Result<String> {
Ok("Current database engine does not support memory usage reporting.".to_string())
fn memory_usage(&self) -> Result<String> {
Ok("Current database engine does not support memory usage reporting.".to_owned())
}
}

View file

@ -4,7 +4,7 @@ use std::{future::Future, pin::Pin, sync::Arc, sync::RwLock};
pub struct Engine {
rocks: rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>,
cache_capacity_bytes: usize,
max_open_files: i32,
cache: rocksdb::Cache,
old_cfs: Vec<String>,
}
@ -16,7 +16,7 @@ pub struct RocksDbEngineTree<'a> {
write_lock: RwLock<()>,
}
fn db_options(cache_capacity_bytes: usize, rocksdb_cache: &rocksdb::Cache) -> rocksdb::Options {
fn db_options(max_open_files: i32, rocksdb_cache: &rocksdb::Cache) -> rocksdb::Options {
let mut block_based_options = rocksdb::BlockBasedOptions::default();
block_based_options.set_block_cache(rocksdb_cache);
@ -36,10 +36,10 @@ fn db_options(cache_capacity_bytes: usize, rocksdb_cache: &rocksdb::Cache) -> ro
//db_opts.set_use_direct_io_for_flush_and_compaction(true);
db_opts.create_if_missing(true);
db_opts.increase_parallelism(num_cpus::get() as i32);
db_opts.set_max_open_files(512);
db_opts.set_max_open_files(max_open_files);
db_opts.set_compression_type(rocksdb::DBCompressionType::Zstd);
db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
db_opts.optimize_level_style_compaction(cache_capacity_bytes);
db_opts.optimize_level_style_compaction(10 * 1024 * 1024);
let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1);
db_opts.set_prefix_extractor(prefix_extractor);
@ -52,7 +52,7 @@ impl DatabaseEngine for Arc<Engine> {
let cache_capacity_bytes = (config.db_cache_capacity_mb * 1024.0 * 1024.0) as usize;
let rocksdb_cache = rocksdb::Cache::new_lru_cache(cache_capacity_bytes).unwrap();
let db_opts = db_options(cache_capacity_bytes, &rocksdb_cache);
let db_opts = db_options(config.rocksdb_max_open_files, &rocksdb_cache);
let cfs = rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(
&db_opts,
@ -66,14 +66,14 @@ impl DatabaseEngine for Arc<Engine> {
cfs.iter().map(|name| {
rocksdb::ColumnFamilyDescriptor::new(
name,
db_options(cache_capacity_bytes, &rocksdb_cache),
db_options(config.rocksdb_max_open_files, &rocksdb_cache),
)
}),
)?;
Ok(Arc::new(Engine {
rocks: db,
cache_capacity_bytes,
max_open_files: config.rocksdb_max_open_files,
cache: rocksdb_cache,
old_cfs: cfs,
}))
@ -84,7 +84,7 @@ impl DatabaseEngine for Arc<Engine> {
// Create if it didn't exist
let _ = self
.rocks
.create_cf(name, &db_options(self.cache_capacity_bytes, &self.cache));
.create_cf(name, &db_options(self.max_open_files, &self.cache));
}
Ok(Arc::new(RocksDbEngineTree {

View file

@ -127,7 +127,7 @@ impl Admin {
if let Ok(response) = guard._db.memory_usage() {
send_message(RoomMessageEventContent::text_plain(response), guard, &state_lock);
} else {
send_message(RoomMessageEventContent::text_plain("Failed to get database memory usage.".to_string()), guard, &state_lock);
send_message(RoomMessageEventContent::text_plain("Failed to get database memory usage.".to_owned()), guard, &state_lock);
}
}
AdminCommand::SendMessage(message) => {

View file

@ -2517,7 +2517,7 @@ impl Rooms {
let state_lock = mutex_state.lock().await;
let mut event: RoomMemberEventContent = serde_json::from_str(
self.room_state_get(room_id, &EventType::RoomMember, &user_id.to_string())?
self.room_state_get(room_id, &EventType::RoomMember, user_id.as_str())?
.ok_or(Error::BadRequest(
ErrorKind::BadState,
"Cannot leave a room you are not a member of.",
@ -3252,8 +3252,7 @@ impl Rooms {
&key[0].to_be_bytes(),
&chain
.iter()
.map(|s| s.to_be_bytes().to_vec())
.flatten()
.flat_map(|s| s.to_be_bytes().to_vec())
.collect::<Vec<u8>>(),
)?;
}
@ -3274,11 +3273,11 @@ impl Rooms {
) -> Result<bool> {
let mut key = user_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(&device_id.as_bytes());
key.extend_from_slice(device_id.as_bytes());
key.push(0xff);
key.extend_from_slice(&room_id.as_bytes());
key.extend_from_slice(room_id.as_bytes());
key.push(0xff);
key.extend_from_slice(&ll_user.as_bytes());
key.extend_from_slice(ll_user.as_bytes());
Ok(self.lazyloadedids.get(&key)?.is_some())
}
@ -3318,14 +3317,14 @@ impl Rooms {
)) {
let mut prefix = user_id.as_bytes().to_vec();
prefix.push(0xff);
prefix.extend_from_slice(&device_id.as_bytes());
prefix.extend_from_slice(device_id.as_bytes());
prefix.push(0xff);
prefix.extend_from_slice(&room_id.as_bytes());
prefix.extend_from_slice(room_id.as_bytes());
prefix.push(0xff);
for ll_id in user_ids {
let mut key = prefix.clone();
key.extend_from_slice(&ll_id.as_bytes());
key.extend_from_slice(ll_id.as_bytes());
self.lazyloadedids.insert(&key, &[])?;
}
}
@ -3336,15 +3335,15 @@ impl Rooms {
#[tracing::instrument(skip(self))]
pub fn lazy_load_reset(
&self,
user_id: &Box<UserId>,
device_id: &Box<DeviceId>,
room_id: &Box<RoomId>,
user_id: &UserId,
device_id: &DeviceId,
room_id: &RoomId,
) -> Result<()> {
let mut prefix = user_id.as_bytes().to_vec();
prefix.push(0xff);
prefix.extend_from_slice(&device_id.as_bytes());
prefix.extend_from_slice(device_id.as_bytes());
prefix.push(0xff);
prefix.extend_from_slice(&room_id.as_bytes());
prefix.extend_from_slice(room_id.as_bytes());
prefix.push(0xff);
for (key, _) in self.lazyloadedids.scan_prefix(prefix) {

View file

@ -524,7 +524,7 @@ impl Sending {
.unwrap(), // TODO: handle error
appservice::event::push_events::v1::Request {
events: &pdu_jsons,
txn_id: &base64::encode_config(
txn_id: (&*base64::encode_config(
Self::calculate_hash(
&events
.iter()
@ -534,7 +534,8 @@ impl Sending {
.collect::<Vec<_>>(),
),
base64::URL_SAFE_NO_PAD,
),
))
.into(),
},
)
.await
@ -682,7 +683,7 @@ impl Sending {
pdus: &pdu_jsons,
edus: &edu_jsons,
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
transaction_id: &base64::encode_config(
transaction_id: (&*base64::encode_config(
Self::calculate_hash(
&events
.iter()
@ -692,7 +693,8 @@ impl Sending {
.collect::<Vec<_>>(),
),
base64::URL_SAFE_NO_PAD,
),
))
.into(),
},
)
.await

View file

@ -1,7 +1,7 @@
use std::sync::Arc;
use crate::Result;
use ruma::{DeviceId, UserId};
use ruma::{identifiers::TransactionId, DeviceId, UserId};
use super::abstraction::Tree;
@ -14,7 +14,7 @@ impl TransactionIds {
&self,
user_id: &UserId,
device_id: Option<&DeviceId>,
txn_id: &str,
txn_id: &TransactionId,
data: &[u8],
) -> Result<()> {
let mut key = user_id.as_bytes().to_vec();
@ -32,7 +32,7 @@ impl TransactionIds {
&self,
user_id: &UserId,
device_id: Option<&DeviceId>,
txn_id: &str,
txn_id: &TransactionId,
) -> Result<Option<Vec<u8>>> {
let mut key = user_id.as_bytes().to_vec();
key.push(0xff);

View file

@ -166,13 +166,12 @@ impl Uiaa {
user_id: &UserId,
device_id: &DeviceId,
session: &str,
) -> Result<Option<CanonicalJsonValue>> {
Ok(self
.userdevicesessionid_uiaarequest
) -> Option<CanonicalJsonValue> {
self.userdevicesessionid_uiaarequest
.read()
.unwrap()
.get(&(user_id.to_owned(), device_id.to_owned(), session.to_owned()))
.map(|j| j.to_owned()))
.map(|j| j.to_owned())
}
fn update_uiaa_session(

View file

@ -531,11 +531,11 @@ impl Users {
prefix.push(0xff);
// Master key
let master_key_map = master_key
let mut master_key_ids = master_key
.deserialize()
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invalid master key"))?
.keys;
let mut master_key_ids = master_key_map.values();
.keys
.into_values();
let master_key_id = master_key_ids.next().ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
@ -560,13 +560,14 @@ impl Users {
// Self-signing key
if let Some(self_signing_key) = self_signing_key {
let self_signing_key_map = self_signing_key
let mut self_signing_key_ids = self_signing_key
.deserialize()
.map_err(|_| {
Error::BadRequest(ErrorKind::InvalidParam, "Invalid self signing key")
})?
.keys;
let mut self_signing_key_ids = self_signing_key_map.values();
.keys
.into_values();
let self_signing_key_id = self_signing_key_ids.next().ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"Self signing key contained no key.",
@ -593,13 +594,14 @@ impl Users {
// User-signing key
if let Some(user_signing_key) = user_signing_key {
let user_signing_key_map = user_signing_key
let mut user_signing_key_ids = user_signing_key
.deserialize()
.map_err(|_| {
Error::BadRequest(ErrorKind::InvalidParam, "Invalid user signing key")
})?
.keys;
let mut user_signing_key_ids = user_signing_key_map.values();
.keys
.into_values();
let user_signing_key_id = user_signing_key_ids.next().ok_or(Error::BadRequest(
ErrorKind::InvalidParam,
"User signing key contained no key.",