at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[db] add resync_buffer keyspace for buffering commits during backfill

ptr.pet 5996ab14 eacdd58d

verified
+24 -2
+21 -2
src/db/keys.rs
··· 1 use jacquard_common::types::string::Did; 2 3 - use crate::db::types::TrimmedDid; 4 5 /// separator used for composite keys 6 pub const SEP: u8 = b'|'; ··· 53 54 pub const COUNT_COLLECTION_PREFIX: &[u8] = &[b'r', SEP]; 55 56 - // key format: r\x00{DID}\x00{collection} (DID trimmed) 57 pub fn count_collection_key(did: &Did, collection: &str) -> Vec<u8> { 58 let repo = TrimmedDid::from(did); 59 let mut key = ··· 64 key.extend_from_slice(collection.as_bytes()); 65 key 66 }
··· 1 use jacquard_common::types::string::Did; 2 3 + use crate::db::types::{DbTid, TrimmedDid}; 4 5 /// separator used for composite keys 6 pub const SEP: u8 = b'|'; ··· 53 54 pub const COUNT_COLLECTION_PREFIX: &[u8] = &[b'r', SEP]; 55 56 + // key format: r|{DID}|{collection} (DID trimmed) 57 pub fn count_collection_key(did: &Did, collection: &str) -> Vec<u8> { 58 let repo = TrimmedDid::from(did); 59 let mut key = ··· 64 key.extend_from_slice(collection.as_bytes()); 65 key 66 } 67 + 68 + // key format: {DID}|{rev} 69 + pub fn resync_buffer_key(did: &Did, rev: DbTid) -> Vec<u8> { 70 + let repo = TrimmedDid::from(did); 71 + let mut key = Vec::with_capacity(repo.len() + 1 + 8); 72 + repo.write_to_vec(&mut key); 73 + key.push(SEP); 74 + key.extend_from_slice(&rev.as_u64().to_be_bytes()); 75 + key 76 + } 77 + 78 + // prefix format: {DID}| (DID trimmed) 79 + pub fn resync_buffer_prefix(did: &Did) -> Vec<u8> { 80 + let repo = TrimmedDid::from(did); 81 + let mut prefix = Vec::with_capacity(repo.len() + 1); 82 + repo.write_to_vec(&mut prefix); 83 + prefix.push(SEP); 84 + prefix 85 + }
+3
src/db/mod.rs
··· 33 pub cursors: Keyspace, 34 pub pending: Keyspace, 35 pub resync: Keyspace, 36 pub events: Keyspace, 37 pub counts: Keyspace, 38 pub event_tx: broadcast::Sender<BroadcastEvent>, ··· 74 let cursors = open_ks("cursors", opts().expect_point_read_hits(true))?; 75 let pending = open_ks("pending", opts())?; 76 let resync = open_ks("resync", opts())?; 77 let events = open_ks("events", opts())?; 78 let counts = open_ks("counts", opts().expect_point_read_hits(true))?; 79 ··· 124 cursors, 125 pending, 126 resync, 127 events, 128 counts, 129 event_tx,
··· 33 pub cursors: Keyspace, 34 pub pending: Keyspace, 35 pub resync: Keyspace, 36 + pub resync_buffer: Keyspace, 37 pub events: Keyspace, 38 pub counts: Keyspace, 39 pub event_tx: broadcast::Sender<BroadcastEvent>, ··· 75 let cursors = open_ks("cursors", opts().expect_point_read_hits(true))?; 76 let pending = open_ks("pending", opts())?; 77 let resync = open_ks("resync", opts())?; 78 + let resync_buffer = open_ks("resync_buffer", opts())?; 79 let events = open_ks("events", opts())?; 80 let counts = open_ks("counts", opts().expect_point_read_hits(true))?; 81 ··· 126 cursors, 127 pending, 128 resync, 129 + resync_buffer, 130 events, 131 counts, 132 event_tx,