this repo has no description
1mod common; 2 3use cid::Cid; 4use common::*; 5use futures::{SinkExt, stream::StreamExt}; 6use iroh_car::CarReader; 7use reqwest::StatusCode; 8use serde::{Deserialize, Serialize}; 9use serde_json::{Value, json}; 10use std::io::Cursor; 11use tokio_tungstenite::{connect_async, tungstenite}; 12 13#[derive(Debug, Deserialize, Serialize)] 14struct FrameHeader { 15 op: i64, 16 t: String, 17} 18 19#[derive(Debug, Deserialize)] 20struct CommitFrame { 21 seq: i64, 22 #[serde(default)] 23 rebase: bool, 24 #[serde(rename = "tooBig", default)] 25 too_big: bool, 26 repo: String, 27 commit: Cid, 28 rev: String, 29 since: Option<String>, 30 #[serde(with = "serde_bytes")] 31 blocks: Vec<u8>, 32 ops: Vec<RepoOp>, 33 #[serde(default)] 34 blobs: Vec<Cid>, 35 time: String, 36 #[serde(rename = "prevData")] 37 prev_data: Option<Cid>, 38} 39 40#[derive(Debug, Deserialize)] 41struct RepoOp { 42 action: String, 43 path: String, 44 cid: Option<Cid>, 45 prev: Option<Cid>, 46} 47 48fn find_cbor_map_end(bytes: &[u8]) -> Result<usize, String> { 49 let mut pos = 0; 50 51 fn read_uint(bytes: &[u8], pos: &mut usize, additional: u8) -> Result<u64, String> { 52 match additional { 53 0..=23 => Ok(additional as u64), 54 24 => { 55 if *pos >= bytes.len() { 56 return Err("Unexpected end".into()); 57 } 58 let val = bytes[*pos] as u64; 59 *pos += 1; 60 Ok(val) 61 } 62 25 => { 63 if *pos + 2 > bytes.len() { 64 return Err("Unexpected end".into()); 65 } 66 let val = u16::from_be_bytes([bytes[*pos], bytes[*pos + 1]]) as u64; 67 *pos += 2; 68 Ok(val) 69 } 70 26 => { 71 if *pos + 4 > bytes.len() { 72 return Err("Unexpected end".into()); 73 } 74 let val = u32::from_be_bytes([ 75 bytes[*pos], 76 bytes[*pos + 1], 77 bytes[*pos + 2], 78 bytes[*pos + 3], 79 ]) as u64; 80 *pos += 4; 81 Ok(val) 82 } 83 27 => { 84 if *pos + 8 > bytes.len() { 85 return Err("Unexpected end".into()); 86 } 87 let val = u64::from_be_bytes([ 88 bytes[*pos], 89 bytes[*pos + 1], 90 bytes[*pos + 2], 91 bytes[*pos + 3], 92 bytes[*pos + 4], 93 bytes[*pos + 5], 94 bytes[*pos + 6], 95 bytes[*pos + 7], 96 ]); 97 *pos += 8; 98 Ok(val) 99 } 100 _ => Err(format!("Invalid additional info: {}", additional)), 101 } 102 } 103 104 fn skip_value(bytes: &[u8], pos: &mut usize) -> Result<(), String> { 105 if *pos >= bytes.len() { 106 return Err("Unexpected end".into()); 107 } 108 let initial = bytes[*pos]; 109 *pos += 1; 110 let major = initial >> 5; 111 let additional = initial & 0x1f; 112 113 match major { 114 0 | 1 => { 115 read_uint(bytes, pos, additional)?; 116 Ok(()) 117 } 118 2 | 3 => { 119 let len = read_uint(bytes, pos, additional)? as usize; 120 *pos += len; 121 Ok(()) 122 } 123 4 => { 124 let len = read_uint(bytes, pos, additional)?; 125 for _ in 0..len { 126 skip_value(bytes, pos)?; 127 } 128 Ok(()) 129 } 130 5 => { 131 let len = read_uint(bytes, pos, additional)?; 132 for _ in 0..len { 133 skip_value(bytes, pos)?; 134 skip_value(bytes, pos)?; 135 } 136 Ok(()) 137 } 138 6 => { 139 read_uint(bytes, pos, additional)?; 140 skip_value(bytes, pos) 141 } 142 7 => Ok(()), 143 _ => Err(format!("Unknown major type: {}", major)), 144 } 145 } 146 147 skip_value(bytes, &mut pos)?; 148 Ok(pos) 149} 150 151fn parse_frame(bytes: &[u8]) -> Result<(FrameHeader, CommitFrame), String> { 152 let header_len = find_cbor_map_end(bytes)?; 153 let header: FrameHeader = serde_ipld_dagcbor::from_slice(&bytes[..header_len]) 154 .map_err(|e| format!("Failed to parse header: {:?}", e))?; 155 156 if header.t != "#commit" { 157 return Err(format!("Not a commit frame: {}", header.t)); 158 } 159 160 let remaining = &bytes[header_len..]; 161 let frame: CommitFrame = serde_ipld_dagcbor::from_slice(remaining) 162 .map_err(|e| format!("Failed to parse commit frame: {:?}", e))?; 163 164 Ok((header, frame)) 165} 166 167fn is_valid_tid(s: &str) -> bool { 168 s.len() == 13 && s.chars().all(|c| c.is_alphanumeric()) 169} 170 171fn is_valid_time_format(s: &str) -> bool { 172 if !s.ends_with('Z') { 173 return false; 174 } 175 let parts: Vec<&str> = s.split('T').collect(); 176 if parts.len() != 2 { 177 return false; 178 } 179 let time_part = parts[1].trim_end_matches('Z'); 180 let time_parts: Vec<&str> = time_part.split(':').collect(); 181 if time_parts.len() != 3 { 182 return false; 183 } 184 let seconds_part = time_parts[2]; 185 if let Some(dot_pos) = seconds_part.find('.') { 186 let millis = &seconds_part[dot_pos + 1..]; 187 millis.len() == 3 188 } else { 189 false 190 } 191} 192 193#[tokio::test] 194async fn test_firehose_frame_structure() { 195 let client = client(); 196 let (token, did) = create_account_and_login(&client).await; 197 198 let url = format!( 199 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos", 200 app_port() 201 ); 202 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 203 204 let post_text = "Testing firehose validation!"; 205 let post_payload = json!({ 206 "repo": did, 207 "collection": "app.bsky.feed.post", 208 "record": { 209 "$type": "app.bsky.feed.post", 210 "text": post_text, 211 "createdAt": chrono::Utc::now().to_rfc3339(), 212 } 213 }); 214 let res = client 215 .post(format!( 216 "{}/xrpc/com.atproto.repo.createRecord", 217 base_url().await 218 )) 219 .bearer_auth(&token) 220 .json(&post_payload) 221 .send() 222 .await 223 .expect("Failed to create post"); 224 assert_eq!(res.status(), StatusCode::OK); 225 226 let mut frame_opt: Option<(FrameHeader, CommitFrame)> = None; 227 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { 228 loop { 229 let msg = ws_stream.next().await.unwrap().unwrap(); 230 let raw_bytes = match msg { 231 tungstenite::Message::Binary(bin) => bin, 232 _ => continue, 233 }; 234 if let Ok((h, f)) = parse_frame(&raw_bytes) { 235 if f.repo == did { 236 frame_opt = Some((h, f)); 237 break; 238 } 239 } 240 } 241 }) 242 .await; 243 assert!(timeout.is_ok(), "Timed out waiting for event for our DID"); 244 let (header, frame) = frame_opt.expect("No matching frame found"); 245 246 println!("\n=== Frame Structure Validation ===\n"); 247 248 println!("Header:"); 249 println!(" op: {} (expected: 1)", header.op); 250 println!(" t: {} (expected: #commit)", header.t); 251 assert_eq!(header.op, 1, "Header op should be 1"); 252 assert_eq!(header.t, "#commit", "Header t should be #commit"); 253 254 println!("\nCommitFrame fields:"); 255 println!(" seq: {}", frame.seq); 256 println!(" rebase: {}", frame.rebase); 257 println!(" tooBig: {}", frame.too_big); 258 println!(" repo: {}", frame.repo); 259 println!(" commit: {}", frame.commit); 260 println!( 261 " rev: {} (valid TID: {})", 262 frame.rev, 263 is_valid_tid(&frame.rev) 264 ); 265 println!(" since: {:?}", frame.since); 266 println!(" blocks length: {} bytes", frame.blocks.len()); 267 println!(" ops count: {}", frame.ops.len()); 268 println!(" blobs count: {}", frame.blobs.len()); 269 println!( 270 " time: {} (valid format: {})", 271 frame.time, 272 is_valid_time_format(&frame.time) 273 ); 274 println!( 275 " prevData: {:?} (IMPORTANT - should have value for updates)", 276 frame.prev_data 277 ); 278 279 assert_eq!(frame.repo, did, "Frame repo should match DID"); 280 assert!( 281 is_valid_tid(&frame.rev), 282 "Rev should be valid TID format, got: {}", 283 frame.rev 284 ); 285 assert!(!frame.blocks.is_empty(), "Blocks should not be empty"); 286 assert!( 287 is_valid_time_format(&frame.time), 288 "Time should be ISO 8601 with milliseconds and Z suffix" 289 ); 290 291 println!("\nOps validation:"); 292 for (i, op) in frame.ops.iter().enumerate() { 293 println!(" Op {}:", i); 294 println!(" action: {}", op.action); 295 println!(" path: {}", op.path); 296 println!(" cid: {:?}", op.cid); 297 println!( 298 " prev: {:?} (should be Some for updates/deletes)", 299 op.prev 300 ); 301 302 assert!( 303 ["create", "update", "delete"].contains(&op.action.as_str()), 304 "Invalid action: {}", 305 op.action 306 ); 307 assert!( 308 op.path.contains('/'), 309 "Path should contain collection/rkey: {}", 310 op.path 311 ); 312 313 if op.action == "create" { 314 assert!(op.cid.is_some(), "Create op should have cid"); 315 } 316 } 317 318 println!("\nCAR validation:"); 319 let mut car_reader = CarReader::new(Cursor::new(&frame.blocks)).await.unwrap(); 320 let car_header = car_reader.header().clone(); 321 println!(" CAR roots: {:?}", car_header.roots()); 322 323 assert!( 324 !car_header.roots().is_empty(), 325 "CAR should have at least one root" 326 ); 327 assert_eq!( 328 car_header.roots()[0], 329 frame.commit, 330 "First CAR root should be commit CID" 331 ); 332 333 let mut block_cids: Vec<Cid> = Vec::new(); 334 while let Ok(Some((cid, _))) = car_reader.next_block().await { 335 block_cids.push(cid); 336 } 337 println!(" CAR blocks: {} total", block_cids.len()); 338 for cid in &block_cids { 339 println!(" - {}", cid); 340 } 341 342 assert!( 343 block_cids.contains(&frame.commit), 344 "CAR should contain commit block" 345 ); 346 347 for op in &frame.ops { 348 if let Some(ref cid) = op.cid { 349 assert!( 350 block_cids.contains(cid), 351 "CAR should contain op's record block: {}", 352 cid 353 ); 354 } 355 } 356 357 println!("\n=== Validation Complete ===\n"); 358 359 ws_stream.send(tungstenite::Message::Close(None)).await.ok(); 360} 361 362#[tokio::test] 363async fn test_firehose_update_has_prev_field() { 364 let client = client(); 365 let (token, did) = create_account_and_login(&client).await; 366 367 let profile_payload = json!({ 368 "repo": did, 369 "collection": "app.bsky.actor.profile", 370 "rkey": "self", 371 "record": { 372 "$type": "app.bsky.actor.profile", 373 "displayName": "Test User v1", 374 } 375 }); 376 let res = client 377 .post(format!( 378 "{}/xrpc/com.atproto.repo.putRecord", 379 base_url().await 380 )) 381 .bearer_auth(&token) 382 .json(&profile_payload) 383 .send() 384 .await 385 .expect("Failed to create profile"); 386 assert_eq!(res.status(), StatusCode::OK); 387 let first_profile: Value = res.json().await.unwrap(); 388 let first_cid = first_profile["cid"].as_str().unwrap(); 389 390 let url = format!( 391 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos", 392 app_port() 393 ); 394 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 395 396 let update_payload = json!({ 397 "repo": did, 398 "collection": "app.bsky.actor.profile", 399 "rkey": "self", 400 "record": { 401 "$type": "app.bsky.actor.profile", 402 "displayName": "Test User v2", 403 } 404 }); 405 let res = client 406 .post(format!( 407 "{}/xrpc/com.atproto.repo.putRecord", 408 base_url().await 409 )) 410 .bearer_auth(&token) 411 .json(&update_payload) 412 .send() 413 .await 414 .expect("Failed to update profile"); 415 assert_eq!(res.status(), StatusCode::OK); 416 417 let mut frame_opt: Option<CommitFrame> = None; 418 let timeout = tokio::time::timeout(std::time::Duration::from_secs(15), async { 419 loop { 420 let msg = match ws_stream.next().await { 421 Some(Ok(m)) => m, 422 _ => continue, 423 }; 424 let raw_bytes = match msg { 425 tungstenite::Message::Binary(bin) => bin, 426 _ => continue, 427 }; 428 if let Ok((_, f)) = parse_frame(&raw_bytes) { 429 if f.repo == did { 430 frame_opt = Some(f); 431 break; 432 } 433 } 434 } 435 }) 436 .await; 437 assert!(timeout.is_ok(), "Timed out waiting for update commit"); 438 let frame = frame_opt.expect("No matching frame found"); 439 440 println!("\n=== Update Operation Validation ===\n"); 441 println!("First profile CID: {}", first_cid); 442 println!("Frame prevData: {:?}", frame.prev_data); 443 444 for op in &frame.ops { 445 println!( 446 "Op: action={}, path={}, cid={:?}, prev={:?}", 447 op.action, op.path, op.cid, op.prev 448 ); 449 450 if op.action == "update" && op.path.contains("app.bsky.actor.profile") { 451 assert!( 452 op.prev.is_some(), 453 "Update operation should have 'prev' field with old CID! Got: {:?}", 454 op.prev 455 ); 456 println!(" ✓ Update op has prev field: {:?}", op.prev); 457 } 458 } 459 460 println!("\n=== Validation Complete ===\n"); 461 462 ws_stream.send(tungstenite::Message::Close(None)).await.ok(); 463} 464 465#[tokio::test] 466async fn test_firehose_commit_has_prev_data() { 467 let client = client(); 468 let (token, did) = create_account_and_login(&client).await; 469 470 let url = format!( 471 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos", 472 app_port() 473 ); 474 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 475 476 let post_payload = json!({ 477 "repo": did, 478 "collection": "app.bsky.feed.post", 479 "record": { 480 "$type": "app.bsky.feed.post", 481 "text": "First post", 482 "createdAt": chrono::Utc::now().to_rfc3339(), 483 } 484 }); 485 client 486 .post(format!( 487 "{}/xrpc/com.atproto.repo.createRecord", 488 base_url().await 489 )) 490 .bearer_auth(&token) 491 .json(&post_payload) 492 .send() 493 .await 494 .expect("Failed to create first post"); 495 496 let mut first_frame_opt: Option<CommitFrame> = None; 497 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { 498 loop { 499 let msg = ws_stream.next().await.unwrap().unwrap(); 500 let raw_bytes = match msg { 501 tungstenite::Message::Binary(bin) => bin, 502 _ => continue, 503 }; 504 if let Ok((_, f)) = parse_frame(&raw_bytes) { 505 if f.repo == did { 506 first_frame_opt = Some(f); 507 break; 508 } 509 } 510 } 511 }) 512 .await; 513 assert!(timeout.is_ok(), "Timed out waiting for first commit"); 514 let first_frame = first_frame_opt.expect("No first frame found"); 515 516 println!("\n=== First Commit ==="); 517 println!( 518 " prevData: {:?} (first commit may be None)", 519 first_frame.prev_data 520 ); 521 println!( 522 " since: {:?} (first commit should be None)", 523 first_frame.since 524 ); 525 526 let post_payload2 = json!({ 527 "repo": did, 528 "collection": "app.bsky.feed.post", 529 "record": { 530 "$type": "app.bsky.feed.post", 531 "text": "Second post", 532 "createdAt": chrono::Utc::now().to_rfc3339(), 533 } 534 }); 535 client 536 .post(format!( 537 "{}/xrpc/com.atproto.repo.createRecord", 538 base_url().await 539 )) 540 .bearer_auth(&token) 541 .json(&post_payload2) 542 .send() 543 .await 544 .expect("Failed to create second post"); 545 546 let mut second_frame_opt: Option<CommitFrame> = None; 547 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { 548 loop { 549 let msg = ws_stream.next().await.unwrap().unwrap(); 550 let raw_bytes = match msg { 551 tungstenite::Message::Binary(bin) => bin, 552 _ => continue, 553 }; 554 if let Ok((_, f)) = parse_frame(&raw_bytes) { 555 if f.repo == did { 556 second_frame_opt = Some(f); 557 break; 558 } 559 } 560 } 561 }) 562 .await; 563 assert!(timeout.is_ok(), "Timed out waiting for second commit"); 564 let second_frame = second_frame_opt.expect("No second frame found"); 565 566 println!("\n=== Second Commit ==="); 567 println!( 568 " prevData: {:?} (should have value - MST root CID)", 569 second_frame.prev_data 570 ); 571 println!( 572 " since: {:?} (should have value - previous rev)", 573 second_frame.since 574 ); 575 576 assert!( 577 second_frame.since.is_some(), 578 "Second commit should have 'since' field pointing to first commit rev" 579 ); 580 581 println!("\n=== Validation Complete ===\n"); 582 583 ws_stream.send(tungstenite::Message::Close(None)).await.ok(); 584} 585 586#[tokio::test] 587async fn test_compare_raw_cbor_encoding() { 588 let client = client(); 589 let (token, did) = create_account_and_login(&client).await; 590 591 let url = format!( 592 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos", 593 app_port() 594 ); 595 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 596 597 let post_payload = json!({ 598 "repo": did, 599 "collection": "app.bsky.feed.post", 600 "record": { 601 "$type": "app.bsky.feed.post", 602 "text": "CBOR encoding test", 603 "createdAt": chrono::Utc::now().to_rfc3339(), 604 } 605 }); 606 client 607 .post(format!( 608 "{}/xrpc/com.atproto.repo.createRecord", 609 base_url().await 610 )) 611 .bearer_auth(&token) 612 .json(&post_payload) 613 .send() 614 .await 615 .expect("Failed to create post"); 616 617 let mut raw_bytes_opt: Option<Vec<u8>> = None; 618 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { 619 loop { 620 let msg = ws_stream.next().await.unwrap().unwrap(); 621 let raw = match msg { 622 tungstenite::Message::Binary(bin) => bin, 623 _ => continue, 624 }; 625 if let Ok((_, f)) = parse_frame(&raw) { 626 if f.repo == did { 627 raw_bytes_opt = Some(raw.to_vec()); 628 break; 629 } 630 } 631 } 632 }) 633 .await; 634 assert!(timeout.is_ok(), "Timed out waiting for event for our DID"); 635 let raw_bytes = raw_bytes_opt.expect("No matching frame found"); 636 637 println!("\n=== Raw CBOR Analysis ===\n"); 638 println!("Total frame size: {} bytes", raw_bytes.len()); 639 640 fn bytes_to_hex(bytes: &[u8]) -> String { 641 bytes 642 .iter() 643 .map(|b| format!("{:02x}", b)) 644 .collect::<Vec<_>>() 645 .join("") 646 } 647 648 println!( 649 "First 64 bytes (hex): {}", 650 bytes_to_hex(&raw_bytes[..64.min(raw_bytes.len())]) 651 ); 652 653 let header_end = find_cbor_map_end(&raw_bytes).expect("Failed to find header end"); 654 655 println!("\nHeader section: {} bytes", header_end); 656 println!("Header hex: {}", bytes_to_hex(&raw_bytes[..header_end])); 657 658 println!("\nPayload section: {} bytes", raw_bytes.len() - header_end); 659 660 println!("\n=== Analysis Complete ===\n"); 661 662 ws_stream.send(tungstenite::Message::Close(None)).await.ok(); 663}