at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[ingest] add account event types and stream support

ptr.pet befe77f8 24decad9

verified
+96 -57
+9
src/api/stream.rs
··· 104 104 cid, 105 105 }), 106 106 identity: None, 107 + account: None, 107 108 } 108 109 } 109 110 StoredEvent::Identity(identity) => MarshallableEvt { ··· 111 112 event_type: "identity".into(), 112 113 record: None, 113 114 identity: Some(identity), 115 + account: None, 116 + }, 117 + StoredEvent::Account(account) => MarshallableEvt { 118 + id, 119 + event_type: "account".into(), 120 + record: None, 121 + identity: None, 122 + account: Some(account), 114 123 }, 115 124 }; 116 125
+14 -1
src/ops.rs
··· 1 1 use crate::db::{keys, Db}; 2 - use crate::types::{BroadcastEvent, IdentityEvt, MarshallableEvt, StoredEvent}; 2 + use crate::types::{AccountEvt, BroadcastEvent, IdentityEvt, MarshallableEvt, StoredEvent}; 3 3 use jacquard::api::com_atproto::sync::subscribe_repos::Commit; 4 4 use jacquard::cowstr::ToCowStr; 5 5 use jacquard_repo::car::reader::parse_car_bytes; ··· 19 19 event_type: "identity".into(), 20 20 record: None, 21 21 identity: Some(evt), 22 + account: None, 23 + }; 24 + let _ = db.event_tx.send(BroadcastEvent::Ephemeral(marshallable)); 25 + } 26 + 27 + pub fn emit_account_event(db: &Db, evt: AccountEvt) { 28 + let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 29 + let marshallable = MarshallableEvt { 30 + id: event_id, 31 + event_type: "account".into(), 32 + record: None, 33 + identity: None, 34 + account: Some(evt), 22 35 }; 23 36 let _ = db.event_tx.send(BroadcastEvent::Ephemeral(marshallable)); 24 37 }
+13 -3
src/types.rs
··· 77 77 pub record: Option<RecordEvt>, 78 78 #[serde(skip_serializing_if = "Option::is_none")] 79 79 pub identity: Option<IdentityEvt>, 80 + #[serde(skip_serializing_if = "Option::is_none")] 81 + pub account: Option<AccountEvt>, 80 82 } 81 83 82 84 #[derive(Clone, Debug)] ··· 102 104 #[derive(Debug, Serialize, Deserialize, Clone)] 103 105 pub struct IdentityEvt { 104 106 pub did: SmolStr, 105 - pub handle: SmolStr, 106 - pub is_active: bool, 107 - pub status: SmolStr, 107 + #[serde(skip_serializing_if = "Option::is_none")] 108 + pub handle: Option<SmolStr>, 109 + } 110 + 111 + #[derive(Debug, Serialize, Deserialize, Clone)] 112 + pub struct AccountEvt { 113 + pub did: SmolStr, 114 + pub active: bool, 115 + #[serde(skip_serializing_if = "Option::is_none")] 116 + pub status: Option<SmolStr>, 108 117 } 109 118 110 119 #[derive(Debug, Serialize, Deserialize, Clone)] ··· 119 128 cid: Option<SmolStr>, 120 129 }, 121 130 Identity(IdentityEvt), 131 + Account(AccountEvt), 122 132 }
+60 -53
tests/authenticated_stream_test.nu
··· 49 49 50 50 def activate-account [pds_url: string, jwt: string] { 51 51 print "activating account..." 52 - http post -t application/json -H ["Authorization" $"Bearer ($jwt)"] $"($pds_url)/xrpc/com.atproto.server.activateAccount" {} 52 + curl -X POST -H "Content-Type: application/json" -H $"Authorization: Bearer ($jwt)" $"($pds_url)/xrpc/com.atproto.server.activateAccount" 53 53 } 54 54 55 55 def resolve-pds [did: string] { ··· 72 72 73 73 let pds_url = resolve-pds $did 74 74 75 - let port = 3003 75 + let port = 3005 76 76 let url = $"http://localhost:($port)" 77 77 let ws_url = $"ws://localhost:($port)/stream" 78 78 let db_path = (mktemp -d -t hydrant_auth_test.XXXXXX) ··· 106 106 print "warning: failed to add repo (might already be tracked), continuing..." 107 107 } 108 108 109 - # wait for connection stability and potential backfill start 110 - sleep 2sec 111 - 112 109 # 5. perform actions 113 110 let collection = "app.bsky.feed.post" 114 111 let timestamp = (date now | format date "%Y-%m-%dT%H:%M:%SZ") ··· 146 143 147 144 print "--- action: deactivate ---" 148 145 deactivate-account $pds_url $jwt 149 - sleep 2sec 146 + 147 + sleep 1sec 148 + 149 + # we might need to re-auth if session was killed by deactivation 150 + print "re-authenticating..." 151 + let session = authenticate $pds_url $did $password 152 + let jwt = $session.accessJwt 153 + 154 + sleep 1sec 150 155 151 156 print "--- action: activate ---" 152 157 activate-account $pds_url $jwt 153 - sleep 2sec 154 - 158 + 155 159 # 6. verify 156 - sleep 2sec 160 + sleep 3sec 157 161 print "stopping listener..." 158 - try { kill $stream_pid } 162 + try { kill -9 $stream_pid } 159 163 160 164 if ($output_file | path exists) { 161 165 let content = (open $output_file | str trim) ··· 164 168 } else { 165 169 # parse json lines 166 170 let events = ($content | lines | each { |it| $it | from json }) 171 + let display_events = ($events | each { |e| 172 + let value = if $e.type == "record" { $e | get -o record } else if $e.type == "account" { $e | get -o account } else { $e | get -o identity } 173 + $e | select id type | insert value $value 174 + }) 167 175 print $"captured ($events | length) events" 168 - 169 - # hydrant stream events seem to be type: "record" or "identity" 170 - # structure: { id: ..., type: "record", record: { action: ..., collection: ..., rkey: ... } } 171 - # structure: { id: ..., type: "identity", identity: { did: ..., status: ..., is_active: ... } } 172 - 173 - let record_events = ($events | where type == "record" and record.collection == $collection and record.rkey == $rkey) 174 - let identity_events = ($events | where type == "identity" and identity.did == $did) 175 - 176 - let creates = ($record_events | where record.action == "create") 177 - let updates = ($record_events | where record.action == "update") 178 - let deletes = ($record_events | where record.action == "delete") 176 + $display_events | table -e | print 179 177 180 - let deactivations = ($identity_events | where identity.status == "deactivated") 181 - let reactivations = ($identity_events | where identity.status == "active" and identity.is_active == true) 182 - 183 - print $"found creates: ($creates | length)" 184 - print $"found updates: ($updates | length)" 185 - print $"found deletes: ($deletes | length)" 186 - print $"found deactivations: ($deactivations | length)" 187 - print $"found reactivations: ($reactivations | length)" 178 + # filter live events for the relevant entities 179 + let relevant_events = ($events | where { |it| 180 + if $it.type == "record" { 181 + if ($it.record | get -o live) == false { 182 + return false 183 + } 184 + } 185 + true 186 + }) 187 + 188 + let checks = [ 189 + { |e| $e.type == "account" and $e.account.active == true }, 190 + { |e| $e.type == "record" and $e.record.action == "create" }, 191 + { |e| $e.type == "record" and $e.record.action == "update" }, 192 + { |e| $e.type == "record" and $e.record.action == "delete" }, 193 + { |e| $e.type == "account" and $e.account.active == false }, 194 + { |e| $e.type == "account" and $e.account.active == true }, 195 + { |e| $e.type == "identity" and $e.identity.did == $did } 196 + ] 188 197 189 - if ($record_events | length) != 3 { 190 - print "test failed: expected exactly 3 record events" 191 - print "captured events:" 192 - print ($events | table -e) 193 - } else if ($deactivations | length) == 0 { 194 - print "test failed: expected at least one deactivation event" 195 - print "captured identity events:" 196 - print ($identity_events | table -e) 197 - } else if ($reactivations | length) == 0 { 198 - print "test failed: expected at least one reactivation (active) event" 199 - print "captured identity events:" 200 - print ($identity_events | table -e) 198 + if ($relevant_events | length) != ($checks | length) { 199 + print $"verification failed: expected ($checks | length) events, got ($relevant_events | length)" 200 + $test_passed = false 201 201 } else { 202 - let first = ($record_events | get 0) 203 - let second = ($record_events | get 1) 204 - let third = ($record_events | get 2) 205 202 206 - if ($first.record.action == "create") and ($second.record.action == "update") and ($third.record.action == "delete") { 207 - print "test passed: all record operations captured in correct order, and identity events captured" 208 - $test_passed = true 209 - } else { 210 - print "test failed: record events out of order or incorrect" 211 - print "captured events:" 212 - print ($events | table -e) 213 - } 203 + mut failed = false 204 + for i in 0..(($relevant_events | length) - 1) { 205 + let event = ($relevant_events | get $i) 206 + let check = ($checks | get $i) 207 + if not (do $check $event) { 208 + print $"verification failed at event #($i + 1)" 209 + print $"event: ($event)" 210 + $failed = true 211 + break 212 + } 213 + } 214 + 215 + if not $failed { 216 + print "test success!" 217 + $test_passed = true 218 + } else { 219 + $test_passed = false 220 + } 214 221 } 215 222 } 216 223 } else { ··· 223 230 224 231 # cleanup 225 232 print "cleaning up..." 226 - try { kill $instance.pid } 233 + try { kill -9 $instance.pid } 227 234 228 235 if $test_passed { 229 236 exit 0