fix: membership deserializing

This commit is contained in:
Timo Kösters 2021-04-09 21:38:16 +02:00
parent 51aa6448bc
commit 84f4ce73e5
No known key found for this signature in database
GPG key ID: 24DA7517711A2BA4
9 changed files with 74 additions and 63 deletions

View file

@ -47,7 +47,7 @@ impl Sending {
let mut futures = FuturesUnordered::new();
// Retry requests we could not finish yet
let mut current_transactions = HashMap::<OutgoingKind, Vec<IVec>>::new();
let mut current_transactions = HashMap::<OutgoingKind, Vec<Vec<u8>>>::new();
for (key, outgoing_kind, pdu) in servercurrentpdus
.iter()
@ -55,7 +55,7 @@ impl Sending {
.filter_map(|(key, _)| {
Self::parse_servercurrentpdus(&key)
.ok()
.map(|(k, p)| (key, k, p))
.map(|(k, p)| (key, k, p.to_vec()))
})
{
if pdu.is_empty() {
@ -150,7 +150,7 @@ impl Sending {
.keys()
.filter_map(|r| r.ok())
.map(|k| {
k.subslice(prefix.len(), k.len() - prefix.len())
k[prefix.len()..].to_vec()
})
.take(30)
.collect::<Vec<_>>();
@ -211,7 +211,11 @@ impl Sending {
};
},
Some(event) = &mut subscriber => {
if let sled::Event::Insert { key, .. } = event {
for (_tree, key, value_opt) in &event {
if value_opt.is_none() {
continue;
}
let servernamepduid = key.clone();
let exponential_backoff = |(tries, instant): &(u32, Instant)| {
@ -265,7 +269,7 @@ impl Sending {
futures.push(
Self::handle_event(
outgoing_kind,
vec![pdu_id],
vec![pdu_id.to_vec()],
&db,
)
);
@ -310,7 +314,7 @@ impl Sending {
}
#[tracing::instrument]
fn calculate_hash(keys: &[IVec]) -> Vec<u8> {
fn calculate_hash(keys: &[Vec<u8>]) -> Vec<u8> {
// We only hash the pdu's event ids, not the whole pdu
let bytes = keys.join(&0xff);
let hash = digest::digest(&digest::SHA256, &bytes);
@ -320,7 +324,7 @@ impl Sending {
#[tracing::instrument(skip(db))]
async fn handle_event(
kind: OutgoingKind,
pdu_ids: Vec<IVec>,
pdu_ids: Vec<Vec<u8>>,
db: &Database,
) -> std::result::Result<OutgoingKind, (OutgoingKind, Error)> {
match &kind {