···78 };
7980 // and make sure all keys at this time are captured from the back
81- page.ops
82- .iter()
83- .rev()
84- .skip(1) // we alredy added the very last one
85- .map(|s| serde_json::from_str::<Op>(s).inspect_err(|e|
86- log::warn!("deduplication failed op parsing ({s:?}: {e}), bailing for downstream to deal with.")))
87- .take_while(|opr| opr.as_ref().map(|op| op.created_at == last_at).unwrap_or(false))
88- .for_each(|opr| {
89- let op = &opr.expect("any Errs were filtered by take_while");
90- me.keys_at.push(op.into());
91- });
9293 Some(me)
94 }
···143 assert_eq!(last_at, self.last_at, "time moved backwards on a page");
144 }
145 // and make sure all keys at this time are captured from the back
00000146 page.ops
147 .iter()
148 .rev()
···78 };
7980 // and make sure all keys at this time are captured from the back
81+ me.capture_nth_last_at(page, last_at);
00000000008283 Some(me)
84 }
···133 assert_eq!(last_at, self.last_at, "time moved backwards on a page");
134 }
135 // and make sure all keys at this time are captured from the back
136+ self.capture_nth_last_at(page, last_at);
137+ }
138+139+ /// walk backwards from 2nd last and collect keys until created_at changes
140+ fn capture_nth_last_at(&mut self, page: &mut ExportPage, last_at: Dt) {
141 page.ops
142 .iter()
143 .rev()