tangled
alpha
login
or
join now
ptr.pet
/
hydrant
18
fork
atom
at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
18
fork
atom
overview
issues
5
pulls
pipelines
[ingest] handle all account statuses
ptr.pet
1 month ago
618eae1f
3dd76b29
verified
This commit was signed with the committer's
known signature
.
ptr.pet
SSH Key Fingerprint:
SHA256:Abmvag+juovVufZTxyWY8KcVgrznxvBjQpJesv071Aw=
+80
-1
2 changed files
expand all
collapse all
unified
split
src
buffer
processor.rs
ops.rs
+61
-1
src/buffer/processor.rs
···
103
103
};
104
104
ops::emit_identity_event(&self.state.db, evt);
105
105
}
106
106
-
SubscribeReposMessage::Account(ref account) => {
106
106
+
SubscribeReposMessage::Account(account) => {
107
107
debug!("processing buffered account for {did}");
108
108
let evt = AccountEvt {
109
109
did: did.to_smolstr(),
···
111
111
status: account.status.as_ref().map(|s| s.to_smolstr()),
112
112
};
113
113
ops::emit_account_event(&self.state.db, evt);
114
114
+
115
115
+
let state = self.state.clone();
116
116
+
let did = did.clone();
117
117
+
let account = account.clone(); // Account is 'static in BufferedMessage
118
118
+
119
119
+
tokio::task::spawn_blocking(move || -> Result<()> {
120
120
+
// handle status updates
121
121
+
if !account.active {
122
122
+
use jacquard::api::com_atproto::sync::subscribe_repos::AccountStatus;
123
123
+
if let Some(status) = &account.status {
124
124
+
match status {
125
125
+
AccountStatus::Deleted => {
126
126
+
info!("account {did} deleted, wiping data");
127
127
+
ops::delete_repo(&state.db, &did)?;
128
128
+
}
129
129
+
AccountStatus::Takendown => {
130
130
+
ops::update_repo_status(
131
131
+
&state.db,
132
132
+
&did,
133
133
+
crate::types::RepoStatus::Takendown,
134
134
+
)?;
135
135
+
}
136
136
+
AccountStatus::Suspended => {
137
137
+
ops::update_repo_status(
138
138
+
&state.db,
139
139
+
&did,
140
140
+
crate::types::RepoStatus::Suspended,
141
141
+
)?;
142
142
+
}
143
143
+
AccountStatus::Deactivated => {
144
144
+
ops::update_repo_status(
145
145
+
&state.db,
146
146
+
&did,
147
147
+
crate::types::RepoStatus::Deactivated,
148
148
+
)?;
149
149
+
}
150
150
+
AccountStatus::Throttled | AccountStatus::Desynchronized => {
151
151
+
let status_str = status.as_str().to_smolstr();
152
152
+
ops::update_repo_status(
153
153
+
&state.db,
154
154
+
&did,
155
155
+
crate::types::RepoStatus::Error(status_str),
156
156
+
)?;
157
157
+
}
158
158
+
AccountStatus::Other(s) => {
159
159
+
warn!("unknown account status for {did}: {s}");
160
160
+
}
161
161
+
}
162
162
+
} else {
163
163
+
warn!("account {did} inactive but no status provided");
164
164
+
}
165
165
+
} else {
166
166
+
// active account, clear any error/suspension states if they exist
167
167
+
// we set it to Synced because we are receiving live events for it
168
168
+
ops::update_repo_status(&state.db, &did, crate::types::RepoStatus::Synced)?;
169
169
+
}
170
170
+
Ok(())
171
171
+
})
172
172
+
.await
173
173
+
.into_diagnostic()??;
114
174
}
115
175
_ => {
116
176
warn!("unknown message type in buffer for {did}");
+19
src/ops.rs
···
87
87
Ok(())
88
88
}
89
89
90
90
+
pub fn update_repo_status(
91
91
+
db: &Db,
92
92
+
did: &jacquard::types::did::Did,
93
93
+
status: crate::types::RepoStatus,
94
94
+
) -> Result<()> {
95
95
+
debug!("updating repo status for {did} to {status:?}");
96
96
+
let (updated, mut batch) =
97
97
+
Db::update_repo_state(db.inner.batch(), &db.repos, did, |state, _val| {
98
98
+
state.status = status.clone();
99
99
+
state.last_updated_at = chrono::Utc::now().timestamp();
100
100
+
Ok((true, ()))
101
101
+
})?;
102
102
+
103
103
+
if updated.is_some() {
104
104
+
batch.commit().into_diagnostic()?;
105
105
+
}
106
106
+
Ok(())
107
107
+
}
108
108
+
90
109
pub fn apply_commit(db: &Db, commit: &Commit<'_>, live: bool) -> Result<()> {
91
110
let did = &commit.repo;
92
111
debug!("applying commit {} for {did}", &commit.commit);