this repo has no description
1mod common;
2
3use cid::Cid;
4use common::*;
5use futures::{SinkExt, stream::StreamExt};
6use iroh_car::CarReader;
7use reqwest::StatusCode;
8use serde::{Deserialize, Serialize};
9use serde_json::{Value, json};
10use std::io::Cursor;
11use tokio_tungstenite::{connect_async, tungstenite};
12
13#[derive(Debug, Deserialize, Serialize)]
14struct FrameHeader {
15 op: i64,
16 t: String,
17}
18
19#[derive(Debug, Deserialize)]
20struct CommitFrame {
21 seq: i64,
22 #[serde(default)]
23 rebase: bool,
24 #[serde(rename = "tooBig", default)]
25 too_big: bool,
26 repo: String,
27 commit: Cid,
28 rev: String,
29 since: Option<String>,
30 #[serde(with = "serde_bytes")]
31 blocks: Vec<u8>,
32 ops: Vec<RepoOp>,
33 #[serde(default)]
34 blobs: Vec<Cid>,
35 time: String,
36 #[serde(rename = "prevData")]
37 prev_data: Option<Cid>,
38}
39
40#[derive(Debug, Deserialize)]
41struct RepoOp {
42 action: String,
43 path: String,
44 cid: Option<Cid>,
45 prev: Option<Cid>,
46}
47
48fn find_cbor_map_end(bytes: &[u8]) -> Result<usize, String> {
49 let mut pos = 0;
50
51 fn read_uint(bytes: &[u8], pos: &mut usize, additional: u8) -> Result<u64, String> {
52 match additional {
53 0..=23 => Ok(additional as u64),
54 24 => {
55 if *pos >= bytes.len() {
56 return Err("Unexpected end".into());
57 }
58 let val = bytes[*pos] as u64;
59 *pos += 1;
60 Ok(val)
61 }
62 25 => {
63 if *pos + 2 > bytes.len() {
64 return Err("Unexpected end".into());
65 }
66 let val = u16::from_be_bytes([bytes[*pos], bytes[*pos + 1]]) as u64;
67 *pos += 2;
68 Ok(val)
69 }
70 26 => {
71 if *pos + 4 > bytes.len() {
72 return Err("Unexpected end".into());
73 }
74 let val = u32::from_be_bytes([
75 bytes[*pos],
76 bytes[*pos + 1],
77 bytes[*pos + 2],
78 bytes[*pos + 3],
79 ]) as u64;
80 *pos += 4;
81 Ok(val)
82 }
83 27 => {
84 if *pos + 8 > bytes.len() {
85 return Err("Unexpected end".into());
86 }
87 let val = u64::from_be_bytes([
88 bytes[*pos],
89 bytes[*pos + 1],
90 bytes[*pos + 2],
91 bytes[*pos + 3],
92 bytes[*pos + 4],
93 bytes[*pos + 5],
94 bytes[*pos + 6],
95 bytes[*pos + 7],
96 ]);
97 *pos += 8;
98 Ok(val)
99 }
100 _ => Err(format!("Invalid additional info: {}", additional)),
101 }
102 }
103
104 fn skip_value(bytes: &[u8], pos: &mut usize) -> Result<(), String> {
105 if *pos >= bytes.len() {
106 return Err("Unexpected end".into());
107 }
108 let initial = bytes[*pos];
109 *pos += 1;
110 let major = initial >> 5;
111 let additional = initial & 0x1f;
112
113 match major {
114 0 | 1 => {
115 read_uint(bytes, pos, additional)?;
116 Ok(())
117 }
118 2 | 3 => {
119 let len = read_uint(bytes, pos, additional)? as usize;
120 *pos += len;
121 Ok(())
122 }
123 4 => {
124 let len = read_uint(bytes, pos, additional)?;
125 for _ in 0..len {
126 skip_value(bytes, pos)?;
127 }
128 Ok(())
129 }
130 5 => {
131 let len = read_uint(bytes, pos, additional)?;
132 for _ in 0..len {
133 skip_value(bytes, pos)?;
134 skip_value(bytes, pos)?;
135 }
136 Ok(())
137 }
138 6 => {
139 read_uint(bytes, pos, additional)?;
140 skip_value(bytes, pos)
141 }
142 7 => Ok(()),
143 _ => Err(format!("Unknown major type: {}", major)),
144 }
145 }
146
147 skip_value(bytes, &mut pos)?;
148 Ok(pos)
149}
150
151fn parse_frame(bytes: &[u8]) -> Result<(FrameHeader, CommitFrame), String> {
152 let header_len = find_cbor_map_end(bytes)?;
153 let header: FrameHeader = serde_ipld_dagcbor::from_slice(&bytes[..header_len])
154 .map_err(|e| format!("Failed to parse header: {:?}", e))?;
155
156 if header.t != "#commit" {
157 return Err(format!("Not a commit frame: {}", header.t));
158 }
159
160 let remaining = &bytes[header_len..];
161 let frame: CommitFrame = serde_ipld_dagcbor::from_slice(remaining)
162 .map_err(|e| format!("Failed to parse commit frame: {:?}", e))?;
163
164 Ok((header, frame))
165}
166
167fn is_valid_tid(s: &str) -> bool {
168 s.len() == 13 && s.chars().all(|c| c.is_alphanumeric())
169}
170
171fn is_valid_time_format(s: &str) -> bool {
172 if !s.ends_with('Z') {
173 return false;
174 }
175 let parts: Vec<&str> = s.split('T').collect();
176 if parts.len() != 2 {
177 return false;
178 }
179 let time_part = parts[1].trim_end_matches('Z');
180 let time_parts: Vec<&str> = time_part.split(':').collect();
181 if time_parts.len() != 3 {
182 return false;
183 }
184 let seconds_part = time_parts[2];
185 if let Some(dot_pos) = seconds_part.find('.') {
186 let millis = &seconds_part[dot_pos + 1..];
187 millis.len() == 3
188 } else {
189 false
190 }
191}
192
193#[tokio::test]
194async fn test_firehose_frame_structure() {
195 let client = client();
196 let (token, did) = create_account_and_login(&client).await;
197
198 let url = format!(
199 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos",
200 app_port()
201 );
202 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect");
203
204 let post_text = "Testing firehose validation!";
205 let post_payload = json!({
206 "repo": did,
207 "collection": "app.bsky.feed.post",
208 "record": {
209 "$type": "app.bsky.feed.post",
210 "text": post_text,
211 "createdAt": chrono::Utc::now().to_rfc3339(),
212 }
213 });
214 let res = client
215 .post(format!(
216 "{}/xrpc/com.atproto.repo.createRecord",
217 base_url().await
218 ))
219 .bearer_auth(&token)
220 .json(&post_payload)
221 .send()
222 .await
223 .expect("Failed to create post");
224 assert_eq!(res.status(), StatusCode::OK);
225
226 let mut frame_opt: Option<(FrameHeader, CommitFrame)> = None;
227 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async {
228 loop {
229 let msg = ws_stream.next().await.unwrap().unwrap();
230 let raw_bytes = match msg {
231 tungstenite::Message::Binary(bin) => bin,
232 _ => continue,
233 };
234 if let Ok((h, f)) = parse_frame(&raw_bytes) {
235 if f.repo == did {
236 frame_opt = Some((h, f));
237 break;
238 }
239 }
240 }
241 })
242 .await;
243 assert!(timeout.is_ok(), "Timed out waiting for event for our DID");
244 let (header, frame) = frame_opt.expect("No matching frame found");
245
246 println!("\n=== Frame Structure Validation ===\n");
247
248 println!("Header:");
249 println!(" op: {} (expected: 1)", header.op);
250 println!(" t: {} (expected: #commit)", header.t);
251 assert_eq!(header.op, 1, "Header op should be 1");
252 assert_eq!(header.t, "#commit", "Header t should be #commit");
253
254 println!("\nCommitFrame fields:");
255 println!(" seq: {}", frame.seq);
256 println!(" rebase: {}", frame.rebase);
257 println!(" tooBig: {}", frame.too_big);
258 println!(" repo: {}", frame.repo);
259 println!(" commit: {}", frame.commit);
260 println!(
261 " rev: {} (valid TID: {})",
262 frame.rev,
263 is_valid_tid(&frame.rev)
264 );
265 println!(" since: {:?}", frame.since);
266 println!(" blocks length: {} bytes", frame.blocks.len());
267 println!(" ops count: {}", frame.ops.len());
268 println!(" blobs count: {}", frame.blobs.len());
269 println!(
270 " time: {} (valid format: {})",
271 frame.time,
272 is_valid_time_format(&frame.time)
273 );
274 println!(
275 " prevData: {:?} (IMPORTANT - should have value for updates)",
276 frame.prev_data
277 );
278
279 assert_eq!(frame.repo, did, "Frame repo should match DID");
280 assert!(
281 is_valid_tid(&frame.rev),
282 "Rev should be valid TID format, got: {}",
283 frame.rev
284 );
285 assert!(!frame.blocks.is_empty(), "Blocks should not be empty");
286 assert!(
287 is_valid_time_format(&frame.time),
288 "Time should be ISO 8601 with milliseconds and Z suffix"
289 );
290
291 println!("\nOps validation:");
292 for (i, op) in frame.ops.iter().enumerate() {
293 println!(" Op {}:", i);
294 println!(" action: {}", op.action);
295 println!(" path: {}", op.path);
296 println!(" cid: {:?}", op.cid);
297 println!(
298 " prev: {:?} (should be Some for updates/deletes)",
299 op.prev
300 );
301
302 assert!(
303 ["create", "update", "delete"].contains(&op.action.as_str()),
304 "Invalid action: {}",
305 op.action
306 );
307 assert!(
308 op.path.contains('/'),
309 "Path should contain collection/rkey: {}",
310 op.path
311 );
312
313 if op.action == "create" {
314 assert!(op.cid.is_some(), "Create op should have cid");
315 }
316 }
317
318 println!("\nCAR validation:");
319 let mut car_reader = CarReader::new(Cursor::new(&frame.blocks)).await.unwrap();
320 let car_header = car_reader.header().clone();
321 println!(" CAR roots: {:?}", car_header.roots());
322
323 assert!(
324 !car_header.roots().is_empty(),
325 "CAR should have at least one root"
326 );
327 assert_eq!(
328 car_header.roots()[0],
329 frame.commit,
330 "First CAR root should be commit CID"
331 );
332
333 let mut block_cids: Vec<Cid> = Vec::new();
334 while let Ok(Some((cid, _))) = car_reader.next_block().await {
335 block_cids.push(cid);
336 }
337 println!(" CAR blocks: {} total", block_cids.len());
338 for cid in &block_cids {
339 println!(" - {}", cid);
340 }
341
342 assert!(
343 block_cids.contains(&frame.commit),
344 "CAR should contain commit block"
345 );
346
347 for op in &frame.ops {
348 if let Some(ref cid) = op.cid {
349 assert!(
350 block_cids.contains(cid),
351 "CAR should contain op's record block: {}",
352 cid
353 );
354 }
355 }
356
357 println!("\n=== Validation Complete ===\n");
358
359 ws_stream.send(tungstenite::Message::Close(None)).await.ok();
360}
361
362#[tokio::test]
363async fn test_firehose_update_has_prev_field() {
364 let client = client();
365 let (token, did) = create_account_and_login(&client).await;
366
367 let url = format!(
368 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos",
369 app_port()
370 );
371 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect");
372
373 let profile_payload = json!({
374 "repo": did,
375 "collection": "app.bsky.actor.profile",
376 "rkey": "self",
377 "record": {
378 "$type": "app.bsky.actor.profile",
379 "displayName": "Test User v1",
380 }
381 });
382 let res = client
383 .post(format!(
384 "{}/xrpc/com.atproto.repo.putRecord",
385 base_url().await
386 ))
387 .bearer_auth(&token)
388 .json(&profile_payload)
389 .send()
390 .await
391 .expect("Failed to create profile");
392 assert_eq!(res.status(), StatusCode::OK);
393 let first_profile: Value = res.json().await.unwrap();
394 let first_cid = first_profile["cid"].as_str().unwrap();
395
396 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async {
397 loop {
398 let msg = ws_stream.next().await.unwrap().unwrap();
399 let raw_bytes = match msg {
400 tungstenite::Message::Binary(bin) => bin,
401 _ => continue,
402 };
403 if let Ok((_, f)) = parse_frame(&raw_bytes) {
404 if f.repo == did {
405 break;
406 }
407 }
408 }
409 })
410 .await;
411 assert!(timeout.is_ok(), "Timed out waiting for first commit");
412
413 let update_payload = json!({
414 "repo": did,
415 "collection": "app.bsky.actor.profile",
416 "rkey": "self",
417 "record": {
418 "$type": "app.bsky.actor.profile",
419 "displayName": "Test User v2",
420 }
421 });
422 let res = client
423 .post(format!(
424 "{}/xrpc/com.atproto.repo.putRecord",
425 base_url().await
426 ))
427 .bearer_auth(&token)
428 .json(&update_payload)
429 .send()
430 .await
431 .expect("Failed to update profile");
432 assert_eq!(res.status(), StatusCode::OK);
433
434 let mut frame_opt: Option<CommitFrame> = None;
435 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async {
436 loop {
437 let msg = ws_stream.next().await.unwrap().unwrap();
438 let raw_bytes = match msg {
439 tungstenite::Message::Binary(bin) => bin,
440 _ => continue,
441 };
442 if let Ok((_, f)) = parse_frame(&raw_bytes) {
443 if f.repo == did {
444 frame_opt = Some(f);
445 break;
446 }
447 }
448 }
449 })
450 .await;
451 assert!(timeout.is_ok(), "Timed out waiting for update commit");
452 let frame = frame_opt.expect("No matching frame found");
453
454 println!("\n=== Update Operation Validation ===\n");
455 println!("First profile CID: {}", first_cid);
456 println!("Frame prevData: {:?}", frame.prev_data);
457
458 for op in &frame.ops {
459 println!(
460 "Op: action={}, path={}, cid={:?}, prev={:?}",
461 op.action, op.path, op.cid, op.prev
462 );
463
464 if op.action == "update" && op.path.contains("app.bsky.actor.profile") {
465 assert!(
466 op.prev.is_some(),
467 "Update operation should have 'prev' field with old CID! Got: {:?}",
468 op.prev
469 );
470 println!(" ✓ Update op has prev field: {:?}", op.prev);
471 }
472 }
473
474 println!("\n=== Validation Complete ===\n");
475
476 ws_stream.send(tungstenite::Message::Close(None)).await.ok();
477}
478
479#[tokio::test]
480async fn test_firehose_commit_has_prev_data() {
481 let client = client();
482 let (token, did) = create_account_and_login(&client).await;
483
484 let url = format!(
485 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos",
486 app_port()
487 );
488 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect");
489
490 let post_payload = json!({
491 "repo": did,
492 "collection": "app.bsky.feed.post",
493 "record": {
494 "$type": "app.bsky.feed.post",
495 "text": "First post",
496 "createdAt": chrono::Utc::now().to_rfc3339(),
497 }
498 });
499 client
500 .post(format!(
501 "{}/xrpc/com.atproto.repo.createRecord",
502 base_url().await
503 ))
504 .bearer_auth(&token)
505 .json(&post_payload)
506 .send()
507 .await
508 .expect("Failed to create first post");
509
510 let mut first_frame_opt: Option<CommitFrame> = None;
511 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async {
512 loop {
513 let msg = ws_stream.next().await.unwrap().unwrap();
514 let raw_bytes = match msg {
515 tungstenite::Message::Binary(bin) => bin,
516 _ => continue,
517 };
518 if let Ok((_, f)) = parse_frame(&raw_bytes) {
519 if f.repo == did {
520 first_frame_opt = Some(f);
521 break;
522 }
523 }
524 }
525 })
526 .await;
527 assert!(timeout.is_ok(), "Timed out waiting for first commit");
528 let first_frame = first_frame_opt.expect("No first frame found");
529
530 println!("\n=== First Commit ===");
531 println!(
532 " prevData: {:?} (first commit may be None)",
533 first_frame.prev_data
534 );
535 println!(
536 " since: {:?} (first commit should be None)",
537 first_frame.since
538 );
539
540 let post_payload2 = json!({
541 "repo": did,
542 "collection": "app.bsky.feed.post",
543 "record": {
544 "$type": "app.bsky.feed.post",
545 "text": "Second post",
546 "createdAt": chrono::Utc::now().to_rfc3339(),
547 }
548 });
549 client
550 .post(format!(
551 "{}/xrpc/com.atproto.repo.createRecord",
552 base_url().await
553 ))
554 .bearer_auth(&token)
555 .json(&post_payload2)
556 .send()
557 .await
558 .expect("Failed to create second post");
559
560 let mut second_frame_opt: Option<CommitFrame> = None;
561 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async {
562 loop {
563 let msg = ws_stream.next().await.unwrap().unwrap();
564 let raw_bytes = match msg {
565 tungstenite::Message::Binary(bin) => bin,
566 _ => continue,
567 };
568 if let Ok((_, f)) = parse_frame(&raw_bytes) {
569 if f.repo == did {
570 second_frame_opt = Some(f);
571 break;
572 }
573 }
574 }
575 })
576 .await;
577 assert!(timeout.is_ok(), "Timed out waiting for second commit");
578 let second_frame = second_frame_opt.expect("No second frame found");
579
580 println!("\n=== Second Commit ===");
581 println!(
582 " prevData: {:?} (should have value - MST root CID)",
583 second_frame.prev_data
584 );
585 println!(
586 " since: {:?} (should have value - previous rev)",
587 second_frame.since
588 );
589
590 assert!(
591 second_frame.since.is_some(),
592 "Second commit should have 'since' field pointing to first commit rev"
593 );
594
595 println!("\n=== Validation Complete ===\n");
596
597 ws_stream.send(tungstenite::Message::Close(None)).await.ok();
598}
599
600#[tokio::test]
601async fn test_compare_raw_cbor_encoding() {
602 let client = client();
603 let (token, did) = create_account_and_login(&client).await;
604
605 let url = format!(
606 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos",
607 app_port()
608 );
609 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect");
610
611 let post_payload = json!({
612 "repo": did,
613 "collection": "app.bsky.feed.post",
614 "record": {
615 "$type": "app.bsky.feed.post",
616 "text": "CBOR encoding test",
617 "createdAt": chrono::Utc::now().to_rfc3339(),
618 }
619 });
620 client
621 .post(format!(
622 "{}/xrpc/com.atproto.repo.createRecord",
623 base_url().await
624 ))
625 .bearer_auth(&token)
626 .json(&post_payload)
627 .send()
628 .await
629 .expect("Failed to create post");
630
631 let mut raw_bytes_opt: Option<Vec<u8>> = None;
632 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async {
633 loop {
634 let msg = ws_stream.next().await.unwrap().unwrap();
635 let raw = match msg {
636 tungstenite::Message::Binary(bin) => bin,
637 _ => continue,
638 };
639 if let Ok((_, f)) = parse_frame(&raw) {
640 if f.repo == did {
641 raw_bytes_opt = Some(raw.to_vec());
642 break;
643 }
644 }
645 }
646 })
647 .await;
648 assert!(timeout.is_ok(), "Timed out waiting for event for our DID");
649 let raw_bytes = raw_bytes_opt.expect("No matching frame found");
650
651 println!("\n=== Raw CBOR Analysis ===\n");
652 println!("Total frame size: {} bytes", raw_bytes.len());
653
654 fn bytes_to_hex(bytes: &[u8]) -> String {
655 bytes
656 .iter()
657 .map(|b| format!("{:02x}", b))
658 .collect::<Vec<_>>()
659 .join("")
660 }
661
662 println!(
663 "First 64 bytes (hex): {}",
664 bytes_to_hex(&raw_bytes[..64.min(raw_bytes.len())])
665 );
666
667 let header_end = find_cbor_map_end(&raw_bytes).expect("Failed to find header end");
668
669 println!("\nHeader section: {} bytes", header_end);
670 println!("Header hex: {}", bytes_to_hex(&raw_bytes[..header_end]));
671
672 println!("\nPayload section: {} bytes", raw_bytes.len() - header_end);
673
674 println!("\n=== Analysis Complete ===\n");
675
676 ws_stream.send(tungstenite::Message::Close(None)).await.ok();
677}