Highly ambitious ATProtocol AppView service and sdks

only log jetsteam deletes for relevant slices

+72 -25
+72 -25
api/src/jetstream.rs
··· 337 337 did: &str, 338 338 commit: atproto_jetstream::JetstreamEventDelete, 339 339 ) -> Result<()> { 340 - // First check if this DID is an actor in any of our slices 340 + let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey); 341 + 342 + // Get slices that track this collection 343 + let slice_collections = self.slice_collections.read().await; 344 + let slice_domains = self.slice_domains.read().await; 341 345 let actor_cache = self.actor_cache.read().await; 342 - let is_tracked_actor = actor_cache.keys().any(|(cached_did, _)| cached_did == did); 343 - 344 - if !is_tracked_actor { 345 - // DID is not an actor in any slice, skip deletion 346 + 347 + let mut relevant_slices: Vec<String> = Vec::new(); 348 + 349 + for (slice_uri, collections) in slice_collections.iter() { 350 + if !collections.contains(&commit.collection) { 351 + continue; 352 + } 353 + 354 + // Get the domain for this slice 355 + let domain = match slice_domains.get(slice_uri) { 356 + Some(d) => d, 357 + None => continue, 358 + }; 359 + 360 + // Check if this is a primary collection (starts with slice domain) 361 + let is_primary_collection = commit.collection.starts_with(domain); 362 + 363 + if is_primary_collection { 364 + // Primary collection - always process deletes 365 + relevant_slices.push(slice_uri.clone()); 366 + } else { 367 + // External collection - only process if DID is an actor in this slice 368 + let cache_key = (did.to_string(), slice_uri.clone()); 369 + if actor_cache.get(&cache_key).copied().unwrap_or(false) { 370 + relevant_slices.push(slice_uri.clone()); 371 + } 372 + } 373 + } 374 + 375 + if relevant_slices.is_empty() { 376 + // No relevant slices found, skip deletion 346 377 return Ok(()); 347 378 } 348 - 349 - // DID is an actor in our system, delete the record globally 350 - let uri = format!("at://{}/{}/{}", did, commit.collection, commit.rkey); 351 - 379 + 380 + // Delete the record and log only for relevant slices 352 381 match self.database.delete_record_by_uri(&uri, None).await { 353 382 Ok(rows_affected) => { 354 383 if rows_affected > 0 { 355 - info!("✓ Deleted record globally: {} ({} rows)", uri, rows_affected); 384 + info!("✓ Deleted record: {} ({} rows) for {} slice(s)", uri, rows_affected, relevant_slices.len()); 356 385 let message = format!("Record deleted from {}", commit.collection); 357 - Logger::global().log_jetstream(LogLevel::Info, &message, Some(serde_json::json!({ 358 - "operation": "delete", 359 - "collection": commit.collection, 360 - "did": did, 361 - "uri": uri, 362 - "rows_affected": rows_affected 363 - }))); 386 + 387 + // Log to each relevant slice 388 + for slice_uri in relevant_slices { 389 + Logger::global().log_jetstream_with_slice( 390 + LogLevel::Info, 391 + &message, 392 + Some(serde_json::json!({ 393 + "operation": "delete", 394 + "collection": commit.collection, 395 + "did": did, 396 + "uri": uri, 397 + "rows_affected": rows_affected 398 + })), 399 + Some(&slice_uri) 400 + ); 401 + } 364 402 } 365 403 } 366 404 Err(e) => { 367 405 let message = "Failed to delete record"; 368 406 error!("{}: {}", message, e); 369 - Logger::global().log_jetstream(LogLevel::Error, message, Some(serde_json::json!({ 370 - "operation": "delete", 371 - "collection": commit.collection, 372 - "did": did, 373 - "uri": uri, 374 - "error": e.to_string() 375 - }))); 407 + 408 + // Log error to each relevant slice 409 + for slice_uri in relevant_slices { 410 + Logger::global().log_jetstream_with_slice( 411 + LogLevel::Error, 412 + message, 413 + Some(serde_json::json!({ 414 + "operation": "delete", 415 + "collection": commit.collection, 416 + "did": did, 417 + "uri": uri, 418 + "error": e.to_string() 419 + })), 420 + Some(&slice_uri) 421 + ); 422 + } 376 423 } 377 424 } 378 - 425 + 379 426 Ok(()) 380 427 } 381 428