···9};
10use crate::utils::at_uri_is_by;
11use deadpool_postgres::{Object, Pool, Transaction};
12-use did_resolver::Resolver;
13use foldhash::quality::RandomState;
14use futures::StreamExt;
15use ipld_core::cid::Cid;
0016use metrics::counter;
17use parakeet_db::types::{ActorStatus, ActorSyncState};
18use parakeet_index::AggregateType;
···20use redis::AsyncCommands;
21use std::collections::HashMap;
22use std::hash::BuildHasher;
23-use std::sync::Arc;
24use tokio::sync::mpsc::{channel, Sender};
25use tokio::sync::watch::Receiver as WatchReceiver;
26use tracing::instrument;
···37#[derive(Clone)]
38struct RelayIndexerState {
39 idxc_tx: Sender<parakeet_index::AggregateDeltaReq>,
40- resolver: Arc<Resolver>,
41 do_backfill: bool,
42 do_handle_res: bool,
43 req_backfill: bool,
···57 pool: Pool,
58 redis: MultiplexedConnection,
59 idxc_tx: Sender<parakeet_index::AggregateDeltaReq>,
60- resolver: Arc<Resolver>,
61 firehose: FirehoseConsumer,
62 resume: sled::Db,
63 opts: RelayIndexerOpts,
···253 did: &str,
254 expected_handle: Option<String>,
255) -> eyre::Result<Option<String>> {
256- // Resolve the did doc
257- let Some(did_doc) = state.resolver.resolve_did(did).await? else {
258- eyre::bail!("missing did doc");
259 };
260261- // if there's no handles in aka or the expected is none, set to none in DB.
262- if did_doc.also_known_as.as_ref().is_none_or(|v| v.is_empty()) || expected_handle.is_none() {
263- return Ok(None);
264- }
265-266- let expected = expected_handle.unwrap();
267-268- // check if the handle from the event is in the did doc
269- let expected_in_doc = did_doc.also_known_as.is_some_and(|v| {
270- v.iter()
271- .filter_map(|v| v.strip_prefix("at://"))
272- .any(|v| v == expected)
273- });
274-275- // if it isn't, set to invalid.
276- if !expected_in_doc {
277- tracing::warn!("Handle not in DID doc");
278- return Ok(None);
279- }
280-281- // in theory, we can use com.atproto.identity.resolveHandle against a PDS, but that seems
282- // like a way to end up with really sus handles.
283- let Some(handle_did) = state.resolver.resolve_handle(&expected).await? else {
284- return Ok(None);
285- };
286287 // finally, check if the event did matches the handle, if not, set invalid, otherwise set the handle.
288- if handle_did != did {
289- Ok(None)
290 } else {
291- Ok(Some(expected))
292 }
293}
294
···9};
10use crate::utils::at_uri_is_by;
11use deadpool_postgres::{Object, Pool, Transaction};
012use foldhash::quality::RandomState;
13use futures::StreamExt;
14use ipld_core::cid::Cid;
15+use jacquard_common::types::string::Handle;
16+use jacquard_identity::JacquardResolver;
17use metrics::counter;
18use parakeet_db::types::{ActorStatus, ActorSyncState};
19use parakeet_index::AggregateType;
···21use redis::AsyncCommands;
22use std::collections::HashMap;
23use std::hash::BuildHasher;
024use tokio::sync::mpsc::{channel, Sender};
25use tokio::sync::watch::Receiver as WatchReceiver;
26use tracing::instrument;
···37#[derive(Clone)]
38struct RelayIndexerState {
39 idxc_tx: Sender<parakeet_index::AggregateDeltaReq>,
40+ resolver: JacquardResolver,
41 do_backfill: bool,
42 do_handle_res: bool,
43 req_backfill: bool,
···57 pool: Pool,
58 redis: MultiplexedConnection,
59 idxc_tx: Sender<parakeet_index::AggregateDeltaReq>,
60+ resolver: JacquardResolver,
61 firehose: FirehoseConsumer,
62 resume: sled::Db,
63 opts: RelayIndexerOpts,
···253 did: &str,
254 expected_handle: Option<String>,
255) -> eyre::Result<Option<String>> {
256+ let Some(handle) = expected_handle else {
257+ return Ok(None);
0258 };
259260+ // this verifies that the handle and the did it resolves to match.
261+ let (handle_did, _did_doc, _warnings) = state
262+ .resolver
263+ .resolve_handle_and_doc(&Handle::raw(&handle))
264+ .await?;
00000000000000000000265266 // finally, check if the event did matches the handle, if not, set invalid, otherwise set the handle.
267+ if did == &*handle_did {
268+ return Ok(Some(handle));
269 } else {
270+ return Ok(None);
271 }
272}
273