APIs for links and references in the ATmosphere
at main 524 lines 21 kB view raw
1use hickory_resolver::{ResolveError, TokioResolver}; 2use std::collections::{HashSet, VecDeque}; 3use std::path::Path; 4use std::sync::Arc; 5/// for now we're gonna just keep doing more cache 6/// 7/// plc.director x foyer, ttl kept with data, refresh deferred to background on fetch 8/// 9/// things we need: 10/// 11/// 1. handle -> DID resolution: getRecord must accept a handle for `repo` param 12/// 2. DID -> PDS resolution: so we know where to getRecord 13/// 3. DID -> handle resolution: for bidirectional handle validation and in case we want to offer this 14use std::time::Duration; 15use tokio::sync::Mutex; 16use tokio_util::sync::CancellationToken; 17 18use crate::error::IdentityError; 19use atrium_api::{ 20 did_doc::DidDocument, 21 types::string::{Did, Handle}, 22}; 23use atrium_common::resolver::Resolver; 24use atrium_identity::{ 25 did::{CommonDidResolver, CommonDidResolverConfig, DEFAULT_PLC_DIRECTORY_URL}, 26 handle::{AtprotoHandleResolver, AtprotoHandleResolverConfig, DnsTxtResolver}, 27}; 28use atrium_oauth::DefaultHttpClient; // it's probably not worth bringing all of atrium_oauth for this but 29use foyer::{DirectFsDeviceOptions, Engine, HybridCache, HybridCacheBuilder}; 30use serde::{Deserialize, Serialize}; 31use time::UtcDateTime; 32 33/// once we have something resolved, don't re-resolve until after this period 34const MIN_TTL: Duration = Duration::from_secs(4 * 3600); // probably shoudl have a max ttl 35const MIN_NOT_FOUND_TTL: Duration = Duration::from_secs(60); 36 37#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] 38enum IdentityKey { 39 Handle(Handle), 40 Did(Did), 41} 42 43#[derive(Debug, Serialize, Deserialize)] 44struct IdentityVal(UtcDateTime, IdentityData); 45 46#[derive(Debug, Serialize, Deserialize)] 47enum IdentityData { 48 NotFound, 49 Did(Did), 50 Doc(PartialMiniDoc), 51} 52 53/// partial representation of a com.bad-example.identity mini atproto doc 54/// 55/// partial because the handle is not verified 56#[derive(Debug, Clone, Serialize, Deserialize)] 57pub struct PartialMiniDoc { 58 /// an atproto handle (**unverified**) 59 /// 60 /// the first valid atproto handle from the did doc's aka 61 pub unverified_handle: Handle, 62 /// the did's atproto pds url (TODO: type this?) 63 /// 64 /// note: atrium *does* actually parse it into a URI, it just doesn't return 65 /// that for some reason 66 pub pds: String, 67 /// for now we're just pulling this straight from the did doc 68 /// 69 /// would be nice to type and validate it 70 /// 71 /// this is the publicKeyMultibase from the did doc. 72 /// legacy key encoding not supported. 73 /// `id`, `type`, and `controller` must be checked, but aren't stored. 74 pub signing_key: String, 75} 76 77impl TryFrom<DidDocument> for PartialMiniDoc { 78 type Error = String; 79 fn try_from(did_doc: DidDocument) -> Result<Self, Self::Error> { 80 // must use the first valid handle 81 let mut unverified_handle = None; 82 let Some(ref doc_akas) = did_doc.also_known_as else { 83 return Err("did doc missing `also_known_as`".to_string()); 84 }; 85 for aka in doc_akas { 86 let Some(maybe_handle) = aka.strip_prefix("at://") else { 87 continue; 88 }; 89 let Ok(valid_handle) = Handle::new(maybe_handle.to_string()) else { 90 continue; 91 }; 92 unverified_handle = Some(valid_handle); 93 break; 94 } 95 let Some(unverified_handle) = unverified_handle else { 96 return Err("no valid atproto handles in `also_known_as`".to_string()); 97 }; 98 99 // atrium seems to get service endpoint getters 100 let Some(pds) = did_doc.get_pds_endpoint() else { 101 return Err("no valid pds service found".to_string()); 102 }; 103 104 // TODO can't use atrium's get_signing_key() becuase it fails to check type and controller 105 // so if we check those and reject it, we might miss a later valid key in the array 106 // (todo is to fix atrium) 107 // actually: atrium might be flexible for legacy reps. for now we're rejecting legacy rep. 108 109 // must use the first valid signing key 110 let mut signing_key = None; 111 let Some(verification_methods) = did_doc.verification_method else { 112 return Err("no verification methods found".to_string()); 113 }; 114 for method in verification_methods { 115 if method.id != format!("{}#atproto", did_doc.id) { 116 continue; 117 } 118 if method.r#type != "Multikey" { 119 continue; 120 } 121 if method.controller != did_doc.id { 122 continue; 123 } 124 let Some(key) = method.public_key_multibase else { 125 continue; 126 }; 127 signing_key = Some(key); 128 break; 129 } 130 let Some(signing_key) = signing_key else { 131 return Err("no valid atproto signing key found in verification methods".to_string()); 132 }; 133 134 Ok(PartialMiniDoc { 135 unverified_handle, 136 pds, 137 signing_key, 138 }) 139 } 140} 141 142/// multi-producer *single-consumer* queue structures (wrap in arc-mutex plz) 143/// 144/// the hashset allows testing for presense of items in the queue. 145/// this has absolutely no support for multiple queue consumers. 146#[derive(Debug, Default)] 147struct RefreshQueue { 148 queue: VecDeque<IdentityKey>, 149 items: HashSet<IdentityKey>, 150} 151 152#[derive(Clone)] 153pub struct Identity { 154 handle_resolver: Arc<AtprotoHandleResolver<HickoryDnsTxtResolver, DefaultHttpClient>>, 155 did_resolver: Arc<CommonDidResolver<DefaultHttpClient>>, 156 cache: HybridCache<IdentityKey, IdentityVal>, 157 /// multi-producer *single consumer* queue 158 refresh_queue: Arc<Mutex<RefreshQueue>>, 159 /// just a lock to ensure only one refresher (queue consumer) is running (to be improved with a better refresher) 160 refresher: Arc<Mutex<()>>, 161} 162 163impl Identity { 164 pub async fn new(cache_dir: impl AsRef<Path>) -> Result<Self, IdentityError> { 165 let http_client = Arc::new(DefaultHttpClient::default()); 166 let handle_resolver = AtprotoHandleResolver::new(AtprotoHandleResolverConfig { 167 dns_txt_resolver: HickoryDnsTxtResolver::new().unwrap(), 168 http_client: http_client.clone(), 169 }); 170 let did_resolver = CommonDidResolver::new(CommonDidResolverConfig { 171 plc_directory_url: DEFAULT_PLC_DIRECTORY_URL.to_string(), 172 http_client: http_client.clone(), 173 }); 174 175 let cache = HybridCacheBuilder::new() 176 .with_name("identity") 177 .memory(16 * 2_usize.pow(20)) 178 .with_weighter(|k, v| std::mem::size_of_val(k) + std::mem::size_of_val(v)) 179 .storage(Engine::small()) 180 .with_device_options( 181 DirectFsDeviceOptions::new(cache_dir) 182 .with_capacity(2_usize.pow(30)) // TODO: configurable (1GB to have something) 183 .with_file_size(2_usize.pow(20)), // note: this does limit the max cached item size, warning jumbo records 184 ) 185 .build() 186 .await?; 187 188 Ok(Self { 189 handle_resolver: Arc::new(handle_resolver), 190 did_resolver: Arc::new(did_resolver), 191 cache, 192 refresh_queue: Default::default(), 193 refresher: Default::default(), 194 }) 195 } 196 197 /// Resolve (and verify!) an atproto handle to a DID 198 /// 199 /// The result can be stale 200 /// 201 /// `None` if the handle can't be found or verification fails 202 pub async fn handle_to_did(&self, handle: Handle) -> Result<Option<Did>, IdentityError> { 203 let Some(did) = self.handle_to_unverified_did(&handle).await? else { 204 return Ok(None); 205 }; 206 let Some(doc) = self.did_to_partial_mini_doc(&did).await? else { 207 return Ok(None); 208 }; 209 if doc.unverified_handle != handle { 210 return Ok(None); 211 } 212 Ok(Some(did)) 213 } 214 215 /// Resolve a DID to a pds url 216 /// 217 /// This *also* incidentally resolves and verifies the handle, which might 218 /// make it slower than expected 219 pub async fn did_to_pds(&self, did: Did) -> Result<Option<String>, IdentityError> { 220 let Some(mini_doc) = self.did_to_partial_mini_doc(&did).await? else { 221 return Ok(None); 222 }; 223 Ok(Some(mini_doc.pds)) 224 } 225 226 /// Resolve (and cache but **not verify**) a handle to a DID 227 async fn handle_to_unverified_did( 228 &self, 229 handle: &Handle, 230 ) -> Result<Option<Did>, IdentityError> { 231 let key = IdentityKey::Handle(handle.clone()); 232 let entry = self 233 .cache 234 .fetch(key.clone(), { 235 let handle = handle.clone(); 236 let resolver = self.handle_resolver.clone(); 237 || async move { 238 match resolver.resolve(&handle).await { 239 Ok(did) => Ok(IdentityVal(UtcDateTime::now(), IdentityData::Did(did))), 240 Err(atrium_identity::Error::NotFound) => { 241 Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound)) 242 } 243 Err(other) => Err(foyer::Error::Other(Box::new( 244 IdentityError::ResolutionFailed(other), 245 ))), 246 } 247 } 248 }) 249 .await?; 250 251 let now = UtcDateTime::now(); 252 let IdentityVal(last_fetch, data) = entry.value(); 253 match data { 254 IdentityData::Doc(_) => { 255 log::error!("identity value mixup: got a doc from a handle key (should be a did)"); 256 Err(IdentityError::IdentityValTypeMixup(handle.to_string())) 257 } 258 IdentityData::NotFound => { 259 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 260 self.queue_refresh(key).await; 261 } 262 Ok(None) 263 } 264 IdentityData::Did(did) => { 265 if (now - *last_fetch) >= MIN_TTL { 266 self.queue_refresh(key).await; 267 } 268 Ok(Some(did.clone())) 269 } 270 } 271 } 272 273 /// Fetch (and cache) a partial mini doc from a did 274 pub async fn did_to_partial_mini_doc( 275 &self, 276 did: &Did, 277 ) -> Result<Option<PartialMiniDoc>, IdentityError> { 278 let key = IdentityKey::Did(did.clone()); 279 let entry = self 280 .cache 281 .fetch(key.clone(), { 282 let did = did.clone(); 283 let resolver = self.did_resolver.clone(); 284 || async move { 285 match resolver.resolve(&did).await { 286 Ok(did_doc) => { 287 // TODO: fix in atrium: should verify id is did 288 if did_doc.id != did.to_string() { 289 return Err(foyer::Error::other(Box::new( 290 IdentityError::BadDidDoc( 291 "did doc's id did not match did".to_string(), 292 ), 293 ))); 294 } 295 let mini_doc = did_doc.try_into().map_err(|e| { 296 foyer::Error::Other(Box::new(IdentityError::BadDidDoc(e))) 297 })?; 298 Ok(IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc))) 299 } 300 Err(atrium_identity::Error::NotFound) => { 301 Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound)) 302 } 303 Err(other) => Err(foyer::Error::Other(Box::new( 304 IdentityError::ResolutionFailed(other), 305 ))), 306 } 307 } 308 }) 309 .await?; 310 311 let now = UtcDateTime::now(); 312 let IdentityVal(last_fetch, data) = entry.value(); 313 match data { 314 IdentityData::Did(_) => { 315 log::error!("identity value mixup: got a did from a did key (should be a doc)"); 316 Err(IdentityError::IdentityValTypeMixup(did.to_string())) 317 } 318 IdentityData::NotFound => { 319 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 320 self.queue_refresh(key).await; 321 } 322 Ok(None) 323 } 324 IdentityData::Doc(mini_did) => { 325 if (now - *last_fetch) >= MIN_TTL { 326 self.queue_refresh(key).await; 327 } 328 Ok(Some(mini_did.clone())) 329 } 330 } 331 } 332 333 /// put a refresh task on the queue 334 /// 335 /// this can be safely called from multiple concurrent tasks 336 async fn queue_refresh(&self, key: IdentityKey) { 337 // todo: max queue size 338 let mut q = self.refresh_queue.lock().await; 339 if !q.items.contains(&key) { 340 q.items.insert(key.clone()); 341 q.queue.push_back(key); 342 } 343 } 344 345 /// find out what's next in the queue. concurrent consumers are not allowed. 346 /// 347 /// intent is to leave the item in the queue while refreshing, so that a 348 /// producer will not re-add it if it's in progress. there's definitely 349 /// better ways to do this, but this is ~simple for as far as a single 350 /// consumer can take us. 351 /// 352 /// we could take it from the queue but leave it in the set and remove from 353 /// set later, but splitting them apart feels more bug-prone. 354 async fn peek_refresh(&self) -> Option<IdentityKey> { 355 let q = self.refresh_queue.lock().await; 356 q.queue.front().cloned() 357 } 358 359 /// call to clear the latest key from the refresh queue. concurrent consumers not allowed. 360 /// 361 /// must provide the last peeked refresh queue item as a small safety check 362 async fn complete_refresh(&self, key: &IdentityKey) -> Result<(), IdentityError> { 363 let mut q = self.refresh_queue.lock().await; 364 365 let Some(queue_key) = q.queue.pop_front() else { 366 // gone from queue + since we're in an error condition, make sure it's not stuck in items 367 // (not toctou because we have the lock) 368 // bolder here than below and removing from items because if the queue is *empty*, then we 369 // know it hasn't been re-added since losing sync. 370 if q.items.remove(key) { 371 log::error!("identity refresh: queue de-sync: not in "); 372 } else { 373 log::warn!( 374 "identity refresh: tried to complete with wrong key. are multiple queue consumers running?" 375 ); 376 } 377 return Err(IdentityError::RefreshQueueKeyError("no key in queue")); 378 }; 379 380 if queue_key != *key { 381 // extra weird case here, what's the most defensive behaviour? 382 // we have two keys: ours should have been first but isn't. this shouldn't happen, so let's 383 // just leave items alone for it. risks unbounded growth but we're in a bad place already. 384 // the other key is the one we just popped. we didn't want it, so maybe we should put it 385 // back, BUT if we somehow ended up with concurrent consumers, we have bigger problems. take 386 // responsibility for taking it instead: remove it from items as well, and just drop it. 387 // 388 // hope that whoever calls us takes this error seriously. 389 if q.items.remove(&queue_key) { 390 log::warn!( 391 "identity refresh: queue de-sync + dropping a bystander key without refreshing it!" 392 ); 393 } else { 394 // you thought things couldn't get weirder? (i mean hopefully they can't) 395 log::error!("identity refresh: queue de-sync + bystander key also de-sync!?"); 396 } 397 return Err(IdentityError::RefreshQueueKeyError( 398 "wrong key at front of queue", 399 )); 400 } 401 402 if q.items.remove(key) { 403 Ok(()) 404 } else { 405 log::error!("identity refresh: queue de-sync: key not in items"); 406 Err(IdentityError::RefreshQueueKeyError("key not in items")) 407 } 408 } 409 410 /// run the refresh queue consumer 411 pub async fn run_refresher(&self, shutdown: CancellationToken) -> Result<(), IdentityError> { 412 let _guard = self 413 .refresher 414 .try_lock() 415 .expect("there to only be one refresher running"); 416 loop { 417 if shutdown.is_cancelled() { 418 log::info!("identity refresher: exiting for shutdown: closing cache..."); 419 if let Err(e) = self.cache.close().await { 420 log::error!("cache close errored: {e}"); 421 } else { 422 log::info!("identity cache closed.") 423 } 424 return Ok(()); 425 } 426 let Some(task_key) = self.peek_refresh().await else { 427 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; 428 continue; 429 }; 430 match task_key { 431 IdentityKey::Handle(ref handle) => { 432 log::trace!("refreshing handle {handle:?}"); 433 match self.handle_resolver.resolve(handle).await { 434 Ok(did) => { 435 self.cache.insert( 436 task_key.clone(), 437 IdentityVal(UtcDateTime::now(), IdentityData::Did(did)), 438 ); 439 } 440 Err(atrium_identity::Error::NotFound) => { 441 self.cache.insert( 442 task_key.clone(), 443 IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 444 ); 445 } 446 Err(err) => { 447 log::warn!( 448 "failed to refresh handle: {err:?}. leaving stale (should we eventually do something?)" 449 ); 450 } 451 } 452 self.complete_refresh(&task_key).await?; // failures are bugs, so break loop 453 } 454 IdentityKey::Did(ref did) => { 455 log::trace!("refreshing did doc: {did:?}"); 456 457 match self.did_resolver.resolve(did).await { 458 Ok(did_doc) => { 459 // TODO: fix in atrium: should verify id is did 460 if did_doc.id != did.to_string() { 461 log::warn!( 462 "refreshed did doc failed: wrong did doc id. dropping refresh." 463 ); 464 continue; 465 } 466 let mini_doc = match did_doc.try_into() { 467 Ok(md) => md, 468 Err(e) => { 469 log::warn!( 470 "converting mini doc failed: {e:?}. dropping refresh." 471 ); 472 continue; 473 } 474 }; 475 self.cache.insert( 476 task_key.clone(), 477 IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc)), 478 ); 479 } 480 Err(atrium_identity::Error::NotFound) => { 481 self.cache.insert( 482 task_key.clone(), 483 IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 484 ); 485 } 486 Err(err) => { 487 log::warn!( 488 "failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)" 489 ); 490 } 491 } 492 493 self.complete_refresh(&task_key).await?; // failures are bugs, so break loop 494 } 495 } 496 } 497 } 498} 499 500pub struct HickoryDnsTxtResolver(TokioResolver); 501 502impl HickoryDnsTxtResolver { 503 fn new() -> Result<Self, ResolveError> { 504 Ok(Self(TokioResolver::builder_tokio()?.build())) 505 } 506} 507 508impl DnsTxtResolver for HickoryDnsTxtResolver { 509 async fn resolve( 510 &self, 511 query: &str, 512 ) -> core::result::Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>> { 513 match self.0.txt_lookup(query).await { 514 Ok(r) => { 515 metrics::counter!("whoami_resolve_dns_txt", "success" => "true").increment(1); 516 Ok(r.iter().map(|r| r.to_string()).collect()) 517 } 518 Err(e) => { 519 metrics::counter!("whoami_resolve_dns_txt", "success" => "false").increment(1); 520 Err(e.into()) 521 } 522 } 523 } 524}