tangled
alpha
login
or
join now
oscillatory.net
/
eyewall
0
fork
atom
this repo has no description
0
fork
atom
overview
issues
pulls
pipelines
WIP post event parsing for replies and quotes
emptydiagram
10 months ago
1a51952c
04ca6118
+93
-13
3 changed files
expand all
collapse all
unified
split
indexer
src
ingestion.rs
main.rs
storage
mod.rs
+84
-11
indexer/src/ingestion.rs
···
1
1
use std::{collections::HashMap, sync::{Arc, Mutex}};
2
2
3
3
-
use anyhow::Result;
3
3
+
use anyhow::{bail, Result};
4
4
use async_trait::async_trait;
5
5
-
use log::{error, info};
6
6
-
use rocketman::{connection::JetstreamConnection, endpoints::JetstreamEndpoints, handler, ingestion::LexiconIngestor, options::JetstreamOptions, types::event::Event};
5
5
+
use log::{debug, error, info};
6
6
+
use rocketman::{connection::JetstreamConnection, endpoints::JetstreamEndpoints, handler, ingestion::LexiconIngestor, options::JetstreamOptions, types::event::{Event, Kind}};
7
7
use serde_json::Value;
8
8
use tokio::{select, sync::Notify, time::{self, Duration}};
9
9
10
10
+
use crate::storage;
11
11
+
10
12
static BSKY_POST_NSID: &'static str = "app.bsky.feed.post";
13
13
+
static BSKY_EMBED_RECORD_NSID: &'static str = "app.bsky.embed.record";
11
14
12
15
struct PostIngestor;
13
16
14
17
#[async_trait]
15
18
impl LexiconIngestor for PostIngestor {
16
19
async fn ingest(&self, message: Event<Value>) -> Result<()> {
17
17
-
info!("{:?}", message);
20
20
+
if message.commit.is_none() {
21
21
+
return Ok(());
22
22
+
}
23
23
+
let commit = message.commit.as_ref().unwrap();
24
24
+
if commit.record.is_none() {
25
25
+
return Ok(());
26
26
+
}
27
27
+
let record = commit.record.as_ref().unwrap();
28
28
+
29
29
+
match record {
30
30
+
Value::Object(map) => {
31
31
+
let reply = map.get("reply");
32
32
+
let has_reply = reply.is_some();
33
33
+
34
34
+
let mut embed_record = None;
35
35
+
if let Some(Value::Object(embed_map)) = map.get("embed") {
36
36
+
if let Some(Value::String(ty)) = embed_map.get("$type") {
37
37
+
if ty == BSKY_EMBED_RECORD_NSID {
38
38
+
embed_record = embed_map.get("record");
39
39
+
}
40
40
+
}
41
41
+
}
42
42
+
let has_quote = embed_record.is_none();
43
43
+
44
44
+
if !has_reply && !has_quote {
45
45
+
return Ok(());
46
46
+
}
47
47
+
48
48
+
debug!("{:?}", message);
49
49
+
50
50
+
let post_id = storage::BskyPostId::new(message.did, message.commit.unwrap().rkey);
51
51
+
52
52
+
// if let Some(Value::Object(embed_record)) = embed_map.get("record") {
53
53
+
// if let Some(uri) = embed_record.get("uri") {
54
54
+
// }
55
55
+
// }
56
56
+
57
57
+
},
58
58
+
_ => {
59
59
+
return Ok(());
60
60
+
}
61
61
+
}
62
62
+
63
63
+
// Event { did: "did:plc:i5o7ybb4yg45zhnhigrzeiqn", time_us: Some(1746977528899856), kind: Commit, commit: Some(Commit {
64
64
+
// rev: "3lovrnkjkf32t", operation: Create, collection: "app.bsky.feed.post", rkey: "3lovrnkbesk2z",
65
65
+
// record: Some(Object {
66
66
+
// "$type": String("app.bsky.feed.post"), "createdAt": String("2025-05-11T15:32:08.458Z"), "langs": Array [String("en")],
67
67
+
// "reply": Object {
68
68
+
// "parent": Object {
69
69
+
// "cid": String("bafyreifhgwuhg25cdh7r4xg54c7zyt3miq37caeixkfdwlpsxbsucdedey"), "uri": String("at://did:plc:qsmmhv4u2ygx2thvepti77zc/app.bsky.feed.post/3lovq4eaoxc2y")
70
70
+
// },
71
71
+
// "root": Object {
72
72
+
// "cid": String("bafyreifhgwuhg25cdh7r4xg54c7zyt3miq37caeixkfdwlpsxbsucdedey"), "uri": String("at://did:plc:qsmmhv4u2ygx2thvepti77zc/app.bsky.feed.post/3lovq4eaoxc2y")
73
73
+
// }
74
74
+
// }, "text": String("Military security is a concept of the past for Trump. Anything and everything is for sale.")}), cid: Some("bafyreicf5n2zehmldcvn6twlmoadq6awsdbofhizq2saxstdik5d4yvckm") }),
75
75
+
// identity: None }
76
76
+
77
77
+
78
78
+
/*
79
79
+
*
80
80
+
* Event {
81
81
+
* did: "did:plc:qinfhaxhsd3kwga35ldgc4zu", time_us: Some(1746985087246643), kind: Commit,
82
82
+
* commit: Some(Commit {
83
83
+
* rev: "3lovyosmtu32v", operation: Create, collection: "app.bsky.feed.post", rkey: "3lovyosgpvc2t",
84
84
+
* record: Some(Object {
85
85
+
* "$type": String("app.bsky.feed.post"), "createdAt": String("2025-05-11T17:38:06.770Z"),
86
86
+
* "embed": Object {
87
87
+
* "$type": String("app.bsky.embed.record"),
88
88
+
* "record": Object {
89
89
+
* "cid": String("bafyreifpwkwnmcrkmvjhp6kbpzmxe3cpua4dcjwcfpsqz4bkxjiypj2fn4"), "uri": String("at://did:plc:ib6pplehueaytqk3q7kpllwh/app.bsky.feed.post/3lovugh5g4s2s")
90
90
+
* }
91
91
+
* }, "langs": Array [String("en")], "text": String("I am old enough to remember when Jimmie Carter gave up his beloved peanut farm voluntarily even before anyone complained, simply to not risk any accidental infraction of the Emoluments clause")}), cid: Some("bafyreih52kyfdiagt3lzrl5ekg5ulc632624hws4xanndhsbzne7yoxfhm") }), identity: None }
92
92
+
*/
93
93
+
94
94
+
18
95
Ok(())
19
96
}
20
97
}
···
41
118
}
42
119
43
120
44
44
-
pub async fn consume() -> Result<()> {
121
121
+
pub async fn consume(cursor_val: Option<u64>) -> Result<()> {
45
122
let opts = JetstreamOptions::builder()
46
123
.wanted_collections(vec![BSKY_POST_NSID.to_string()])
47
124
.ws_url(JetstreamEndpoints::Public(rocketman::endpoints::JetstreamEndpointLocations::UsEast, 2))
···
55
132
Box::new(PostIngestor)
56
133
);
57
134
58
58
-
let cursor = Arc::new(Mutex::new(None));
135
135
+
let cursor = Arc::new(Mutex::new(cursor_val));
59
136
60
137
let shutdown = Arc::new(Notify::new());
61
138
let persist_handle = {
···
72
149
let handle_result = handler::handle_message(msg, &ingestors, reconnect_tx.clone(), c_cursor.clone()).await;
73
150
if let Err(e) = handle_result{
74
151
error!("Error processing message: {}", e);
75
75
-
};
152
152
+
}
76
153
}
77
154
anyhow::Ok(())
78
155
});
79
79
-
80
80
-
info!("About to connect");
81
156
82
157
if let Err(e) = js_conn.connect(cursor.clone()).await {
83
158
error!("Error connecting: {}", e);
84
159
std::process::exit(1);
85
160
}
86
86
-
87
87
-
info!("After connect");
88
161
89
162
select! {
90
163
_ = tokio::signal::ctrl_c() => info!("ctrl-c"),
+2
-1
indexer/src/main.rs
···
1
1
use eyewall_indexer;
2
2
+
static CURSOR_VAL_US: Option<u64> = Some(1746075600_000000);
2
3
3
4
#[tokio::main]
4
5
async fn main() -> anyhow::Result<()> {
5
6
println!("Hello, world!");
6
7
env_logger::init();
7
7
-
eyewall_indexer::ingestion::consume().await?;
8
8
+
eyewall_indexer::ingestion::consume(CURSOR_VAL_US).await?;
8
9
Ok(())
9
10
}
+7
-1
indexer/src/storage/mod.rs
···
1
1
use std::{collections::HashMap, hash::Hash};
2
2
3
3
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
4
4
-
struct BskyPostId {
4
4
+
pub struct BskyPostId {
5
5
did: String,
6
6
rkey: String,
7
7
+
}
8
8
+
9
9
+
impl BskyPostId {
10
10
+
pub fn new(did: String, rkey: String) -> BskyPostId {
11
11
+
BskyPostId { did: did, rkey: rkey }
12
12
+
}
7
13
}
8
14
9
15
impl BskyPostId {