this repo has no description
1mod common;
2use common::*;
3
4use bspds::sync::frame::{Frame, FrameData};
5use cid::Cid;
6use futures::{stream::StreamExt, SinkExt};
7use iroh_car::CarReader;
8use reqwest::StatusCode;
9use serde_json::{json, Value};
10use std::io::Cursor;
11use std::str::FromStr;
12use tokio_tungstenite::{connect_async, tungstenite};
13
14#[tokio::test]
15async fn test_firehose_subscription() {
16 let client = client();
17 let (token, did) = create_account_and_login(&client).await;
18
19 let url = format!(
20 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos",
21 app_port()
22 );
23 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect");
24
25 let post_text = "Hello from the firehose test!";
26 let post_payload = json!({
27 "repo": did,
28 "collection": "app.bsky.feed.post",
29 "record": {
30 "$type": "app.bsky.feed.post",
31 "text": post_text,
32 "createdAt": chrono::Utc::now().to_rfc3339(),
33 }
34 });
35 let res = client
36 .post(format!(
37 "{}/xrpc/com.atproto.repo.createRecord",
38 base_url().await
39 ))
40 .bearer_auth(token)
41 .json(&post_payload)
42 .send()
43 .await
44 .expect("Failed to create post");
45 assert_eq!(res.status(), StatusCode::OK);
46
47 let msg = ws_stream.next().await.unwrap().unwrap();
48
49 let frame: Frame = match msg {
50 tungstenite::Message::Binary(bin) => {
51 serde_ipld_dagcbor::from_slice(&bin).expect("Failed to deserialize frame")
52 }
53 _ => panic!("Expected binary message"),
54 };
55
56 let FrameData::Commit(commit) = frame.data;
57 assert_eq!(commit.repo, did);
58 assert_eq!(commit.ops.len(), 1);
59 assert!(!commit.blocks.is_empty());
60
61 let op = &commit.ops[0];
62 let record_cid = Cid::from_str(&op.cid.clone().unwrap()).unwrap();
63
64 let mut car_reader = CarReader::new(Cursor::new(&commit.blocks)).await.unwrap();
65 let mut record_block: Option<Vec<u8>> = None;
66 while let Ok(Some((cid, block))) = car_reader.next_block().await {
67 if cid == record_cid {
68 record_block = Some(block);
69 break;
70 }
71 }
72 let record_block = record_block.expect("Record block not found in CAR");
73
74 let record: Value = serde_ipld_dagcbor::from_slice(&record_block).unwrap();
75 assert_eq!(record["text"], post_text);
76
77 ws_stream
78 .send(tungstenite::Message::Close(None))
79 .await
80 .ok();
81}