improvement: fetch signing keys in parallel when joining a room
This commit is contained in:
parent
5049d0e01b
commit
8b40e0a85f
4 changed files with 105 additions and 60 deletions
|
@ -38,7 +38,7 @@ use std::{
|
|||
net::{IpAddr, SocketAddr},
|
||||
pin::Pin,
|
||||
result::Result as StdResult,
|
||||
sync::Arc,
|
||||
sync::{Arc, RwLock},
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
|
||||
|
@ -543,7 +543,7 @@ pub async fn send_transaction_message_route<'a>(
|
|||
|
||||
let mut resolved_map = BTreeMap::new();
|
||||
|
||||
let mut pub_key_map = BTreeMap::new();
|
||||
let pub_key_map = RwLock::new(BTreeMap::new());
|
||||
|
||||
// This is all the auth_events that have been recursively fetched so they don't have to be
|
||||
// deserialized over and over again.
|
||||
|
@ -569,7 +569,7 @@ pub async fn send_transaction_message_route<'a>(
|
|||
value,
|
||||
true,
|
||||
&db,
|
||||
&mut pub_key_map,
|
||||
&pub_key_map,
|
||||
&mut auth_cache,
|
||||
)
|
||||
.await
|
||||
|
@ -622,7 +622,7 @@ fn handle_incoming_pdu<'a>(
|
|||
value: BTreeMap<String, CanonicalJsonValue>,
|
||||
is_timeline_event: bool,
|
||||
db: &'a Database,
|
||||
pub_key_map: &'a mut BTreeMap<String, BTreeMap<String, String>>,
|
||||
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>,
|
||||
auth_cache: &'a mut EventMap<Arc<PduEvent>>,
|
||||
) -> AsyncRecursiveResult<'a, Arc<PduEvent>> {
|
||||
Box::pin(async move {
|
||||
|
@ -658,7 +658,9 @@ fn handle_incoming_pdu<'a>(
|
|||
|
||||
// We go through all the signatures we see on the value and fetch the corresponding signing
|
||||
// keys
|
||||
fetch_required_signing_keys(&value, pub_key_map, db).await.map_err(|e| e.to_string())?;
|
||||
fetch_required_signing_keys(&value, &pub_key_map, db)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
// 2. Check signatures, otherwise drop
|
||||
// 3. check content hash, redact if doesn't match
|
||||
|
@ -676,7 +678,11 @@ fn handle_incoming_pdu<'a>(
|
|||
|
||||
let room_version = create_event_content.room_version;
|
||||
|
||||
let mut val = match ruma::signatures::verify_event(&pub_key_map, &value, &room_version) {
|
||||
let mut val = match ruma::signatures::verify_event(
|
||||
&*pub_key_map.read().map_err(|_| "RwLock is poisoned.")?,
|
||||
&value,
|
||||
&room_version,
|
||||
) {
|
||||
Err(e) => {
|
||||
// Drop
|
||||
error!("{:?}: {}", value, e);
|
||||
|
@ -1106,7 +1112,7 @@ pub(crate) async fn fetch_and_handle_events(
|
|||
db: &Database,
|
||||
origin: &ServerName,
|
||||
events: &[EventId],
|
||||
pub_key_map: &mut BTreeMap<String, BTreeMap<String, String>>,
|
||||
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, String>>>,
|
||||
auth_cache: &mut EventMap<Arc<PduEvent>>,
|
||||
) -> Result<Vec<Arc<PduEvent>>> {
|
||||
let mut pdus = vec![];
|
||||
|
@ -1256,6 +1262,7 @@ pub(crate) async fn fetch_signing_keys(
|
|||
}
|
||||
}
|
||||
|
||||
warn!("Failed to find public key for server: {}", origin);
|
||||
Err(Error::BadServerResponse(
|
||||
"Failed to find public key for server",
|
||||
))
|
||||
|
@ -1486,7 +1493,7 @@ pub fn get_room_state_ids_route<'a>(
|
|||
put("/_matrix/federation/v2/invite/<_>/<_>", data = "<body>")
|
||||
)]
|
||||
#[tracing::instrument(skip(db, body))]
|
||||
pub fn create_invite_route<'a>(
|
||||
pub async fn create_invite_route<'a>(
|
||||
db: State<'a, Database>,
|
||||
body: Ruma<create_invite::v2::Request>,
|
||||
) -> ConduitResult<create_invite::v2::Response> {
|
||||
|
@ -1510,6 +1517,20 @@ pub fn create_invite_route<'a>(
|
|||
)
|
||||
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Failed to sign event."))?;
|
||||
|
||||
// Generate event id
|
||||
let event_id = EventId::try_from(&*format!(
|
||||
"${}",
|
||||
ruma::signatures::reference_hash(&signed_event, &body.room_version)
|
||||
.expect("ruma can calculate reference hashes")
|
||||
))
|
||||
.expect("ruma's reference hashes are valid event ids");
|
||||
|
||||
// Add event_id back
|
||||
signed_event.insert(
|
||||
"event_id".to_owned(),
|
||||
to_canonical_value(&event_id).expect("EventId is a valid CanonicalJsonValue"),
|
||||
);
|
||||
|
||||
let sender = serde_json::from_value(
|
||||
serde_json::to_value(
|
||||
signed_event
|
||||
|
@ -1543,24 +1564,26 @@ pub fn create_invite_route<'a>(
|
|||
.map_err(|_| Error::BadRequest(ErrorKind::InvalidParam, "Invalid invite event bytes."))?;
|
||||
|
||||
event.insert("event_id".to_owned(), "$dummy".into());
|
||||
invite_state.push(
|
||||
serde_json::from_value::<PduEvent>(event.into())
|
||||
.map_err(|e| {
|
||||
warn!("Invalid invite event: {}", e);
|
||||
Error::BadRequest(ErrorKind::InvalidParam, "Invalid invite event.")
|
||||
})?
|
||||
.to_stripped_state_event(),
|
||||
);
|
||||
|
||||
db.rooms.update_membership(
|
||||
&body.room_id,
|
||||
&invited_user,
|
||||
MembershipState::Invite,
|
||||
&sender,
|
||||
Some(invite_state),
|
||||
&db.account_data,
|
||||
&db.globals,
|
||||
)?;
|
||||
let pdu = serde_json::from_value::<PduEvent>(event.into()).map_err(|e| {
|
||||
warn!("Invalid invite event: {}", e);
|
||||
Error::BadRequest(ErrorKind::InvalidParam, "Invalid invite event.")
|
||||
})?;
|
||||
|
||||
invite_state.push(pdu.to_stripped_state_event());
|
||||
|
||||
// If the room already exists, the remote server will notify us about the join via /send
|
||||
if !db.rooms.exists(&pdu.room_id)? {
|
||||
db.rooms.update_membership(
|
||||
&body.room_id,
|
||||
&invited_user,
|
||||
MembershipState::Invite,
|
||||
&sender,
|
||||
Some(invite_state),
|
||||
&db.account_data,
|
||||
&db.globals,
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(create_invite::v2::Response {
|
||||
event: PduEvent::convert_to_outgoing_federation_event(signed_event),
|
||||
|
@ -1604,7 +1627,7 @@ pub fn get_profile_information_route<'a>(
|
|||
|
||||
pub async fn fetch_required_signing_keys(
|
||||
event: &BTreeMap<String, CanonicalJsonValue>,
|
||||
pub_key_map: &mut BTreeMap<String, BTreeMap<String, String>>,
|
||||
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, String>>>,
|
||||
db: &Database,
|
||||
) -> Result<()> {
|
||||
// We go through all the signatures we see on the value and fetch the corresponding signing
|
||||
|
@ -1642,14 +1665,17 @@ pub async fn fetch_required_signing_keys(
|
|||
.await
|
||||
{
|
||||
Ok(keys) => keys,
|
||||
Err(_) => {
|
||||
Err(e) => {
|
||||
return Err(Error::BadServerResponse(
|
||||
"Signature verification failed: Could not fetch signing key.",
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
pub_key_map.insert(signature_server.clone(), keys);
|
||||
pub_key_map
|
||||
.write()
|
||||
.map_err(|_| Error::bad_database("RwLock is poisoned."))?
|
||||
.insert(signature_server.clone(), keys);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue