feat: recurse relationships
This commit is contained in:
		
							parent
							
								
									7b19618136
								
							
						
					
					
						commit
						b46000fadc
					
				
					 4 changed files with 138 additions and 140 deletions
				
			
		|  | @ -3,7 +3,7 @@ use ruma::api::client::relations::{ | |||
|     get_relating_events_with_rel_type_and_event_type, | ||||
| }; | ||||
| 
 | ||||
| use crate::{service::rooms::timeline::PduCount, services, Result, Ruma}; | ||||
| use crate::{services, Result, Ruma}; | ||||
| 
 | ||||
| /// # `GET /_matrix/client/r0/rooms/{roomId}/relations/{eventId}/{relType}/{eventType}`
 | ||||
| pub async fn get_relating_events_with_rel_type_and_event_type_route( | ||||
|  | @ -11,27 +11,6 @@ pub async fn get_relating_events_with_rel_type_and_event_type_route( | |||
| ) -> Result<get_relating_events_with_rel_type_and_event_type::v1::Response> { | ||||
|     let sender_user = body.sender_user.as_ref().expect("user is authenticated"); | ||||
| 
 | ||||
|     let from = match body.from.clone() { | ||||
|         Some(from) => PduCount::try_from_string(&from)?, | ||||
|         None => match ruma::api::Direction::Backward { | ||||
|             // TODO: fix ruma so `body.dir` exists
 | ||||
|             ruma::api::Direction::Forward => PduCount::min(), | ||||
|             ruma::api::Direction::Backward => PduCount::max(), | ||||
|         }, | ||||
|     }; | ||||
| 
 | ||||
|     let to = body | ||||
|         .to | ||||
|         .as_ref() | ||||
|         .and_then(|t| PduCount::try_from_string(t).ok()); | ||||
| 
 | ||||
|     // Use limit or else 10, with maximum 100
 | ||||
|     let limit = body | ||||
|         .limit | ||||
|         .and_then(|u| u32::try_from(u).ok()) | ||||
|         .map_or(10_usize, |u| u as usize) | ||||
|         .min(100); | ||||
| 
 | ||||
|     let res = services() | ||||
|         .rooms | ||||
|         .pdu_metadata | ||||
|  | @ -41,9 +20,11 @@ pub async fn get_relating_events_with_rel_type_and_event_type_route( | |||
|             &body.event_id, | ||||
|             Some(body.event_type.clone()), | ||||
|             Some(body.rel_type.clone()), | ||||
|             from, | ||||
|             to, | ||||
|             limit, | ||||
|             body.from.clone(), | ||||
|             body.to.clone(), | ||||
|             body.limit, | ||||
|             body.recurse, | ||||
|             &body.dir, | ||||
|         )?; | ||||
| 
 | ||||
|     Ok( | ||||
|  | @ -51,6 +32,7 @@ pub async fn get_relating_events_with_rel_type_and_event_type_route( | |||
|             chunk: res.chunk, | ||||
|             next_batch: res.next_batch, | ||||
|             prev_batch: res.prev_batch, | ||||
|             recursion_depth: res.recursion_depth, | ||||
|         }, | ||||
|     ) | ||||
| } | ||||
|  | @ -61,27 +43,6 @@ pub async fn get_relating_events_with_rel_type_route( | |||
| ) -> Result<get_relating_events_with_rel_type::v1::Response> { | ||||
|     let sender_user = body.sender_user.as_ref().expect("user is authenticated"); | ||||
| 
 | ||||
|     let from = match body.from.clone() { | ||||
|         Some(from) => PduCount::try_from_string(&from)?, | ||||
|         None => match ruma::api::Direction::Backward { | ||||
|             // TODO: fix ruma so `body.dir` exists
 | ||||
|             ruma::api::Direction::Forward => PduCount::min(), | ||||
|             ruma::api::Direction::Backward => PduCount::max(), | ||||
|         }, | ||||
|     }; | ||||
| 
 | ||||
|     let to = body | ||||
|         .to | ||||
|         .as_ref() | ||||
|         .and_then(|t| PduCount::try_from_string(t).ok()); | ||||
| 
 | ||||
|     // Use limit or else 10, with maximum 100
 | ||||
|     let limit = body | ||||
|         .limit | ||||
|         .and_then(|u| u32::try_from(u).ok()) | ||||
|         .map_or(10_usize, |u| u as usize) | ||||
|         .min(100); | ||||
| 
 | ||||
|     let res = services() | ||||
|         .rooms | ||||
|         .pdu_metadata | ||||
|  | @ -91,15 +52,18 @@ pub async fn get_relating_events_with_rel_type_route( | |||
|             &body.event_id, | ||||
|             None, | ||||
|             Some(body.rel_type.clone()), | ||||
|             from, | ||||
|             to, | ||||
|             limit, | ||||
|             body.from.clone(), | ||||
|             body.to.clone(), | ||||
|             body.limit, | ||||
|             body.recurse, | ||||
|             &body.dir, | ||||
|         )?; | ||||
| 
 | ||||
|     Ok(get_relating_events_with_rel_type::v1::Response { | ||||
|         chunk: res.chunk, | ||||
|         next_batch: res.next_batch, | ||||
|         prev_batch: res.prev_batch, | ||||
|         recursion_depth: res.recursion_depth, | ||||
|     }) | ||||
| } | ||||
| 
 | ||||
|  | @ -109,27 +73,6 @@ pub async fn get_relating_events_route( | |||
| ) -> Result<get_relating_events::v1::Response> { | ||||
|     let sender_user = body.sender_user.as_ref().expect("user is authenticated"); | ||||
| 
 | ||||
|     let from = match body.from.clone() { | ||||
|         Some(from) => PduCount::try_from_string(&from)?, | ||||
|         None => match ruma::api::Direction::Backward { | ||||
|             // TODO: fix ruma so `body.dir` exists
 | ||||
|             ruma::api::Direction::Forward => PduCount::min(), | ||||
|             ruma::api::Direction::Backward => PduCount::max(), | ||||
|         }, | ||||
|     }; | ||||
| 
 | ||||
|     let to = body | ||||
|         .to | ||||
|         .as_ref() | ||||
|         .and_then(|t| PduCount::try_from_string(t).ok()); | ||||
| 
 | ||||
|     // Use limit or else 10, with maximum 100
 | ||||
|     let limit = body | ||||
|         .limit | ||||
|         .and_then(|u| u32::try_from(u).ok()) | ||||
|         .map_or(10_usize, |u| u as usize) | ||||
|         .min(100); | ||||
| 
 | ||||
|     services() | ||||
|         .rooms | ||||
|         .pdu_metadata | ||||
|  | @ -139,8 +82,10 @@ pub async fn get_relating_events_route( | |||
|             &body.event_id, | ||||
|             None, | ||||
|             None, | ||||
|             from, | ||||
|             to, | ||||
|             limit, | ||||
|             body.from.clone(), | ||||
|             body.to.clone(), | ||||
|             body.limit, | ||||
|             body.recurse, | ||||
|             &body.dir, | ||||
|         ) | ||||
| } | ||||
|  |  | |||
|  | @ -3,9 +3,9 @@ use std::sync::Arc; | |||
| 
 | ||||
| pub use data::Data; | ||||
| use ruma::{ | ||||
|     api::client::relations::get_relating_events, | ||||
|     api::{client::relations::get_relating_events, Direction}, | ||||
|     events::{relation::RelationType, TimelineEventType}, | ||||
|     EventId, RoomId, UserId, | ||||
|     EventId, RoomId, UInt, UserId, | ||||
| }; | ||||
| use serde::Deserialize; | ||||
| 
 | ||||
|  | @ -48,37 +48,57 @@ impl Service { | |||
|         target: &EventId, | ||||
|         filter_event_type: Option<TimelineEventType>, | ||||
|         filter_rel_type: Option<RelationType>, | ||||
|         from: PduCount, | ||||
|         to: Option<PduCount>, | ||||
|         limit: usize, | ||||
|         from: Option<String>, | ||||
|         to: Option<String>, | ||||
|         limit: Option<UInt>, | ||||
|         recurse: bool, | ||||
|         dir: &Direction, | ||||
|     ) -> Result<get_relating_events::v1::Response> { | ||||
|         let from = match from { | ||||
|             Some(from) => PduCount::try_from_string(&from)?, | ||||
|             None => match dir { | ||||
|                 Direction::Forward => PduCount::min(), | ||||
|                 Direction::Backward => PduCount::max(), | ||||
|             }, | ||||
|         }; | ||||
| 
 | ||||
|         let to = to.as_ref().and_then(|t| PduCount::try_from_string(t).ok()); | ||||
| 
 | ||||
|         // Use limit or else 10, with maximum 100
 | ||||
|         let limit = limit | ||||
|             .and_then(|u| u32::try_from(u).ok()) | ||||
|             .map_or(10_usize, |u| u as usize) | ||||
|             .min(100); | ||||
| 
 | ||||
|         let next_token; | ||||
| 
 | ||||
|         //TODO: Fix ruma: match body.dir {
 | ||||
|         match ruma::api::Direction::Backward { | ||||
|             ruma::api::Direction::Forward => { | ||||
|                 let events_after: Vec<_> = services() | ||||
|                     .rooms | ||||
|                     .pdu_metadata | ||||
|                     .relations_until(sender_user, room_id, target, from)? // TODO: should be relations_after
 | ||||
|                     .filter(|r| { | ||||
|                         r.as_ref().map_or(true, |(_, pdu)| { | ||||
|                             filter_event_type.as_ref().map_or(true, |t| &pdu.kind == t) | ||||
|                                 && if let Ok(content) = | ||||
|                                     serde_json::from_str::<ExtractRelatesToEventId>( | ||||
|                                         pdu.content.get(), | ||||
|                                     ) | ||||
|                                 { | ||||
|                                     filter_rel_type | ||||
|                                         .as_ref() | ||||
|                                         .map_or(true, |r| &content.relates_to.rel_type == r) | ||||
|                                 } else { | ||||
|                                     false | ||||
|                                 } | ||||
|                         }) | ||||
|         // Spec (v1.10) recommends depth of at least 3
 | ||||
|         let depth: u8 = if recurse { 3 } else { 1 }; | ||||
| 
 | ||||
|         match dir { | ||||
|             Direction::Forward => { | ||||
|                 let relations_until = &services().rooms.pdu_metadata.relations_until( | ||||
|                     sender_user, | ||||
|                     room_id, | ||||
|                     target, | ||||
|                     from, | ||||
|                     depth, | ||||
|                 )?; | ||||
|                 let events_after: Vec<_> = relations_until // TODO: should be relations_after
 | ||||
|                     .iter() | ||||
|                     .filter(|(_, pdu)| { | ||||
|                         filter_event_type.as_ref().map_or(true, |t| &pdu.kind == t) | ||||
|                             && if let Ok(content) = | ||||
|                                 serde_json::from_str::<ExtractRelatesToEventId>(pdu.content.get()) | ||||
|                             { | ||||
|                                 filter_rel_type | ||||
|                                     .as_ref() | ||||
|                                     .map_or(true, |r| &content.relates_to.rel_type == r) | ||||
|                             } else { | ||||
|                                 false | ||||
|                             } | ||||
|                     }) | ||||
|                     .take(limit) | ||||
|                     .filter_map(|r| r.ok()) // Filter out buggy events
 | ||||
|                     .filter(|(_, pdu)| { | ||||
|                         services() | ||||
|                             .rooms | ||||
|  | @ -86,7 +106,7 @@ impl Service { | |||
|                             .user_can_see_event(sender_user, room_id, &pdu.event_id) | ||||
|                             .unwrap_or(false) | ||||
|                     }) | ||||
|                     .take_while(|&(k, _)| Some(k) != to) // Stop at `to`
 | ||||
|                     .take_while(|(k, _)| Some(k) != to.as_ref()) // Stop at `to`
 | ||||
|                     .collect(); | ||||
| 
 | ||||
|                 next_token = events_after.last().map(|(count, _)| count).copied(); | ||||
|  | @ -101,31 +121,32 @@ impl Service { | |||
|                     chunk: events_after, | ||||
|                     next_batch: next_token.map(|t| t.stringify()), | ||||
|                     prev_batch: Some(from.stringify()), | ||||
|                     recursion_depth: if recurse { Some(depth.into()) } else { None }, | ||||
|                 }) | ||||
|             } | ||||
|             ruma::api::Direction::Backward => { | ||||
|                 let events_before: Vec<_> = services() | ||||
|                     .rooms | ||||
|                     .pdu_metadata | ||||
|                     .relations_until(sender_user, room_id, target, from)? | ||||
|                     .filter(|r| { | ||||
|                         r.as_ref().map_or(true, |(_, pdu)| { | ||||
|                             filter_event_type.as_ref().map_or(true, |t| &pdu.kind == t) | ||||
|                                 && if let Ok(content) = | ||||
|                                     serde_json::from_str::<ExtractRelatesToEventId>( | ||||
|                                         pdu.content.get(), | ||||
|                                     ) | ||||
|                                 { | ||||
|                                     filter_rel_type | ||||
|                                         .as_ref() | ||||
|                                         .map_or(true, |r| &content.relates_to.rel_type == r) | ||||
|                                 } else { | ||||
|                                     false | ||||
|                                 } | ||||
|                         }) | ||||
|             Direction::Backward => { | ||||
|                 let relations_until = &services().rooms.pdu_metadata.relations_until( | ||||
|                     sender_user, | ||||
|                     room_id, | ||||
|                     target, | ||||
|                     from, | ||||
|                     depth, | ||||
|                 )?; | ||||
|                 let events_before: Vec<_> = relations_until | ||||
|                     .iter() | ||||
|                     .filter(|(_, pdu)| { | ||||
|                         filter_event_type.as_ref().map_or(true, |t| &pdu.kind == t) | ||||
|                             && if let Ok(content) = | ||||
|                                 serde_json::from_str::<ExtractRelatesToEventId>(pdu.content.get()) | ||||
|                             { | ||||
|                                 filter_rel_type | ||||
|                                     .as_ref() | ||||
|                                     .map_or(true, |r| &content.relates_to.rel_type == r) | ||||
|                             } else { | ||||
|                                 false | ||||
|                             } | ||||
|                     }) | ||||
|                     .take(limit) | ||||
|                     .filter_map(|r| r.ok()) // Filter out buggy events
 | ||||
|                     .filter(|(_, pdu)| { | ||||
|                         services() | ||||
|                             .rooms | ||||
|  | @ -133,7 +154,7 @@ impl Service { | |||
|                             .user_can_see_event(sender_user, room_id, &pdu.event_id) | ||||
|                             .unwrap_or(false) | ||||
|                     }) | ||||
|                     .take_while(|&(k, _)| Some(k) != to) // Stop at `to`
 | ||||
|                     .take_while(|&(k, _)| Some(k) != to.as_ref()) // Stop at `to`
 | ||||
|                     .collect(); | ||||
| 
 | ||||
|                 next_token = events_before.last().map(|(count, _)| count).copied(); | ||||
|  | @ -147,6 +168,7 @@ impl Service { | |||
|                     chunk: events_before, | ||||
|                     next_batch: next_token.map(|t| t.stringify()), | ||||
|                     prev_batch: Some(from.stringify()), | ||||
|                     recursion_depth: if recurse { Some(depth.into()) } else { None }, | ||||
|                 }) | ||||
|             } | ||||
|         } | ||||
|  | @ -158,14 +180,44 @@ impl Service { | |||
|         room_id: &'a RoomId, | ||||
|         target: &'a EventId, | ||||
|         until: PduCount, | ||||
|     ) -> Result<impl Iterator<Item = Result<(PduCount, PduEvent)>> + 'a> { | ||||
|         max_depth: u8, | ||||
|     ) -> Result<Vec<(PduCount, PduEvent)>> { | ||||
|         let room_id = services().rooms.short.get_or_create_shortroomid(room_id)?; | ||||
|         let target = match services().rooms.timeline.get_pdu_count(target)? { | ||||
|             Some(PduCount::Normal(c)) => c, | ||||
|             // TODO: Support backfilled relations
 | ||||
|             _ => 0, // This will result in an empty iterator
 | ||||
|         }; | ||||
|         self.db.relations_until(user_id, room_id, target, until) | ||||
| 
 | ||||
|         self.db | ||||
|             .relations_until(user_id, room_id, target, until) | ||||
|             .map(|mut relations| { | ||||
|                 let mut pdus: Vec<_> = (*relations).into_iter().filter_map(Result::ok).collect(); | ||||
|                 let mut stack: Vec<_> = | ||||
|                     pdus.clone().iter().map(|pdu| (pdu.to_owned(), 1)).collect(); | ||||
| 
 | ||||
|                 while let Some(stack_pdu) = stack.pop() { | ||||
|                     let target = match stack_pdu.0 .0 { | ||||
|                         PduCount::Normal(c) => c, | ||||
|                         // TODO: Support backfilled relations
 | ||||
|                         PduCount::Backfilled(_) => 0, // This will result in an empty iterator
 | ||||
|                     }; | ||||
| 
 | ||||
|                     if let Ok(relations) = self.db.relations_until(user_id, room_id, target, until) | ||||
|                     { | ||||
|                         for relation in relations.flatten() { | ||||
|                             if stack_pdu.1 < max_depth { | ||||
|                                 stack.push((relation.clone(), stack_pdu.1 + 1)); | ||||
|                             } | ||||
| 
 | ||||
|                             pdus.push(relation); | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
| 
 | ||||
|                 pdus.sort_by(|a, b| a.0.cmp(&b.0)); | ||||
|                 pdus | ||||
|             }) | ||||
|     } | ||||
| 
 | ||||
|     #[tracing::instrument(skip(self, room_id, event_ids))] | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Matthias Ahouansou
						Matthias Ahouansou