this repo has no description
1mod common;
2
3use cid::Cid;
4use common::*;
5use futures::{SinkExt, stream::StreamExt};
6use iroh_car::CarReader;
7use reqwest::StatusCode;
8use serde::{Deserialize, Serialize};
9use serde_json::{Value, json};
10use std::io::Cursor;
11use tokio_tungstenite::{connect_async, tungstenite};
12
13#[derive(Debug, Deserialize, Serialize)]
14struct FrameHeader {
15 op: i64,
16 t: String,
17}
18
19#[derive(Debug, Deserialize)]
20struct CommitFrame {
21 seq: i64,
22 #[serde(default)]
23 rebase: bool,
24 #[serde(rename = "tooBig", default)]
25 too_big: bool,
26 repo: String,
27 commit: Cid,
28 rev: String,
29 since: Option<String>,
30 #[serde(with = "serde_bytes")]
31 blocks: Vec<u8>,
32 ops: Vec<RepoOp>,
33 #[serde(default)]
34 blobs: Vec<Cid>,
35 time: String,
36 #[serde(rename = "prevData")]
37 prev_data: Option<Cid>,
38}
39
40#[derive(Debug, Deserialize)]
41struct RepoOp {
42 action: String,
43 path: String,
44 cid: Option<Cid>,
45 prev: Option<Cid>,
46}
47
48fn find_cbor_map_end(bytes: &[u8]) -> Result<usize, String> {
49 let mut pos = 0;
50
51 fn read_uint(bytes: &[u8], pos: &mut usize, additional: u8) -> Result<u64, String> {
52 match additional {
53 0..=23 => Ok(additional as u64),
54 24 => {
55 if *pos >= bytes.len() {
56 return Err("Unexpected end".into());
57 }
58 let val = bytes[*pos] as u64;
59 *pos += 1;
60 Ok(val)
61 }
62 25 => {
63 if *pos + 2 > bytes.len() {
64 return Err("Unexpected end".into());
65 }
66 let val = u16::from_be_bytes([bytes[*pos], bytes[*pos + 1]]) as u64;
67 *pos += 2;
68 Ok(val)
69 }
70 26 => {
71 if *pos + 4 > bytes.len() {
72 return Err("Unexpected end".into());
73 }
74 let val = u32::from_be_bytes([
75 bytes[*pos],
76 bytes[*pos + 1],
77 bytes[*pos + 2],
78 bytes[*pos + 3],
79 ]) as u64;
80 *pos += 4;
81 Ok(val)
82 }
83 27 => {
84 if *pos + 8 > bytes.len() {
85 return Err("Unexpected end".into());
86 }
87 let val = u64::from_be_bytes([
88 bytes[*pos],
89 bytes[*pos + 1],
90 bytes[*pos + 2],
91 bytes[*pos + 3],
92 bytes[*pos + 4],
93 bytes[*pos + 5],
94 bytes[*pos + 6],
95 bytes[*pos + 7],
96 ]);
97 *pos += 8;
98 Ok(val)
99 }
100 _ => Err(format!("Invalid additional info: {}", additional)),
101 }
102 }
103
104 fn skip_value(bytes: &[u8], pos: &mut usize) -> Result<(), String> {
105 if *pos >= bytes.len() {
106 return Err("Unexpected end".into());
107 }
108 let initial = bytes[*pos];
109 *pos += 1;
110 let major = initial >> 5;
111 let additional = initial & 0x1f;
112
113 match major {
114 0 | 1 => {
115 read_uint(bytes, pos, additional)?;
116 Ok(())
117 }
118 2 | 3 => {
119 let len = read_uint(bytes, pos, additional)? as usize;
120 *pos += len;
121 Ok(())
122 }
123 4 => {
124 let len = read_uint(bytes, pos, additional)?;
125 for _ in 0..len {
126 skip_value(bytes, pos)?;
127 }
128 Ok(())
129 }
130 5 => {
131 let len = read_uint(bytes, pos, additional)?;
132 for _ in 0..len {
133 skip_value(bytes, pos)?;
134 skip_value(bytes, pos)?;
135 }
136 Ok(())
137 }
138 6 => {
139 read_uint(bytes, pos, additional)?;
140 skip_value(bytes, pos)
141 }
142 7 => Ok(()),
143 _ => Err(format!("Unknown major type: {}", major)),
144 }
145 }
146
147 skip_value(bytes, &mut pos)?;
148 Ok(pos)
149}
150
151fn parse_frame(bytes: &[u8]) -> Result<(FrameHeader, CommitFrame), String> {
152 let header_len = find_cbor_map_end(bytes)?;
153 let header: FrameHeader = serde_ipld_dagcbor::from_slice(&bytes[..header_len])
154 .map_err(|e| format!("Failed to parse header: {:?}", e))?;
155
156 if header.t != "#commit" {
157 return Err(format!("Not a commit frame: {}", header.t));
158 }
159
160 let remaining = &bytes[header_len..];
161 let frame: CommitFrame = serde_ipld_dagcbor::from_slice(remaining)
162 .map_err(|e| format!("Failed to parse commit frame: {:?}", e))?;
163
164 Ok((header, frame))
165}
166
167fn is_valid_tid(s: &str) -> bool {
168 s.len() == 13 && s.chars().all(|c| c.is_alphanumeric())
169}
170
171fn is_valid_time_format(s: &str) -> bool {
172 if !s.ends_with('Z') {
173 return false;
174 }
175 let parts: Vec<&str> = s.split('T').collect();
176 if parts.len() != 2 {
177 return false;
178 }
179 let time_part = parts[1].trim_end_matches('Z');
180 let time_parts: Vec<&str> = time_part.split(':').collect();
181 if time_parts.len() != 3 {
182 return false;
183 }
184 let seconds_part = time_parts[2];
185 if let Some(dot_pos) = seconds_part.find('.') {
186 let millis = &seconds_part[dot_pos + 1..];
187 millis.len() == 3
188 } else {
189 false
190 }
191}
192
193#[tokio::test]
194async fn test_firehose_frame_structure() {
195 let client = client();
196 let (token, did) = create_account_and_login(&client).await;
197
198 let url = format!(
199 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos",
200 app_port()
201 );
202 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect");
203
204 let post_text = "Testing firehose validation!";
205 let post_payload = json!({
206 "repo": did,
207 "collection": "app.bsky.feed.post",
208 "record": {
209 "$type": "app.bsky.feed.post",
210 "text": post_text,
211 "createdAt": chrono::Utc::now().to_rfc3339(),
212 }
213 });
214 let res = client
215 .post(format!(
216 "{}/xrpc/com.atproto.repo.createRecord",
217 base_url().await
218 ))
219 .bearer_auth(&token)
220 .json(&post_payload)
221 .send()
222 .await
223 .expect("Failed to create post");
224 assert_eq!(res.status(), StatusCode::OK);
225
226 let mut frame_opt: Option<(FrameHeader, CommitFrame)> = None;
227 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async {
228 loop {
229 let msg = ws_stream.next().await.unwrap().unwrap();
230 let raw_bytes = match msg {
231 tungstenite::Message::Binary(bin) => bin,
232 _ => continue,
233 };
234 if let Ok((h, f)) = parse_frame(&raw_bytes) {
235 if f.repo == did {
236 frame_opt = Some((h, f));
237 break;
238 }
239 }
240 }
241 })
242 .await;
243 assert!(timeout.is_ok(), "Timed out waiting for event for our DID");
244 let (header, frame) = frame_opt.expect("No matching frame found");
245
246 println!("\n=== Frame Structure Validation ===\n");
247
248 println!("Header:");
249 println!(" op: {} (expected: 1)", header.op);
250 println!(" t: {} (expected: #commit)", header.t);
251 assert_eq!(header.op, 1, "Header op should be 1");
252 assert_eq!(header.t, "#commit", "Header t should be #commit");
253
254 println!("\nCommitFrame fields:");
255 println!(" seq: {}", frame.seq);
256 println!(" rebase: {}", frame.rebase);
257 println!(" tooBig: {}", frame.too_big);
258 println!(" repo: {}", frame.repo);
259 println!(" commit: {}", frame.commit);
260 println!(
261 " rev: {} (valid TID: {})",
262 frame.rev,
263 is_valid_tid(&frame.rev)
264 );
265 println!(" since: {:?}", frame.since);
266 println!(" blocks length: {} bytes", frame.blocks.len());
267 println!(" ops count: {}", frame.ops.len());
268 println!(" blobs count: {}", frame.blobs.len());
269 println!(
270 " time: {} (valid format: {})",
271 frame.time,
272 is_valid_time_format(&frame.time)
273 );
274 println!(
275 " prevData: {:?} (IMPORTANT - should have value for updates)",
276 frame.prev_data
277 );
278
279 assert_eq!(frame.repo, did, "Frame repo should match DID");
280 assert!(
281 is_valid_tid(&frame.rev),
282 "Rev should be valid TID format, got: {}",
283 frame.rev
284 );
285 assert!(!frame.blocks.is_empty(), "Blocks should not be empty");
286 assert!(
287 is_valid_time_format(&frame.time),
288 "Time should be ISO 8601 with milliseconds and Z suffix"
289 );
290
291 println!("\nOps validation:");
292 for (i, op) in frame.ops.iter().enumerate() {
293 println!(" Op {}:", i);
294 println!(" action: {}", op.action);
295 println!(" path: {}", op.path);
296 println!(" cid: {:?}", op.cid);
297 println!(
298 " prev: {:?} (should be Some for updates/deletes)",
299 op.prev
300 );
301
302 assert!(
303 ["create", "update", "delete"].contains(&op.action.as_str()),
304 "Invalid action: {}",
305 op.action
306 );
307 assert!(
308 op.path.contains('/'),
309 "Path should contain collection/rkey: {}",
310 op.path
311 );
312
313 if op.action == "create" {
314 assert!(op.cid.is_some(), "Create op should have cid");
315 }
316 }
317
318 println!("\nCAR validation:");
319 let mut car_reader = CarReader::new(Cursor::new(&frame.blocks)).await.unwrap();
320 let car_header = car_reader.header().clone();
321 println!(" CAR roots: {:?}", car_header.roots());
322
323 assert!(
324 !car_header.roots().is_empty(),
325 "CAR should have at least one root"
326 );
327 assert_eq!(
328 car_header.roots()[0],
329 frame.commit,
330 "First CAR root should be commit CID"
331 );
332
333 let mut block_cids: Vec<Cid> = Vec::new();
334 while let Ok(Some((cid, _))) = car_reader.next_block().await {
335 block_cids.push(cid);
336 }
337 println!(" CAR blocks: {} total", block_cids.len());
338 for cid in &block_cids {
339 println!(" - {}", cid);
340 }
341
342 assert!(
343 block_cids.contains(&frame.commit),
344 "CAR should contain commit block"
345 );
346
347 for op in &frame.ops {
348 if let Some(ref cid) = op.cid {
349 assert!(
350 block_cids.contains(cid),
351 "CAR should contain op's record block: {}",
352 cid
353 );
354 }
355 }
356
357 println!("\n=== Validation Complete ===\n");
358
359 ws_stream.send(tungstenite::Message::Close(None)).await.ok();
360}
361
362#[tokio::test]
363async fn test_firehose_update_has_prev_field() {
364 let client = client();
365 let (token, did) = create_account_and_login(&client).await;
366
367 let profile_payload = json!({
368 "repo": did,
369 "collection": "app.bsky.actor.profile",
370 "rkey": "self",
371 "record": {
372 "$type": "app.bsky.actor.profile",
373 "displayName": "Test User v1",
374 }
375 });
376 let res = client
377 .post(format!(
378 "{}/xrpc/com.atproto.repo.putRecord",
379 base_url().await
380 ))
381 .bearer_auth(&token)
382 .json(&profile_payload)
383 .send()
384 .await
385 .expect("Failed to create profile");
386 assert_eq!(res.status(), StatusCode::OK);
387 let first_profile: Value = res.json().await.unwrap();
388 let first_cid = first_profile["cid"].as_str().unwrap();
389
390 let url = format!(
391 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos",
392 app_port()
393 );
394 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect");
395
396 let update_payload = json!({
397 "repo": did,
398 "collection": "app.bsky.actor.profile",
399 "rkey": "self",
400 "record": {
401 "$type": "app.bsky.actor.profile",
402 "displayName": "Test User v2",
403 }
404 });
405 let res = client
406 .post(format!(
407 "{}/xrpc/com.atproto.repo.putRecord",
408 base_url().await
409 ))
410 .bearer_auth(&token)
411 .json(&update_payload)
412 .send()
413 .await
414 .expect("Failed to update profile");
415 assert_eq!(res.status(), StatusCode::OK);
416
417 let mut frame_opt: Option<CommitFrame> = None;
418 let timeout = tokio::time::timeout(std::time::Duration::from_secs(15), async {
419 loop {
420 let msg = match ws_stream.next().await {
421 Some(Ok(m)) => m,
422 _ => continue,
423 };
424 let raw_bytes = match msg {
425 tungstenite::Message::Binary(bin) => bin,
426 _ => continue,
427 };
428 if let Ok((_, f)) = parse_frame(&raw_bytes) {
429 if f.repo == did {
430 frame_opt = Some(f);
431 break;
432 }
433 }
434 }
435 })
436 .await;
437 assert!(timeout.is_ok(), "Timed out waiting for update commit");
438 let frame = frame_opt.expect("No matching frame found");
439
440 println!("\n=== Update Operation Validation ===\n");
441 println!("First profile CID: {}", first_cid);
442 println!("Frame prevData: {:?}", frame.prev_data);
443
444 for op in &frame.ops {
445 println!(
446 "Op: action={}, path={}, cid={:?}, prev={:?}",
447 op.action, op.path, op.cid, op.prev
448 );
449
450 if op.action == "update" && op.path.contains("app.bsky.actor.profile") {
451 assert!(
452 op.prev.is_some(),
453 "Update operation should have 'prev' field with old CID! Got: {:?}",
454 op.prev
455 );
456 println!(" ✓ Update op has prev field: {:?}", op.prev);
457 }
458 }
459
460 println!("\n=== Validation Complete ===\n");
461
462 ws_stream.send(tungstenite::Message::Close(None)).await.ok();
463}
464
465#[tokio::test]
466async fn test_firehose_commit_has_prev_data() {
467 let client = client();
468 let (token, did) = create_account_and_login(&client).await;
469
470 let url = format!(
471 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos",
472 app_port()
473 );
474 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect");
475
476 let post_payload = json!({
477 "repo": did,
478 "collection": "app.bsky.feed.post",
479 "record": {
480 "$type": "app.bsky.feed.post",
481 "text": "First post",
482 "createdAt": chrono::Utc::now().to_rfc3339(),
483 }
484 });
485 client
486 .post(format!(
487 "{}/xrpc/com.atproto.repo.createRecord",
488 base_url().await
489 ))
490 .bearer_auth(&token)
491 .json(&post_payload)
492 .send()
493 .await
494 .expect("Failed to create first post");
495
496 let mut first_frame_opt: Option<CommitFrame> = None;
497 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async {
498 loop {
499 let msg = ws_stream.next().await.unwrap().unwrap();
500 let raw_bytes = match msg {
501 tungstenite::Message::Binary(bin) => bin,
502 _ => continue,
503 };
504 if let Ok((_, f)) = parse_frame(&raw_bytes) {
505 if f.repo == did {
506 first_frame_opt = Some(f);
507 break;
508 }
509 }
510 }
511 })
512 .await;
513 assert!(timeout.is_ok(), "Timed out waiting for first commit");
514 let first_frame = first_frame_opt.expect("No first frame found");
515
516 println!("\n=== First Commit ===");
517 println!(
518 " prevData: {:?} (first commit may be None)",
519 first_frame.prev_data
520 );
521 println!(
522 " since: {:?} (first commit should be None)",
523 first_frame.since
524 );
525
526 let post_payload2 = json!({
527 "repo": did,
528 "collection": "app.bsky.feed.post",
529 "record": {
530 "$type": "app.bsky.feed.post",
531 "text": "Second post",
532 "createdAt": chrono::Utc::now().to_rfc3339(),
533 }
534 });
535 client
536 .post(format!(
537 "{}/xrpc/com.atproto.repo.createRecord",
538 base_url().await
539 ))
540 .bearer_auth(&token)
541 .json(&post_payload2)
542 .send()
543 .await
544 .expect("Failed to create second post");
545
546 let mut second_frame_opt: Option<CommitFrame> = None;
547 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async {
548 loop {
549 let msg = ws_stream.next().await.unwrap().unwrap();
550 let raw_bytes = match msg {
551 tungstenite::Message::Binary(bin) => bin,
552 _ => continue,
553 };
554 if let Ok((_, f)) = parse_frame(&raw_bytes) {
555 if f.repo == did {
556 second_frame_opt = Some(f);
557 break;
558 }
559 }
560 }
561 })
562 .await;
563 assert!(timeout.is_ok(), "Timed out waiting for second commit");
564 let second_frame = second_frame_opt.expect("No second frame found");
565
566 println!("\n=== Second Commit ===");
567 println!(
568 " prevData: {:?} (should have value - MST root CID)",
569 second_frame.prev_data
570 );
571 println!(
572 " since: {:?} (should have value - previous rev)",
573 second_frame.since
574 );
575
576 assert!(
577 second_frame.since.is_some(),
578 "Second commit should have 'since' field pointing to first commit rev"
579 );
580
581 println!("\n=== Validation Complete ===\n");
582
583 ws_stream.send(tungstenite::Message::Close(None)).await.ok();
584}
585
586#[tokio::test]
587async fn test_compare_raw_cbor_encoding() {
588 let client = client();
589 let (token, did) = create_account_and_login(&client).await;
590
591 let url = format!(
592 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos",
593 app_port()
594 );
595 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect");
596
597 let post_payload = json!({
598 "repo": did,
599 "collection": "app.bsky.feed.post",
600 "record": {
601 "$type": "app.bsky.feed.post",
602 "text": "CBOR encoding test",
603 "createdAt": chrono::Utc::now().to_rfc3339(),
604 }
605 });
606 client
607 .post(format!(
608 "{}/xrpc/com.atproto.repo.createRecord",
609 base_url().await
610 ))
611 .bearer_auth(&token)
612 .json(&post_payload)
613 .send()
614 .await
615 .expect("Failed to create post");
616
617 let mut raw_bytes_opt: Option<Vec<u8>> = None;
618 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async {
619 loop {
620 let msg = ws_stream.next().await.unwrap().unwrap();
621 let raw = match msg {
622 tungstenite::Message::Binary(bin) => bin,
623 _ => continue,
624 };
625 if let Ok((_, f)) = parse_frame(&raw) {
626 if f.repo == did {
627 raw_bytes_opt = Some(raw.to_vec());
628 break;
629 }
630 }
631 }
632 })
633 .await;
634 assert!(timeout.is_ok(), "Timed out waiting for event for our DID");
635 let raw_bytes = raw_bytes_opt.expect("No matching frame found");
636
637 println!("\n=== Raw CBOR Analysis ===\n");
638 println!("Total frame size: {} bytes", raw_bytes.len());
639
640 fn bytes_to_hex(bytes: &[u8]) -> String {
641 bytes
642 .iter()
643 .map(|b| format!("{:02x}", b))
644 .collect::<Vec<_>>()
645 .join("")
646 }
647
648 println!(
649 "First 64 bytes (hex): {}",
650 bytes_to_hex(&raw_bytes[..64.min(raw_bytes.len())])
651 );
652
653 let header_end = find_cbor_map_end(&raw_bytes).expect("Failed to find header end");
654
655 println!("\nHeader section: {} bytes", header_end);
656 println!("Header hex: {}", bytes_to_hex(&raw_bytes[..header_end]));
657
658 println!("\nPayload section: {} bytes", raw_bytes.len() - header_end);
659
660 println!("\n=== Analysis Complete ===\n");
661
662 ws_stream.send(tungstenite::Message::Close(None)).await.ok();
663}