tangled
alpha
login
or
join now
baileytownsend.dev
/
stitch_counter
13
fork
atom
tangled.org trending bluesky account
13
fork
atom
overview
issues
1
pulls
pipelines
Should be moved to bsky agent meow
baileytownsend.dev
6 months ago
82b7f259
6e15add6
+204
-145
6 changed files
expand all
collapse all
unified
split
Cargo.lock
Cargo.toml
bot
Cargo.toml
src
main.rs
logic
Cargo.toml
src
lib.rs
+42
Cargo.lock
···
377
377
"dotenv",
378
378
"env_logger",
379
379
"log",
380
380
+
"logic",
380
381
"rocketman",
381
382
"serde",
382
383
"serde_json",
383
384
"sqlx",
384
385
"tokio",
386
386
+
]
387
387
+
388
388
+
[[package]]
389
389
+
name = "bsky-sdk"
390
390
+
version = "0.1.21"
391
391
+
source = "registry+https://github.com/rust-lang/crates.io-index"
392
392
+
checksum = "fac3fd5ca998d3bdb1debdd421a16a94931f61e0d805a0208907ec3b5f2cffea"
393
393
+
dependencies = [
394
394
+
"anyhow",
395
395
+
"atrium-api",
396
396
+
"atrium-xrpc-client",
397
397
+
"chrono",
398
398
+
"psl",
399
399
+
"regex",
400
400
+
"serde",
401
401
+
"serde_json",
402
402
+
"thiserror 1.0.69",
403
403
+
"trait-variant",
404
404
+
"unicode-segmentation",
385
405
]
386
406
387
407
[[package]]
···
1640
1660
"atrium-identity",
1641
1661
"atrium-oauth",
1642
1662
"atrium-xrpc-client",
1663
1663
+
"bsky-sdk",
1643
1664
"clap",
1644
1665
"flume",
1645
1666
"getrandom 0.2.16",
···
2088
2109
]
2089
2110
2090
2111
[[package]]
2112
2112
+
name = "psl"
2113
2113
+
version = "2.1.141"
2114
2114
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2115
2115
+
checksum = "98c10a4dce9ad24c1fad826cffc79a624cf626bfaddb466e969368a53d877b30"
2116
2116
+
dependencies = [
2117
2117
+
"psl-types",
2118
2118
+
]
2119
2119
+
2120
2120
+
[[package]]
2121
2121
+
name = "psl-types"
2122
2122
+
version = "2.0.11"
2123
2123
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2124
2124
+
checksum = "33cb294fe86a74cbcf50d4445b37da762029549ebeea341421c7c70370f86cac"
2125
2125
+
2126
2126
+
[[package]]
2091
2127
name = "quote"
2092
2128
version = "1.0.40"
2093
2129
source = "registry+https://github.com/rust-lang/crates.io-index"
···
3231
3267
version = "0.1.3"
3232
3268
source = "registry+https://github.com/rust-lang/crates.io-index"
3233
3269
checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0"
3270
3270
+
3271
3271
+
[[package]]
3272
3272
+
name = "unicode-segmentation"
3273
3273
+
version = "1.12.0"
3274
3274
+
source = "registry+https://github.com/rust-lang/crates.io-index"
3275
3275
+
checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493"
3234
3276
3235
3277
[[package]]
3236
3278
name = "unsigned-varint"
+2
-1
Cargo.toml
···
10
10
atrium-oauth = "0.1.4"
11
11
atrium-xrpc = "0.12.3"
12
12
atrium-xrpc-client = "0.5.14"
13
13
+
bsky-sdk ="0.1.21"
13
14
clap = { version = "4.0", features = ["derive"] }
14
15
log = "0.4.27"
15
16
logic = { path = "logic" }
···
19
20
wasm-bindgen = { version = "0.2.100", features = ["default", "serde_json"] }
20
21
web-sys = "0.3.77"
21
22
gloo = "0.11.0"
22
22
-
gloo-utils = "0.2.0"
23
23
+
gloo-utils = "0.2.0"
+1
bot/Cargo.toml
···
16
16
async-trait = "0.1.83"
17
17
serde.workspace = true
18
18
serde_json = "1.0.132"
19
19
+
logic.workspace = true
19
20
+10
-1
bot/src/main.rs
···
7
7
use std::collections::HashMap;
8
8
use std::sync::{Arc, Mutex};
9
9
use std::time::Duration;
10
10
+
use logic::BotApi;
10
11
11
12
#[tokio::main]
12
13
async fn main() -> anyhow::Result<()> {
···
52
53
.filter(|v| *v > 0)
53
54
.unwrap_or(24);
54
55
56
56
+
let bot_username = std::env::var("BOT_USERNAME").expect("BOT_USERNAME must be set");
57
57
+
let bot_password = std::env::var("BOT_PASSWORD").expect("BOT_PASSWORD must be set");
58
58
+
let bot_pds_url = std::env::var("BOT_PDS_URL").expect("BOT_PDS_URL must be set");
59
59
+
60
60
+
let bot_api = BotApi::new_logged_in(bot_username, bot_password, bot_pds_url).await?;
61
61
+
62
62
+
55
63
// Ingestor for the star collection
56
64
let mut ingestors: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new();
57
65
ingestors.insert(
58
66
atproto_api::sh::tangled::feed::Star::NSID.to_string(),
59
59
-
Box::new(StarIngestor { pool: pool.clone(), timeframe_hours, star_threshold, post_window_hours }),
67
67
+
Box::new(StarIngestor { pool: pool.clone(), bot: Arc::new(bot_api), timeframe_hours, star_threshold, post_window_hours }),
60
68
);
61
69
let ingestors = Arc::new(ingestors);
62
70
···
78
86
79
87
struct StarIngestor {
80
88
pool: SqlitePool,
89
89
+
bot: Arc<BotApi>,
81
90
timeframe_hours: i64,
82
91
star_threshold: i64,
83
92
post_window_hours: i64,
+1
logic/Cargo.toml
···
10
10
atrium-identity.workspace = true
11
11
atrium-oauth.workspace = true
12
12
atrium-xrpc-client.workspace = true
13
13
+
bsky-sdk.workspace = true
13
14
serde.workspace = true
14
15
reqwest = { version = "0.12", default-features = false, features = ["gzip", "json"] }
15
16
thiserror = "2.0.12"
+148
-143
logic/src/lib.rs
···
29
29
use std::sync::Arc;
30
30
#[cfg(not(target_arch = "wasm32"))]
31
31
use std::sync::Mutex;
32
32
+
use bsky_sdk::BskyAgent;
32
33
use thiserror::Error;
33
34
#[cfg(not(target_arch = "wasm32"))]
34
35
use tungstenite::Message;
···
57
58
58
59
#[derive(Clone)]
59
60
pub struct BotApi {
60
60
-
agent: Arc<Agent<CredentialSession<MemoryStore<(), AtpSession>, ReqwestClient>>>,
61
61
+
agent: Arc<BskyAgent>,
61
62
handle_resolver: Arc<AtprotoHandleResolver<ApiDNSTxtResolver, DefaultHttpClient>>,
62
63
did_resolver: Arc<CommonDidResolver<DefaultHttpClient>>,
63
64
authenticated: bool,
64
65
}
65
66
66
66
-
fn get_new_session() -> CredentialSession<MemoryStore<(), AtpSession>, ReqwestClient> {
67
67
+
fn get_new_session(pds_url: String) -> CredentialSession<MemoryStore<(), AtpSession>, ReqwestClient> {
67
68
CredentialSession::new(
68
68
-
ReqwestClient::new("https://bsky.social"),
69
69
+
ReqwestClient::new(pds_url.as_str()),
69
70
MemorySessionStore::default(),
70
71
)
71
72
}
···
86
87
87
88
impl BotApi {
88
89
/// Creates a new StatusphereApi to make unauthenticated calls to atproto repos
89
89
-
pub fn new() -> Self {
90
90
-
let session = get_new_session();
91
91
-
let agent = Agent::new(session);
92
92
-
let http_client = Arc::new(DefaultHttpClient::default());
93
93
-
let handle_resolver = AtprotoHandleResolver::new(AtprotoHandleResolverConfig {
94
94
-
dns_txt_resolver: ApiDNSTxtResolver::default(),
95
95
-
http_client: Arc::new(DefaultHttpClient::default()),
96
96
-
});
97
97
-
let did_resolver = CommonDidResolver::new(CommonDidResolverConfig {
98
98
-
plc_directory_url: DEFAULT_PLC_DIRECTORY_URL.to_string(),
99
99
-
http_client,
100
100
-
});
101
101
-
Self {
102
102
-
agent: Arc::new(agent),
103
103
-
handle_resolver: Arc::new(handle_resolver),
104
104
-
did_resolver: Arc::new(did_resolver),
105
105
-
authenticated: false,
106
106
-
}
107
107
-
}
90
90
+
// pub fn new() -> Self {
91
91
+
// let session = get_new_session();
92
92
+
// let agent = Agent::new(session);
93
93
+
// let http_client = Arc::new(DefaultHttpClient::default());
94
94
+
// let handle_resolver = AtprotoHandleResolver::new(AtprotoHandleResolverConfig {
95
95
+
// dns_txt_resolver: ApiDNSTxtResolver::default(),
96
96
+
// http_client: Arc::new(DefaultHttpClient::default()),
97
97
+
// });
98
98
+
// let did_resolver = CommonDidResolver::new(CommonDidResolverConfig {
99
99
+
// plc_directory_url: DEFAULT_PLC_DIRECTORY_URL.to_string(),
100
100
+
// http_client,
101
101
+
// });
102
102
+
// Self {
103
103
+
// agent: Arc::new(agent),
104
104
+
// handle_resolver: Arc::new(handle_resolver),
105
105
+
// did_resolver: Arc::new(did_resolver),
106
106
+
// authenticated: false,
107
107
+
// }
108
108
+
// }
108
109
109
110
/// Creates a new StatusphereAPi to make authenticated requests to your atproto repo
110
110
-
pub async fn new_logged_in(handle: String, password: String) -> Result<Self, Error> {
111
111
-
let session = get_new_session();
112
112
-
if let Err(error) = session.login(&handle, &password).await {
113
113
-
return Err(Error::LoginError(error.to_string()));
114
114
-
}
115
115
-
let agent = Agent::new(session);
111
111
+
pub async fn new_logged_in(handle: String, password: String, pds_url: String) -> Result<Self, Error> {
112
112
+
// let session = get_new_session(pds_url);
113
113
+
// if let Err(error) = session.login(&handle, &password).await {
114
114
+
// return Err(Error::LoginError(error.to_string()));
115
115
+
// }
116
116
+
117
117
+
let agent = BskyAgent::builder().build().await.expect("Failed to build agent");
118
118
+
agent.configure_endpoint(pds_url);
119
119
+
// let agent = Agent::new(session);
120
120
+
agent.login(handle, password).await.expect("Failed to login");
116
121
let handle_resolver = AtprotoHandleResolver::new(AtprotoHandleResolverConfig {
117
122
dns_txt_resolver: ApiDNSTxtResolver::default(),
118
123
http_client: Arc::new(DefaultHttpClient::default()),
···
293
298
handler: F,
294
299
}
295
300
296
296
-
#[cfg(not(target_arch = "wasm32"))]
297
297
-
impl StatusSphereListener {
298
298
-
pub fn new<F>(handler: F) -> Self
299
299
-
where
300
300
-
F: StatusphereIngesterTrait + Send + Sync + 'static,
301
301
-
{
302
302
-
// init the builder
303
303
-
let opts = JetstreamOptions::builder()
304
304
-
// your EXACT nsids
305
305
-
.wanted_collections(vec![
306
306
-
atproto_api::xyz::statusphere::Status::NSID.to_string(),
307
307
-
])
308
308
-
.build();
309
309
-
// create the jetstream connector
310
310
-
let jetstream = JetstreamConnection::new(opts);
311
311
-
312
312
-
// tracks the last message we've processed
313
313
-
let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
314
314
-
315
315
-
// get channels
316
316
-
let msg_rx = jetstream.get_msg_rx();
317
317
-
let reconnect_tx = jetstream.get_reconnect_tx();
318
318
-
319
319
-
// create your ingestors
320
320
-
let mut injestors: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new();
321
321
-
322
322
-
injestors.insert(
323
323
-
atproto_api::xyz::statusphere::Status::NSID.to_string(),
324
324
-
Box::new(StatusphereIngester {
325
325
-
status_sphere_agent: BotApi::new(),
326
326
-
handler,
327
327
-
}),
328
328
-
);
329
329
-
Self {
330
330
-
cursor,
331
331
-
ingestors: Arc::new(injestors),
332
332
-
jetstream_connection: jetstream,
333
333
-
msg_rx,
334
334
-
reconnect_tx,
335
335
-
}
336
336
-
}
337
337
-
338
338
-
/// Start listening to the jetstream for new statusphere updates
339
339
-
pub async fn listen(&self) -> Result<(), Error> {
340
340
-
let msg_rx = self.msg_rx.clone();
341
341
-
let ingestors = Arc::clone(&self.ingestors);
342
342
-
let reconnect_tx = self.reconnect_tx.clone();
343
343
-
let cursor = self.cursor.clone();
344
344
-
345
345
-
tokio::spawn(async move {
346
346
-
while let Ok(message) = msg_rx.recv_async().await {
347
347
-
if let Err(e) = handler::handle_message(
348
348
-
message,
349
349
-
&ingestors,
350
350
-
reconnect_tx.clone(),
351
351
-
cursor.clone(),
352
352
-
)
353
353
-
.await
354
354
-
{
355
355
-
//Just error internally since this is a template
356
356
-
eprintln!("Error processing message: {}", e);
357
357
-
};
358
358
-
}
359
359
-
});
360
360
-
361
361
-
// connect to jetstream
362
362
-
// retries internally, but may fail if there is an extreme error.
363
363
-
self.jetstream_connection
364
364
-
.connect(self.cursor.clone())
365
365
-
.await
366
366
-
.map_err(|e| Error::GeneralError(e.to_string()))
367
367
-
}
368
368
-
}
369
369
-
370
370
-
/// Shows a Jestream Listener for desktop or server side
371
371
-
#[cfg(not(target_arch = "wasm32"))]
372
372
-
#[async_trait]
373
373
-
impl<F: StatusphereIngesterTrait + Send + Sync> LexiconIngestor for StatusphereIngester<F> {
374
374
-
async fn ingest(
375
375
-
&self,
376
376
-
message: rocketman::types::event::Event<serde_json::Value>,
377
377
-
) -> anyhow::Result<()> {
378
378
-
if let Some(commit) = &message.commit {
379
379
-
let handle = self
380
380
-
.status_sphere_agent
381
381
-
.get_handle(message.did.clone())
382
382
-
.await?;
383
383
-
384
384
-
match commit.operation {
385
385
-
Operation::Create | Operation::Update => {
386
386
-
if let Some(record) = &commit.record {
387
387
-
let status_at_proto_record = serde_json::from_value::<
388
388
-
atproto_api::xyz::statusphere::status::RecordData,
389
389
-
>(record.clone())?;
390
390
-
391
391
-
if let Some(ref _cid) = commit.cid {
392
392
-
self.handler.ingest(HydratedStatus {
393
393
-
handle: Some(handle),
394
394
-
did: message.did,
395
395
-
status: status_at_proto_record.clone(),
396
396
-
})?;
397
397
-
}
398
398
-
Ok::<(), anyhow::Error>(())
399
399
-
} else {
400
400
-
Ok::<(), anyhow::Error>(())
401
401
-
}
402
402
-
}
403
403
-
Operation::Delete => Ok::<(), anyhow::Error>(()),
404
404
-
}
405
405
-
} else {
406
406
-
Ok(())
407
407
-
}
408
408
-
}
409
409
-
}
301
301
+
// #[cfg(not(target_arch = "wasm32"))]
302
302
+
// impl StatusSphereListener {
303
303
+
// pub fn new<F>(handler: F) -> Self
304
304
+
// where
305
305
+
// F: StatusphereIngesterTrait + Send + Sync + 'static,
306
306
+
// {
307
307
+
// // init the builder
308
308
+
// let opts = JetstreamOptions::builder()
309
309
+
// // your EXACT nsids
310
310
+
// .wanted_collections(vec![
311
311
+
// atproto_api::xyz::statusphere::Status::NSID.to_string(),
312
312
+
// ])
313
313
+
// .build();
314
314
+
// // create the jetstream connector
315
315
+
// let jetstream = JetstreamConnection::new(opts);
316
316
+
//
317
317
+
// // tracks the last message we've processed
318
318
+
// let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
319
319
+
//
320
320
+
// // get channels
321
321
+
// let msg_rx = jetstream.get_msg_rx();
322
322
+
// let reconnect_tx = jetstream.get_reconnect_tx();
323
323
+
//
324
324
+
// // create your ingestors
325
325
+
// let mut injestors: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new();
326
326
+
//
327
327
+
// injestors.insert(
328
328
+
// atproto_api::xyz::statusphere::Status::NSID.to_string(),
329
329
+
// Box::new(StatusphereIngester {
330
330
+
// status_sphere_agent: BotApi::new(),
331
331
+
// handler,
332
332
+
// }),
333
333
+
// );
334
334
+
// Self {
335
335
+
// cursor,
336
336
+
// ingestors: Arc::new(injestors),
337
337
+
// jetstream_connection: jetstream,
338
338
+
// msg_rx,
339
339
+
// reconnect_tx,
340
340
+
// }
341
341
+
// }
342
342
+
//
343
343
+
// /// Start listening to the jetstream for new statusphere updates
344
344
+
// pub async fn listen(&self) -> Result<(), Error> {
345
345
+
// let msg_rx = self.msg_rx.clone();
346
346
+
// let ingestors = Arc::clone(&self.ingestors);
347
347
+
// let reconnect_tx = self.reconnect_tx.clone();
348
348
+
// let cursor = self.cursor.clone();
349
349
+
//
350
350
+
// tokio::spawn(async move {
351
351
+
// while let Ok(message) = msg_rx.recv_async().await {
352
352
+
// if let Err(e) = handler::handle_message(
353
353
+
// message,
354
354
+
// &ingestors,
355
355
+
// reconnect_tx.clone(),
356
356
+
// cursor.clone(),
357
357
+
// )
358
358
+
// .await
359
359
+
// {
360
360
+
// //Just error internally since this is a template
361
361
+
// eprintln!("Error processing message: {}", e);
362
362
+
// };
363
363
+
// }
364
364
+
// });
365
365
+
//
366
366
+
// // connect to jetstream
367
367
+
// // retries internally, but may fail if there is an extreme error.
368
368
+
// self.jetstream_connection
369
369
+
// .connect(self.cursor.clone())
370
370
+
// .await
371
371
+
// .map_err(|e| Error::GeneralError(e.to_string()))
372
372
+
// }
373
373
+
// }
374
374
+
//
375
375
+
// /// Shows a Jestream Listener for desktop or server side
376
376
+
// #[cfg(not(target_arch = "wasm32"))]
377
377
+
// #[async_trait]
378
378
+
// impl<F: StatusphereIngesterTrait + Send + Sync> LexiconIngestor for StatusphereIngester<F> {
379
379
+
// async fn ingest(
380
380
+
// &self,
381
381
+
// message: rocketman::types::event::Event<serde_json::Value>,
382
382
+
// ) -> anyhow::Result<()> {
383
383
+
// if let Some(commit) = &message.commit {
384
384
+
// let handle = self
385
385
+
// .status_sphere_agent
386
386
+
// .get_handle(message.did.clone())
387
387
+
// .await?;
388
388
+
//
389
389
+
// match commit.operation {
390
390
+
// Operation::Create | Operation::Update => {
391
391
+
// if let Some(record) = &commit.record {
392
392
+
// let status_at_proto_record = serde_json::from_value::<
393
393
+
// atproto_api::xyz::statusphere::status::RecordData,
394
394
+
// >(record.clone())?;
395
395
+
//
396
396
+
// if let Some(ref _cid) = commit.cid {
397
397
+
// self.handler.ingest(HydratedStatus {
398
398
+
// handle: Some(handle),
399
399
+
// did: message.did,
400
400
+
// status: status_at_proto_record.clone(),
401
401
+
// })?;
402
402
+
// }
403
403
+
// Ok::<(), anyhow::Error>(())
404
404
+
// } else {
405
405
+
// Ok::<(), anyhow::Error>(())
406
406
+
// }
407
407
+
// }
408
408
+
// Operation::Delete => Ok::<(), anyhow::Error>(()),
409
409
+
// }
410
410
+
// } else {
411
411
+
// Ok(())
412
412
+
// }
413
413
+
// }
414
414
+
// }
410
415
411
416
#[cfg(target_arch = "wasm32")]
412
417
/// Just a simple jetstream listener with a lot less features than Rocketman but just to show it can happen and sharing code
413
413
-
pub fn listen<F>(on_message_handler: F) -> Result<(), Error>
418
418
+
pub fn listen<F>(on_messa_handler: F) -> Result<(), Error>
414
419
where
415
420
F: StatusphereIngesterTrait + 'static,
416
421
{