at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[crawler] add diagnostic to see possibly hanging blocking tasks?

ptr.pet 80f5c529 74f9249b

verified
+85 -42
+85 -42
src/crawler/mod.rs
··· 410 410 count: usize, 411 411 } 412 412 413 + const BLOCKING_TASK_TIMEOUT: Duration = Duration::from_secs(30); 414 + 413 415 let parse_result = { 414 416 let repos = db.repos.clone(); 415 417 let filter_ks = db.filter.clone(); 416 418 let crawler_ks = db.crawler.clone(); 417 - tokio::task::spawn_blocking(move || -> miette::Result<Option<ParseResult>> { 418 - let output = match serde_json::from_slice::<ListReposOutput>(&bytes) { 419 - Ok(out) => out.into_static(), 420 - Err(e) => { 421 - error!(err = %e, "failed to parse listRepos response"); 419 + 420 + // this wont actually cancel the task since spawn_blocking isnt cancel safe 421 + // but at least we'll see whats going on? 422 + tokio::time::timeout( 423 + BLOCKING_TASK_TIMEOUT, 424 + tokio::task::spawn_blocking(move || -> miette::Result<Option<ParseResult>> { 425 + let output = match serde_json::from_slice::<ListReposOutput>(&bytes) { 426 + Ok(out) => out.into_static(), 427 + Err(e) => { 428 + error!(err = %e, "failed to parse listRepos response"); 429 + return Ok(None); 430 + } 431 + }; 432 + 433 + if output.repos.is_empty() { 422 434 return Ok(None); 423 435 } 424 - }; 425 436 426 - if output.repos.is_empty() { 427 - return Ok(None); 428 - } 437 + let count = output.repos.len(); 438 + let next_cursor = output.cursor.map(|c| c.as_str().into()); 439 + let mut unknown = Vec::new(); 440 + for repo in output.repos { 441 + let excl_key = crate::db::filter::exclude_key(repo.did.as_str())?; 442 + if filter_ks.contains_key(&excl_key).into_diagnostic()? { 443 + continue; 444 + } 429 445 430 - let count = output.repos.len(); 431 - let next_cursor = output.cursor.map(|c| c.as_str().into()); 432 - let mut unknown = Vec::new(); 433 - for repo in output.repos { 434 - let excl_key = crate::db::filter::exclude_key(repo.did.as_str())?; 435 - if filter_ks.contains_key(&excl_key).into_diagnostic()? { 436 - continue; 437 - } 446 + // already in retry queue — let the retry thread handle it 447 + let retry_key = keys::crawler_retry_key(&repo.did); 448 + if crawler_ks.contains_key(&retry_key).into_diagnostic()? { 449 + continue; 450 + } 438 451 439 - // already in retry queue — let the retry thread handle it 440 - let retry_key = keys::crawler_retry_key(&repo.did); 441 - if crawler_ks.contains_key(&retry_key).into_diagnostic()? { 442 - continue; 452 + let did_key = keys::repo_key(&repo.did); 453 + if !repos.contains_key(&did_key).into_diagnostic()? { 454 + unknown.push(repo.did.into_static()); 455 + } 443 456 } 444 457 445 - let did_key = keys::repo_key(&repo.did); 446 - if !repos.contains_key(&did_key).into_diagnostic()? { 447 - unknown.push(repo.did.into_static()); 448 - } 449 - } 450 - 451 - Ok(Some(ParseResult { 452 - unknown_dids: unknown, 453 - cursor: next_cursor, 454 - count, 455 - })) 456 - }) 458 + Ok(Some(ParseResult { 459 + unknown_dids: unknown, 460 + cursor: next_cursor, 461 + count, 462 + })) 463 + }), 464 + ) 457 465 .await 458 - .into_diagnostic()?? 459 - }; 466 + } 467 + .into_diagnostic()? 468 + .map_err(|_| { 469 + error!( 470 + "spawn_blocking task for parsing listRepos timed out after {}", 471 + BLOCKING_TASK_TIMEOUT.as_secs() 472 + ); 473 + miette::miette!("spawn_blocking task for parsing listRepos timed out") 474 + })?; 460 475 461 - let Some(ParseResult { 476 + let Ok(Some(ParseResult { 462 477 unknown_dids, 463 478 cursor: next_cursor, 464 479 count, 465 - }) = parse_result 480 + })) = parse_result 466 481 else { 467 482 info!("finished enumeration (or empty page)"); 468 483 tokio::time::sleep(Duration::from_secs(3600)).await; ··· 505 520 .wrap_err("cant serialize cursor")?, 506 521 ); 507 522 508 - tokio::task::spawn_blocking(move || batch.commit().into_diagnostic()) 509 - .await 510 - .into_diagnostic()??; 523 + tokio::time::timeout( 524 + BLOCKING_TASK_TIMEOUT, 525 + tokio::task::spawn_blocking(move || batch.commit().into_diagnostic()), 526 + ) 527 + .await 528 + .into_diagnostic()? 529 + .map_err(|_| { 530 + error!( 531 + "spawn_blocking task for batch commit timed out after {}", 532 + BLOCKING_TASK_TIMEOUT.as_secs() 533 + ); 534 + miette::miette!("spawn_blocking task for batch commit timed out") 535 + })? 536 + .inspect_err(|e| { 537 + error!(err = ?e, "batch commit failed"); 538 + }) 539 + .ok(); 511 540 512 541 crawler.account_new_repos(to_queue.len()).await; 513 542 ··· 779 808 set.spawn(self.check_repo_signals(filter, did).instrument(span)); 780 809 } 781 810 782 - while let Some(res) = set.join_next().await { 783 - let (did, result) = res.into_diagnostic()?; 811 + while let Some(res) = tokio::time::timeout(Duration::from_secs(60), set.join_next()) 812 + .await 813 + .into_diagnostic() 814 + .map_err(|_| { 815 + error!("signal check task timed out after 60s"); 816 + miette::miette!("signal check task timed out") 817 + })? 818 + { 819 + let (did, result) = match res { 820 + Ok(inner) => inner, 821 + Err(e) => { 822 + error!(err = ?e, "signal check task failed or panicked"); 823 + continue; 824 + } 825 + }; 826 + 784 827 match result { 785 828 CrawlCheckResult::Signal => { 786 829 batch.remove(&db.crawler, keys::crawler_retry_key(&did));