this repo has no description
1mod common; 2use common::*; 3 4use bspds::sync::frame::{Frame, FrameData}; 5use cid::Cid; 6use futures::{stream::StreamExt, SinkExt}; 7use iroh_car::CarReader; 8use reqwest::StatusCode; 9use serde_json::{json, Value}; 10use std::io::Cursor; 11use std::str::FromStr; 12use tokio_tungstenite::{connect_async, tungstenite}; 13 14#[tokio::test] 15async fn test_firehose_subscription() { 16 let client = client(); 17 let (token, did) = create_account_and_login(&client).await; 18 19 let url = format!( 20 "ws://127.0.0.1:{}/xrpc/com.atproto.sync.subscribeRepos", 21 app_port() 22 ); 23 let (mut ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); 24 25 let post_text = "Hello from the firehose test!"; 26 let post_payload = json!({ 27 "repo": did, 28 "collection": "app.bsky.feed.post", 29 "record": { 30 "$type": "app.bsky.feed.post", 31 "text": post_text, 32 "createdAt": chrono::Utc::now().to_rfc3339(), 33 } 34 }); 35 let res = client 36 .post(format!( 37 "{}/xrpc/com.atproto.repo.createRecord", 38 base_url().await 39 )) 40 .bearer_auth(token) 41 .json(&post_payload) 42 .send() 43 .await 44 .expect("Failed to create post"); 45 assert_eq!(res.status(), StatusCode::OK); 46 47 let msg = ws_stream.next().await.unwrap().unwrap(); 48 49 let frame: Frame = match msg { 50 tungstenite::Message::Binary(bin) => { 51 serde_ipld_dagcbor::from_slice(&bin).expect("Failed to deserialize frame") 52 } 53 _ => panic!("Expected binary message"), 54 }; 55 56 let FrameData::Commit(commit) = frame.data; 57 assert_eq!(commit.repo, did); 58 assert_eq!(commit.ops.len(), 1); 59 assert!(!commit.blocks.is_empty()); 60 61 let op = &commit.ops[0]; 62 let record_cid = Cid::from_str(&op.cid.clone().unwrap()).unwrap(); 63 64 let mut car_reader = CarReader::new(Cursor::new(&commit.blocks)).await.unwrap(); 65 let mut record_block: Option<Vec<u8>> = None; 66 while let Ok(Some((cid, block))) = car_reader.next_block().await { 67 if cid == record_cid { 68 record_block = Some(block); 69 break; 70 } 71 } 72 let record_block = record_block.expect("Record block not found in CAR"); 73 74 let record: Value = serde_ipld_dagcbor::from_slice(&record_block).unwrap(); 75 assert_eq!(record["text"], post_text); 76 77 ws_stream 78 .send(tungstenite::Message::Close(None)) 79 .await 80 .ok(); 81}