this repo has no description
1mod common; 2 3use common::*; 4use cid::Cid; 5use futures::{stream::StreamExt, SinkExt}; 6use iroh_car::CarReader; 7use reqwest::StatusCode; 8use serde::{Deserialize, Serialize}; 9use serde_json::{json, Value}; 10use std::io::Cursor; 11use tokio_tungstenite::{connect_async, tungstenite}; 12 13#[derive(Debug, Deserialize, Serialize)] 14struct FrameHeader { 15 op: i64, 16 t: String, 17} 18 19#[derive(Debug, Deserialize)] 20struct CommitFrame { 21 seq: i64, 22 #[serde(default)] 23 rebase: bool, 24 #[serde(rename = "tooBig", default)] 25 too_big: bool, 26 repo: String, 27 commit: Cid, 28 rev: String, 29 since: Option<String>, 30 #[serde(with = "serde_bytes")] 31 blocks: Vec<u8>, 32 ops: Vec<RepoOp>, 33 #[serde(default)] 34 blobs: Vec<Cid>, 35 time: String, 36 #[serde(rename = "prevData")] 37 prev_data: Option<Cid>, 38} 39 40#[derive(Debug, Deserialize)] 41struct RepoOp { 42 action: String, 43 path: String, 44 cid: Option<Cid>, 45 prev: Option<Cid>, 46} 47 48fn find_cbor_map_end(bytes: &[u8]) -> Result<usize, String> { 49 let mut pos = 0; 50 51 fn read_uint(bytes: &[u8], pos: &mut usize, additional: u8) -> Result<u64, String> { 52 match additional { 53 0..=23 => Ok(additional as u64), 54 24 => { 55 if *pos >= bytes.len() { return Err("Unexpected end".into()); } 56 let val = bytes[*pos] as u64; 57 *pos += 1; 58 Ok(val) 59 } 60 25 => { 61 if *pos + 2 > bytes.len() { return Err("Unexpected end".into()); } 62 let val = u16::from_be_bytes([bytes[*pos], bytes[*pos + 1]]) as u64; 63 *pos += 2; 64 Ok(val) 65 } 66 26 => { 67 if *pos + 4 > bytes.len() { return Err("Unexpected end".into()); } 68 let val = u32::from_be_bytes([bytes[*pos], bytes[*pos + 1], bytes[*pos + 2], bytes[*pos + 3]]) as u64; 69 *pos += 4; 70 Ok(val) 71 } 72 27 => { 73 if *pos + 8 > bytes.len() { return Err("Unexpected end".into()); } 74 let val = u64::from_be_bytes([bytes[*pos], bytes[*pos + 1], bytes[*pos + 2], bytes[*pos + 3], bytes[*pos + 4], bytes[*pos + 5], bytes[*pos + 6], bytes[*pos + 7]]); 75 *pos += 8; 76 Ok(val) 77 } 78 _ => Err(format!("Invalid additional info: {}", additional)), 79 } 80 } 81 82 fn skip_value(bytes: &[u8], pos: &mut usize) -> Result<(), String> { 83 if *pos >= bytes.len() { return Err("Unexpected end".into()); } 84 let initial = bytes[*pos]; 85 *pos += 1; 86 let major = initial >> 5; 87 let additional = initial & 0x1f; 88 89 match major { 90 0 | 1 => { read_uint(bytes, pos, additional)?; Ok(()) } 91 2 | 3 => { 92 let len = read_uint(bytes, pos, additional)? as usize; 93 *pos += len; 94 Ok(()) 95 } 96 4 => { 97 let len = read_uint(bytes, pos, additional)?; 98 for _ in 0..len { skip_value(bytes, pos)?; } 99 Ok(()) 100 } 101 5 => { 102 let len = read_uint(bytes, pos, additional)?; 103 for _ in 0..len { 104 skip_value(bytes, pos)?; 105 skip_value(bytes, pos)?; 106 } 107 Ok(()) 108 } 109 6 => { 110 read_uint(bytes, pos, additional)?; 111 skip_value(bytes, pos) 112 } 113 7 => Ok(()), 114 _ => Err(format!("Unknown major type: {}", major)), 115 } 116 } 117 118 skip_value(bytes, &mut pos)?; 119 Ok(pos) 120} 121 122fn parse_frame(bytes: &[u8]) -> Result<(FrameHeader, CommitFrame), String> { 123 let header_len = find_cbor_map_end(bytes)?; 124 let header: FrameHeader = serde_ipld_dagcbor::from_slice(&bytes[..header_len]) 125 .map_err(|e| format!("Failed to parse header: {:?}", e))?; 126 127 if header.t != "#commit" { 128 return Err(format!("Not a commit frame: {}", header.t)); 129 } 130 131 let remaining = &bytes[header_len..]; 132 let frame: CommitFrame = serde_ipld_dagcbor::from_slice(remaining) 133 .map_err(|e| format!("Failed to parse commit frame: {:?}", e))?; 134 135 Ok((header, frame)) 136} 137 138fn is_valid_tid(s: &str) -> bool { 139 s.len() == 13 && s.chars().all(|c| c.is_alphanumeric()) 140} 141 142fn is_valid_time_format(s: &str) -> bool { 143 if !s.ends_with('Z') { 144 return false; 145 } 146 let parts: Vec<&str> = s.split('T').collect(); 147 if parts.len() != 2 { 148 return false; 149 } 150 let time_part = parts[1].trim_end_matches('Z'); 151 let time_parts: Vec<&str> = time_part.split(':').collect(); 152 if time_parts.len() != 3 { 153 return false; 154 } 155 let seconds_part = time_parts[2]; 156 if let Some(dot_pos) = seconds_part.find('.') { 157 let millis = &seconds_part[dot_pos + 1..]; 158 millis.len() == 3 159 } else { 160 false 161 } 162} 163 164#[tokio::test] 165async fn test_firehose_frame_structure() { 166 let client = client(); 167 let (token, did) = create_account_and_login(&client).await; 168 169 let url = format!( 170 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos", 171 app_port() 172 ); 173 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 174 175 let post_text = "Testing firehose validation!"; 176 let post_payload = json!({ 177 "repo": did, 178 "collection": "app.bsky.feed.post", 179 "record": { 180 "$type": "app.bsky.feed.post", 181 "text": post_text, 182 "createdAt": chrono::Utc::now().to_rfc3339(), 183 } 184 }); 185 let res = client 186 .post(format!( 187 "{}/xrpc/com.atproto.repo.createRecord", 188 base_url().await 189 )) 190 .bearer_auth(&token) 191 .json(&post_payload) 192 .send() 193 .await 194 .expect("Failed to create post"); 195 assert_eq!(res.status(), StatusCode::OK); 196 197 let mut frame_opt: Option<(FrameHeader, CommitFrame)> = None; 198 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { 199 loop { 200 let msg = ws_stream.next().await.unwrap().unwrap(); 201 let raw_bytes = match msg { 202 tungstenite::Message::Binary(bin) => bin, 203 _ => continue, 204 }; 205 if let Ok((h, f)) = parse_frame(&raw_bytes) { 206 if f.repo == did { 207 frame_opt = Some((h, f)); 208 break; 209 } 210 } 211 } 212 }) 213 .await; 214 assert!(timeout.is_ok(), "Timed out waiting for event for our DID"); 215 let (header, frame) = frame_opt.expect("No matching frame found"); 216 217 println!("\n=== Frame Structure Validation ===\n"); 218 219 println!("Header:"); 220 println!(" op: {} (expected: 1)", header.op); 221 println!(" t: {} (expected: #commit)", header.t); 222 assert_eq!(header.op, 1, "Header op should be 1"); 223 assert_eq!(header.t, "#commit", "Header t should be #commit"); 224 225 println!("\nCommitFrame fields:"); 226 println!(" seq: {}", frame.seq); 227 println!(" rebase: {}", frame.rebase); 228 println!(" tooBig: {}", frame.too_big); 229 println!(" repo: {}", frame.repo); 230 println!(" commit: {}", frame.commit); 231 println!(" rev: {} (valid TID: {})", frame.rev, is_valid_tid(&frame.rev)); 232 println!(" since: {:?}", frame.since); 233 println!(" blocks length: {} bytes", frame.blocks.len()); 234 println!(" ops count: {}", frame.ops.len()); 235 println!(" blobs count: {}", frame.blobs.len()); 236 println!(" time: {} (valid format: {})", frame.time, is_valid_time_format(&frame.time)); 237 println!(" prevData: {:?} (IMPORTANT - should have value for updates)", frame.prev_data); 238 239 assert_eq!(frame.repo, did, "Frame repo should match DID"); 240 assert!(is_valid_tid(&frame.rev), "Rev should be valid TID format, got: {}", frame.rev); 241 assert!(!frame.blocks.is_empty(), "Blocks should not be empty"); 242 assert!(is_valid_time_format(&frame.time), "Time should be ISO 8601 with milliseconds and Z suffix"); 243 244 println!("\nOps validation:"); 245 for (i, op) in frame.ops.iter().enumerate() { 246 println!(" Op {}:", i); 247 println!(" action: {}", op.action); 248 println!(" path: {}", op.path); 249 println!(" cid: {:?}", op.cid); 250 println!(" prev: {:?} (should be Some for updates/deletes)", op.prev); 251 252 assert!( 253 ["create", "update", "delete"].contains(&op.action.as_str()), 254 "Invalid action: {}", op.action 255 ); 256 assert!(op.path.contains('/'), "Path should contain collection/rkey: {}", op.path); 257 258 if op.action == "create" { 259 assert!(op.cid.is_some(), "Create op should have cid"); 260 } 261 } 262 263 println!("\nCAR validation:"); 264 let mut car_reader = CarReader::new(Cursor::new(&frame.blocks)).await.unwrap(); 265 let car_header = car_reader.header().clone(); 266 println!(" CAR roots: {:?}", car_header.roots()); 267 268 assert!( 269 !car_header.roots().is_empty(), 270 "CAR should have at least one root" 271 ); 272 assert_eq!( 273 car_header.roots()[0], frame.commit, 274 "First CAR root should be commit CID" 275 ); 276 277 let mut block_cids: Vec<Cid> = Vec::new(); 278 while let Ok(Some((cid, _))) = car_reader.next_block().await { 279 block_cids.push(cid); 280 } 281 println!(" CAR blocks: {} total", block_cids.len()); 282 for cid in &block_cids { 283 println!(" - {}", cid); 284 } 285 286 assert!( 287 block_cids.contains(&frame.commit), 288 "CAR should contain commit block" 289 ); 290 291 for op in &frame.ops { 292 if let Some(ref cid) = op.cid { 293 assert!( 294 block_cids.contains(cid), 295 "CAR should contain op's record block: {}", cid 296 ); 297 } 298 } 299 300 println!("\n=== Validation Complete ===\n"); 301 302 ws_stream 303 .send(tungstenite::Message::Close(None)) 304 .await 305 .ok(); 306} 307 308#[tokio::test] 309async fn test_firehose_update_has_prev_field() { 310 let client = client(); 311 let (token, did) = create_account_and_login(&client).await; 312 313 let url = format!( 314 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos", 315 app_port() 316 ); 317 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 318 319 let profile_payload = json!({ 320 "repo": did, 321 "collection": "app.bsky.actor.profile", 322 "rkey": "self", 323 "record": { 324 "$type": "app.bsky.actor.profile", 325 "displayName": "Test User v1", 326 } 327 }); 328 let res = client 329 .post(format!( 330 "{}/xrpc/com.atproto.repo.putRecord", 331 base_url().await 332 )) 333 .bearer_auth(&token) 334 .json(&profile_payload) 335 .send() 336 .await 337 .expect("Failed to create profile"); 338 assert_eq!(res.status(), StatusCode::OK); 339 let first_profile: Value = res.json().await.unwrap(); 340 let first_cid = first_profile["cid"].as_str().unwrap(); 341 342 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { 343 loop { 344 let msg = ws_stream.next().await.unwrap().unwrap(); 345 let raw_bytes = match msg { 346 tungstenite::Message::Binary(bin) => bin, 347 _ => continue, 348 }; 349 if let Ok((_, f)) = parse_frame(&raw_bytes) { 350 if f.repo == did { 351 break; 352 } 353 } 354 } 355 }) 356 .await; 357 assert!(timeout.is_ok(), "Timed out waiting for first commit"); 358 359 let update_payload = json!({ 360 "repo": did, 361 "collection": "app.bsky.actor.profile", 362 "rkey": "self", 363 "record": { 364 "$type": "app.bsky.actor.profile", 365 "displayName": "Test User v2", 366 } 367 }); 368 let res = client 369 .post(format!( 370 "{}/xrpc/com.atproto.repo.putRecord", 371 base_url().await 372 )) 373 .bearer_auth(&token) 374 .json(&update_payload) 375 .send() 376 .await 377 .expect("Failed to update profile"); 378 assert_eq!(res.status(), StatusCode::OK); 379 380 let mut frame_opt: Option<CommitFrame> = None; 381 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { 382 loop { 383 let msg = ws_stream.next().await.unwrap().unwrap(); 384 let raw_bytes = match msg { 385 tungstenite::Message::Binary(bin) => bin, 386 _ => continue, 387 }; 388 if let Ok((_, f)) = parse_frame(&raw_bytes) { 389 if f.repo == did { 390 frame_opt = Some(f); 391 break; 392 } 393 } 394 } 395 }) 396 .await; 397 assert!(timeout.is_ok(), "Timed out waiting for update commit"); 398 let frame = frame_opt.expect("No matching frame found"); 399 400 println!("\n=== Update Operation Validation ===\n"); 401 println!("First profile CID: {}", first_cid); 402 println!("Frame prevData: {:?}", frame.prev_data); 403 404 for op in &frame.ops { 405 println!("Op: action={}, path={}, cid={:?}, prev={:?}", 406 op.action, op.path, op.cid, op.prev); 407 408 if op.action == "update" && op.path.contains("app.bsky.actor.profile") { 409 assert!( 410 op.prev.is_some(), 411 "Update operation should have 'prev' field with old CID! Got: {:?}", 412 op.prev 413 ); 414 println!(" ✓ Update op has prev field: {:?}", op.prev); 415 } 416 } 417 418 println!("\n=== Validation Complete ===\n"); 419 420 ws_stream 421 .send(tungstenite::Message::Close(None)) 422 .await 423 .ok(); 424} 425 426#[tokio::test] 427async fn test_firehose_commit_has_prev_data() { 428 let client = client(); 429 let (token, did) = create_account_and_login(&client).await; 430 431 let url = format!( 432 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos", 433 app_port() 434 ); 435 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 436 437 let post_payload = json!({ 438 "repo": did, 439 "collection": "app.bsky.feed.post", 440 "record": { 441 "$type": "app.bsky.feed.post", 442 "text": "First post", 443 "createdAt": chrono::Utc::now().to_rfc3339(), 444 } 445 }); 446 client 447 .post(format!( 448 "{}/xrpc/com.atproto.repo.createRecord", 449 base_url().await 450 )) 451 .bearer_auth(&token) 452 .json(&post_payload) 453 .send() 454 .await 455 .expect("Failed to create first post"); 456 457 let mut first_frame_opt: Option<CommitFrame> = None; 458 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { 459 loop { 460 let msg = ws_stream.next().await.unwrap().unwrap(); 461 let raw_bytes = match msg { 462 tungstenite::Message::Binary(bin) => bin, 463 _ => continue, 464 }; 465 if let Ok((_, f)) = parse_frame(&raw_bytes) { 466 if f.repo == did { 467 first_frame_opt = Some(f); 468 break; 469 } 470 } 471 } 472 }) 473 .await; 474 assert!(timeout.is_ok(), "Timed out waiting for first commit"); 475 let first_frame = first_frame_opt.expect("No first frame found"); 476 477 println!("\n=== First Commit ==="); 478 println!(" prevData: {:?} (first commit may be None)", first_frame.prev_data); 479 println!(" since: {:?} (first commit should be None)", first_frame.since); 480 481 let post_payload2 = json!({ 482 "repo": did, 483 "collection": "app.bsky.feed.post", 484 "record": { 485 "$type": "app.bsky.feed.post", 486 "text": "Second post", 487 "createdAt": chrono::Utc::now().to_rfc3339(), 488 } 489 }); 490 client 491 .post(format!( 492 "{}/xrpc/com.atproto.repo.createRecord", 493 base_url().await 494 )) 495 .bearer_auth(&token) 496 .json(&post_payload2) 497 .send() 498 .await 499 .expect("Failed to create second post"); 500 501 let mut second_frame_opt: Option<CommitFrame> = None; 502 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { 503 loop { 504 let msg = ws_stream.next().await.unwrap().unwrap(); 505 let raw_bytes = match msg { 506 tungstenite::Message::Binary(bin) => bin, 507 _ => continue, 508 }; 509 if let Ok((_, f)) = parse_frame(&raw_bytes) { 510 if f.repo == did { 511 second_frame_opt = Some(f); 512 break; 513 } 514 } 515 } 516 }) 517 .await; 518 assert!(timeout.is_ok(), "Timed out waiting for second commit"); 519 let second_frame = second_frame_opt.expect("No second frame found"); 520 521 println!("\n=== Second Commit ==="); 522 println!(" prevData: {:?} (should have value - MST root CID)", second_frame.prev_data); 523 println!(" since: {:?} (should have value - previous rev)", second_frame.since); 524 525 assert!( 526 second_frame.since.is_some(), 527 "Second commit should have 'since' field pointing to first commit rev" 528 ); 529 530 println!("\n=== Validation Complete ===\n"); 531 532 ws_stream 533 .send(tungstenite::Message::Close(None)) 534 .await 535 .ok(); 536} 537 538#[tokio::test] 539async fn test_compare_raw_cbor_encoding() { 540 let client = client(); 541 let (token, did) = create_account_and_login(&client).await; 542 543 let url = format!( 544 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos", 545 app_port() 546 ); 547 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 548 549 let post_payload = json!({ 550 "repo": did, 551 "collection": "app.bsky.feed.post", 552 "record": { 553 "$type": "app.bsky.feed.post", 554 "text": "CBOR encoding test", 555 "createdAt": chrono::Utc::now().to_rfc3339(), 556 } 557 }); 558 client 559 .post(format!( 560 "{}/xrpc/com.atproto.repo.createRecord", 561 base_url().await 562 )) 563 .bearer_auth(&token) 564 .json(&post_payload) 565 .send() 566 .await 567 .expect("Failed to create post"); 568 569 let mut raw_bytes_opt: Option<Vec<u8>> = None; 570 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async { 571 loop { 572 let msg = ws_stream.next().await.unwrap().unwrap(); 573 let raw = match msg { 574 tungstenite::Message::Binary(bin) => bin, 575 _ => continue, 576 }; 577 if let Ok((_, f)) = parse_frame(&raw) { 578 if f.repo == did { 579 raw_bytes_opt = Some(raw.to_vec()); 580 break; 581 } 582 } 583 } 584 }) 585 .await; 586 assert!(timeout.is_ok(), "Timed out waiting for event for our DID"); 587 let raw_bytes = raw_bytes_opt.expect("No matching frame found"); 588 589 println!("\n=== Raw CBOR Analysis ===\n"); 590 println!("Total frame size: {} bytes", raw_bytes.len()); 591 592 fn bytes_to_hex(bytes: &[u8]) -> String { 593 bytes.iter().map(|b| format!("{:02x}", b)).collect::<Vec<_>>().join("") 594 } 595 596 println!("First 64 bytes (hex): {}", bytes_to_hex(&raw_bytes[..64.min(raw_bytes.len())])); 597 598 let header_end = find_cbor_map_end(&raw_bytes).expect("Failed to find header end"); 599 600 println!("\nHeader section: {} bytes", header_end); 601 println!("Header hex: {}", bytes_to_hex(&raw_bytes[..header_end])); 602 603 println!("\nPayload section: {} bytes", raw_bytes.len() - header_end); 604 605 println!("\n=== Analysis Complete ===\n"); 606 607 ws_stream 608 .send(tungstenite::Message::Close(None)) 609 .await 610 .ok(); 611}