APIs for links and references in the ATmosphere
1use hickory_resolver::{ResolveError, TokioResolver};
2use std::collections::{HashSet, VecDeque};
3use std::path::Path;
4use std::sync::Arc;
5/// for now we're gonna just keep doing more cache
6///
7/// plc.director x foyer, ttl kept with data, refresh deferred to background on fetch
8///
9/// things we need:
10///
11/// 1. handle -> DID resolution: getRecord must accept a handle for `repo` param
12/// 2. DID -> PDS resolution: so we know where to getRecord
13/// 3. DID -> handle resolution: for bidirectional handle validation and in case we want to offer this
14use std::time::Duration;
15use tokio::sync::Mutex;
16use tokio_util::sync::CancellationToken;
17
18use crate::error::IdentityError;
19use atrium_api::{
20 did_doc::DidDocument,
21 types::string::{Did, Handle},
22};
23use atrium_common::resolver::Resolver;
24use atrium_identity::{
25 did::{CommonDidResolver, CommonDidResolverConfig, DEFAULT_PLC_DIRECTORY_URL},
26 handle::{AtprotoHandleResolver, AtprotoHandleResolverConfig, DnsTxtResolver},
27};
28use atrium_oauth::DefaultHttpClient; // it's probably not worth bringing all of atrium_oauth for this but
29use foyer::{DirectFsDeviceOptions, Engine, HybridCache, HybridCacheBuilder};
30use serde::{Deserialize, Serialize};
31use time::UtcDateTime;
32
33/// once we have something resolved, don't re-resolve until after this period
34const MIN_TTL: Duration = Duration::from_secs(4 * 3600); // probably shoudl have a max ttl
35const MIN_NOT_FOUND_TTL: Duration = Duration::from_secs(60);
36
37#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
38enum IdentityKey {
39 Handle(Handle),
40 Did(Did),
41}
42
43#[derive(Debug, Serialize, Deserialize)]
44struct IdentityVal(UtcDateTime, IdentityData);
45
46#[derive(Debug, Serialize, Deserialize)]
47enum IdentityData {
48 NotFound,
49 Did(Did),
50 Doc(PartialMiniDoc),
51}
52
53/// partial representation of a com.bad-example.identity mini atproto doc
54///
55/// partial because the handle is not verified
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct PartialMiniDoc {
58 /// an atproto handle (**unverified**)
59 ///
60 /// the first valid atproto handle from the did doc's aka
61 pub unverified_handle: Handle,
62 /// the did's atproto pds url (TODO: type this?)
63 ///
64 /// note: atrium *does* actually parse it into a URI, it just doesn't return
65 /// that for some reason
66 pub pds: String,
67 /// for now we're just pulling this straight from the did doc
68 ///
69 /// would be nice to type and validate it
70 ///
71 /// this is the publicKeyMultibase from the did doc.
72 /// legacy key encoding not supported.
73 /// `id`, `type`, and `controller` must be checked, but aren't stored.
74 pub signing_key: String,
75}
76
77impl TryFrom<DidDocument> for PartialMiniDoc {
78 type Error = String;
79 fn try_from(did_doc: DidDocument) -> Result<Self, Self::Error> {
80 // must use the first valid handle
81 let mut unverified_handle = None;
82 let Some(ref doc_akas) = did_doc.also_known_as else {
83 return Err("did doc missing `also_known_as`".to_string());
84 };
85 for aka in doc_akas {
86 let Some(maybe_handle) = aka.strip_prefix("at://") else {
87 continue;
88 };
89 let Ok(valid_handle) = Handle::new(maybe_handle.to_string()) else {
90 continue;
91 };
92 unverified_handle = Some(valid_handle);
93 break;
94 }
95 let Some(unverified_handle) = unverified_handle else {
96 return Err("no valid atproto handles in `also_known_as`".to_string());
97 };
98
99 // atrium seems to get service endpoint getters
100 let Some(pds) = did_doc.get_pds_endpoint() else {
101 return Err("no valid pds service found".to_string());
102 };
103
104 // TODO can't use atrium's get_signing_key() becuase it fails to check type and controller
105 // so if we check those and reject it, we might miss a later valid key in the array
106 // (todo is to fix atrium)
107 // actually: atrium might be flexible for legacy reps. for now we're rejecting legacy rep.
108
109 // must use the first valid signing key
110 let mut signing_key = None;
111 let Some(verification_methods) = did_doc.verification_method else {
112 return Err("no verification methods found".to_string());
113 };
114 for method in verification_methods {
115 if method.id != format!("{}#atproto", did_doc.id) {
116 continue;
117 }
118 if method.r#type != "Multikey" {
119 continue;
120 }
121 if method.controller != did_doc.id {
122 continue;
123 }
124 let Some(key) = method.public_key_multibase else {
125 continue;
126 };
127 signing_key = Some(key);
128 break;
129 }
130 let Some(signing_key) = signing_key else {
131 return Err("no valid atproto signing key found in verification methods".to_string());
132 };
133
134 Ok(PartialMiniDoc {
135 unverified_handle,
136 pds,
137 signing_key,
138 })
139 }
140}
141
142/// multi-producer *single-consumer* queue structures (wrap in arc-mutex plz)
143///
144/// the hashset allows testing for presense of items in the queue.
145/// this has absolutely no support for multiple queue consumers.
146#[derive(Debug, Default)]
147struct RefreshQueue {
148 queue: VecDeque<IdentityKey>,
149 items: HashSet<IdentityKey>,
150}
151
152#[derive(Clone)]
153pub struct Identity {
154 handle_resolver: Arc<AtprotoHandleResolver<HickoryDnsTxtResolver, DefaultHttpClient>>,
155 did_resolver: Arc<CommonDidResolver<DefaultHttpClient>>,
156 cache: HybridCache<IdentityKey, IdentityVal>,
157 /// multi-producer *single consumer* queue
158 refresh_queue: Arc<Mutex<RefreshQueue>>,
159 /// just a lock to ensure only one refresher (queue consumer) is running (to be improved with a better refresher)
160 refresher: Arc<Mutex<()>>,
161}
162
163impl Identity {
164 pub async fn new(cache_dir: impl AsRef<Path>) -> Result<Self, IdentityError> {
165 let http_client = Arc::new(DefaultHttpClient::default());
166 let handle_resolver = AtprotoHandleResolver::new(AtprotoHandleResolverConfig {
167 dns_txt_resolver: HickoryDnsTxtResolver::new().unwrap(),
168 http_client: http_client.clone(),
169 });
170 let did_resolver = CommonDidResolver::new(CommonDidResolverConfig {
171 plc_directory_url: DEFAULT_PLC_DIRECTORY_URL.to_string(),
172 http_client: http_client.clone(),
173 });
174
175 let cache = HybridCacheBuilder::new()
176 .with_name("identity")
177 .memory(16 * 2_usize.pow(20))
178 .with_weighter(|k, v| std::mem::size_of_val(k) + std::mem::size_of_val(v))
179 .storage(Engine::small())
180 .with_device_options(
181 DirectFsDeviceOptions::new(cache_dir)
182 .with_capacity(2_usize.pow(30)) // TODO: configurable (1GB to have something)
183 .with_file_size(2_usize.pow(20)), // note: this does limit the max cached item size, warning jumbo records
184 )
185 .build()
186 .await?;
187
188 Ok(Self {
189 handle_resolver: Arc::new(handle_resolver),
190 did_resolver: Arc::new(did_resolver),
191 cache,
192 refresh_queue: Default::default(),
193 refresher: Default::default(),
194 })
195 }
196
197 /// Resolve (and verify!) an atproto handle to a DID
198 ///
199 /// The result can be stale
200 ///
201 /// `None` if the handle can't be found or verification fails
202 pub async fn handle_to_did(&self, handle: Handle) -> Result<Option<Did>, IdentityError> {
203 let Some(did) = self.handle_to_unverified_did(&handle).await? else {
204 return Ok(None);
205 };
206 let Some(doc) = self.did_to_partial_mini_doc(&did).await? else {
207 return Ok(None);
208 };
209 if doc.unverified_handle != handle {
210 return Ok(None);
211 }
212 Ok(Some(did))
213 }
214
215 /// Resolve a DID to a pds url
216 ///
217 /// This *also* incidentally resolves and verifies the handle, which might
218 /// make it slower than expected
219 pub async fn did_to_pds(&self, did: Did) -> Result<Option<String>, IdentityError> {
220 let Some(mini_doc) = self.did_to_partial_mini_doc(&did).await? else {
221 return Ok(None);
222 };
223 Ok(Some(mini_doc.pds))
224 }
225
226 /// Resolve (and cache but **not verify**) a handle to a DID
227 async fn handle_to_unverified_did(
228 &self,
229 handle: &Handle,
230 ) -> Result<Option<Did>, IdentityError> {
231 let key = IdentityKey::Handle(handle.clone());
232 let entry = self
233 .cache
234 .fetch(key.clone(), {
235 let handle = handle.clone();
236 let resolver = self.handle_resolver.clone();
237 || async move {
238 match resolver.resolve(&handle).await {
239 Ok(did) => Ok(IdentityVal(UtcDateTime::now(), IdentityData::Did(did))),
240 Err(atrium_identity::Error::NotFound) => {
241 Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound))
242 }
243 Err(other) => Err(foyer::Error::Other(Box::new(
244 IdentityError::ResolutionFailed(other),
245 ))),
246 }
247 }
248 })
249 .await?;
250
251 let now = UtcDateTime::now();
252 let IdentityVal(last_fetch, data) = entry.value();
253 match data {
254 IdentityData::Doc(_) => {
255 log::error!("identity value mixup: got a doc from a handle key (should be a did)");
256 Err(IdentityError::IdentityValTypeMixup(handle.to_string()))
257 }
258 IdentityData::NotFound => {
259 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
260 self.queue_refresh(key).await;
261 }
262 Ok(None)
263 }
264 IdentityData::Did(did) => {
265 if (now - *last_fetch) >= MIN_TTL {
266 self.queue_refresh(key).await;
267 }
268 Ok(Some(did.clone()))
269 }
270 }
271 }
272
273 /// Fetch (and cache) a partial mini doc from a did
274 pub async fn did_to_partial_mini_doc(
275 &self,
276 did: &Did,
277 ) -> Result<Option<PartialMiniDoc>, IdentityError> {
278 let key = IdentityKey::Did(did.clone());
279 let entry = self
280 .cache
281 .fetch(key.clone(), {
282 let did = did.clone();
283 let resolver = self.did_resolver.clone();
284 || async move {
285 match resolver.resolve(&did).await {
286 Ok(did_doc) => {
287 // TODO: fix in atrium: should verify id is did
288 if did_doc.id != did.to_string() {
289 return Err(foyer::Error::other(Box::new(
290 IdentityError::BadDidDoc(
291 "did doc's id did not match did".to_string(),
292 ),
293 )));
294 }
295 let mini_doc = did_doc.try_into().map_err(|e| {
296 foyer::Error::Other(Box::new(IdentityError::BadDidDoc(e)))
297 })?;
298 Ok(IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc)))
299 }
300 Err(atrium_identity::Error::NotFound) => {
301 Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound))
302 }
303 Err(other) => Err(foyer::Error::Other(Box::new(
304 IdentityError::ResolutionFailed(other),
305 ))),
306 }
307 }
308 })
309 .await?;
310
311 let now = UtcDateTime::now();
312 let IdentityVal(last_fetch, data) = entry.value();
313 match data {
314 IdentityData::Did(_) => {
315 log::error!("identity value mixup: got a did from a did key (should be a doc)");
316 Err(IdentityError::IdentityValTypeMixup(did.to_string()))
317 }
318 IdentityData::NotFound => {
319 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
320 self.queue_refresh(key).await;
321 }
322 Ok(None)
323 }
324 IdentityData::Doc(mini_did) => {
325 if (now - *last_fetch) >= MIN_TTL {
326 self.queue_refresh(key).await;
327 }
328 Ok(Some(mini_did.clone()))
329 }
330 }
331 }
332
333 /// put a refresh task on the queue
334 ///
335 /// this can be safely called from multiple concurrent tasks
336 async fn queue_refresh(&self, key: IdentityKey) {
337 // todo: max queue size
338 let mut q = self.refresh_queue.lock().await;
339 if !q.items.contains(&key) {
340 q.items.insert(key.clone());
341 q.queue.push_back(key);
342 }
343 }
344
345 /// find out what's next in the queue. concurrent consumers are not allowed.
346 ///
347 /// intent is to leave the item in the queue while refreshing, so that a
348 /// producer will not re-add it if it's in progress. there's definitely
349 /// better ways to do this, but this is ~simple for as far as a single
350 /// consumer can take us.
351 ///
352 /// we could take it from the queue but leave it in the set and remove from
353 /// set later, but splitting them apart feels more bug-prone.
354 async fn peek_refresh(&self) -> Option<IdentityKey> {
355 let q = self.refresh_queue.lock().await;
356 q.queue.front().cloned()
357 }
358
359 /// call to clear the latest key from the refresh queue. concurrent consumers not allowed.
360 ///
361 /// must provide the last peeked refresh queue item as a small safety check
362 async fn complete_refresh(&self, key: &IdentityKey) -> Result<(), IdentityError> {
363 let mut q = self.refresh_queue.lock().await;
364
365 let Some(queue_key) = q.queue.pop_front() else {
366 // gone from queue + since we're in an error condition, make sure it's not stuck in items
367 // (not toctou because we have the lock)
368 // bolder here than below and removing from items because if the queue is *empty*, then we
369 // know it hasn't been re-added since losing sync.
370 if q.items.remove(key) {
371 log::error!("identity refresh: queue de-sync: not in ");
372 } else {
373 log::warn!(
374 "identity refresh: tried to complete with wrong key. are multiple queue consumers running?"
375 );
376 }
377 return Err(IdentityError::RefreshQueueKeyError("no key in queue"));
378 };
379
380 if queue_key != *key {
381 // extra weird case here, what's the most defensive behaviour?
382 // we have two keys: ours should have been first but isn't. this shouldn't happen, so let's
383 // just leave items alone for it. risks unbounded growth but we're in a bad place already.
384 // the other key is the one we just popped. we didn't want it, so maybe we should put it
385 // back, BUT if we somehow ended up with concurrent consumers, we have bigger problems. take
386 // responsibility for taking it instead: remove it from items as well, and just drop it.
387 //
388 // hope that whoever calls us takes this error seriously.
389 if q.items.remove(&queue_key) {
390 log::warn!(
391 "identity refresh: queue de-sync + dropping a bystander key without refreshing it!"
392 );
393 } else {
394 // you thought things couldn't get weirder? (i mean hopefully they can't)
395 log::error!("identity refresh: queue de-sync + bystander key also de-sync!?");
396 }
397 return Err(IdentityError::RefreshQueueKeyError(
398 "wrong key at front of queue",
399 ));
400 }
401
402 if q.items.remove(key) {
403 Ok(())
404 } else {
405 log::error!("identity refresh: queue de-sync: key not in items");
406 Err(IdentityError::RefreshQueueKeyError("key not in items"))
407 }
408 }
409
410 /// run the refresh queue consumer
411 pub async fn run_refresher(&self, shutdown: CancellationToken) -> Result<(), IdentityError> {
412 let _guard = self
413 .refresher
414 .try_lock()
415 .expect("there to only be one refresher running");
416 loop {
417 if shutdown.is_cancelled() {
418 log::info!("identity refresher: exiting for shutdown: closing cache...");
419 if let Err(e) = self.cache.close().await {
420 log::error!("cache close errored: {e}");
421 } else {
422 log::info!("identity cache closed.")
423 }
424 return Ok(());
425 }
426 let Some(task_key) = self.peek_refresh().await else {
427 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
428 continue;
429 };
430 match task_key {
431 IdentityKey::Handle(ref handle) => {
432 log::trace!("refreshing handle {handle:?}");
433 match self.handle_resolver.resolve(handle).await {
434 Ok(did) => {
435 self.cache.insert(
436 task_key.clone(),
437 IdentityVal(UtcDateTime::now(), IdentityData::Did(did)),
438 );
439 }
440 Err(atrium_identity::Error::NotFound) => {
441 self.cache.insert(
442 task_key.clone(),
443 IdentityVal(UtcDateTime::now(), IdentityData::NotFound),
444 );
445 }
446 Err(err) => {
447 log::warn!(
448 "failed to refresh handle: {err:?}. leaving stale (should we eventually do something?)"
449 );
450 }
451 }
452 self.complete_refresh(&task_key).await?; // failures are bugs, so break loop
453 }
454 IdentityKey::Did(ref did) => {
455 log::trace!("refreshing did doc: {did:?}");
456
457 match self.did_resolver.resolve(did).await {
458 Ok(did_doc) => {
459 // TODO: fix in atrium: should verify id is did
460 if did_doc.id != did.to_string() {
461 log::warn!(
462 "refreshed did doc failed: wrong did doc id. dropping refresh."
463 );
464 continue;
465 }
466 let mini_doc = match did_doc.try_into() {
467 Ok(md) => md,
468 Err(e) => {
469 log::warn!(
470 "converting mini doc failed: {e:?}. dropping refresh."
471 );
472 continue;
473 }
474 };
475 self.cache.insert(
476 task_key.clone(),
477 IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc)),
478 );
479 }
480 Err(atrium_identity::Error::NotFound) => {
481 self.cache.insert(
482 task_key.clone(),
483 IdentityVal(UtcDateTime::now(), IdentityData::NotFound),
484 );
485 }
486 Err(err) => {
487 log::warn!(
488 "failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)"
489 );
490 }
491 }
492
493 self.complete_refresh(&task_key).await?; // failures are bugs, so break loop
494 }
495 }
496 }
497 }
498}
499
500pub struct HickoryDnsTxtResolver(TokioResolver);
501
502impl HickoryDnsTxtResolver {
503 fn new() -> Result<Self, ResolveError> {
504 Ok(Self(TokioResolver::builder_tokio()?.build()))
505 }
506}
507
508impl DnsTxtResolver for HickoryDnsTxtResolver {
509 async fn resolve(
510 &self,
511 query: &str,
512 ) -> core::result::Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>> {
513 match self.0.txt_lookup(query).await {
514 Ok(r) => {
515 metrics::counter!("whoami_resolve_dns_txt", "success" => "true").increment(1);
516 Ok(r.iter().map(|r| r.to_string()).collect())
517 }
518 Err(e) => {
519 metrics::counter!("whoami_resolve_dns_txt", "success" => "false").increment(1);
520 Err(e.into())
521 }
522 }
523 }
524}