this repo has no description
1mod common;
2
3use common::*;
4use cid::Cid;
5use futures::{stream::StreamExt, SinkExt};
6use iroh_car::CarReader;
7use reqwest::StatusCode;
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Value};
10use std::io::Cursor;
11use tokio_tungstenite::{connect_async, tungstenite};
12
13#[derive(Debug, Deserialize, Serialize)]
14struct FrameHeader {
15 op: i64,
16 t: String,
17}
18
19#[derive(Debug, Deserialize)]
20struct CommitFrame {
21 seq: i64,
22 #[serde(default)]
23 rebase: bool,
24 #[serde(rename = "tooBig", default)]
25 too_big: bool,
26 repo: String,
27 commit: Cid,
28 rev: String,
29 since: Option<String>,
30 #[serde(with = "serde_bytes")]
31 blocks: Vec<u8>,
32 ops: Vec<RepoOp>,
33 #[serde(default)]
34 blobs: Vec<Cid>,
35 time: String,
36 #[serde(rename = "prevData")]
37 prev_data: Option<Cid>,
38}
39
40#[derive(Debug, Deserialize)]
41struct RepoOp {
42 action: String,
43 path: String,
44 cid: Option<Cid>,
45 prev: Option<Cid>,
46}
47
48fn find_cbor_map_end(bytes: &[u8]) -> Result<usize, String> {
49 let mut pos = 0;
50
51 fn read_uint(bytes: &[u8], pos: &mut usize, additional: u8) -> Result<u64, String> {
52 match additional {
53 0..=23 => Ok(additional as u64),
54 24 => {
55 if *pos >= bytes.len() { return Err("Unexpected end".into()); }
56 let val = bytes[*pos] as u64;
57 *pos += 1;
58 Ok(val)
59 }
60 25 => {
61 if *pos + 2 > bytes.len() { return Err("Unexpected end".into()); }
62 let val = u16::from_be_bytes([bytes[*pos], bytes[*pos + 1]]) as u64;
63 *pos += 2;
64 Ok(val)
65 }
66 26 => {
67 if *pos + 4 > bytes.len() { return Err("Unexpected end".into()); }
68 let val = u32::from_be_bytes([bytes[*pos], bytes[*pos + 1], bytes[*pos + 2], bytes[*pos + 3]]) as u64;
69 *pos += 4;
70 Ok(val)
71 }
72 27 => {
73 if *pos + 8 > bytes.len() { return Err("Unexpected end".into()); }
74 let val = u64::from_be_bytes([bytes[*pos], bytes[*pos + 1], bytes[*pos + 2], bytes[*pos + 3], bytes[*pos + 4], bytes[*pos + 5], bytes[*pos + 6], bytes[*pos + 7]]);
75 *pos += 8;
76 Ok(val)
77 }
78 _ => Err(format!("Invalid additional info: {}", additional)),
79 }
80 }
81
82 fn skip_value(bytes: &[u8], pos: &mut usize) -> Result<(), String> {
83 if *pos >= bytes.len() { return Err("Unexpected end".into()); }
84 let initial = bytes[*pos];
85 *pos += 1;
86 let major = initial >> 5;
87 let additional = initial & 0x1f;
88
89 match major {
90 0 | 1 => { read_uint(bytes, pos, additional)?; Ok(()) }
91 2 | 3 => {
92 let len = read_uint(bytes, pos, additional)? as usize;
93 *pos += len;
94 Ok(())
95 }
96 4 => {
97 let len = read_uint(bytes, pos, additional)?;
98 for _ in 0..len { skip_value(bytes, pos)?; }
99 Ok(())
100 }
101 5 => {
102 let len = read_uint(bytes, pos, additional)?;
103 for _ in 0..len {
104 skip_value(bytes, pos)?;
105 skip_value(bytes, pos)?;
106 }
107 Ok(())
108 }
109 6 => {
110 read_uint(bytes, pos, additional)?;
111 skip_value(bytes, pos)
112 }
113 7 => Ok(()),
114 _ => Err(format!("Unknown major type: {}", major)),
115 }
116 }
117
118 skip_value(bytes, &mut pos)?;
119 Ok(pos)
120}
121
122fn parse_frame(bytes: &[u8]) -> Result<(FrameHeader, CommitFrame), String> {
123 let header_len = find_cbor_map_end(bytes)?;
124 let header: FrameHeader = serde_ipld_dagcbor::from_slice(&bytes[..header_len])
125 .map_err(|e| format!("Failed to parse header: {:?}", e))?;
126
127 if header.t != "#commit" {
128 return Err(format!("Not a commit frame: {}", header.t));
129 }
130
131 let remaining = &bytes[header_len..];
132 let frame: CommitFrame = serde_ipld_dagcbor::from_slice(remaining)
133 .map_err(|e| format!("Failed to parse commit frame: {:?}", e))?;
134
135 Ok((header, frame))
136}
137
138fn is_valid_tid(s: &str) -> bool {
139 s.len() == 13 && s.chars().all(|c| c.is_alphanumeric())
140}
141
142fn is_valid_time_format(s: &str) -> bool {
143 if !s.ends_with('Z') {
144 return false;
145 }
146 let parts: Vec<&str> = s.split('T').collect();
147 if parts.len() != 2 {
148 return false;
149 }
150 let time_part = parts[1].trim_end_matches('Z');
151 let time_parts: Vec<&str> = time_part.split(':').collect();
152 if time_parts.len() != 3 {
153 return false;
154 }
155 let seconds_part = time_parts[2];
156 if let Some(dot_pos) = seconds_part.find('.') {
157 let millis = &seconds_part[dot_pos + 1..];
158 millis.len() == 3
159 } else {
160 false
161 }
162}
163
164#[tokio::test]
165async fn test_firehose_frame_structure() {
166 let client = client();
167 let (token, did) = create_account_and_login(&client).await;
168
169 let url = format!(
170 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos",
171 app_port()
172 );
173 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect");
174
175 let post_text = "Testing firehose validation!";
176 let post_payload = json!({
177 "repo": did,
178 "collection": "app.bsky.feed.post",
179 "record": {
180 "$type": "app.bsky.feed.post",
181 "text": post_text,
182 "createdAt": chrono::Utc::now().to_rfc3339(),
183 }
184 });
185 let res = client
186 .post(format!(
187 "{}/xrpc/com.atproto.repo.createRecord",
188 base_url().await
189 ))
190 .bearer_auth(&token)
191 .json(&post_payload)
192 .send()
193 .await
194 .expect("Failed to create post");
195 assert_eq!(res.status(), StatusCode::OK);
196
197 let mut frame_opt: Option<(FrameHeader, CommitFrame)> = None;
198 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async {
199 loop {
200 let msg = ws_stream.next().await.unwrap().unwrap();
201 let raw_bytes = match msg {
202 tungstenite::Message::Binary(bin) => bin,
203 _ => continue,
204 };
205 if let Ok((h, f)) = parse_frame(&raw_bytes) {
206 if f.repo == did {
207 frame_opt = Some((h, f));
208 break;
209 }
210 }
211 }
212 })
213 .await;
214 assert!(timeout.is_ok(), "Timed out waiting for event for our DID");
215 let (header, frame) = frame_opt.expect("No matching frame found");
216
217 println!("\n=== Frame Structure Validation ===\n");
218
219 println!("Header:");
220 println!(" op: {} (expected: 1)", header.op);
221 println!(" t: {} (expected: #commit)", header.t);
222 assert_eq!(header.op, 1, "Header op should be 1");
223 assert_eq!(header.t, "#commit", "Header t should be #commit");
224
225 println!("\nCommitFrame fields:");
226 println!(" seq: {}", frame.seq);
227 println!(" rebase: {}", frame.rebase);
228 println!(" tooBig: {}", frame.too_big);
229 println!(" repo: {}", frame.repo);
230 println!(" commit: {}", frame.commit);
231 println!(" rev: {} (valid TID: {})", frame.rev, is_valid_tid(&frame.rev));
232 println!(" since: {:?}", frame.since);
233 println!(" blocks length: {} bytes", frame.blocks.len());
234 println!(" ops count: {}", frame.ops.len());
235 println!(" blobs count: {}", frame.blobs.len());
236 println!(" time: {} (valid format: {})", frame.time, is_valid_time_format(&frame.time));
237 println!(" prevData: {:?} (IMPORTANT - should have value for updates)", frame.prev_data);
238
239 assert_eq!(frame.repo, did, "Frame repo should match DID");
240 assert!(is_valid_tid(&frame.rev), "Rev should be valid TID format, got: {}", frame.rev);
241 assert!(!frame.blocks.is_empty(), "Blocks should not be empty");
242 assert!(is_valid_time_format(&frame.time), "Time should be ISO 8601 with milliseconds and Z suffix");
243
244 println!("\nOps validation:");
245 for (i, op) in frame.ops.iter().enumerate() {
246 println!(" Op {}:", i);
247 println!(" action: {}", op.action);
248 println!(" path: {}", op.path);
249 println!(" cid: {:?}", op.cid);
250 println!(" prev: {:?} (should be Some for updates/deletes)", op.prev);
251
252 assert!(
253 ["create", "update", "delete"].contains(&op.action.as_str()),
254 "Invalid action: {}", op.action
255 );
256 assert!(op.path.contains('/'), "Path should contain collection/rkey: {}", op.path);
257
258 if op.action == "create" {
259 assert!(op.cid.is_some(), "Create op should have cid");
260 }
261 }
262
263 println!("\nCAR validation:");
264 let mut car_reader = CarReader::new(Cursor::new(&frame.blocks)).await.unwrap();
265 let car_header = car_reader.header().clone();
266 println!(" CAR roots: {:?}", car_header.roots());
267
268 assert!(
269 !car_header.roots().is_empty(),
270 "CAR should have at least one root"
271 );
272 assert_eq!(
273 car_header.roots()[0], frame.commit,
274 "First CAR root should be commit CID"
275 );
276
277 let mut block_cids: Vec<Cid> = Vec::new();
278 while let Ok(Some((cid, _))) = car_reader.next_block().await {
279 block_cids.push(cid);
280 }
281 println!(" CAR blocks: {} total", block_cids.len());
282 for cid in &block_cids {
283 println!(" - {}", cid);
284 }
285
286 assert!(
287 block_cids.contains(&frame.commit),
288 "CAR should contain commit block"
289 );
290
291 for op in &frame.ops {
292 if let Some(ref cid) = op.cid {
293 assert!(
294 block_cids.contains(cid),
295 "CAR should contain op's record block: {}", cid
296 );
297 }
298 }
299
300 println!("\n=== Validation Complete ===\n");
301
302 ws_stream
303 .send(tungstenite::Message::Close(None))
304 .await
305 .ok();
306}
307
308#[tokio::test]
309async fn test_firehose_update_has_prev_field() {
310 let client = client();
311 let (token, did) = create_account_and_login(&client).await;
312
313 let url = format!(
314 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos",
315 app_port()
316 );
317 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect");
318
319 let profile_payload = json!({
320 "repo": did,
321 "collection": "app.bsky.actor.profile",
322 "rkey": "self",
323 "record": {
324 "$type": "app.bsky.actor.profile",
325 "displayName": "Test User v1",
326 }
327 });
328 let res = client
329 .post(format!(
330 "{}/xrpc/com.atproto.repo.putRecord",
331 base_url().await
332 ))
333 .bearer_auth(&token)
334 .json(&profile_payload)
335 .send()
336 .await
337 .expect("Failed to create profile");
338 assert_eq!(res.status(), StatusCode::OK);
339 let first_profile: Value = res.json().await.unwrap();
340 let first_cid = first_profile["cid"].as_str().unwrap();
341
342 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async {
343 loop {
344 let msg = ws_stream.next().await.unwrap().unwrap();
345 let raw_bytes = match msg {
346 tungstenite::Message::Binary(bin) => bin,
347 _ => continue,
348 };
349 if let Ok((_, f)) = parse_frame(&raw_bytes) {
350 if f.repo == did {
351 break;
352 }
353 }
354 }
355 })
356 .await;
357 assert!(timeout.is_ok(), "Timed out waiting for first commit");
358
359 let update_payload = json!({
360 "repo": did,
361 "collection": "app.bsky.actor.profile",
362 "rkey": "self",
363 "record": {
364 "$type": "app.bsky.actor.profile",
365 "displayName": "Test User v2",
366 }
367 });
368 let res = client
369 .post(format!(
370 "{}/xrpc/com.atproto.repo.putRecord",
371 base_url().await
372 ))
373 .bearer_auth(&token)
374 .json(&update_payload)
375 .send()
376 .await
377 .expect("Failed to update profile");
378 assert_eq!(res.status(), StatusCode::OK);
379
380 let mut frame_opt: Option<CommitFrame> = None;
381 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async {
382 loop {
383 let msg = ws_stream.next().await.unwrap().unwrap();
384 let raw_bytes = match msg {
385 tungstenite::Message::Binary(bin) => bin,
386 _ => continue,
387 };
388 if let Ok((_, f)) = parse_frame(&raw_bytes) {
389 if f.repo == did {
390 frame_opt = Some(f);
391 break;
392 }
393 }
394 }
395 })
396 .await;
397 assert!(timeout.is_ok(), "Timed out waiting for update commit");
398 let frame = frame_opt.expect("No matching frame found");
399
400 println!("\n=== Update Operation Validation ===\n");
401 println!("First profile CID: {}", first_cid);
402 println!("Frame prevData: {:?}", frame.prev_data);
403
404 for op in &frame.ops {
405 println!("Op: action={}, path={}, cid={:?}, prev={:?}",
406 op.action, op.path, op.cid, op.prev);
407
408 if op.action == "update" && op.path.contains("app.bsky.actor.profile") {
409 assert!(
410 op.prev.is_some(),
411 "Update operation should have 'prev' field with old CID! Got: {:?}",
412 op.prev
413 );
414 println!(" ✓ Update op has prev field: {:?}", op.prev);
415 }
416 }
417
418 println!("\n=== Validation Complete ===\n");
419
420 ws_stream
421 .send(tungstenite::Message::Close(None))
422 .await
423 .ok();
424}
425
426#[tokio::test]
427async fn test_firehose_commit_has_prev_data() {
428 let client = client();
429 let (token, did) = create_account_and_login(&client).await;
430
431 let url = format!(
432 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos",
433 app_port()
434 );
435 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect");
436
437 let post_payload = json!({
438 "repo": did,
439 "collection": "app.bsky.feed.post",
440 "record": {
441 "$type": "app.bsky.feed.post",
442 "text": "First post",
443 "createdAt": chrono::Utc::now().to_rfc3339(),
444 }
445 });
446 client
447 .post(format!(
448 "{}/xrpc/com.atproto.repo.createRecord",
449 base_url().await
450 ))
451 .bearer_auth(&token)
452 .json(&post_payload)
453 .send()
454 .await
455 .expect("Failed to create first post");
456
457 let mut first_frame_opt: Option<CommitFrame> = None;
458 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async {
459 loop {
460 let msg = ws_stream.next().await.unwrap().unwrap();
461 let raw_bytes = match msg {
462 tungstenite::Message::Binary(bin) => bin,
463 _ => continue,
464 };
465 if let Ok((_, f)) = parse_frame(&raw_bytes) {
466 if f.repo == did {
467 first_frame_opt = Some(f);
468 break;
469 }
470 }
471 }
472 })
473 .await;
474 assert!(timeout.is_ok(), "Timed out waiting for first commit");
475 let first_frame = first_frame_opt.expect("No first frame found");
476
477 println!("\n=== First Commit ===");
478 println!(" prevData: {:?} (first commit may be None)", first_frame.prev_data);
479 println!(" since: {:?} (first commit should be None)", first_frame.since);
480
481 let post_payload2 = json!({
482 "repo": did,
483 "collection": "app.bsky.feed.post",
484 "record": {
485 "$type": "app.bsky.feed.post",
486 "text": "Second post",
487 "createdAt": chrono::Utc::now().to_rfc3339(),
488 }
489 });
490 client
491 .post(format!(
492 "{}/xrpc/com.atproto.repo.createRecord",
493 base_url().await
494 ))
495 .bearer_auth(&token)
496 .json(&post_payload2)
497 .send()
498 .await
499 .expect("Failed to create second post");
500
501 let mut second_frame_opt: Option<CommitFrame> = None;
502 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async {
503 loop {
504 let msg = ws_stream.next().await.unwrap().unwrap();
505 let raw_bytes = match msg {
506 tungstenite::Message::Binary(bin) => bin,
507 _ => continue,
508 };
509 if let Ok((_, f)) = parse_frame(&raw_bytes) {
510 if f.repo == did {
511 second_frame_opt = Some(f);
512 break;
513 }
514 }
515 }
516 })
517 .await;
518 assert!(timeout.is_ok(), "Timed out waiting for second commit");
519 let second_frame = second_frame_opt.expect("No second frame found");
520
521 println!("\n=== Second Commit ===");
522 println!(" prevData: {:?} (should have value - MST root CID)", second_frame.prev_data);
523 println!(" since: {:?} (should have value - previous rev)", second_frame.since);
524
525 assert!(
526 second_frame.since.is_some(),
527 "Second commit should have 'since' field pointing to first commit rev"
528 );
529
530 println!("\n=== Validation Complete ===\n");
531
532 ws_stream
533 .send(tungstenite::Message::Close(None))
534 .await
535 .ok();
536}
537
538#[tokio::test]
539async fn test_compare_raw_cbor_encoding() {
540 let client = client();
541 let (token, did) = create_account_and_login(&client).await;
542
543 let url = format!(
544 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos",
545 app_port()
546 );
547 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect");
548
549 let post_payload = json!({
550 "repo": did,
551 "collection": "app.bsky.feed.post",
552 "record": {
553 "$type": "app.bsky.feed.post",
554 "text": "CBOR encoding test",
555 "createdAt": chrono::Utc::now().to_rfc3339(),
556 }
557 });
558 client
559 .post(format!(
560 "{}/xrpc/com.atproto.repo.createRecord",
561 base_url().await
562 ))
563 .bearer_auth(&token)
564 .json(&post_payload)
565 .send()
566 .await
567 .expect("Failed to create post");
568
569 let mut raw_bytes_opt: Option<Vec<u8>> = None;
570 let timeout = tokio::time::timeout(std::time::Duration::from_secs(5), async {
571 loop {
572 let msg = ws_stream.next().await.unwrap().unwrap();
573 let raw = match msg {
574 tungstenite::Message::Binary(bin) => bin,
575 _ => continue,
576 };
577 if let Ok((_, f)) = parse_frame(&raw) {
578 if f.repo == did {
579 raw_bytes_opt = Some(raw.to_vec());
580 break;
581 }
582 }
583 }
584 })
585 .await;
586 assert!(timeout.is_ok(), "Timed out waiting for event for our DID");
587 let raw_bytes = raw_bytes_opt.expect("No matching frame found");
588
589 println!("\n=== Raw CBOR Analysis ===\n");
590 println!("Total frame size: {} bytes", raw_bytes.len());
591
592 fn bytes_to_hex(bytes: &[u8]) -> String {
593 bytes.iter().map(|b| format!("{:02x}", b)).collect::<Vec<_>>().join("")
594 }
595
596 println!("First 64 bytes (hex): {}", bytes_to_hex(&raw_bytes[..64.min(raw_bytes.len())]));
597
598 let header_end = find_cbor_map_end(&raw_bytes).expect("Failed to find header end");
599
600 println!("\nHeader section: {} bytes", header_end);
601 println!("Header hex: {}", bytes_to_hex(&raw_bytes[..header_end]));
602
603 println!("\nPayload section: {} bytes", raw_bytes.len() - header_end);
604
605 println!("\n=== Analysis Complete ===\n");
606
607 ws_stream
608 .send(tungstenite::Message::Close(None))
609 .await
610 .ok();
611}