tangled
alpha
login
or
join now
ptr.pet
/
hydrant
24
fork
atom
at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
24
fork
atom
overview
issues
6
pulls
pipelines
[ingest] accept since with empty string
ptr.pet
8 hours ago
9f8e65a0
8990a2ff
verified
This commit was signed with the committer's
known signature
.
ptr.pet
SSH Key Fingerprint:
SHA256:Abmvag+juovVufZTxyWY8KcVgrznxvBjQpJesv071Aw=
+34
-4
3 changed files
expand all
collapse all
unified
split
src
crawler
mod.rs
ingest
stream.rs
worker.rs
+1
-1
src/crawler/mod.rs
···
598
598
return Ok(next_wake);
599
599
}
600
600
601
601
-
info!(count = ready.len(), "retrying pending repos");
601
601
+
debug!(count = ready.len(), "retrying pending repos");
602
602
603
603
let handle = tokio::runtime::Handle::current();
604
604
let filter = self.state.filter.load();
+31
src/ingest/stream.rs
···
250
250
pub repo: Did<'a>,
251
251
pub rev: Tid,
252
252
pub seq: i64,
253
253
+
#[serde(deserialize_with = "deserialize_tid_or_empty")]
253
254
pub since: Option<Tid>,
254
255
pub time: Datetime,
255
256
pub too_big: bool,
···
505
506
use serde::Deserialize;
506
507
use serde_ipld_dagcbor::de::Deserializer;
507
508
509
509
+
// some relays send `""` for `since` when there is no previous revision instead of null
510
510
+
fn deserialize_tid_or_empty<'de, D>(deserializer: D) -> Result<Option<Tid>, D::Error>
511
511
+
where
512
512
+
D: serde::Deserializer<'de>,
513
513
+
{
514
514
+
let s = <Option<String>>::deserialize(deserializer)?;
515
515
+
match s.as_deref() {
516
516
+
None => Ok(None),
517
517
+
Some("") => {
518
518
+
tracing::warn!("received since with empty string instead of null");
519
519
+
Ok(None)
520
520
+
}
521
521
+
Some(s) => s.parse::<Tid>().map(Some).map_err(serde::de::Error::custom),
522
522
+
}
523
523
+
}
524
524
+
508
525
#[derive(Debug, Deserialize)]
509
526
struct EventHeader {
510
527
op: i64,
···
559
576
let bytes = data_encoding::BASE64.decode(FRAME).unwrap();
560
577
let msg = decode_frame(&bytes).unwrap();
561
578
assert!(matches!(msg, SubscribeReposMessage::Account(_)));
579
579
+
}
580
580
+
581
581
+
/// regression: some relays send `since: ""` (empty string) instead of null/absent for the initial commit.
582
582
+
/// this should decode cleanly with `since = None`.
583
583
+
/// TODO: is this behaviour we should reject?
584
584
+
#[test]
585
585
+
fn test_decode_commit_empty_since() {
586
586
+
const FRAME: &[u8] = b"omF0ZyNjb21taXRib3ABq2NvcHOAY3Jldm0zbWdkeWNmdWIyeTIyY3NlcRsAAAACZ8sHJmRyZXBveCBkaWQ6cGxjOmpxM3p2cmI1ZXdnMnFndXA3M3Fzb3V6ZWR0aW1leBsyMDI2LTAzLTA1VDE1OjQ3OjU0LjcxNjMyOFplYmxvYnOAZXNpbmNlYGZibG9ja3NZAUk6omVyb290c4HYKlglAAFxEiAlaO/rjabPL4/e2QlxkoxzCCwv69hE4P3Vdxpv7f6uEWd2ZXJzaW9uAeABAXESICVo7+uNps8vj97ZCXGSjHMILC/r2ETg/dV3Gm/t/q4RpmNkaWR4IGRpZDpwbGM6anEzenZyYjVld2cycWd1cDczcXNvdXplY3Jldm0zbWdkeWNmdWIyeTIyY3NpZ1hAwKfrZtwwbN7dW0uSbviOs65NWQRvlS9Qc7oRtiorybMTEYxKGJaFK2kHIMEWIJqumb4751En2aJEpsilWlaQOWRkYXRh2CpYJQABcRIgnf7+Yd126j3K5QI4gLCDedV63yBILW/b4nWSifZHZ3tkcHJldvZndmVyc2lvbgMrAXESIJ3+/mHdduo9yuUCOICwg3nVet8gSC1v2+J1kon2R2d7omFlgGFs9mZjb21taXTYKlglAAFxEiAlaO/rjabPL4/e2QlxkoxzCCwv69hE4P3Vdxpv7f6uEWZyZWJhc2X0ZnRvb0JpZ/Q=";
587
587
+
let bytes = data_encoding::BASE64.decode(FRAME).unwrap();
588
588
+
let msg = decode_frame(&bytes).unwrap();
589
589
+
let SubscribeReposMessage::Commit(c) = msg else {
590
590
+
panic!("expected Commit");
591
591
+
};
592
592
+
assert!(c.since.is_none(), "since should be None for empty string");
562
593
}
563
594
}
+2
-3
src/ingest/worker.rs
···
576
576
.unwrap_or(false)
577
577
});
578
578
if !touches_signal {
579
579
-
debug!(did = %did, "dropping commit, no signal-matching ops");
579
579
+
trace!(did = %did, "dropping commit, no signal-matching ops");
580
580
return Ok(RepoProcessResult::Syncing(None));
581
581
}
582
582
-
debug!(did = %did, "commit touches a signal, queuing backfill");
583
582
}
584
583
585
584
debug!(did = %did, "discovered new account from firehose, queueing backfill");
···
611
610
let mut repo_state = crate::db::deser_repo_state(&state_bytes)?.into_static();
612
611
613
612
if !repo_state.tracked && repo_state.status != RepoStatus::Backfilling {
614
614
-
debug!(did = %did, "ignoring active status as it is explicitly untracked");
613
613
+
trace!(did = %did, "ignoring active status as it is explicitly untracked");
615
614
return Ok(RepoProcessResult::Syncing(None));
616
615
}
617
616