Step 5 in /send just fetches state from incoming server
This commit is contained in:
parent
9e83d2b2d5
commit
0ee239c9d7
4 changed files with 186 additions and 187 deletions
|
@ -1,4 +1,4 @@
|
|||
use crate::{client_server, utils, ConduitResult, Database, Error, PduEvent, Result, Ruma};
|
||||
use crate::{client_server, pdu, utils, ConduitResult, Database, Error, PduEvent, Result, Ruma};
|
||||
use get_profile_information::v1::ProfileField;
|
||||
use http::header::{HeaderValue, AUTHORIZATION, HOST};
|
||||
use log::{error, info, warn};
|
||||
|
@ -11,13 +11,15 @@ use ruma::{
|
|||
get_server_keys, get_server_version::v1 as get_server_version, ServerSigningKeys,
|
||||
VerifyKey,
|
||||
},
|
||||
event::{get_missing_events, get_room_state, get_room_state_ids},
|
||||
event::{get_event, get_missing_events, get_room_state, get_room_state_ids},
|
||||
query::get_profile_information,
|
||||
transactions::send_transaction_message,
|
||||
},
|
||||
OutgoingRequest,
|
||||
},
|
||||
directory::{IncomingFilter, IncomingRoomNetwork},
|
||||
serde::Raw,
|
||||
signatures::{CanonicalJsonObject, PublicKeyMap},
|
||||
EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId,
|
||||
};
|
||||
use state_res::{Event, StateMap};
|
||||
|
@ -578,32 +580,13 @@ pub async fn send_transaction_message_route<'a>(
|
|||
let mut pub_key_map = BTreeMap::new();
|
||||
pub_key_map.insert("domain".to_string(), pub_key_set);
|
||||
|
||||
let value =
|
||||
match ruma::signatures::verify_event(&pub_key_map, &value, &RoomVersionId::Version6) {
|
||||
Ok(ver) => {
|
||||
if let ruma::signatures::Verified::Signatures = ver {
|
||||
match ruma::signatures::redact(&value, &RoomVersionId::Version6) {
|
||||
Ok(obj) => obj,
|
||||
Err(_) => {
|
||||
resolved_map
|
||||
.insert(event_id, Err("Room is unknown to this server".into()));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
value
|
||||
}
|
||||
}
|
||||
Err(_e) => {
|
||||
resolved_map.insert(event_id, Err("Room is unknown to this server".into()));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let pdu = serde_json::from_value::<PduEvent>(
|
||||
serde_json::to_value(&value).expect("CanonicalJsonObj is a valid JsonValue"),
|
||||
)
|
||||
.expect("all ruma pdus are conduit pdus");
|
||||
let pdu = match signature_and_hash_check(&pub_key_map, value) {
|
||||
Ok(pdu) => pdu,
|
||||
Err(e) => {
|
||||
resolved_map.insert(event_id, Err(e));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// If we have no idea about this room skip the PDU
|
||||
if !db.rooms.exists(&pdu.room_id)? {
|
||||
|
@ -619,7 +602,10 @@ pub async fn send_transaction_message_route<'a>(
|
|||
.map(|id| db.rooms.get_pdu(id).expect("todo").map(Arc::new))
|
||||
.flatten();
|
||||
|
||||
// 4.
|
||||
// 4. Passes authorization rules based on the event's auth events, otherwise it is rejected.
|
||||
// TODO: To me this sounds more like the auth_events should be get the pdu.auth_events not
|
||||
// the auth events that would be correct for this pdu. Put another way we should use the auth events
|
||||
// the pdu claims are its auth events
|
||||
let auth_events = db.rooms.get_auth_events(
|
||||
&pdu.room_id,
|
||||
&pdu.kind,
|
||||
|
@ -627,6 +613,12 @@ pub async fn send_transaction_message_route<'a>(
|
|||
pdu.state_key.as_deref(),
|
||||
pdu.content.clone(),
|
||||
)?;
|
||||
|
||||
let mut event_map: state_res::EventMap<Arc<PduEvent>> = auth_events
|
||||
.iter()
|
||||
.map(|(k, v)| (v.event_id().clone(), Arc::new(v.clone())))
|
||||
.collect();
|
||||
|
||||
if !state_res::event_auth::auth_check(
|
||||
&RoomVersionId::Version6,
|
||||
&event,
|
||||
|
@ -635,7 +627,7 @@ pub async fn send_transaction_message_route<'a>(
|
|||
.into_iter()
|
||||
.map(|(k, v)| (k, Arc::new(v)))
|
||||
.collect(),
|
||||
None,
|
||||
None, // TODO: third party invite
|
||||
)
|
||||
.map_err(|_e| Error::Conflict("Auth check failed"))?
|
||||
{
|
||||
|
@ -646,66 +638,38 @@ pub async fn send_transaction_message_route<'a>(
|
|||
continue;
|
||||
}
|
||||
|
||||
let mut previous_states: Vec<StateMap<Arc<PduEvent>>> = vec![];
|
||||
for id in &pdu.prev_events {
|
||||
if let Some(id) = db.rooms.get_pdu_id(id)? {
|
||||
let state_hash = db
|
||||
.rooms
|
||||
.pdu_state_hash(&id)?
|
||||
.expect("found pdu with no statehash");
|
||||
let state = db
|
||||
.rooms
|
||||
.state_full(&pdu.room_id, &state_hash)?
|
||||
let server_name = body.body.origin.clone();
|
||||
let (state_at_event, incoming_auth_events): (StateMap<Arc<PduEvent>>, _) = match db
|
||||
.sending
|
||||
.send_federation_request(
|
||||
&db.globals,
|
||||
server_name.clone(),
|
||||
get_room_state_ids::v1::Request {
|
||||
room_id: pdu.room_id(),
|
||||
event_id: pdu.event_id(),
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(res) => {
|
||||
let state = fetch_events(&db, server_name.clone(), &pub_key_map, &res.pdu_ids)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|((et, sk), ev)| ((et, Some(sk)), Arc::new(ev)))
|
||||
.map(|pdu| ((pdu.kind.clone(), pdu.state_key.clone()), Arc::new(pdu)))
|
||||
.collect();
|
||||
previous_states.push(state);
|
||||
} else {
|
||||
// fetch the state
|
||||
match db
|
||||
.sending
|
||||
.send_federation_request(
|
||||
&db.globals,
|
||||
body.body.origin,
|
||||
get_room_state_ids::v1::Request {
|
||||
room_id: &pdu.room_id,
|
||||
event_id: id,
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(res) => todo!(),
|
||||
Err(e) => panic!(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 5. Passes authorization rules based on the state at the event, otherwise it is rejected.
|
||||
let state_at_event = if previous_states.is_empty() {
|
||||
// State is empty
|
||||
Default::default()
|
||||
} else if previous_states.len() == 1 {
|
||||
previous_states[0].clone()
|
||||
} else {
|
||||
match state_res::StateResolution::resolve(
|
||||
&pdu.room_id,
|
||||
&RoomVersionId::Version6,
|
||||
&previous_states
|
||||
.into_iter()
|
||||
.map(|map| {
|
||||
map.into_iter()
|
||||
.map(|(k, v)| (k, v.event_id.clone()))
|
||||
.collect::<StateMap<_>>()
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
None,
|
||||
&db.rooms,
|
||||
) {
|
||||
Ok(res) => res
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k, Arc::new(db.rooms.get_pdu(&v).unwrap().unwrap())))
|
||||
.collect(),
|
||||
Err(e) => panic!("{:?}", e),
|
||||
(
|
||||
state,
|
||||
fetch_events(&db, server_name.clone(), &pub_key_map, &res.auth_chain_ids)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
Err(_) => {
|
||||
resolved_map.insert(
|
||||
event.event_id().clone(),
|
||||
Err("Fetching state for event failed".into()),
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -713,8 +677,8 @@ pub async fn send_transaction_message_route<'a>(
|
|||
&RoomVersionId::Version6,
|
||||
&event,
|
||||
previous.clone(),
|
||||
state_at_event,
|
||||
None,
|
||||
state_at_event.clone(), // TODO: could this be &state avoid .clone
|
||||
None, // TODO: third party invite
|
||||
)
|
||||
.map_err(|_e| Error::Conflict("Auth check failed"))?
|
||||
{
|
||||
|
@ -747,22 +711,7 @@ pub async fn send_transaction_message_route<'a>(
|
|||
|
||||
fork_states.push(state);
|
||||
} else {
|
||||
// This is probably an error??
|
||||
match db
|
||||
.sending
|
||||
.send_federation_request(
|
||||
&db.globals,
|
||||
body.body.origin,
|
||||
get_room_state_ids::v1::Request {
|
||||
room_id: &pdu.room_id,
|
||||
event_id: id,
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(res) => todo!(),
|
||||
Err(e) => panic!(e),
|
||||
}
|
||||
todo!("we don't know of a pdu that is part of our known forks OOPS")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -773,6 +722,18 @@ pub async fn send_transaction_message_route<'a>(
|
|||
} else if fork_states.len() == 1 {
|
||||
fork_states[0].clone()
|
||||
} else {
|
||||
// Add as much as we can to the `event_map` (less DB hits)
|
||||
event_map.extend(
|
||||
incoming_auth_events
|
||||
.into_iter()
|
||||
.map(|pdu| (pdu.event_id().clone(), Arc::new(pdu))),
|
||||
);
|
||||
event_map.extend(
|
||||
state_at_event
|
||||
.into_iter()
|
||||
.map(|(_, pdu)| (pdu.event_id().clone(), pdu)),
|
||||
);
|
||||
|
||||
match state_res::StateResolution::resolve(
|
||||
&pdu.room_id,
|
||||
&RoomVersionId::Version6,
|
||||
|
@ -784,7 +745,7 @@ pub async fn send_transaction_message_route<'a>(
|
|||
.collect::<StateMap<_>>()
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
None,
|
||||
&mut event_map,
|
||||
&db.rooms,
|
||||
) {
|
||||
Ok(res) => res
|
||||
|
@ -819,8 +780,74 @@ pub async fn send_transaction_message_route<'a>(
|
|||
Ok(send_transaction_message::v1::Response { pdus: resolved_map }.into())
|
||||
}
|
||||
|
||||
fn signature_and_hash_check(
|
||||
pub_key_map: &ruma::signatures::PublicKeyMap,
|
||||
value: CanonicalJsonObject,
|
||||
) -> std::result::Result<PduEvent, String> {
|
||||
let val = match ruma::signatures::verify_event(pub_key_map, &value, &RoomVersionId::Version6) {
|
||||
Ok(ver) => {
|
||||
if let ruma::signatures::Verified::Signatures = ver {
|
||||
match ruma::signatures::redact(&value, &RoomVersionId::Version6) {
|
||||
Ok(obj) => obj,
|
||||
Err(_) => return Err("Redaction failed".into()),
|
||||
}
|
||||
} else {
|
||||
value
|
||||
}
|
||||
}
|
||||
Err(_e) => return Err("Signature verification failed".into()),
|
||||
};
|
||||
|
||||
serde_json::from_value::<PduEvent>(
|
||||
serde_json::to_value(val).expect("CanonicalJsonObj is a valid JsonValue"),
|
||||
)
|
||||
.map_err(|_| "Deserialization failed for JSON value".into())
|
||||
}
|
||||
|
||||
/// TODO: this needs to add events to the DB in a way that does not
|
||||
/// effect the state of the room
|
||||
async fn fetch_events(
|
||||
db: &Database,
|
||||
origin: Box<ServerName>,
|
||||
key_map: &PublicKeyMap,
|
||||
events: &[EventId],
|
||||
) -> Result<Vec<PduEvent>> {
|
||||
let mut pdus = vec![];
|
||||
for id in events {
|
||||
match db.rooms.get_pdu(id)? {
|
||||
Some(pdu) => pdus.push(pdu),
|
||||
None => match db
|
||||
.sending
|
||||
.send_federation_request(
|
||||
&db.globals,
|
||||
origin.clone(),
|
||||
get_event::v1::Request { event_id: id },
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(res) => {
|
||||
let (_, value) = crate::pdu::process_incoming_pdu(&res.pdu);
|
||||
match signature_and_hash_check(key_map, value) {
|
||||
Ok(pdu) => {
|
||||
// TODO: add to our DB somehow?
|
||||
pdus.push(pdu);
|
||||
}
|
||||
Err(e) => {
|
||||
// TODO: I would assume we just keep going
|
||||
error!("{:?}", e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => return Err(Error::BadServerResponse("Failed to fetch event")),
|
||||
},
|
||||
}
|
||||
}
|
||||
Ok(pdus)
|
||||
}
|
||||
|
||||
fn forward_extremity_ids(db: &Database, room_id: &RoomId) -> Result<Vec<EventId>> {
|
||||
todo!()
|
||||
db.rooms.get_pdu_leaves(room_id)
|
||||
}
|
||||
|
||||
fn append_state(db: &Database, pdu: &PduEvent) -> Result<()> {
|
||||
|
@ -854,20 +881,15 @@ fn append_state_soft(db: &Database, pdu: &PduEvent) -> Result<()> {
|
|||
pdu_id.push(0xff);
|
||||
pdu_id.extend_from_slice(&count.to_be_bytes());
|
||||
|
||||
db.rooms.append_to_state(&pdu_id, pdu, &db.globals)?;
|
||||
db.rooms.append_pdu(
|
||||
pdu,
|
||||
&utils::to_canonical_object(pdu).expect("Pdu is valid canonical object"),
|
||||
count,
|
||||
pdu_id.clone().into(),
|
||||
&db.globals,
|
||||
&db.account_data,
|
||||
&db.admin,
|
||||
)?;
|
||||
|
||||
for appservice in db.appservice.iter_all().filter_map(|r| r.ok()) {
|
||||
db.sending.send_pdu_appservice(&appservice.0, &pdu_id)?;
|
||||
}
|
||||
// db.rooms.append_pdu(
|
||||
// pdu,
|
||||
// &utils::to_canonical_object(pdu).expect("Pdu is valid canonical object"),
|
||||
// count,
|
||||
// pdu_id.clone().into(),
|
||||
// &db.globals,
|
||||
// &db.account_data,
|
||||
// &db.admin,
|
||||
// )?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue