this repo has no description
1mod common; 2 3use cid::Cid; 4use common::*; 5use futures::{SinkExt, stream::StreamExt}; 6use iroh_car::CarReader; 7use reqwest::StatusCode; 8use serde::{Deserialize, Serialize}; 9use serde_json::{Value, json}; 10use std::io::Cursor; 11use tokio_tungstenite::{connect_async, tungstenite}; 12 13#[derive(Debug, Deserialize, Serialize)] 14struct FrameHeader { 15 op: i64, 16 t: String, 17} 18 19#[derive(Debug, Deserialize)] 20struct CommitFrame { 21 seq: i64, 22 #[serde(default)] 23 rebase: bool, 24 #[serde(rename = "tooBig", default)] 25 too_big: bool, 26 repo: String, 27 commit: Cid, 28 rev: String, 29 since: Option<String>, 30 #[serde(with = "serde_bytes")] 31 blocks: Vec<u8>, 32 ops: Vec<RepoOp>, 33 #[serde(default)] 34 blobs: Vec<Cid>, 35 time: String, 36 #[serde(rename = "prevData")] 37 prev_data: Option<Cid>, 38} 39 40#[derive(Debug, Deserialize)] 41struct RepoOp { 42 action: String, 43 path: String, 44 cid: Option<Cid>, 45 prev: Option<Cid>, 46} 47 48fn find_cbor_map_end(bytes: &[u8]) -> Result<usize, String> { 49 let mut pos = 0; 50 51 fn read_uint(bytes: &[u8], pos: &mut usize, additional: u8) -> Result<u64, String> { 52 match additional { 53 0..=23 => Ok(additional as u64), 54 24 => { 55 if *pos >= bytes.len() { 56 return Err("Unexpected end".into()); 57 } 58 let val = bytes[*pos] as u64; 59 *pos += 1; 60 Ok(val) 61 } 62 25 => { 63 if *pos + 2 > bytes.len() { 64 return Err("Unexpected end".into()); 65 } 66 let val = u16::from_be_bytes([bytes[*pos], bytes[*pos + 1]]) as u64; 67 *pos += 2; 68 Ok(val) 69 } 70 26 => { 71 if *pos + 4 > bytes.len() { 72 return Err("Unexpected end".into()); 73 } 74 let val = u32::from_be_bytes([ 75 bytes[*pos], 76 bytes[*pos + 1], 77 bytes[*pos + 2], 78 bytes[*pos + 3], 79 ]) as u64; 80 *pos += 4; 81 Ok(val) 82 } 83 27 => { 84 if *pos + 8 > bytes.len() { 85 return Err("Unexpected end".into()); 86 } 87 let val = u64::from_be_bytes([ 88 bytes[*pos], 89 bytes[*pos + 1], 90 bytes[*pos + 2], 91 bytes[*pos + 3], 92 bytes[*pos + 4], 93 bytes[*pos + 5], 94 bytes[*pos + 6], 95 bytes[*pos + 7], 96 ]); 97 *pos += 8; 98 Ok(val) 99 } 100 _ => Err(format!("Invalid additional info: {}", additional)), 101 } 102 } 103 104 fn skip_value(bytes: &[u8], pos: &mut usize) -> Result<(), String> { 105 if *pos >= bytes.len() { 106 return Err("Unexpected end".into()); 107 } 108 let initial = bytes[*pos]; 109 *pos += 1; 110 let major = initial >> 5; 111 let additional = initial & 0x1f; 112 113 match major { 114 0 | 1 => { 115 read_uint(bytes, pos, additional)?; 116 Ok(()) 117 } 118 2 | 3 => { 119 let len = read_uint(bytes, pos, additional)? as usize; 120 *pos += len; 121 Ok(()) 122 } 123 4 => { 124 let len = read_uint(bytes, pos, additional)?; 125 for _ in 0..len { 126 skip_value(bytes, pos)?; 127 } 128 Ok(()) 129 } 130 5 => { 131 let len = read_uint(bytes, pos, additional)?; 132 for _ in 0..len { 133 skip_value(bytes, pos)?; 134 skip_value(bytes, pos)?; 135 } 136 Ok(()) 137 } 138 6 => { 139 read_uint(bytes, pos, additional)?; 140 skip_value(bytes, pos) 141 } 142 7 => Ok(()), 143 _ => Err(format!("Unknown major type: {}", major)), 144 } 145 } 146 147 skip_value(bytes, &mut pos)?; 148 Ok(pos) 149} 150 151fn parse_frame(bytes: &[u8]) -> Result<(FrameHeader, CommitFrame), String> { 152 let header_len = find_cbor_map_end(bytes)?; 153 let header: FrameHeader = serde_ipld_dagcbor::from_slice(&bytes[..header_len]) 154 .map_err(|e| format!("Failed to parse header: {:?}", e))?; 155 156 if header.t != "#commit" { 157 return Err(format!("Not a commit frame: {}", header.t)); 158 } 159 160 let remaining = &bytes[header_len..]; 161 let frame: CommitFrame = serde_ipld_dagcbor::from_slice(remaining) 162 .map_err(|e| format!("Failed to parse commit frame: {:?}", e))?; 163 164 Ok((header, frame)) 165} 166 167fn is_valid_tid(s: &str) -> bool { 168 s.len() == 13 && s.chars().all(|c| c.is_alphanumeric()) 169} 170 171fn is_valid_time_format(s: &str) -> bool { 172 if !s.ends_with('Z') { 173 return false; 174 } 175 let parts: Vec<&str> = s.split('T').collect(); 176 if parts.len() != 2 { 177 return false; 178 } 179 let time_part = parts[1].trim_end_matches('Z'); 180 let time_parts: Vec<&str> = time_part.split(':').collect(); 181 if time_parts.len() != 3 { 182 return false; 183 } 184 let seconds_part = time_parts[2]; 185 if let Some(dot_pos) = seconds_part.find('.') { 186 let millis = &seconds_part[dot_pos + 1..]; 187 millis.len() == 3 188 } else { 189 false 190 } 191} 192 193#[tokio::test] 194async fn test_firehose_frame_structure() { 195 let client = client(); 196 let (token, did) = create_account_and_login(&client).await; 197 198 let url = format!( 199 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos", 200 app_port() 201 ); 202 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 203 204 let post_text = "Testing firehose validation!"; 205 let post_payload = json!({ 206 "repo": did, 207 "collection": "app.bsky.feed.post", 208 "record": { 209 "$type": "app.bsky.feed.post", 210 "text": post_text, 211 "createdAt": chrono::Utc::now().to_rfc3339(), 212 } 213 }); 214 let res = client 215 .post(format!( 216 "{}/xrpc/com.atproto.repo.createRecord", 217 base_url().await 218 )) 219 .bearer_auth(&token) 220 .json(&post_payload) 221 .send() 222 .await 223 .expect("Failed to create post"); 224 assert_eq!(res.status(), StatusCode::OK); 225 226 let mut frame_opt: Option<(FrameHeader, CommitFrame)> = None; 227 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { 228 loop { 229 let msg = ws_stream.next().await.unwrap().unwrap(); 230 let raw_bytes = match msg { 231 tungstenite::Message::Binary(bin) => bin, 232 _ => continue, 233 }; 234 if let Ok((h, f)) = parse_frame(&raw_bytes) { 235 if f.repo == did { 236 frame_opt = Some((h, f)); 237 break; 238 } 239 } 240 } 241 }) 242 .await; 243 assert!(timeout.is_ok(), "Timed out waiting for event for our DID"); 244 let (header, frame) = frame_opt.expect("No matching frame found"); 245 246 println!("\n=== Frame Structure Validation ===\n"); 247 248 println!("Header:"); 249 println!(" op: {} (expected: 1)", header.op); 250 println!(" t: {} (expected: #commit)", header.t); 251 assert_eq!(header.op, 1, "Header op should be 1"); 252 assert_eq!(header.t, "#commit", "Header t should be #commit"); 253 254 println!("\nCommitFrame fields:"); 255 println!(" seq: {}", frame.seq); 256 println!(" rebase: {}", frame.rebase); 257 println!(" tooBig: {}", frame.too_big); 258 println!(" repo: {}", frame.repo); 259 println!(" commit: {}", frame.commit); 260 println!( 261 " rev: {} (valid TID: {})", 262 frame.rev, 263 is_valid_tid(&frame.rev) 264 ); 265 println!(" since: {:?}", frame.since); 266 println!(" blocks length: {} bytes", frame.blocks.len()); 267 println!(" ops count: {}", frame.ops.len()); 268 println!(" blobs count: {}", frame.blobs.len()); 269 println!( 270 " time: {} (valid format: {})", 271 frame.time, 272 is_valid_time_format(&frame.time) 273 ); 274 println!( 275 " prevData: {:?} (IMPORTANT - should have value for updates)", 276 frame.prev_data 277 ); 278 279 assert_eq!(frame.repo, did, "Frame repo should match DID"); 280 assert!( 281 is_valid_tid(&frame.rev), 282 "Rev should be valid TID format, got: {}", 283 frame.rev 284 ); 285 assert!(!frame.blocks.is_empty(), "Blocks should not be empty"); 286 assert!( 287 is_valid_time_format(&frame.time), 288 "Time should be ISO 8601 with milliseconds and Z suffix" 289 ); 290 291 println!("\nOps validation:"); 292 for (i, op) in frame.ops.iter().enumerate() { 293 println!(" Op {}:", i); 294 println!(" action: {}", op.action); 295 println!(" path: {}", op.path); 296 println!(" cid: {:?}", op.cid); 297 println!( 298 " prev: {:?} (should be Some for updates/deletes)", 299 op.prev 300 ); 301 302 assert!( 303 ["create", "update", "delete"].contains(&op.action.as_str()), 304 "Invalid action: {}", 305 op.action 306 ); 307 assert!( 308 op.path.contains('/'), 309 "Path should contain collection/rkey: {}", 310 op.path 311 ); 312 313 if op.action == "create" { 314 assert!(op.cid.is_some(), "Create op should have cid"); 315 } 316 } 317 318 println!("\nCAR validation:"); 319 let mut car_reader = CarReader::new(Cursor::new(&frame.blocks)).await.unwrap(); 320 let car_header = car_reader.header().clone(); 321 println!(" CAR roots: {:?}", car_header.roots()); 322 323 assert!( 324 !car_header.roots().is_empty(), 325 "CAR should have at least one root" 326 ); 327 assert_eq!( 328 car_header.roots()[0], 329 frame.commit, 330 "First CAR root should be commit CID" 331 ); 332 333 let mut block_cids: Vec<Cid> = Vec::new(); 334 while let Ok(Some((cid, _))) = car_reader.next_block().await { 335 block_cids.push(cid); 336 } 337 println!(" CAR blocks: {} total", block_cids.len()); 338 for cid in &block_cids { 339 println!(" - {}", cid); 340 } 341 342 assert!( 343 block_cids.contains(&frame.commit), 344 "CAR should contain commit block" 345 ); 346 347 for op in &frame.ops { 348 if let Some(ref cid) = op.cid { 349 assert!( 350 block_cids.contains(cid), 351 "CAR should contain op's record block: {}", 352 cid 353 ); 354 } 355 } 356 357 println!("\n=== Validation Complete ===\n"); 358 359 ws_stream.send(tungstenite::Message::Close(None)).await.ok(); 360} 361 362#[tokio::test] 363async fn test_firehose_update_has_prev_field() { 364 let client = client(); 365 let (token, did) = create_account_and_login(&client).await; 366 367 let url = format!( 368 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos", 369 app_port() 370 ); 371 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 372 373 let profile_payload = json!({ 374 "repo": did, 375 "collection": "app.bsky.actor.profile", 376 "rkey": "self", 377 "record": { 378 "$type": "app.bsky.actor.profile", 379 "displayName": "Test User v1", 380 } 381 }); 382 let res = client 383 .post(format!( 384 "{}/xrpc/com.atproto.repo.putRecord", 385 base_url().await 386 )) 387 .bearer_auth(&token) 388 .json(&profile_payload) 389 .send() 390 .await 391 .expect("Failed to create profile"); 392 assert_eq!(res.status(), StatusCode::OK); 393 let first_profile: Value = res.json().await.unwrap(); 394 let first_cid = first_profile["cid"].as_str().unwrap(); 395 396 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { 397 loop { 398 let msg = ws_stream.next().await.unwrap().unwrap(); 399 let raw_bytes = match msg { 400 tungstenite::Message::Binary(bin) => bin, 401 _ => continue, 402 }; 403 if let Ok((_, f)) = parse_frame(&raw_bytes) { 404 if f.repo == did { 405 break; 406 } 407 } 408 } 409 }) 410 .await; 411 assert!(timeout.is_ok(), "Timed out waiting for first commit"); 412 413 let update_payload = json!({ 414 "repo": did, 415 "collection": "app.bsky.actor.profile", 416 "rkey": "self", 417 "record": { 418 "$type": "app.bsky.actor.profile", 419 "displayName": "Test User v2", 420 } 421 }); 422 let res = client 423 .post(format!( 424 "{}/xrpc/com.atproto.repo.putRecord", 425 base_url().await 426 )) 427 .bearer_auth(&token) 428 .json(&update_payload) 429 .send() 430 .await 431 .expect("Failed to update profile"); 432 assert_eq!(res.status(), StatusCode::OK); 433 434 let mut frame_opt: Option<CommitFrame> = None; 435 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { 436 loop { 437 let msg = ws_stream.next().await.unwrap().unwrap(); 438 let raw_bytes = match msg { 439 tungstenite::Message::Binary(bin) => bin, 440 _ => continue, 441 }; 442 if let Ok((_, f)) = parse_frame(&raw_bytes) { 443 if f.repo == did { 444 frame_opt = Some(f); 445 break; 446 } 447 } 448 } 449 }) 450 .await; 451 assert!(timeout.is_ok(), "Timed out waiting for update commit"); 452 let frame = frame_opt.expect("No matching frame found"); 453 454 println!("\n=== Update Operation Validation ===\n"); 455 println!("First profile CID: {}", first_cid); 456 println!("Frame prevData: {:?}", frame.prev_data); 457 458 for op in &frame.ops { 459 println!( 460 "Op: action={}, path={}, cid={:?}, prev={:?}", 461 op.action, op.path, op.cid, op.prev 462 ); 463 464 if op.action == "update" && op.path.contains("app.bsky.actor.profile") { 465 assert!( 466 op.prev.is_some(), 467 "Update operation should have 'prev' field with old CID! Got: {:?}", 468 op.prev 469 ); 470 println!(" ✓ Update op has prev field: {:?}", op.prev); 471 } 472 } 473 474 println!("\n=== Validation Complete ===\n"); 475 476 ws_stream.send(tungstenite::Message::Close(None)).await.ok(); 477} 478 479#[tokio::test] 480async fn test_firehose_commit_has_prev_data() { 481 let client = client(); 482 let (token, did) = create_account_and_login(&client).await; 483 484 let url = format!( 485 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos", 486 app_port() 487 ); 488 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 489 490 let post_payload = json!({ 491 "repo": did, 492 "collection": "app.bsky.feed.post", 493 "record": { 494 "$type": "app.bsky.feed.post", 495 "text": "First post", 496 "createdAt": chrono::Utc::now().to_rfc3339(), 497 } 498 }); 499 client 500 .post(format!( 501 "{}/xrpc/com.atproto.repo.createRecord", 502 base_url().await 503 )) 504 .bearer_auth(&token) 505 .json(&post_payload) 506 .send() 507 .await 508 .expect("Failed to create first post"); 509 510 let mut first_frame_opt: Option<CommitFrame> = None; 511 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { 512 loop { 513 let msg = ws_stream.next().await.unwrap().unwrap(); 514 let raw_bytes = match msg { 515 tungstenite::Message::Binary(bin) => bin, 516 _ => continue, 517 }; 518 if let Ok((_, f)) = parse_frame(&raw_bytes) { 519 if f.repo == did { 520 first_frame_opt = Some(f); 521 break; 522 } 523 } 524 } 525 }) 526 .await; 527 assert!(timeout.is_ok(), "Timed out waiting for first commit"); 528 let first_frame = first_frame_opt.expect("No first frame found"); 529 530 println!("\n=== First Commit ==="); 531 println!( 532 " prevData: {:?} (first commit may be None)", 533 first_frame.prev_data 534 ); 535 println!( 536 " since: {:?} (first commit should be None)", 537 first_frame.since 538 ); 539 540 let post_payload2 = json!({ 541 "repo": did, 542 "collection": "app.bsky.feed.post", 543 "record": { 544 "$type": "app.bsky.feed.post", 545 "text": "Second post", 546 "createdAt": chrono::Utc::now().to_rfc3339(), 547 } 548 }); 549 client 550 .post(format!( 551 "{}/xrpc/com.atproto.repo.createRecord", 552 base_url().await 553 )) 554 .bearer_auth(&token) 555 .json(&post_payload2) 556 .send() 557 .await 558 .expect("Failed to create second post"); 559 560 let mut second_frame_opt: Option<CommitFrame> = None; 561 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { 562 loop { 563 let msg = ws_stream.next().await.unwrap().unwrap(); 564 let raw_bytes = match msg { 565 tungstenite::Message::Binary(bin) => bin, 566 _ => continue, 567 }; 568 if let Ok((_, f)) = parse_frame(&raw_bytes) { 569 if f.repo == did { 570 second_frame_opt = Some(f); 571 break; 572 } 573 } 574 } 575 }) 576 .await; 577 assert!(timeout.is_ok(), "Timed out waiting for second commit"); 578 let second_frame = second_frame_opt.expect("No second frame found"); 579 580 println!("\n=== Second Commit ==="); 581 println!( 582 " prevData: {:?} (should have value - MST root CID)", 583 second_frame.prev_data 584 ); 585 println!( 586 " since: {:?} (should have value - previous rev)", 587 second_frame.since 588 ); 589 590 assert!( 591 second_frame.since.is_some(), 592 "Second commit should have 'since' field pointing to first commit rev" 593 ); 594 595 println!("\n=== Validation Complete ===\n"); 596 597 ws_stream.send(tungstenite::Message::Close(None)).await.ok(); 598} 599 600#[tokio::test] 601async fn test_compare_raw_cbor_encoding() { 602 let client = client(); 603 let (token, did) = create_account_and_login(&client).await; 604 605 let url = format!( 606 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos", 607 app_port() 608 ); 609 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 610 611 let post_payload = json!({ 612 "repo": did, 613 "collection": "app.bsky.feed.post", 614 "record": { 615 "$type": "app.bsky.feed.post", 616 "text": "CBOR encoding test", 617 "createdAt": chrono::Utc::now().to_rfc3339(), 618 } 619 }); 620 client 621 .post(format!( 622 "{}/xrpc/com.atproto.repo.createRecord", 623 base_url().await 624 )) 625 .bearer_auth(&token) 626 .json(&post_payload) 627 .send() 628 .await 629 .expect("Failed to create post"); 630 631 let mut raw_bytes_opt: Option<Vec<u8>> = None; 632 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { 633 loop { 634 let msg = ws_stream.next().await.unwrap().unwrap(); 635 let raw = match msg { 636 tungstenite::Message::Binary(bin) => bin, 637 _ => continue, 638 }; 639 if let Ok((_, f)) = parse_frame(&raw) { 640 if f.repo == did { 641 raw_bytes_opt = Some(raw.to_vec()); 642 break; 643 } 644 } 645 } 646 }) 647 .await; 648 assert!(timeout.is_ok(), "Timed out waiting for event for our DID"); 649 let raw_bytes = raw_bytes_opt.expect("No matching frame found"); 650 651 println!("\n=== Raw CBOR Analysis ===\n"); 652 println!("Total frame size: {} bytes", raw_bytes.len()); 653 654 fn bytes_to_hex(bytes: &[u8]) -> String { 655 bytes 656 .iter() 657 .map(|b| format!("{:02x}", b)) 658 .collect::<Vec<_>>() 659 .join("") 660 } 661 662 println!( 663 "First 64 bytes (hex): {}", 664 bytes_to_hex(&raw_bytes[..64.min(raw_bytes.len())]) 665 ); 666 667 let header_end = find_cbor_map_end(&raw_bytes).expect("Failed to find header end"); 668 669 println!("\nHeader section: {} bytes", header_end); 670 println!("Header hex: {}", bytes_to_hex(&raw_bytes[..header_end])); 671 672 println!("\nPayload section: {} bytes", raw_bytes.len() - header_end); 673 674 println!("\n=== Analysis Complete ===\n"); 675 676 ws_stream.send(tungstenite::Message::Close(None)).await.ok(); 677}