···11+{
22+ "$type": "app.bsky.labeler.service",
33+ "policies": {
44+ "labelValues": [
55+ "joined-nov-a",
66+ "joined-dec-a",
77+ "joined-jan-b",
88+ "joined-feb-b",
99+ "joined-mar-b",
1010+ "joined-apr-b",
1111+ "joined-may-b",
1212+ "joined-jun-b",
1313+ "joined-jul-b",
1414+ "joined-aug-b",
1515+ "joined-sep-b",
1616+ "joined-oct-b",
1717+ "joined-nov-b",
1818+ "joined-dec-b",
1919+ "joined-jan-c",
2020+ "joined-feb-c",
2121+ "joined-mar-c",
2222+ "joined-apr-c",
2323+ "joined-may-c",
2424+ "joined-jun-c",
2525+ "joined-jul-c",
2626+ "joined-aug-c",
2727+ "joined-sep-c",
2828+ "joined-oct-c",
2929+ "joined-nov-c",
3030+ "joined-dec-c",
3131+ "joined-jan-d",
3232+ "joined-feb-d",
3333+ "joined-mar-d",
3434+ "joined-apr-d",
3535+ "joined-may-d",
3636+ "joined-jun-d",
3737+ "joined-jul-d",
3838+ "joined-aug-d",
3939+ "joined-sep-d",
4040+ "joined-oct-d",
4141+ "joined-nov-d",
4242+ "joined-dec-d"
4343+ ],
4444+ "labelValueDefinitions": [
4545+ {
4646+ "blurs": "none",
4747+ "locales": [
4848+ {
4949+ "lang": "en",
5050+ "name": "Joined Nov ’22",
5151+ "description": "This profile was created in November of 2022."
5252+ }
5353+ ],
5454+ "severity": "inform",
5555+ "adultOnly": false,
5656+ "identifier": "joined-nov-a",
5757+ "defaultSetting": "warn"
5858+ },
5959+ {
6060+ "blurs": "none",
6161+ "locales": [
6262+ {
6363+ "lang": "en",
6464+ "name": "Joined Dec ’22",
6565+ "description": "This profile was created in December of 2022."
6666+ }
6767+ ],
6868+ "severity": "inform",
6969+ "adultOnly": false,
7070+ "identifier": "joined-dec-a",
7171+ "defaultSetting": "warn"
7272+ },
7373+ {
7474+ "blurs": "none",
7575+ "locales": [
7676+ {
7777+ "lang": "en",
7878+ "name": "Joined Jan ’23",
7979+ "description": "This profile was created in January of 2023."
8080+ }
8181+ ],
8282+ "severity": "inform",
8383+ "adultOnly": false,
8484+ "identifier": "joined-jan-b",
8585+ "defaultSetting": "warn"
8686+ },
8787+ {
8888+ "blurs": "none",
8989+ "locales": [
9090+ {
9191+ "lang": "en",
9292+ "name": "Joined Feb ’23",
9393+ "description": "This profile was created in February of 2023."
9494+ }
9595+ ],
9696+ "severity": "inform",
9797+ "adultOnly": false,
9898+ "identifier": "joined-feb-b",
9999+ "defaultSetting": "warn"
100100+ },
101101+ {
102102+ "blurs": "none",
103103+ "locales": [
104104+ {
105105+ "lang": "en",
106106+ "name": "Joined Mar ’23",
107107+ "description": "This profile was created in March of 2023."
108108+ }
109109+ ],
110110+ "severity": "inform",
111111+ "adultOnly": false,
112112+ "identifier": "joined-mar-b",
113113+ "defaultSetting": "warn"
114114+ },
115115+ {
116116+ "blurs": "none",
117117+ "locales": [
118118+ {
119119+ "lang": "en",
120120+ "name": "Joined Apr ’23",
121121+ "description": "This profile was created in April of 2023."
122122+ }
123123+ ],
124124+ "severity": "inform",
125125+ "adultOnly": false,
126126+ "identifier": "joined-apr-b",
127127+ "defaultSetting": "warn"
128128+ },
129129+ {
130130+ "blurs": "none",
131131+ "locales": [
132132+ {
133133+ "lang": "en",
134134+ "name": "Joined May ’23",
135135+ "description": "This profile was created in May of 2023."
136136+ }
137137+ ],
138138+ "severity": "inform",
139139+ "adultOnly": false,
140140+ "identifier": "joined-may-b",
141141+ "defaultSetting": "warn"
142142+ },
143143+ {
144144+ "blurs": "none",
145145+ "locales": [
146146+ {
147147+ "lang": "en",
148148+ "name": "Joined Jun ’23",
149149+ "description": "This profile was created in June of 2023."
150150+ }
151151+ ],
152152+ "severity": "inform",
153153+ "adultOnly": false,
154154+ "identifier": "joined-jun-b",
155155+ "defaultSetting": "warn"
156156+ },
157157+ {
158158+ "blurs": "none",
159159+ "locales": [
160160+ {
161161+ "lang": "en",
162162+ "name": "Joined Jul ’23",
163163+ "description": "This profile was created in July of 2023."
164164+ }
165165+ ],
166166+ "severity": "inform",
167167+ "adultOnly": false,
168168+ "identifier": "joined-jul-b",
169169+ "defaultSetting": "warn"
170170+ },
171171+ {
172172+ "blurs": "none",
173173+ "locales": [
174174+ {
175175+ "lang": "en",
176176+ "name": "Joined Aug ’23",
177177+ "description": "This profile was created in August of 2023."
178178+ }
179179+ ],
180180+ "severity": "inform",
181181+ "adultOnly": false,
182182+ "identifier": "joined-aug-b",
183183+ "defaultSetting": "warn"
184184+ },
185185+ {
186186+ "blurs": "none",
187187+ "locales": [
188188+ {
189189+ "lang": "en",
190190+ "name": "Joined Sep ’23",
191191+ "description": "This profile was created in September of 2023."
192192+ }
193193+ ],
194194+ "severity": "inform",
195195+ "adultOnly": false,
196196+ "identifier": "joined-sep-b",
197197+ "defaultSetting": "warn"
198198+ },
199199+ {
200200+ "blurs": "none",
201201+ "locales": [
202202+ {
203203+ "lang": "en",
204204+ "name": "Joined Oct ’23",
205205+ "description": "This profile was created in October of 2023."
206206+ }
207207+ ],
208208+ "severity": "inform",
209209+ "adultOnly": false,
210210+ "identifier": "joined-oct-b",
211211+ "defaultSetting": "warn"
212212+ },
213213+ {
214214+ "blurs": "none",
215215+ "locales": [
216216+ {
217217+ "lang": "en",
218218+ "name": "Joined Nov ’23",
219219+ "description": "This profile was created in November of 2023."
220220+ }
221221+ ],
222222+ "severity": "inform",
223223+ "adultOnly": false,
224224+ "identifier": "joined-nov-b",
225225+ "defaultSetting": "warn"
226226+ },
227227+ {
228228+ "blurs": "none",
229229+ "locales": [
230230+ {
231231+ "lang": "en",
232232+ "name": "Joined Dec ’23",
233233+ "description": "This profile was created in December of 2023."
234234+ }
235235+ ],
236236+ "severity": "inform",
237237+ "adultOnly": false,
238238+ "identifier": "joined-dec-b",
239239+ "defaultSetting": "warn"
240240+ },
241241+ {
242242+ "blurs": "none",
243243+ "locales": [
244244+ {
245245+ "lang": "en",
246246+ "name": "Joined Jan ’24",
247247+ "description": "This profile was created in January of 2024."
248248+ }
249249+ ],
250250+ "severity": "inform",
251251+ "adultOnly": false,
252252+ "identifier": "joined-jan-c",
253253+ "defaultSetting": "warn"
254254+ },
255255+ {
256256+ "blurs": "none",
257257+ "locales": [
258258+ {
259259+ "lang": "en",
260260+ "name": "Joined Feb ’24",
261261+ "description": "This profile was created in February of 2024."
262262+ }
263263+ ],
264264+ "severity": "inform",
265265+ "adultOnly": false,
266266+ "identifier": "joined-feb-c",
267267+ "defaultSetting": "warn"
268268+ },
269269+ {
270270+ "blurs": "none",
271271+ "locales": [
272272+ {
273273+ "lang": "en",
274274+ "name": "Joined Mar ’24",
275275+ "description": "This profile was created in March of 2024."
276276+ }
277277+ ],
278278+ "severity": "inform",
279279+ "adultOnly": false,
280280+ "identifier": "joined-mar-c",
281281+ "defaultSetting": "warn"
282282+ },
283283+ {
284284+ "blurs": "none",
285285+ "locales": [
286286+ {
287287+ "lang": "en",
288288+ "name": "Joined Apr ’24",
289289+ "description": "This profile was created in April of 2024."
290290+ }
291291+ ],
292292+ "severity": "inform",
293293+ "adultOnly": false,
294294+ "identifier": "joined-apr-c",
295295+ "defaultSetting": "warn"
296296+ },
297297+ {
298298+ "blurs": "none",
299299+ "locales": [
300300+ {
301301+ "lang": "en",
302302+ "name": "Joined May ’24",
303303+ "description": "This profile was created in May of 2024."
304304+ }
305305+ ],
306306+ "severity": "inform",
307307+ "adultOnly": false,
308308+ "identifier": "joined-may-c",
309309+ "defaultSetting": "warn"
310310+ },
311311+ {
312312+ "blurs": "none",
313313+ "locales": [
314314+ {
315315+ "lang": "en",
316316+ "name": "Joined Jun ’24",
317317+ "description": "This profile was created in June of 2024."
318318+ }
319319+ ],
320320+ "severity": "inform",
321321+ "adultOnly": false,
322322+ "identifier": "joined-jun-c",
323323+ "defaultSetting": "warn"
324324+ },
325325+ {
326326+ "blurs": "none",
327327+ "locales": [
328328+ {
329329+ "lang": "en",
330330+ "name": "Joined Jul ’24",
331331+ "description": "This profile was created in July of 2024."
332332+ }
333333+ ],
334334+ "severity": "inform",
335335+ "adultOnly": false,
336336+ "identifier": "joined-jul-c",
337337+ "defaultSetting": "warn"
338338+ },
339339+ {
340340+ "blurs": "none",
341341+ "locales": [
342342+ {
343343+ "lang": "en",
344344+ "name": "Joined Aug ’24",
345345+ "description": "This profile was created in August of 2024."
346346+ }
347347+ ],
348348+ "severity": "inform",
349349+ "adultOnly": false,
350350+ "identifier": "joined-aug-c",
351351+ "defaultSetting": "warn"
352352+ },
353353+ {
354354+ "blurs": "none",
355355+ "locales": [
356356+ {
357357+ "lang": "en",
358358+ "name": "Joined Sep ’24",
359359+ "description": "This profile was created in September of 2024."
360360+ }
361361+ ],
362362+ "severity": "inform",
363363+ "adultOnly": false,
364364+ "identifier": "joined-sep-c",
365365+ "defaultSetting": "warn"
366366+ },
367367+ {
368368+ "blurs": "none",
369369+ "locales": [
370370+ {
371371+ "lang": "en",
372372+ "name": "Joined Oct ’24",
373373+ "description": "This profile was created in October of 2024."
374374+ }
375375+ ],
376376+ "severity": "inform",
377377+ "adultOnly": false,
378378+ "identifier": "joined-oct-c",
379379+ "defaultSetting": "warn"
380380+ },
381381+ {
382382+ "blurs": "none",
383383+ "locales": [
384384+ {
385385+ "lang": "en",
386386+ "name": "Joined Nov ’24",
387387+ "description": "This profile was created in November of 2024."
388388+ }
389389+ ],
390390+ "severity": "inform",
391391+ "adultOnly": false,
392392+ "identifier": "joined-nov-c",
393393+ "defaultSetting": "warn"
394394+ },
395395+ {
396396+ "blurs": "none",
397397+ "locales": [
398398+ {
399399+ "lang": "en",
400400+ "name": "Joined Dec ’24",
401401+ "description": "This profile was created in December of 2024."
402402+ }
403403+ ],
404404+ "severity": "inform",
405405+ "adultOnly": false,
406406+ "identifier": "joined-dec-c",
407407+ "defaultSetting": "warn"
408408+ },
409409+ {
410410+ "blurs": "none",
411411+ "locales": [
412412+ {
413413+ "lang": "en",
414414+ "name": "Joined Jan ’25",
415415+ "description": "This profile was created in January of 2025."
416416+ }
417417+ ],
418418+ "severity": "inform",
419419+ "adultOnly": false,
420420+ "identifier": "joined-jan-d",
421421+ "defaultSetting": "warn"
422422+ },
423423+ {
424424+ "blurs": "none",
425425+ "locales": [
426426+ {
427427+ "lang": "en",
428428+ "name": "Joined Feb ’25",
429429+ "description": "This profile was created in February of 2025."
430430+ }
431431+ ],
432432+ "severity": "inform",
433433+ "adultOnly": false,
434434+ "identifier": "joined-feb-d",
435435+ "defaultSetting": "warn"
436436+ },
437437+ {
438438+ "blurs": "none",
439439+ "locales": [
440440+ {
441441+ "lang": "en",
442442+ "name": "Joined Mar ’25",
443443+ "description": "This profile was created in March of 2025."
444444+ }
445445+ ],
446446+ "severity": "inform",
447447+ "adultOnly": false,
448448+ "identifier": "joined-mar-d",
449449+ "defaultSetting": "warn"
450450+ },
451451+ {
452452+ "blurs": "none",
453453+ "locales": [
454454+ {
455455+ "lang": "en",
456456+ "name": "Joined Apr ’25",
457457+ "description": "This profile was created in April of 2025."
458458+ }
459459+ ],
460460+ "severity": "inform",
461461+ "adultOnly": false,
462462+ "identifier": "joined-apr-d",
463463+ "defaultSetting": "warn"
464464+ },
465465+ {
466466+ "blurs": "none",
467467+ "locales": [
468468+ {
469469+ "lang": "en",
470470+ "name": "Joined May ’25",
471471+ "description": "This profile was created in May of 2025."
472472+ }
473473+ ],
474474+ "severity": "inform",
475475+ "adultOnly": false,
476476+ "identifier": "joined-may-d",
477477+ "defaultSetting": "warn"
478478+ },
479479+ {
480480+ "blurs": "none",
481481+ "locales": [
482482+ {
483483+ "lang": "en",
484484+ "name": "Joined Jun ’25",
485485+ "description": "This profile was created in June of 2025."
486486+ }
487487+ ],
488488+ "severity": "inform",
489489+ "adultOnly": false,
490490+ "identifier": "joined-jun-d",
491491+ "defaultSetting": "warn"
492492+ },
493493+ {
494494+ "blurs": "none",
495495+ "locales": [
496496+ {
497497+ "lang": "en",
498498+ "name": "Joined Jul ’25",
499499+ "description": "This profile was created in July of 2025."
500500+ }
501501+ ],
502502+ "severity": "inform",
503503+ "adultOnly": false,
504504+ "identifier": "joined-jul-d",
505505+ "defaultSetting": "warn"
506506+ },
507507+ {
508508+ "blurs": "none",
509509+ "locales": [
510510+ {
511511+ "lang": "en",
512512+ "name": "Joined Aug ’25",
513513+ "description": "This profile was created in August of 2025."
514514+ }
515515+ ],
516516+ "severity": "inform",
517517+ "adultOnly": false,
518518+ "identifier": "joined-aug-d",
519519+ "defaultSetting": "warn"
520520+ },
521521+ {
522522+ "blurs": "none",
523523+ "locales": [
524524+ {
525525+ "lang": "en",
526526+ "name": "Joined Sep ’25",
527527+ "description": "This profile was created in September of 2025."
528528+ }
529529+ ],
530530+ "severity": "inform",
531531+ "adultOnly": false,
532532+ "identifier": "joined-sep-d",
533533+ "defaultSetting": "warn"
534534+ },
535535+ {
536536+ "blurs": "none",
537537+ "locales": [
538538+ {
539539+ "lang": "en",
540540+ "name": "Joined Oct ’25",
541541+ "description": "This profile was created in October of 2025."
542542+ }
543543+ ],
544544+ "severity": "inform",
545545+ "adultOnly": false,
546546+ "identifier": "joined-oct-d",
547547+ "defaultSetting": "warn"
548548+ },
549549+ {
550550+ "blurs": "none",
551551+ "locales": [
552552+ {
553553+ "lang": "en",
554554+ "name": "Joined Nov ’25",
555555+ "description": "This profile was created in November of 2025."
556556+ }
557557+ ],
558558+ "severity": "inform",
559559+ "adultOnly": false,
560560+ "identifier": "joined-nov-d",
561561+ "defaultSetting": "warn"
562562+ },
563563+ {
564564+ "blurs": "none",
565565+ "locales": [
566566+ {
567567+ "lang": "en",
568568+ "name": "Joined Dec ’25",
569569+ "description": "This profile was created in December of 2025."
570570+ }
571571+ ],
572572+ "severity": "inform",
573573+ "adultOnly": false,
574574+ "identifier": "joined-dec-d",
575575+ "defaultSetting": "warn"
576576+ }
577577+ ]
578578+ },
579579+ "createdAt": "2025-02-10T06:08:05.000Z"
580580+}
+14
src/bin/database.rs
···11+//! Main entrypoint.
22+33+#[tokio::main(flavor = "current_thread")]
44+async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
55+ let subscriber = tracing_subscriber::fmt()
66+ .compact() // Use a more compact, abbreviated log format
77+ .with_file(true) // Display source code file paths
88+ .with_line_number(true) // Display source code line numbers
99+ .with_thread_ids(true) // Display the thread ID an event was recorded on
1010+ .with_target(false) // Don't display the event's target (module path)
1111+ .finish(); // Build the subscriber
1212+ tracing::subscriber::set_global_default(subscriber)?;
1313+ atproto_teq::database::main_database().await
1414+}
+23
src/bin/jetstream.rs
···11+//! Main entrypoint.
22+33+#[tokio::main(flavor = "current_thread")]
44+async fn main() -> Result<(), Box<dyn std::error::Error>> {
55+ let subscriber = tracing_subscriber::fmt()
66+ .compact() // Use a more compact, abbreviated log format
77+ .with_file(true) // Display source code file paths
88+ .with_line_number(true) // Display source code line numbers
99+ .with_thread_ids(true) // Display the thread ID an event was recorded on
1010+ .with_target(false) // Don't display the event's target (module path)
1111+ .finish(); // Build the subscriber
1212+ tracing::subscriber::set_global_default(subscriber)?;
1313+ loop {
1414+ if let Err(e) = atproto_teq::jetstream::main_jetstream().await {
1515+ tracing::error!("Error in main_jetstream: {:?}", e);
1616+ break;
1717+ } else {
1818+ tracing::info!("Restarting main_jetstream");
1919+ tokio::time::sleep(tokio::time::Duration::from_secs(15)).await;
2020+ }
2121+ }
2222+ Ok(())
2323+}
+24
src/bin/negation.rs
···11+//! Main entrypoint.
22+33+use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePool};
44+use std::str::FromStr;
55+66+#[tokio::main(flavor = "current_thread")]
77+async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
88+ let subscriber = tracing_subscriber::fmt()
99+ .compact() // Use a more compact, abbreviated log format
1010+ .with_file(true) // Display source code file paths
1111+ .with_line_number(true) // Display source code line numbers
1212+ .with_thread_ids(true) // Display the thread ID an event was recorded on
1313+ .with_target(false) // Don't display the event's target (module path)
1414+ .finish(); // Build the subscriber
1515+ tracing::subscriber::set_global_default(subscriber)?;
1616+1717+ const NEGATION_ID: &str = "did:plc:"; // TODO: Argumentize this.
1818+ let mut agent = atproto_teq::webrequest::Agent::default();
1919+ let pool_opts = SqliteConnectOptions::from_str("sqlite://prod.db").expect("Expected to be able to configure the database, but failed.")
2020+ .journal_mode(SqliteJournalMode::Wal);
2121+ let pool = SqlitePool::connect_with(pool_opts).await.expect("Expected to be able to connect to the database at sqlite://prod.db but failed.");
2222+ atproto_teq::database::negation(NEGATION_ID, &mut agent, &pool).await?;
2323+ Ok(())
2424+}
+16
src/bin/webserve.rs
···11+//! Main entrypoint.
22+#![expect(let_underscore_drop)]
33+44+#[tokio::main(flavor = "current_thread")]
55+async fn main() -> Result<(), Box<dyn std::error::Error>> {
66+ let subscriber = tracing_subscriber::fmt()
77+ .compact() // Use a more compact, abbreviated log format
88+ .with_file(true) // Display source code file paths
99+ .with_line_number(true) // Display source code line numbers
1010+ .with_thread_ids(true) // Display the thread ID an event was recorded on
1111+ .with_target(false) // Don't display the event's target (module path)
1212+ .finish(); // Build the subscriber
1313+ tracing::subscriber::set_global_default(subscriber)?;
1414+ let _ = tokio::spawn(atproto_teq::webserve::main_webserve()).await;
1515+ Ok(())
1616+}
+142
src/crypto.rs
···11+//! # Crypto
22+//! Sign messages using the secp256k1 elliptic curve algorithm.
33+44+use secp256k1::{Secp256k1, Message, SecretKey, PublicKey};
55+use secp256k1::hashes::{sha256, Hash};
66+use std::str::FromStr;
77+88+use crate::types::{AssignedLabelResponse, RetrievedLabelResponse, SignatureBytes, SignatureEnum};
99+1010+1111+/// Cryptographic signing and verification.
1212+pub struct Crypto {
1313+ private_key: SecretKey,
1414+ _public_key: PublicKey,
1515+}
1616+impl Default for Crypto {
1717+ fn default() -> Self {
1818+ Self::new()
1919+ }
2020+}
2121+impl Crypto {
2222+ /// Create a new `Crypto` instance.
2323+ pub fn new() -> Self {
2424+ let secp = Secp256k1::new();
2525+ drop(dotenvy::dotenv().expect("Failed to load .env file"));
2626+ let private_key_hex = std::env::var("PRIVATE_KEY_HEX").expect("Expected to be able to get a private key from the environment, but failed");
2727+ let private_key_vec = hex::decode(private_key_hex).expect("Expected to be able to decode a hex string, but failed");
2828+ let private_key_array: [u8; 32] = private_key_vec.as_slice().try_into().expect("Expected 32 bytes, within curve order, but failed");
2929+ let private_key = SecretKey::from_slice(&private_key_array).expect("Expected 32 bytes, within curve order, but failed");
3030+ let public_key = PublicKey::from_secret_key(&secp, &private_key);
3131+ Self {
3232+ private_key,
3333+ _public_key: public_key,
3434+ }
3535+ }
3636+ /// Create a new `Crypto` instance from a slice.
3737+ pub fn from_slice(slice: &[u8]) -> Self {
3838+ let secp = Secp256k1::new();
3939+ let private_key = SecretKey::from_slice(slice).expect("Expected 32 bytes, within curve order, but failed");
4040+ let public_key = PublicKey::from_secret_key(&secp, &private_key);
4141+ Self {
4242+ private_key,
4343+ _public_key: public_key,
4444+ }
4545+ }
4646+ /// Sign a message.
4747+ #[expect(clippy::cognitive_complexity)]
4848+ pub fn sign(&self, label: &mut AssignedLabelResponse) {
4949+ let secp = Secp256k1::new();
5050+ let label_for_serialization = RetrievedLabelResponse {
5151+ cts: label.cts.clone(),
5252+ neg: label.neg,
5353+ src: label.src.clone(),
5454+ uri: label.uri.clone(),
5555+ val: label.val.clone(),
5656+ ver: label.ver,
5757+ };
5858+ tracing::debug!("Label for serialization: {:?}", label_for_serialization);
5959+ label.sig = None;
6060+ // let label_json = serde_json::to_string(&label_for_serialization).unwrap();
6161+ // let digest = sha256::Hash::hash(msg.as_bytes());
6262+ // let message = Message::from_digest(digest.to_byte_array());
6363+ let label_cbor = serde_cbor::to_vec(&label_for_serialization).expect("Expected to be able to serialize a label, but failed");
6464+ tracing::debug!("Label CBOR: {:?}", label_cbor);
6565+ // decode the cbor we just made
6666+ let label_decoded: RetrievedLabelResponse = serde_cbor::from_slice(&label_cbor).expect("Expected to be able to deserialize a label, but failed");
6767+ tracing::debug!("Label decoded: {:?}", label_decoded);
6868+ let digest = sha256::Hash::hash(&label_cbor);
6969+ let message = Message::from_digest(digest.to_byte_array());
7070+ let sig = secp.sign_ecdsa(&message, &self.private_key);
7171+ // verify the sig we just made:
7272+ let verified = secp.verify_ecdsa(&message, &sig, &self._public_key);
7373+ assert!(verified.is_ok());
7474+ tracing::debug!("Verified: {:?}", verified);
7575+ tracing::debug!("Message: {:?}", message);
7676+ tracing::debug!("Signature: {:?}", sig);
7777+ tracing::debug!("Public key: {:?}", self._public_key);
7878+ // let serialized_sig = sig.serialize_der();
7979+ // return raw 64 byte sig not DER-encoded
8080+ let serialized_sig: [u8; 64] = sig.serialize_compact();
8181+ // serialized_sig.to_vec()
8282+ label.sig = Some(SignatureEnum::Bytes(SignatureBytes::from_bytes(serialized_sig)));
8383+ }
8484+ /// Consume a label response and validate the signature.
8585+ #[expect(clippy::cognitive_complexity)]
8686+ pub fn validate(&self,
8787+ label: RetrievedLabelResponse,
8888+ sig: &str,
8989+ public_key_string: &str, // multibase-encoded string
9090+ ) -> bool {
9191+ tracing::debug!("Retrieved label: {:?}", label);
9292+ // let public_key_vec = hex::decode(public_key_string).unwrap();
9393+ // When encoding public keys as strings, the preferred representation uses multibase (with base58btc specifically) and a multicode prefix to indicate the specific key type. By embedding metadata about the type of key in the encoding itself, they can be parsed unambiguously.
9494+ // The process for encoding a public key in this format is:
9595+ // Encode the public key curve "point" as bytes. Be sure to use the smaller "compact" or "compressed" representation.
9696+ // Prepend the appropriate curve multicodec value, as varint-encoded bytes, in front of the key bytes
9797+ // p256 (compressed, 33 byte key length): p256-pub, code 0x1200, varint-encoded bytes: [0x80, 0x24]
9898+ // k256 (compressed, 33 byte key length): secp256k1-pub, code 0xE7, varint bytes: [0xE7, 0x01]
9999+ // Encode the combined bytes with with base58btc, and prefix with a z character, yielding a multibase-encoded string
100100+ // The decoding process is the same in reverse, using the identified curve type as context.
101101+ let public_key_string = public_key_string.strip_prefix("z").expect("Expected to be able to strip a prefix, but failed");
102102+ let public_key_vec = bs58::decode(public_key_string).into_vec().expect("Expected to be able to decode a base58 string, but failed");
103103+ // // Remove the multicodec prefix
104104+ // let public_key_vec = public_key_vec[2..].to_vec();
105105+ // Determine which curve the key is for
106106+ match public_key_vec[0] {
107107+ 0x80 => {
108108+ tracing::debug!("p256");
109109+ // p256
110110+ },
111111+ 0xE7 => {
112112+ tracing::debug!("k256");
113113+ // k256
114114+ },
115115+ _ => {
116116+ panic!("Unknown curve");
117117+ },
118118+ };
119119+ let public_key_vec = public_key_vec[2..].to_vec();
120120+121121+ let public_key_array: [u8; 33] = public_key_vec.as_slice().try_into().expect("Expected 33 bytes, within curve order, but failed");
122122+ let public_key = PublicKey::from_slice(&public_key_array).expect("Expected 33 bytes, within curve order, but failed");
123123+ // use of the "low-S" signature variant is required
124124+ // let secp = Secp256k1::new();
125125+ let secp = Secp256k1::verification_only();
126126+ // let label_json = serde_json::to_string(&label).unwrap();
127127+ // tracing::debug!("Label JSON: {:?}", label_json);
128128+ let label_cbor = serde_cbor::to_vec(&label).expect("Expected to be able to serialize a label, but failed");
129129+ let digest = sha256::Hash::hash(&label_cbor);
130130+ tracing::debug!("Digest: {:?}", digest);
131131+ let message = Message::from_digest(digest.to_byte_array());
132132+ tracing::debug!("Signature: {:?}", sig);
133133+ let sig = SignatureBytes::from_str(sig).expect("Expected to be able to parse a signature from a string, but failed");
134134+ tracing::debug!("Signature bytes: {:?}", sig);
135135+ let signature = secp256k1::ecdsa::Signature::from_compact(&sig.as_vec()).expect("Expected to be able to parse a signature from a byte array, but failed");
136136+ tracing::debug!("Message: {:?}", message);
137137+ tracing::debug!("Signature: {:?}", signature);
138138+ tracing::debug!("Public key: {:?}", public_key);
139139+ secp.verify_ecdsa(&message, &signature, &public_key).is_ok()
140140+ }
141141+142142+}
+262
src/database.rs
···11+//! This module is responsible for handling the database operations.
22+33+use sqlx::{sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePool}, Executor};
44+use std::str::FromStr;
55+66+use crate::{types, webrequest::Agent};
77+88+/// The main function for the database module.
99+#[tracing::instrument]
1010+pub async fn main_database() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1111+ const STATE: u8 = 0;
1212+ let pool_opts = SqliteConnectOptions::from_str("sqlite://prod.db").expect("Expected to be able to configure the database, but failed.")
1313+ .journal_mode(SqliteJournalMode::Wal);
1414+ let pool = SqlitePool::connect_with(pool_opts).await.expect("Expected to be able to connect to the database at sqlite://prod.db but failed.");
1515+ match STATE {
1616+ 0 => {
1717+ if initialize_database(&pool).await.is_err() {
1818+ tracing::debug!("Database already initialized");
1919+ }
2020+ },
2121+ 1 => {
2222+ validate_labels(&mut Agent::default(), &pool).await?
2323+ },
2424+ _ => (),
2525+ }
2626+ Ok(())
2727+}
2828+async fn initialize_database(pool: &SqlitePool) -> Result<(), sqlx::Error> {
2929+ tracing::debug!("Initializing database");
3030+ let mut connection = pool.acquire().await?;
3131+ _ = connection.execute("PRAGMA foreign_keys=on").await?;
3232+ _ = connection
3333+ .execute("CREATE TABLE profile (did STRING PRIMARY KEY)")
3434+ .await?;
3535+ _ = connection
3636+ .execute(
3737+ "CREATE TABLE profile_stats (
3838+ did STRING PRIMARY KEY,
3939+ created_at DATETIME NOT NULL,
4040+ follower_count INTEGER NOT NULL,
4141+ post_count INTEGER NOT NULL,
4242+ checked_at DATETIME NOT NULL,
4343+ FOREIGN KEY(did) REFERENCES profile (did)
4444+ )",
4545+ )
4646+ .await?;
4747+ _ = connection
4848+ .execute(
4949+ "CREATE TABLE profile_labels (
5050+ seq INTEGER PRIMARY KEY AUTOINCREMENT,
5151+ uri STRING NOT NULL,
5252+ cid STRING,
5353+ val STRING NOT NULL,
5454+ neg BOOLEAN,
5555+ cts DATETIME NOT NULL,
5656+ exp DATETIME,
5757+ sig BLOB NOT NULL,
5858+ FOREIGN KEY(uri) REFERENCES profile (did)
5959+ )",
6060+ )
6161+ .await?;
6262+ tracing::info!("Database initialized");
6363+ Ok(())
6464+}
6565+/// Negate a label by its DID.
6666+pub async fn negation(negation_id: &str, agent: &mut Agent, pool: &SqlitePool) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
6767+ let existing_label = sqlx::query!(
6868+ r#"SELECT seq FROM profile_labels
6969+ WHERE uri = ? AND neg is not true
7070+ "#,
7171+ negation_id,
7272+ ).fetch_optional(pool)
7373+ .await.expect("Expected to be able to fetch the label, but failed.");
7474+ tracing::info!("Existing label: {:?}", existing_label);
7575+ if let Some(label) = existing_label {
7676+ tracing::info!("Removing label for {}", negation_id);
7777+ drop(types::Profile::remove_label(pool, label.seq).await);
7878+ }
7979+ tracing::info!("Inserting negate for {}", negation_id);
8080+ drop(types::Profile::new(negation_id).determine_stats(agent, pool).await.negate_label(pool).await);
8181+ Ok(())
8282+}
8383+/// Iterate over all rows of profile_labels and validate the labels.
8484+async fn validate_labels(agent: &mut Agent, pool: &SqlitePool) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
8585+ // // cleanup duplicates
8686+ // let uris_with_duplicates = sqlx::query!(
8787+ // r#"SELECT uri FROM profile_labels
8888+ // GROUP BY uri
8989+ // HAVING COUNT(*) > 1
9090+ // "#,
9191+ // ).fetch_all(pool)
9292+ // .await.expect("Expected to be able to fetch all labels, but failed.");
9393+ // for uri in uris_with_duplicates {
9494+ // let seqs = sqlx::query!(
9595+ // r#"SELECT seq FROM profile_labels
9696+ // WHERE uri = ?
9797+ // ORDER BY seq ASC
9898+ // "#,
9999+ // uri.uri,
100100+ // ).fetch_all(pool)
101101+ // .await.expect("Expected to be able to fetch all labels, but failed.");
102102+ // let mut first = true;
103103+ // for seq in seqs {
104104+ // if first {
105105+ // first = false;
106106+ // continue;
107107+ // }
108108+ // let seq = seq.seq;
109109+ // let _ = sqlx::query!(
110110+ // r#"DELETE FROM profile_labels
111111+ // WHERE seq = ?
112112+ // "#,
113113+ // seq,
114114+ // ).execute(pool).await.expect("Expected to be able to delete the label, but failed.");
115115+ // }
116116+ // }
117117+ let mut valid = 0;
118118+ let mut invalid = 0;
119119+ let mut invalid_list = Vec::new();
120120+ let mut unreachable = 0;
121121+ let start_from_seq = 222_081; // TODO: argumentize this
122122+ let go_to_seq = 222_144; // TODO: argumentize this
123123+ let labels = sqlx::query!(
124124+ r#"SELECT seq, uri "uri: String", neg FROM profile_labels
125125+ WHERE seq > ? AND seq < ?
126126+ "#,
127127+ start_from_seq,
128128+ go_to_seq,
129129+ )
130130+ .fetch_all(pool)
131131+ .await.expect("Expected to be able to fetch all labels, but failed.");
132132+ let num_of_labels = labels.len();
133133+ tracing::info!("Validating {} labels", num_of_labels);
134134+ let mut count: u32 = 0;
135135+ // We'll be collecting 25 labels at a time, and then checking them with get_profiles, to avoid rate limiting.
136136+ let mut label_list = Vec::new();
137137+ const FETCH_AMOUNT: usize = 25;
138138+ for label in labels {
139139+ if label.neg == Some(true) {
140140+ tracing::info!("Skipping negative label seq {} for https://bsky.app/profile/{}", label.seq, label.uri);
141141+ continue;
142142+ }
143143+ label_list.push((label.uri, label.seq));
144144+ if label_list.len() == FETCH_AMOUNT {
145145+ let fetched_labels = agent.check_profiles(label_list.as_slice()).await.expect("Expected to be able to check the profiles, but failed.");
146146+ if fetched_labels.len() != FETCH_AMOUNT {
147147+ tracing::warn!("Expected to get {} labels, but got {}", FETCH_AMOUNT, fetched_labels.len());
148148+ unreachable += FETCH_AMOUNT - fetched_labels.len();
149149+ // TODO: Figure out which ones are missing
150150+ }
151151+ for fetched_label in fetched_labels.iter() {
152152+ let uri = fetched_label.1.0.as_str();
153153+ let seq = fetched_label.1.1;
154154+ count += 1;
155155+ if count % 100 == 0 {
156156+ tracing::info!("Validating label count {}, seq {}", count, seq);
157157+ }
158158+ if fetched_label.0 {
159159+ tracing::debug!("Valid label seq {} for https://bsky.app/profile/{}", seq, uri);
160160+ valid += 1;
161161+ } else {
162162+ tracing::warn!("Invalid label seq {} for https://bsky.app/profile/{}", seq, uri);
163163+ invalid += 1;
164164+ invalid_list.push(format!("https://bsky.app/profile/{}", uri));
165165+ // let mut profile = types::Profile::new(uri);
166166+ // let spawned_pool = pool.clone();
167167+ // // drop(tokio::spawn(async move {
168168+ // if types::Profile::remove_label(&spawned_pool, seq).await.is_ok()
169169+ // && profile.determine_stats_exist(&spawned_pool.clone()).await.expect(
170170+ // "Expected to be able to determine if stats exist, but failed.").is_some() {
171171+ // _ = profile.determine_label(&spawned_pool).await;
172172+ // tracing::debug!("Label removed and profile revalidated for https://bsky.app/profile/{}", uri);
173173+ // } else {
174174+ // tracing::warn!("Failed to remove label {} for https://bsky.app/profile/{}", seq, uri);
175175+ // }
176176+ // }));
177177+ }
178178+ // {
179179+ // tracing::warn!("Failed to get profile https://bsky.app/profile/{}", uri);
180180+ // unreachable += 1;
181181+ // let spawned_pool = pool.clone();
182182+ // // drop(tokio::spawn(async move {
183183+ // if types::Profile::remove_label(&spawned_pool, seq).await.is_err() {
184184+ // tracing::warn!("Failed to remove label {} for https://bsky.app/profile/{}", seq, uri);
185185+ // }
186186+ // // }));
187187+ // }
188188+ }
189189+ label_list.clear();
190190+ // tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
191191+ }
192192+ }
193193+194194+ // for label in labels {
195195+ // tokio::time::sleep(tokio::time::Duration::from_millis(25)).await; // Sleep for 25ms to throttle rate limiting
196196+ // count += 1;
197197+ // if count % 100 == 0 {
198198+ // tracing::info!("Validating label count {}, seq {}", count, label.seq);
199199+ // }
200200+ // tracing::debug!("Validating label seq {} for {}", label.seq, label.uri);
201201+ // let valid_label = agent.check_profile(&label.uri).await;
202202+ // if valid_label.is_ok() {
203203+ // if valid_label.expect("Expected to be able to check the profile, but failed.") {
204204+ // valid += 1;
205205+ // } else {
206206+ // tracing::warn!("Invalid label seq {} for https://bsky.app/profile/{}", label.seq, label.uri);
207207+ // invalid += 1;
208208+ // invalid_list.push(format!("https://bsky.app/profile/{}", label.uri));
209209+ // let mut profile = crate::types::Profile::new(&label.uri);
210210+ // let spawned_pool = pool.clone();
211211+ // drop(tokio::spawn(async move {
212212+ // if crate::types::Profile::remove_label(&spawned_pool, label.seq).await.is_ok()
213213+ // && profile.determine_stats_exist(&spawned_pool.clone()).await.expect(
214214+ // "Expected to be able to determine if stats exist, but failed.").is_some() {
215215+ // _ = profile.determine_label(&spawned_pool).await;
216216+ // tracing::debug!("Label removed and profile revalidated for https://bsky.app/profile/{}", label.uri);
217217+ // } else {
218218+ // tracing::warn!("Failed to remove label {} for https://bsky.app/profile/{}", label.seq, label.uri);
219219+ // }
220220+ // }));
221221+ // }
222222+ // } else {
223223+ // let valid_label2 = agent.check_profile(&label.uri).await;
224224+ // if valid_label2.is_ok() {
225225+ // if valid_label2.expect("Expected to be able to check the profile, but failed.") {
226226+ // tracing::warn!("Had to retry for profile https://bsky.app/profile/{}", label.uri);
227227+ // valid += 1;
228228+ // } else {
229229+ // tracing::warn!("Invalid label {} for https://bsky.app/profile/{}", label.seq, label.uri);
230230+ // invalid += 1;
231231+ // invalid_list.push(format!("https://bsky.app/profile/{}", label.uri));
232232+ // let mut profile = crate::types::Profile::new(&label.uri);
233233+ // let spawned_pool = pool.clone();
234234+ // drop(tokio::spawn(async move {
235235+ // if crate::types::Profile::remove_label(&spawned_pool, label.seq).await.is_ok()
236236+ // && profile.determine_stats_exist(&spawned_pool.clone()).await.expect(
237237+ // "Expected to be able to determine if stats exist, but failed.").is_none() {
238238+ // _ = profile.determine_label(&spawned_pool).await;
239239+ // tracing::info!("Label removed and profile revalidated for https://bsky.app/profile/{}", label.uri);
240240+ // } else {
241241+ // tracing::warn!("Failed to remove label {} for https://bsky.app/profile/{}", label.seq, label.uri);
242242+ // }
243243+ // }));
244244+ // }
245245+ // } else {
246246+ // tracing::warn!("Failed to get profile https://bsky.app/profile/{}", label.uri);
247247+ // unreachable += 1;
248248+ // let spawned_pool = pool.clone();
249249+ // drop(tokio::spawn(async move {
250250+ // if crate::types::Profile::remove_label(&spawned_pool, label.seq).await.is_err() {
251251+ // tracing::warn!("Failed to remove label {} for https://bsky.app/profile/{}", label.seq, label.uri);
252252+ // }
253253+ // }));
254254+ // }
255255+ // }
256256+ // }
257257+ tracing::info!("Valid labels: {}", valid);
258258+ tracing::info!("Invalid labels: {}", invalid);
259259+ tracing::info!("List of invalid labels: {:?}", invalid_list);
260260+ tracing::info!("Unreachable labels: {}", unreachable);
261261+ Ok(())
262262+}
+197
src/jetstream.rs
···11+//! Consume Jetstream events.
22+use std::{str::FromStr, sync::Arc};
33+44+use atrium_api::{record::KnownRecord::AppBskyFeedPost, types::string};
55+use jetstream_oxide::{
66+ DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector,
77+ events::{JetstreamEvent::Commit, commit::CommitEvent},
88+};
99+use sqlx::{sqlite::{SqliteConnectOptions, SqliteJournalMode}, SqlitePool};
1010+use tokio::runtime::Handle;
1111+1212+use crate::{database::negation, types::{Profile, ProfileStats}, webrequest::Agent};
1313+/// Consume Jetstream events.
1414+#[tracing::instrument]
1515+pub async fn main_jetstream() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1616+ let metrics = Handle::current().metrics();
1717+ let pool_opts = SqliteConnectOptions::from_str("sqlite://prod.db").expect("Expected to be able to configure the database, but failed.")
1818+ .journal_mode(SqliteJournalMode::Wal);
1919+ let pool = SqlitePool::connect_with(pool_opts).await.expect("Expected to be able to connect to the database at sqlite://prod.db but failed.");
2020+ let mut agent: Agent = Agent::default();
2121+ let app_bsky_feed_post: string::Nsid = "app.bsky.feed.post".parse().expect("Expected to be able to parse a string, but failed");
2222+ let config = JetstreamConfig {
2323+ endpoint: DefaultJetstreamEndpoints::USEastOne.into(),
2424+ wanted_collections: vec![app_bsky_feed_post],
2525+ wanted_dids: vec![],
2626+ compression: JetstreamCompression::Zstd,
2727+ cursor: None,
2828+ };
2929+ let jetstream = JetstreamConnector::new(config).expect("Failed to connect to Jetstream");
3030+ let receiver = jetstream
3131+ .connect()
3232+ .await
3333+ .expect("Failed to connect to Jetstream");
3434+ let language = string::Language::from_str("en").expect("Expected to be able to parse a string, but failed");
3535+3636+ drop(dotenvy::dotenv().expect("Failed to load .env file"));
3737+ let negation_post_uri = dotenvy::var("NEGATION_POST_URI").expect("Expected to be able to read a variable, but failed.");
3838+ automated_removal(&pool, &mut agent, negation_post_uri.as_str()).await?;
3939+ let addition_post_uri = dotenvy::var("ADDITION_POST_URI").expect("Expected to be able to read a variable, but failed.");
4040+ manual_addition(&pool, &mut agent, addition_post_uri.as_str()).await?;
4141+4242+ const CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(20);
4343+ let mut last_check = tokio::time::Instant::now();
4444+4545+ let mut vec_of_profiles_to_check: Vec<String> = vec![];
4646+4747+ while let Ok(event) = receiver.recv_async().await {
4848+ if let Commit(CommitEvent::Create { info, commit }) = event {
4949+ match commit.record {
5050+ AppBskyFeedPost(record) => {
5151+ if record.langs == Some(vec![(language).clone()]) {
5252+ let profile = Profile::new(info.did.clone().as_str()).insert_profile(&pool.clone()).await;
5353+ match profile
5454+ {
5555+ Ok(mut profile) => {
5656+ if profile.determine_stats_exist(&pool.clone()).await?.is_none() {
5757+ vec_of_profiles_to_check.push(info.did.clone().to_string());
5858+ } else {
5959+ tracing::debug!("Stats already exist: {:?}", info.did.as_str());
6060+ }
6161+ }
6262+ Err(_e) => {
6363+ // tracing::debug!("Duplicate profile: {:?}", info.did.as_str());
6464+ }
6565+ }
6666+ }
6767+ }
6868+ // atrium_api::record::KnownRecord::AppBskyFeedLike(like) => {
6969+ // match like.subject.uri.as_str() {
7070+ // NEGATION_POST_URI => {
7171+ // negation(info.did.as_str(), &mut agent, &pool).await?;
7272+ // },
7373+ // ADDITION_POST_URI => {
7474+ // let mut profile = Profile::new(info.did.as_str()).insert_profile(&pool).await?;
7575+ // let mut profile_with_stats = profile.determine_stats_exist(&pool).await?;
7676+ // if profile_with_stats.is_none() {
7777+ // profile_with_stats = Some(profile.determine_stats(&mut agent, &pool).await);
7878+ // }
7979+ // if let Some(profile_with_stats) = profile_with_stats {
8080+ // _ = profile_with_stats.determine_label_agnostic(&pool).await;
8181+ // }
8282+ // }
8383+ // _ => {}
8484+ // }
8585+ // }
8686+ _ => {}
8787+ }
8888+ }
8989+ if vec_of_profiles_to_check.len() >= 25 {
9090+ match_profiles(&agent, &pool, &metrics, &vec_of_profiles_to_check).await?;
9191+ vec_of_profiles_to_check.clear();
9292+ }
9393+ if last_check.elapsed() > CHECK_INTERVAL {
9494+ automated_removal(&pool, &mut agent, negation_post_uri.as_str()).await?;
9595+ manual_addition(&pool, &mut agent, addition_post_uri.as_str()).await?;
9696+ last_check = tokio::time::Instant::now();
9797+ }
9898+ }
9999+ tracing::info!("Jetstream event stream ended unexpectedly.");
100100+ Ok(())
101101+}
102102+103103+/// Every so often, we'll check the `app.bsky.feed.getLikes` endpoint for new likes on our provided `uri` for requested negation labels.
104104+async fn automated_removal(
105105+ pool: &SqlitePool,
106106+ agent: &mut Agent,
107107+ negation_post_uri: &str,
108108+) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
109109+ let likes = agent.get_likes(negation_post_uri).await?;
110110+ for like in likes {
111111+ let did = like["actor"]["did"].as_str().expect("Expected to be able to parse a string, but failed");
112112+ negation(did, agent, pool).await?;
113113+ }
114114+ Ok(())
115115+}
116116+async fn manual_addition(
117117+ pool: &SqlitePool,
118118+ agent: &mut Agent,
119119+ addition_post_uri: &str,
120120+) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
121121+ let likes = agent.get_likes(addition_post_uri).await?;
122122+ for like in likes {
123123+ let did = like["actor"]["did"].as_str().expect("Expected to be able to parse a string, but failed");
124124+ let mut profile = Profile::new(did).insert_profile(pool).await?;
125125+ let mut profile_with_stats = profile.determine_stats_exist(pool).await?;
126126+ if profile_with_stats.is_none() {
127127+ profile_with_stats = Some(profile.determine_stats(agent, pool).await);
128128+ }
129129+ if let Some(profile_with_stats) = profile_with_stats {
130130+ _ = profile_with_stats.determine_label_agnostic(pool).await;
131131+ }
132132+ }
133133+ Ok(())
134134+}
135135+// async fn match_profile(
136136+// agent: &Agent,
137137+// pool: &SqlitePool,
138138+// metrics: &tokio::runtime::RuntimeMetrics,
139139+// did_str: &str,
140140+// ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
141141+// let mut profile = Profile::new(did_str);
142142+// let num_alive_tasks = metrics.num_alive_tasks();
143143+// if num_alive_tasks < 50 {
144144+// let spawned_pool = pool.clone();
145145+// let mut spawned_agent = agent.clone();
146146+// drop(tokio::spawn(async move {
147147+// _ = profile.determine_stats(&mut spawned_agent, &spawned_pool).await.determine_label(&spawned_pool).await;
148148+// }));
149149+// } else {
150150+// tracing::warn!("Too many tasks alive: {:?}", num_alive_tasks);
151151+// }
152152+// Ok(())
153153+// }
154154+/// Match a vec of profiles
155155+async fn match_profiles(
156156+ agent: &Agent,
157157+ pool: &SqlitePool,
158158+ metrics: &tokio::runtime::RuntimeMetrics,
159159+ did_strs: &[String],
160160+) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
161161+ let num_alive_tasks = metrics.num_alive_tasks();
162162+ if num_alive_tasks < 14 {
163163+ let spawned_pool = pool.clone();
164164+ let mut spawned_agent = Agent {
165165+ access_jwt: Arc::clone(&agent.access_jwt),
166166+ refresh_jwt: Arc::clone(&agent.refresh_jwt),
167167+ client: agent.client.clone(),
168168+ self_did: agent.self_did.clone(),
169169+ };
170170+ let did_strs = did_strs.to_owned();
171171+ drop(tokio::spawn(async move {
172172+ tracing::info!("Checking profiles: {:?}", did_strs);
173173+ let profiles_vec = spawned_agent.get_profiles(&did_strs).await.expect("Expected to be able to get profiles, but failed.");
174174+ let profiles_array = profiles_vec["profiles"].as_array();
175175+ if profiles_array.is_none() {
176176+ tracing::warn!("No profiles json found for profiles: {:?}", profiles_vec);
177177+ return;
178178+ }
179179+ for profile_stats in profiles_array.unwrap_or_else(|| panic!("Expected to be able to read profiles as array, but failed. Profiles: {:?}", profiles_vec)) {
180180+ let did = profile_stats["did"].as_str().expect("Expected to be able to parse a string, but failed");
181181+ let mut profile = Profile::new(did).insert_profile(&spawned_pool).await.expect("Expected to be able to insert a profile, but failed.");
182182+ let checked_at = chrono::Utc::now();
183183+ profile.stats = Some(ProfileStats {
184184+ follower_count: profile_stats["followersCount"].as_i64().expect("Expected to be able to parse an integer, but failed") as i32,
185185+ post_count: profile_stats["postsCount"].as_i64().expect("Expected to be able to parse an integer, but failed") as i32,
186186+ created_at: chrono::DateTime::parse_from_rfc3339(profile_stats["createdAt"].as_str().expect("Expected to be able to parse a string, but failed")).expect("Expected to be able to parse a string, but failed").into(),
187187+ checked_at,
188188+ });
189189+ profile.insert_profile_stats(&spawned_pool).await.expect("Expected to be able to insert profile stats, but failed.");
190190+ _ = profile.determine_label(&spawned_pool).await;
191191+ }
192192+ }));
193193+ } else {
194194+ tracing::warn!("Too many tasks alive: {:?}", num_alive_tasks);
195195+ }
196196+ Ok(())
197197+}
+7
src/lib.rs
···11+//! Library reexports.
22+pub mod database;
33+pub mod jetstream;
44+pub mod types;
55+pub mod webrequest;
66+pub mod webserve;
77+pub mod crypto;
+806
src/types.rs
···11+//! Structs, enums, and impls.
22+use std::str::FromStr;
33+use base64::Engine;
44+use chrono::{DateTime as Datetime, Datelike};
55+use jetstream_oxide::exports::Did;
66+use serde::{ser::{Serialize, Serializer}, Deserialize};
77+88+// /// How should a client visually convey this label?
99+// enum LabelDefinitionSeverity {
1010+// /// 'inform' means neutral and informational
1111+// Inform,
1212+// /// 'alert' means negative and warning
1313+// Alert,
1414+// /// 'none' means show nothing.
1515+// None,
1616+// }
1717+// impl LabelDefinitionSeverity {
1818+// fn to_string(&self) -> String {
1919+// match self {
2020+// Self::Inform => "inform".to_owned(),
2121+// Self::Alert => "alert".to_owned(),
2222+// Self::None => "none".to_owned(),
2323+// }
2424+// }
2525+// }
2626+// /// What should this label hide in the UI, if applied?
2727+// enum LabelDefinitionBlurs {
2828+// /// 'content' hides all of the target
2929+// Content,
3030+// /// 'media' hides the images/video/audio
3131+// Media,
3232+// /// 'none' hides nothing.
3333+// None,
3434+// }
3535+// impl LabelDefinitionBlurs {
3636+// fn to_string(&self) -> String {
3737+// match self {
3838+// Self::Content => "content".to_owned(),
3939+// Self::Media => "media".to_owned(),
4040+// Self::None => "none".to_owned(),
4141+// }
4242+// }
4343+// }
4444+// /// The default setting for this label.
4545+// enum LabelDefinitionDefaultSetting {
4646+// Hide,
4747+// Warn,
4848+// Ignore,
4949+// }
5050+// impl LabelDefinitionDefaultSetting {
5151+// fn to_string(&self) -> String {
5252+// match self {
5353+// Self::Hide => "hide".to_owned(),
5454+// Self::Warn => "warn".to_owned(),
5555+// Self::Ignore => "ignore".to_owned(),
5656+// }
5757+// }
5858+// }
5959+// /// Strings which describe the label in the UI, localized into a specific language.
6060+// struct LabelValueDefinitionStrings {
6161+// /// The code of the language these strings are written in.
6262+// lang: String,
6363+// /// A short human-readable name for the label.
6464+// name: String,
6565+// /// A longer description of what the label means and why it might be applied.
6666+// description: String,
6767+// }
6868+// /// Labels.
6969+// struct LabelDefinition {
7070+// /// The value of the label being defined. Must only include lowercase ascii and the '-' character (a-z-+).
7171+// identifier: String,
7272+// /// How should a client visually convey this label? 'inform' means neutral and informational; 'alert' means negative and warning; 'none' means show nothing.
7373+// severity: LabelDefinitionSeverity,
7474+// /// What should this label hide in the UI, if applied? 'content' hides all of the target; 'media' hides the images/video/audio; 'none' hides nothing.
7575+// blurs: LabelDefinitionBlurs,
7676+// /// The default setting for this label.
7777+// default_setting: LabelDefinitionDefaultSetting,
7878+// /// Does the user need to have adult content enabled in order to configure this label?
7979+// adult_content: Option<bool>,
8080+// /// Strings which describe the label in the UI, localized into a specific language.
8181+// locales: Vec<LabelValueDefinitionStrings>,
8282+// }
8383+// impl LabelDefinition {
8484+// fn new(identifier: String) -> Self {
8585+// let locales = vec![LabelValueDefinitionStrings {
8686+// lang: "en".to_owned(),
8787+// name: identifier.replace("joined-", "Joined "),
8888+// description: format!("Profile created {}", identifier.replace("joined-", "").replace("-", " ")),
8989+// }];
9090+// Self {
9191+// identifier,
9292+// severity: LabelDefinitionSeverity::Inform,
9393+// blurs: LabelDefinitionBlurs::None,
9494+// default_setting: LabelDefinitionDefaultSetting::Warn,
9595+// adult_content: Some(false),
9696+// locales,
9797+// }
9898+// }
9999+// }
100100+101101+102102+#[derive(Debug)]
103103+/// Signature bytes.
104104+pub struct SignatureBytes([u8; 64]);
105105+impl FromStr for SignatureBytes {
106106+ type Err = std::io::Error;
107107+ fn from_str(s: &str) -> Result<Self, Self::Err> {
108108+ let bytes = base64::engine::GeneralPurpose::new(
109109+ &base64::alphabet::STANDARD,
110110+ base64::engine::general_purpose::NO_PAD).decode(s).expect("Expected to be able to decode the base64 string as bytes but failed.");
111111+ let mut array = [0; 64];
112112+ array.copy_from_slice(&bytes);
113113+ Ok(Self(array))
114114+ }
115115+}
116116+impl SignatureBytes {
117117+ /// Create a new signature from a vector of bytes.
118118+ pub fn from_vec(vec: Vec<u8>) -> Self {
119119+ let mut array = [0; 64];
120120+ array.copy_from_slice(&vec);
121121+ Self(array)
122122+ }
123123+ /// Create a new signature from a slice of bytes.
124124+ pub const fn from_bytes(bytes: [u8; 64]) -> Self {
125125+ Self(bytes)
126126+ }
127127+ /// Create a new signature from a JSON value in the format of a $bytes object.
128128+ pub fn from_json(json: serde_json::Value) -> Self {
129129+ let byte_string = json["$bytes"].as_str().expect("Expected to be able to get the $bytes field from the JSON object as a string but failed.");
130130+ let bytes = base64::engine::GeneralPurpose::new(
131131+ &base64::alphabet::STANDARD,
132132+ base64::engine::general_purpose::NO_PAD).decode(byte_string).expect("Expected to be able to decode the base64 string as bytes but failed.");
133133+ Self::from_vec(bytes)
134134+ }
135135+ /// Get the signature as a vector of bytes.
136136+ pub fn as_vec(&self) -> Vec<u8> {
137137+ self.0.to_vec()
138138+ }
139139+ /// Get the signature as a base64 string.
140140+ pub fn as_base64(&self) -> String {
141141+ base64::engine::GeneralPurpose::new(
142142+ &base64::alphabet::STANDARD,
143143+ base64::engine::general_purpose::NO_PAD).encode(self.0)
144144+ }
145145+ /// Get the signature as a JSON object in the format of a $bytes object.
146146+ pub fn as_json_object(&self) -> serde_json::Value {
147147+ serde_json::json!({
148148+ "$bytes": self.as_base64()
149149+ })
150150+ }
151151+}
152152+impl Serialize for SignatureBytes {
153153+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
154154+ where
155155+ S: Serializer,
156156+ {
157157+ serializer.serialize_bytes(&self.0)
158158+ }
159159+}
160160+#[derive(Debug)]
161161+/// Signature bytes or JSON value.
162162+pub enum SignatureEnum {
163163+ /// Signature bytes.
164164+ Bytes(SignatureBytes),
165165+ /// Signature JSON value.
166166+ Json(serde_json::Value),
167167+}
168168+impl Serialize for SignatureEnum {
169169+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
170170+ where
171171+ S: Serializer,
172172+ {
173173+ match self {
174174+ Self::Bytes(bytes) => bytes.serialize(serializer),
175175+ Self::Json(json) => json.serialize(serializer),
176176+ }
177177+ }
178178+}
179179+180180+#[derive(serde::Serialize, Debug)]
181181+/// Label response content.
182182+pub struct AssignedLabelResponse {
183183+ // /// Timestamp at which this label expires (no longer applies, is no longer valid).
184184+ // exp: Option<DateTime<FixedOffset>>,
185185+ // /// Optionally, CID specifying the specific version of 'uri' resource this label applies to. \
186186+ // /// If provided, the label applies to a specific version of the subject uri
187187+ // cid: Option<String>,
188188+ /// Timestamp when this label was created.
189189+ /// Note that timestamps in a distributed system are not trustworthy or verified by default.
190190+ pub cts: String, // DateTime<Utc>,
191191+ /// If true, this is a negation label, indicates that this label "negates" an earlier label with the same src, uri, and val.
192192+ /// If the neg field is false, best practice is to simply not include the field at all.
193193+ #[serde(skip_serializing_if = "bool_is_false")]
194194+ pub neg: bool,
195195+ /// Signature of dag-cbor encoded label. \
196196+ /// cryptographic signature bytes. \
197197+ /// Uses the bytes type from the [Data Model](https://atproto.com/specs/data-model), which encodes in JSON as a $bytes object with base64 encoding
198198+ /// When labels are being transferred as full objects between services, the ver and sig fields are required.
199199+ pub sig: Option<SignatureEnum>,
200200+ /// DID of the actor authority (account) which generated this label. \
201201+ pub src: Did,
202202+ /// AT URI of the record, repository (account), or other resource that this label applies to. \
203203+ /// For a specific record, an `at://` URI. For an account, the `did:`.
204204+ pub uri: String,
205205+ /// The short (<=128 character) string name of the value or type of this label.
206206+ pub val: String,
207207+ /// The AT Protocol version of the label object schema version. \
208208+ /// Current version is always 1.
209209+ /// When labels are being transferred as full objects between services, the ver and sig fields are required.
210210+ pub ver: u64,
211211+}
212212+impl AssignedLabelResponse {
213213+ /// Create a new label.
214214+ pub fn generate(
215215+ src: Did,
216216+ uri: String,
217217+ val: String,
218218+ ) -> Self {
219219+ let sig = SignatureEnum::Bytes(SignatureBytes([0; 64]));
220220+ Self::reconstruct(src, uri, val, false, chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true), sig)
221221+ }
222222+ /// Reconstruct a label from parts.
223223+ pub const fn reconstruct(
224224+ src: Did,
225225+ uri: String,
226226+ val: String,
227227+ neg: bool,
228228+ cts: String,
229229+ sig: SignatureEnum,
230230+ ) -> Self {
231231+ Self {
232232+ ver: 1,
233233+ src,
234234+ uri,
235235+ // cid: None,
236236+ val,
237237+ neg,
238238+ sig: Some(sig),
239239+ cts,
240240+ // exp: None,
241241+ }
242242+ }
243243+ // The process to sign or verify a signature is to construct a complete version of the label, using only the specified schema fields, and not including the sig field.
244244+ // This means including the ver field, but not any $type field or other un-specified fields which may have been included in a Lexicon representation of the label. This data object is then encoded in CBOR, following the deterministic IPLD/DAG-CBOR normalization rules.
245245+ // The CBOR bytes are hashed with SHA-256, and then the direct hash bytes (not a hex-encoded string) are signed (or verified) using the appropriate cryptographic key. The signature bytes are stored in the sig field as bytes (see Data Model for details representing bytes).
246246+ /// Generate signature.
247247+ pub fn sign(mut self) -> Self {
248248+ crate::crypto::Crypto::new().sign(&mut self);
249249+ self
250250+ }
251251+252252+}
253253+#[derive(serde::Serialize)]
254254+/// A label response wrapper.
255255+pub struct AssignedLabelResponseWrapper {
256256+ /// The cursor to find the sequence number of this label. \
257257+ /// Returned as a string.
258258+ pub cursor: String,
259259+ /// Vector of labels.
260260+ pub labels: Vec<AssignedLabelResponse>,
261261+}
262262+#[derive(serde::Serialize, Debug)]
263263+/// A label response wrapper.
264264+pub struct SubscribeLabelsLabels {
265265+ /// The sequence number of this label. \
266266+ /// The seq field is a monotonically increasing integer, starting at 1 for the first label.
267267+ /// Returned as a long.
268268+ pub seq: i64,
269269+ /// Vector of labels.
270270+ pub labels: Vec<AssignedLabelResponse>,
271271+}
272272+#[derive(Deserialize)]
273273+#[expect(non_snake_case, reason = "Name matches URI parameter literally.")]
274274+/// URI parameters.
275275+pub struct UriParams {
276276+ /// URI patterns.
277277+ pub uriPatterns: Option<String>,
278278+ /// The DID of sources.
279279+ pub sources: Option<String>,
280280+ /// The limit of labels to fetch. Default is (50?).
281281+ pub limit: Option<i64>,
282282+ /// The cursor to use for seq.
283283+ pub cursor: Option<String>,
284284+ /// The actor to lookup.
285285+ pub actor: Option<String>,
286286+}
287287+const fn neg_default() -> bool {
288288+ false
289289+}
290290+291291+#[derive(serde::Serialize, serde::Deserialize, Debug)]
292292+/// A label retrieved from the atproto API.
293293+pub struct RetrievedLabelResponse {
294294+ /// The creation timestamp.
295295+ pub cts: String,
296296+ /// Whether the label is negative.
297297+ #[serde(skip_serializing_if = "bool_is_false", default = "neg_default")]
298298+ pub neg: bool,
299299+ /// The source DID.
300300+ pub src: Did,
301301+ /// The URI.
302302+ pub uri: String,
303303+ /// The value.
304304+ pub val: String,
305305+ /// The version.
306306+ pub ver: u64,
307307+}
308308+#[derive(serde::Serialize, serde::Deserialize, Debug)]
309309+/// A label retrieved from the atproto API.
310310+pub struct SignedRetrievedLabelResponse {
311311+ /// The creation timestamp.
312312+ pub cts: String,
313313+ /// Whether the label is negative.
314314+ #[serde(skip_serializing_if = "bool_is_false")]
315315+ pub neg: bool,
316316+ /// The source DID.
317317+ pub sig: serde_json::Value,
318318+ /// The source DID.
319319+ pub src: Did,
320320+ /// The URI.
321321+ pub uri: String,
322322+ /// The value.
323323+ pub val: String,
324324+ /// The version.
325325+ pub ver: u64,
326326+}
327327+fn bool_is_false(b: &bool) -> bool {
328328+ !b
329329+}
330330+#[derive(serde::Serialize, serde::Deserialize, Debug)]
331331+/// A label retrieved from the atproto API.
332332+pub struct SignedRetrievedLabelResponseWs {
333333+ /// The creation timestamp.
334334+ pub cts: String,
335335+ /// Whether the label is negative.
336336+ #[serde(skip_serializing_if = "bool_is_false")]
337337+ pub neg: bool,
338338+ /// The signature.
339339+ #[serde(with = "serde_bytes")]
340340+ pub sig: [u8; 64],
341341+ /// The source DID.
342342+ pub src: Did,
343343+ /// The URI.
344344+ pub uri: String,
345345+ /// The value.
346346+ pub val: String,
347347+ /// The version.
348348+ pub ver: u64,
349349+}
350350+#[derive(serde::Serialize, serde::Deserialize, Debug)]
351351+/// Labels with a sequence number.
352352+pub struct LabelsVecWithSeq {
353353+ /// The sequence number.
354354+ pub seq: u64,
355355+ /// The labels.
356356+ pub labels: Vec<SignedRetrievedLabelResponseWs>,
357357+}
358358+#[derive(Debug)]
359359+/// Profile stats.
360360+pub struct ProfileStats {
361361+ /// The number of followers.
362362+ pub follower_count: i32,
363363+ /// The number of posts.
364364+ pub post_count: i32,
365365+ /// The creation timestamp, as reported by actor.
366366+ pub created_at: Datetime<chrono::Utc>,
367367+ /// The timestamp at which the stats were checked.
368368+ pub checked_at: Datetime<chrono::Utc>,
369369+}
370370+impl ProfileStats {
371371+ fn new(
372372+ follower_count: i32,
373373+ post_count: i32,
374374+ created_at: Datetime<chrono::Utc>,
375375+ ) -> Self {
376376+ Self {
377377+ follower_count,
378378+ post_count,
379379+ created_at,
380380+ checked_at: chrono::Utc::now(),
381381+ }
382382+ }
383383+ /// Given a AT uri, lookup the profile and return the stats.
384384+ pub async fn from_at_url(
385385+ uri: String,
386386+ agent: &mut crate::webrequest::Agent,
387387+ ) -> Result<Self, Box<dyn std::error::Error>> {
388388+ let uri = uri.replace("at://","").replace("/app.bsky.actor.profile/self", "");
389389+ if let Ok(profile) = agent.get_profile(uri.as_str()).await {
390390+ tracing::debug!("{:?}", profile);
391391+392392+ // Begin enforce reasonable limits on the number of follows.
393393+ // https://jazco.dev/2025/02/19/imperfection/
394394+ let follows_count = profile["followsCount"].as_i64().expect("Expected to be able to parse an integer, but failed") as i32;
395395+ const MAX_FOLLOWS: i32 = 4_000;
396396+ if follows_count > MAX_FOLLOWS {
397397+ tracing::warn!("Profile {:?} has a suspicious number of follows: {:?}", uri, follows_count);
398398+ return Err(Box::new(std::io::Error::new(
399399+ std::io::ErrorKind::Other,
400400+ "Profile has a suspicious number of follows",
401401+ )));
402402+ }
403403+ // End
404404+405405+ let followers_count = profile["followersCount"].as_i64().expect("Expected to be able to parse an integer, but failed") as i32;
406406+ let posts_count = profile["postsCount"].as_i64().expect("Expected to be able to parse an integer, but failed") as i32;
407407+ let created_at = Datetime::parse_from_rfc3339(profile["createdAt"].as_str().expect("Expected to be able to parse a string, but failed"))?;
408408+ Ok(Self::new(followers_count, posts_count, created_at.into()))
409409+ } else {
410410+ Err(Box::new(std::io::Error::new(
411411+ std::io::ErrorKind::Other,
412412+ "Failed to get profile",
413413+ )))
414414+ }
415415+ }
416416+}
417417+#[derive(Debug, Clone, Copy, serde::Deserialize)]
418418+enum Year {
419419+ _2022,
420420+ _2023,
421421+ _2024,
422422+ _2025,
423423+}
424424+impl std::fmt::Display for Year {
425425+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
426426+ write!(
427427+ f,
428428+ "{}",
429429+ match self {
430430+ Self::_2022 => "a",
431431+ Self::_2023 => "b",
432432+ Self::_2024 => "c",
433433+ Self::_2025 => "d",
434434+ }
435435+ )
436436+ }
437437+}
438438+impl Serialize for Year {
439439+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
440440+ where
441441+ S: Serializer,
442442+ {
443443+ let val: char = self.to_string().chars().next().expect("Expected to be able to get the first character, but failed");
444444+ serializer.serialize_char(val)
445445+ }
446446+}
447447+#[derive(Debug, Clone, Copy, serde::Deserialize)]
448448+enum Month {
449449+ January,
450450+ February,
451451+ March,
452452+ April,
453453+ May,
454454+ June,
455455+ July,
456456+ August,
457457+ September,
458458+ October,
459459+ November,
460460+ December,
461461+}
462462+impl Serialize for Month {
463463+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
464464+ where
465465+ S: Serializer,
466466+ {
467467+ let val = self.to_string().to_lowercase();
468468+ serializer.serialize_str(&val)
469469+ }
470470+}
471471+impl std::fmt::Display for Month {
472472+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
473473+ write!(
474474+ f,
475475+ "{}",
476476+ match self {
477477+ Self::January => "jan",
478478+ Self::February => "feb",
479479+ Self::March => "mar",
480480+ Self::April => "apr",
481481+ Self::May => "may",
482482+ Self::June => "jun",
483483+ Self::July => "jul",
484484+ Self::August => "aug",
485485+ Self::September => "sep",
486486+ Self::October => "oct",
487487+ Self::November => "nov",
488488+ Self::December => "dec",
489489+ }
490490+ )
491491+ }
492492+}
493493+/// Profile labels for month+year.
494494+#[derive(Debug, Clone, Copy, serde::Deserialize, serde::Serialize)]
495495+pub struct ProfileLabel {
496496+ year: Year,
497497+ month: Month,
498498+}
499499+impl ProfileLabel {
500500+ /// Create a new profile label from a datetime.
501501+ #[allow(clippy::cognitive_complexity)]
502502+ pub fn from_datetime(datetime: Datetime<chrono::Utc>) -> Option<Self> {
503503+ Some(Self {
504504+ year: match datetime.year() {
505505+ 2023 => Year::_2023,
506506+ 2024 => Year::_2024,
507507+ 2025 => Year::_2025,
508508+ _ => {
509509+ tracing::debug!("Invalid year");
510510+ return None;
511511+ }
512512+ },
513513+ month: match datetime.month() {
514514+ 1 => Month::January,
515515+ 2 => Month::February,
516516+ 3 => Month::March,
517517+ 4 => Month::April,
518518+ 5 => Month::May,
519519+ 6 => Month::June,
520520+ 7 => Month::July,
521521+ 8 => Month::August,
522522+ 9 => Month::September,
523523+ 10 => Month::October,
524524+ 11 => Month::November,
525525+ 12 => Month::December,
526526+ _ => {
527527+ tracing::debug!("Invalid month");
528528+ return None;
529529+ }
530530+ },
531531+ })
532532+ }
533533+ /// Convert a profile label to a string.
534534+ pub fn to_label_val(self) -> String {
535535+ format!("joined-{}-{}", self.month, self.year)
536536+ .to_lowercase()
537537+ }
538538+}
539539+/// Profile with optional stats and label.
540540+#[derive(Debug)]
541541+pub struct Profile {
542542+ did: String,
543543+ /// Stats for a profile.
544544+ pub stats: Option<ProfileStats>,
545545+ label: Option<String>,
546546+}
547547+impl Profile {
548548+ /// Create a new profile, given a DID.
549549+ pub fn new(did: &str) -> Self {
550550+ Self {
551551+ did: {
552552+ // if did.starts_with("did:") {
553553+ // format!("at://{}{}", did, "/app.bsky.actor.profile/self")
554554+ // } else {
555555+ did.to_owned()
556556+ // }
557557+ },
558558+ stats: None,
559559+ label: None,
560560+ }
561561+ }
562562+ /// Fetch stats for a profile.
563563+ pub async fn determine_stats(&mut self, agent: &mut crate::webrequest::Agent, pool: &sqlx::sqlite::SqlitePool) -> &mut Self {
564564+ if let Ok(stats) = ProfileStats::from_at_url(self.did.clone(), agent).await {
565565+ self.stats = Some(stats);
566566+ } else {
567567+ tracing::warn!("Failed to get stats for profile {}", self.did);
568568+ }
569569+ self.insert_profile_stats(pool).await.expect("Expected to be able to insert profile stats, but failed");
570570+ self
571571+ }
572572+ /// Determine if stats exist for a profile.
573573+ pub async fn determine_stats_exist(&mut self, pool: &sqlx::Pool<sqlx::Sqlite>) -> Result<Option<&mut Self>, Box<dyn std::error::Error + Sync + Send>> {
574574+ if self.stats.is_some() {
575575+ return Ok(Some(self));
576576+ }
577577+ let profile_stats = sqlx::query!(
578578+ r#"
579579+ SELECT created_at "created_at: String", follower_count, post_count, checked_at "checked_at: String" FROM profile_stats WHERE did = ?
580580+ "#,
581581+ self.did
582582+ )
583583+ .fetch_one(pool)
584584+ .await;
585585+ if profile_stats.is_ok() {
586586+ let profile_stats = profile_stats.expect("Expected to be able to unwrap a profile_checked_at, but failed");
587587+ let created_at = Datetime::parse_from_rfc3339(profile_stats.created_at.as_str()).expect("Expected to be able to parse a string as a datetime, but failed").to_utc();
588588+ let follower_count = profile_stats.follower_count as i32;
589589+ let post_count = profile_stats.post_count as i32;
590590+ let checked_at = Datetime::parse_from_rfc3339(profile_stats.checked_at.as_str()).expect("Expected to be able to parse a string as a datetime, but failed").to_utc();
591591+ const TIMEPERIOD: i64 = 60 * 60 * 24 * 7 * 1000 * 4; // 4 weeks in milliseconds
592592+ if chrono::Utc::now().timestamp_millis() - checked_at.timestamp_millis() < TIMEPERIOD {
593593+ tracing::debug!("Stats exist for: {:?}", self.did);
594594+ self.stats = Some(ProfileStats {
595595+ follower_count,
596596+ post_count,
597597+ created_at,
598598+ checked_at,
599599+ });
600600+ return Ok(Some(self));
601601+ }
602602+ tracing::info!("Refetching stats for: {:?}", self.did);
603603+ return Ok(None);
604604+ }
605605+ tracing::info!("Stats do not exist for: {:?}", self.did);
606606+ Ok(None)
607607+ }
608608+ /// Determine the label of a profile.
609609+ pub async fn determine_label(&mut self, pool: &sqlx::sqlite::SqlitePool) -> &mut Self {
610610+ if self.stats.is_none() {
611611+ return self;
612612+ }
613613+ const MIN_POSTS: i32 = 30;
614614+ const SOME_POSTS: i32 = 200;
615615+ const MIN_FOLLOWERS: i32 = 400;
616616+ const SOME_FOLLOWERS: i32 = 2_500;
617617+ let post_count = self.stats.as_ref().expect("Expected stats to exist, but failed").post_count;
618618+ let follower_count = self.stats.as_ref().expect("Expected stats to exist, but failed").follower_count;
619619+ if (post_count >= MIN_POSTS && follower_count >= MIN_FOLLOWERS) && (post_count >= SOME_POSTS || follower_count >= SOME_FOLLOWERS)
620620+ {
621621+ match ProfileLabel::from_datetime(self.stats.as_ref().expect("Expected stats to exist, but failed").created_at) {
622622+ Some(label) => self.label = Some(label.to_label_val()),
623623+ None => {
624624+ tracing::debug!("Invalid datetime");
625625+ }
626626+ }
627627+ }
628628+ self.insert_profile_labels(pool).await.expect("Expected to be able to insert profile labels, but failed");
629629+ self
630630+ }
631631+ /// Determine the label of a profile, and insert it without checking stats reqs.
632632+ pub async fn determine_label_agnostic(&mut self, pool: &sqlx::sqlite::SqlitePool) -> &mut Self {
633633+ if self.stats.is_none() {
634634+ return self;
635635+ }
636636+ match ProfileLabel::from_datetime(self.stats.as_ref().expect("Expected stats to exist, but failed").created_at) {
637637+ Some(label) => self.label = Some(label.to_label_val()),
638638+ None => {
639639+ tracing::debug!("Invalid datetime");
640640+ }
641641+ }
642642+ self.insert_profile_labels(pool).await.expect("Expected to be able to insert profile labels, but failed");
643643+ self
644644+ }
645645+ /// Insert a profile into the database.
646646+ pub async fn insert_profile(self, pool: &sqlx::sqlite::SqlitePool) -> Result<Self, sqlx::Error> {
647647+ if (sqlx::query(&format!(
648648+ "INSERT INTO profile (did) VALUES ('{}')",
649649+ self.did
650650+ ))
651651+ .execute(pool)
652652+ .await).is_ok() {
653653+ tracing::debug!("Inserted profile {:?}", self.did);
654654+ } else {
655655+ tracing::debug!("Duplicate profile: {:?}", self.did);
656656+ }
657657+ Ok(self)
658658+ }
659659+ /// Insert profile stats into the database.
660660+ pub async fn insert_profile_stats(
661661+ &self,
662662+ pool: &sqlx::sqlite::SqlitePool,
663663+ ) -> Result<(), sqlx::Error> {
664664+ if self.stats.is_none() {
665665+ return Ok(());
666666+ }
667667+ // if sqlx::query!(
668668+ // r#"SELECT did "did: String" FROM profile_stats WHERE did = ? LIMIT 1"#,
669669+ // self.did
670670+ // )
671671+ // .fetch_one(pool)
672672+ // .await.is_ok() {
673673+ // tracing::debug!("Stats already exist for {:?}", self.did);
674674+ // return Ok(());
675675+ // }
676676+ let created_at = self.stats.as_ref().expect("Expected stats to exist, but failed").created_at.to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
677677+ let follower_count = self.stats.as_ref().expect("Expected stats to exist, but failed").follower_count;
678678+ let post_count = self.stats.as_ref().expect("Expected stats to exist, but failed").post_count;
679679+ let checked_at = self.stats.as_ref().expect("Expected stats to exist, but failed").checked_at.to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
680680+ _ = sqlx::query!(r#"
681681+ INSERT INTO profile_stats (did, created_at, follower_count, post_count, checked_at)
682682+ VALUES (?, ?, ?, ?, ?)
683683+ ON CONFLICT(did) DO UPDATE SET
684684+ created_at = ?,
685685+ follower_count = ?,
686686+ post_count = ?,
687687+ checked_at = ?
688688+ "#,
689689+ self.did,
690690+ created_at,
691691+ follower_count,
692692+ post_count,
693693+ checked_at,
694694+ created_at,
695695+ follower_count,
696696+ post_count,
697697+ checked_at,
698698+ )
699699+ .execute(pool).await.expect("Expected to be able to insert profile stats, but failed");
700700+ tracing::info!("Inserted profile stats for {:?} with {:?} followers", self.did, self.stats.as_ref().expect("Expected stats to exist, but failed").follower_count);
701701+ Ok(())
702702+ }
703703+ /// Negate a profile label.
704704+ pub async fn negate_label(
705705+ &mut self,
706706+ pool: &sqlx::sqlite::SqlitePool,
707707+ ) -> Result<(), sqlx::Error> {
708708+ if self.stats.is_none() {
709709+ tracing::warn!("No stats for {:?}", self.did);
710710+ return Ok(());
711711+ }
712712+ match ProfileLabel::from_datetime(self.stats.as_ref().expect("Expected stats to exist, but failed").created_at) {
713713+ Some(label) => self.label = Some(label.to_label_val()),
714714+ None => {
715715+ tracing::debug!("Invalid datetime");
716716+ }
717717+ }
718718+ let label = self.label.as_ref().expect("Expected label to exist, but failed");
719719+ let uri = self.did.as_str();
720720+ let val: &str = label.as_str();
721721+ drop(dotenvy::dotenv().expect("Failed to load .env file"));
722722+ let self_did = dotenvy::var("SELF_DID").expect("Expected to be able to get the SELF_DID from the environment, but failed");
723723+ let src = Did::new(self_did).expect("Expected to be able to create a valid DID but failed");
724724+ let mut label_response: AssignedLabelResponse = AssignedLabelResponse::generate(src, self.did.clone(), val.to_owned());
725725+ label_response.neg = true;
726726+ label_response = label_response.sign();
727727+ let sig_enum = label_response.sig.expect("Expected a signature, but failed");
728728+ if let SignatureEnum::Bytes(sig) = sig_enum {
729729+ let sig = sig.as_vec();
730730+ _ = sqlx::query!(
731731+ r#"INSERT INTO profile_labels (uri, val, neg, cts, sig) VALUES (?, ?, ?, ?, ?)"#,
732732+ uri,
733733+ val,
734734+ label_response.neg,
735735+ label_response.cts,
736736+ sig,
737737+ )
738738+ .execute(pool)
739739+ .await?;
740740+ }
741741+ tracing::info!("Negated profile label for {:?} with {:?}", self.did, self.label.as_ref().expect("Expected label to exist, but failed"));
742742+ Ok(())
743743+ }
744744+ /// Insert profile labels into the database.
745745+ async fn insert_profile_labels(
746746+ &self,
747747+ pool: &sqlx::sqlite::SqlitePool,
748748+ ) -> Result<(), sqlx::Error> {
749749+ if self.label.is_none() {
750750+ return Ok(());
751751+ }
752752+ if sqlx::query!(
753753+ r#"SELECT seq FROM profile_labels WHERE uri = ? LIMIT 1"#,
754754+ self.did
755755+ )
756756+ .fetch_one(pool)
757757+ .await.is_ok() {
758758+ tracing::debug!("Label already exists for {:?}", self.did);
759759+ return Ok(());
760760+ }
761761+ let label = self.label.as_ref().expect("Expected label to exist, but failed");
762762+ let uri = self.did.as_str();
763763+ let val: &str = label.as_str();
764764+ drop(dotenvy::dotenv().expect("Failed to load .env file"));
765765+ let self_did = dotenvy::var("SELF_DID").expect("Expected to be able to get the SELF_DID from the environment, but failed");
766766+ let src = Did::new(self_did).expect("Expected to be able to create a valid DID but failed");
767767+ let mut label_response: AssignedLabelResponse = AssignedLabelResponse::generate(src, self.did.clone(), val.to_owned());
768768+ label_response = label_response.sign();
769769+ let sig_enum = label_response.sig.expect("Expected a signature, but failed");
770770+ if let SignatureEnum::Bytes(sig) = sig_enum {
771771+ let sig = sig.as_vec();
772772+ let result = sqlx::query!(
773773+ r#"INSERT INTO profile_labels (uri, val, cts, sig) VALUES (?, ?, ?, ?)"#,
774774+ uri,
775775+ val,
776776+ label_response.cts,
777777+ sig,
778778+ )
779779+ .execute(pool)
780780+ .await;
781781+ if result.is_ok() {
782782+ tracing::info!("Inserted profile label for {:?} with {:?}", self.did, self.label.as_ref().expect("Expected label to exist, but failed"));
783783+ } else {
784784+ tracing::debug!("Duplicate profile label for {:?}", self.did);
785785+ }
786786+ return Ok(());
787787+ }
788788+ tracing::warn!("Failed to insert profile label for {:?}", self.did);
789789+ Ok(())
790790+ }
791791+ /// Remove label from profile_labels.
792792+ /// Used when a label needs to be regenerated.
793793+ pub async fn remove_label(
794794+ pool: &sqlx::sqlite::SqlitePool,
795795+ seq: i64,
796796+ ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
797797+ tracing::debug!("Removing label with seq: {:?}", seq);
798798+ _ = sqlx::query!(
799799+ r#"DELETE FROM profile_labels WHERE seq = ?"#,
800800+ seq,
801801+ )
802802+ .execute(pool)
803803+ .await.expect("Expected to be able to delete a label, but failed.");
804804+ Ok(())
805805+ }
806806+}
+340
src/webrequest.rs
···11+//! Requests to the atproto API.
22+use std::env;
33+use std::fs;
44+use std::sync::Arc;
55+66+use base64::Engine;
77+use reqwest::Client;
88+use tokio::sync::Mutex;
99+1010+use crate::types::{LabelsVecWithSeq, RetrievedLabelResponse, SignatureBytes};
1111+enum ApiEndpoint {
1212+ Authorized,
1313+ Public,
1414+}
1515+/// Agent for interactions with atproto.
1616+#[derive(Clone)]
1717+pub struct Agent {
1818+ /// The access JWT.
1919+ pub access_jwt: Arc<Mutex<String>>,
2020+ /// The refresh JWT.
2121+ pub refresh_jwt: Arc<Mutex<String>>,
2222+ /// The reqwest client.
2323+ pub client: Client,
2424+ /// The DID of the labeler.
2525+ pub self_did: Arc<String>,
2626+}
2727+impl Default for Agent {
2828+ fn default() -> Self {
2929+ drop(dotenvy::dotenv().expect("Failed to load .env file"));
3030+ Self {
3131+ access_jwt: Arc::new(Mutex::new(env::var("ACCESS_JWT").expect("ACCESS_JWT must be set"))),
3232+ refresh_jwt: Arc::new(Mutex::new(env::var("REFRESH_JWT").expect("REFRESH_JWT must be set"))),
3333+ client: Client::new(),
3434+ self_did: Arc::new(env::var("SELF_DID").expect("SELF_DID must be set")),
3535+ }
3636+ }
3737+}
3838+impl Agent {
3939+ /// The base URL of the atproto API's XRPC endpoint.
4040+ /// Rate limit: 3_000 per 5 minutes
4141+ const AUTH_URL: &'static str = "https://bsky.social/xrpc/";
4242+ const PUBLIC_URL: &'static str = "https://public.api.bsky.app/xrpc/";
4343+ async fn client_get(
4444+ &self,
4545+ path: &str,
4646+ parameters: &[(&str, &str)],
4747+ api_endpoint: &ApiEndpoint,
4848+ ) -> reqwest::Response {
4949+ self.client
5050+ .get(format!("{}{}", match api_endpoint {
5151+ ApiEndpoint::Authorized => Self::AUTH_URL,
5252+ ApiEndpoint::Public => Self::PUBLIC_URL,
5353+ }, &path))
5454+ .header("Content-Type", "application/json")
5555+ .header("Authorization", format!("Bearer {}", self.access_jwt.lock().await))
5656+ .header("atproto-accept-labelers", self.self_did.as_str())
5757+ .query(parameters)
5858+ .send()
5959+ .await.expect("Expected to be able to send request, but failed.")
6060+ }
6161+ async fn client_refresh(&self) {
6262+ tracing::warn!("Token expired, refreshing");
6363+ let response = self.client
6464+ .post(format!(
6565+ "{}{}",
6666+ Self::AUTH_URL,
6767+ "com.atproto.server.refreshSession"
6868+ ))
6969+ .header("Content-Type", "application/json")
7070+ .header("Authorization", format!("Bearer {}", self.refresh_jwt.lock().await))
7171+ .header("atproto-accept-labelers", self.self_did.as_str())
7272+ .send()
7373+ .await.expect("Expected to be able to send request, but failed.");
7474+ let json = response.json::<serde_json::Value>().await.expect("Expected to be able to read response as JSON, but failed.");
7575+ if let Some(error) = json["error"].as_str() {
7676+ match error {
7777+ "InvalidRequest" => {
7878+ tracing::warn!("Invalid request");
7979+ return;
8080+ },
8181+ "ExpiredToken" => {
8282+ tracing::warn!("Token expired");
8383+ return;
8484+ },
8585+ "AccountDeactivated" => {
8686+ tracing::warn!("Account deactivated");
8787+ return;
8888+ },
8989+ "AccountTakedown" => {
9090+ tracing::warn!("Account has been suspended (Takedown)");
9191+ return;
9292+ },
9393+ _ => {
9494+ tracing::warn!("Unknown error from HTTP response: {:?}", json);
9595+ return;
9696+ }
9797+ }
9898+ }
9999+ *self.refresh_jwt.lock().await = json["refreshJwt"].as_str().expect("Expected to be able to read refreshJwt as str, but failed.").to_owned();
100100+ *self.access_jwt.lock().await = json["accessJwt"].as_str().expect("Expected to be able to read accessJwt as str, but failed.").to_owned();
101101+ let new_env = format!(
102102+ "ACCESS_JWT={}\nREFRESH_JWT={}\n",
103103+ self.access_jwt.lock().await, self.refresh_jwt.lock().await
104104+ );
105105+ fs::write(".env", new_env).expect("Failed to write to .env");
106106+ tracing::info!("Token refreshed");
107107+ }
108108+ /// Get a JSON response from the atproto API. Used internal to this struct.
109109+ async fn get(
110110+ &self,
111111+ path: &str,
112112+ parameters: &[(&str, &str)],
113113+ api_endpoint: ApiEndpoint,
114114+ ) -> Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>> {
115115+ let response = self.client_get(path, parameters, &api_endpoint).await;
116116+ if response.status() == reqwest::StatusCode::TOO_MANY_REQUESTS {
117117+ tracing::warn!("Rate limited, sleeping for 5 minutes");
118118+ tracing::warn!("We were working on {} with parameters {:?}", path, parameters);
119119+ tokio::time::sleep(std::time::Duration::from_secs(305)).await; // 5 minutes and 5 seconds
120120+ let response = self.client_get(path, parameters, &api_endpoint).await;
121121+ return Ok(response.json::<serde_json::Value>().await.expect("Expected to be able to read response as JSON, but failed."));
122122+ }
123123+ if response.status() == reqwest::StatusCode::BAD_REQUEST {
124124+ let json = &response.json::<serde_json::Value>().await.expect("Expected to be able to read response as JSON, but failed.");
125125+ match json["error"].as_str().expect("Expected to be able to read error as str, but failed.") {
126126+ "ExpiredToken" => {
127127+ self.client_refresh().await;
128128+ let response = self.client_get(path, parameters, &api_endpoint).await;
129129+ return Ok(response.json::<serde_json::Value>().await.expect("Expected to be able to read response as JSON, but failed."));
130130+ },
131131+ "AccountDeactivated" => {
132132+ tracing::warn!("Account deactivated");
133133+ return Err(Box::new(std::io::Error::new(
134134+ std::io::ErrorKind::Other,
135135+ "Account deactivated",
136136+ )));
137137+ },
138138+ "AccountTakedown" => {
139139+ tracing::warn!("Account has been suspended (Takedown)");
140140+ return Err(Box::new(std::io::Error::new(
141141+ std::io::ErrorKind::Other,
142142+ "Account deactivated",
143143+ )));
144144+ },
145145+ "InvalidRequest" => {
146146+ // Check if the message is "Profile not found"
147147+ if json["message"].as_str().expect("Expected to be able to read message as str, but failed.") == "Profile not found" {
148148+ tracing::warn!("Profile not found");
149149+ return Err(Box::new(std::io::Error::new(
150150+ std::io::ErrorKind::NotFound,
151151+ "Profile not found",
152152+ )));
153153+ }
154154+ tracing::warn!("Unknown invalid request: {:?}", json);
155155+ return Err(Box::new(std::io::Error::new(
156156+ std::io::ErrorKind::Other,
157157+ "Unknown invalid request",
158158+ )));
159159+ },
160160+ _ => {
161161+ tracing::warn!("Unknown error from HTTP response: {:?}", json);
162162+ return Err(Box::new(std::io::Error::new(
163163+ std::io::ErrorKind::Other,
164164+ "Unknown bad request",
165165+ )));
166166+ }
167167+ };
168168+ }
169169+ if response.status() != reqwest::StatusCode::OK {
170170+ return Err(Box::new(std::io::Error::new(
171171+ std::io::ErrorKind::Other,
172172+ "Unknown HTTP error",
173173+ )));
174174+ }
175175+ let json = response.json::<serde_json::Value>().await.expect("Expected to be able to read response as JSON, but failed.");
176176+ Ok(json)
177177+ }
178178+ /// Get a profile from the atproto API.
179179+ pub async fn get_profile(
180180+ &mut self,
181181+ profile_id: &str,
182182+ ) -> Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>> {
183183+ let path = "app.bsky.actor.getProfile";
184184+ let parameters = [("actor", profile_id)];
185185+ self.get(path, ¶meters, ApiEndpoint::Public).await
186186+ }
187187+ /// Get multiple profiles.
188188+ pub async fn get_profiles(
189189+ &mut self,
190190+ profile_ids: &[String],
191191+ ) -> Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>> {
192192+ let path = "app.bsky.actor.getProfiles";
193193+ let mut parameters = Vec::new();
194194+ for profile_id in profile_ids {
195195+ parameters.push(("actors", profile_id.as_str()));
196196+ }
197197+ self.get(path, parameters.as_slice(), ApiEndpoint::Authorized).await
198198+ }
199199+ /// Check if a list of profiles has a label from us.
200200+ pub async fn check_profiles(
201201+ &mut self,
202202+ profile_ids: &[(String, i64)],
203203+ ) -> Result<Vec<(bool, (String, i64))>, Box<dyn std::error::Error + Send + Sync>> {
204204+ let mut found_labels: Vec<(bool, (String, i64))> = Vec::new();
205205+ let profile_ids_uris = profile_ids.iter().map(|(profile_id, _)| profile_id.clone()).collect::<Vec<String>>();
206206+ let profile_ids_seqs = profile_ids.iter().map(|(_, seq)| seq).collect::<Vec<&i64>>();
207207+ let profiles = self.get_profiles(profile_ids_uris.as_slice()).await?;
208208+ let profiles_array = profiles["profiles"].as_array();
209209+ if profiles_array.is_none() {
210210+ tracing::warn!("No profiles json found for profiles: {:?}", profiles);
211211+ return Ok(vec![]);
212212+ }
213213+ for profile in profiles_array.unwrap_or_else(|| panic!("Expected to be able to read profiles as array, but failed. Profiles: {:?}", profiles)) {
214214+ let labels = &profile["labels"];
215215+ let mut found = false;
216216+ let label_array = labels.as_array();
217217+ if label_array.is_none() {
218218+ tracing::warn!("No labels json found for profile: {:?}", profile);
219219+ continue;
220220+ }
221221+ let did = profile["did"].as_str().expect("Expected to be able to read did as str, but failed.");
222222+ let seq = profile_ids_seqs[profile_ids_uris.iter().position(|x| x == did).expect("Expected to be able to find the index of the uri.")];
223223+ for label in label_array.unwrap_or_else(|| panic!("Expected to be able to read labels as array, but failed. Profile: {:?}", profile)) {
224224+ if label["src"].as_str().expect("Expected to be able to read src as str, but failed.") == self.self_did.as_str() {
225225+ found = true;
226226+ break;
227227+ }
228228+ }
229229+ found_labels.push((found, (did.to_owned(), *seq)));
230230+ }
231231+ Ok(found_labels)
232232+ }
233233+ /// After getting a profile, check the labels on it, and see if one from us ("src:") is there.
234234+ pub async fn check_profile(
235235+ &mut self,
236236+ profile_did: &str,
237237+ ) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
238238+ let profile = self.get_profile(profile_did).await?;
239239+ let labels = &profile["labels"];
240240+ let label_array = labels.as_array();
241241+ if label_array.is_none() {
242242+ tracing::warn!("No labels json found for profile: {:?}", profile);
243243+ return Ok(false);
244244+ }
245245+ for label in label_array.unwrap_or_else(|| panic!("Expected to be able to read labels as array, but failed. Profile: {:?}", profile)) {
246246+ if label["src"].as_str().expect("Expected to be able to read src as str, but failed.") == self.self_did.as_str() {
247247+ return Ok(true);
248248+ }
249249+ }
250250+ Ok(false)
251251+ }
252252+ /// Get a label from the provided URL, then validate the signature.
253253+ pub async fn get_label_and_validate(
254254+ &self,
255255+ url: &str,
256256+ ) -> Result<(), Box<dyn std::error::Error>> {
257257+ tracing::debug!("Getting label from {}", url);
258258+ let response = reqwest::get(url).await.expect("Expected to be able to get response, but failed.");
259259+ tracing::debug!("Response: {:?}", response);
260260+ let response_json = response.json::<serde_json::Value>().await.expect("Expected to be able to read response as JSON, but failed.");
261261+ tracing::debug!("Response JSON: {:?}", response_json);
262262+ let sig = &response_json["labels"][0]["sig"];
263263+ tracing::debug!("Signature: {:?}", sig);
264264+ let retrieved_label = RetrievedLabelResponse {
265265+ // id: response_json["labels"][0]["id"].as_u64().unwrap(),
266266+ cts: response_json["labels"][0]["cts"].as_str().expect("Expected to be able to read cts as str, but failed.").to_owned(),
267267+ neg: response_json["labels"][0]["neg"].as_str() == Some("true"),
268268+ src: response_json["labels"][0]["src"].as_str().expect("Expected to be able to read src as str, but failed.").to_owned().parse().expect("Failed to parse DID"),
269269+ uri: response_json["labels"][0]["uri"].as_str().expect("Expected to be able to read uri as str, but failed.").to_owned().parse().expect("Failed to parse URI"),
270270+ val: response_json["labels"][0]["val"].as_str().expect("Expected to be able to read val as str, but failed.").to_owned(),
271271+ ver: response_json["labels"][0]["ver"].as_u64().expect("Expected to be able to read ver as u64, but failed."),
272272+ };
273273+ let crypto = crate::crypto::Crypto::new();
274274+ let pub_key = "zQ3shreqyXEdouQeEQSFKfoSEN5eig74BXuqQyTaiE9uzADqZ";
275275+ let sig_string = sig["$bytes"].as_str().expect("Expected to be able to read sig as str, but failed.");
276276+ if crypto.validate(retrieved_label, sig_string, pub_key) {
277277+ tracing::info!("Valid signature");
278278+ Ok(())
279279+ } else {
280280+ tracing::info!("Invalid signature");
281281+ Err(Box::new(std::io::Error::new(
282282+ std::io::ErrorKind::Other,
283283+ "Invalid signature",
284284+ )))
285285+ }
286286+ }
287287+ /// Get a label from a websocket URL, then validate the signature.
288288+ /// Similar to what's done in webserve.rs, but in reverse, we'll need to decode the message.
289289+ pub async fn get_label_and_validate_ws(
290290+ &self,
291291+ // url: &str,
292292+ ) -> Result<(), Box<dyn std::error::Error>> {
293293+ // For now, use this mock response, represented in base64:
294294+ let response = "omF0ZyNsYWJlbHNib3ABomNzZXEYG2ZsYWJlbHOBp2NjdHN4GzIwMjUtMDItMDlUMDM6MjU6MjcuOTI4MDIzWmNuZWf0Y3NpZ1hAXLIRXAG5mF5bCWWCwEhbYvC8YYVP9fWwbVVL6IBXXlIrZ6sr6MQ4DfNdpGhwRWawA4Mq44HlEDsJ7OvcGsDCDWNzcmN4IGRpZDpwbGM6bTZhZHB0bjYyZGNhaGZhcTM0dGNlM2o1Y3VyaXggZGlkOnBsYzptNmFkcHRuNjJkY2FoZmFxMzR0Y2UzajVjdmFsbmpvaW5lZC0yMDI1LTAyY3ZlcgE=";
295295+ tracing::debug!("Response: {:?}", response);
296296+ let response_bytes = base64::engine::GeneralPurpose::new(
297297+ &base64::alphabet::STANDARD,
298298+ base64::engine::general_purpose::PAD).decode(response).expect("Expected to be able to decode base64 response.");
299299+ tracing::debug!("Response bytes: {:?}", response_bytes);
300300+ let reponse_bytes_in_hex = hex::encode(&response_bytes);
301301+ tracing::debug!("Response bytes in hex: {:?}", reponse_bytes_in_hex);
302302+ let response_0 = &response_bytes[0..response_bytes.iter().position(|&r| r == 0x01).expect("Expected to find 0x01 in response bytes.")];
303303+ let response_1 = &response_bytes[response_bytes.iter().position(|&r| r == 0x01).expect("Expected to find 0x01 in response bytes.") + 1..];
304304+ tracing::debug!("Response 0: {:?}", hex::encode(response_0));
305305+ tracing::debug!("Response 1: {:?}", hex::encode(response_1));
306306+ let response_cbor: LabelsVecWithSeq = serde_cbor::from_slice(response_1).expect("Expected to be able to deserialize response 1 as LabelsVecWithSeq, but failed.");
307307+ tracing::debug!("Response CBOR: {:?}", response_cbor);
308308+ let unsigned_response = RetrievedLabelResponse {
309309+ cts: response_cbor.labels[0].cts.clone(),
310310+ neg: response_cbor.labels[0].neg,
311311+ src: response_cbor.labels[0].src.clone(),
312312+ uri: response_cbor.labels[0].uri.clone(),
313313+ val: response_cbor.labels[0].val.clone(),
314314+ ver: response_cbor.labels[0].ver,
315315+ };
316316+ let sig_base64 = SignatureBytes::from_bytes(response_cbor.labels[0].sig).as_base64();
317317+ tracing::debug!("Retrieved label: {:?}", response_cbor);
318318+ let crypto = crate::crypto::Crypto::new();
319319+ let public_key = "zQ3shreqyXEdouQeEQSFKfoSEN5eig74BXuqQyTaiE9uzADqZ";
320320+ if crypto.validate(unsigned_response, &sig_base64, public_key) {
321321+ tracing::info!("Valid signature");
322322+ Ok(())
323323+ } else {
324324+ tracing::info!("Invalid signature");
325325+ Err(Box::new(std::io::Error::new(
326326+ std::io::ErrorKind::Other,
327327+ "Invalid signature",
328328+ )))
329329+ }
330330+ }
331331+ /// getLikes
332332+ pub async fn get_likes(
333333+ &mut self,
334334+ uri: &str,
335335+ ) -> Result<Vec<serde_json::Value>, Box<dyn std::error::Error + Send + Sync>> {
336336+ let path = "app.bsky.feed.getLikes";
337337+ let parameters = [("uri", uri)];
338338+ self.get(path, ¶meters, ApiEndpoint::Public).await.map(|response| response["likes"].as_array().expect("Expected to be able to read likes as array, but failed.").to_owned())
339339+ }
340340+}
+488
src/webserve.rs
···11+//! Serving requests as a labeler.
22+33+use axum::{
44+ body::Bytes,
55+ extract::ws::{Message, Utf8Bytes, WebSocket, WebSocketUpgrade},
66+ response::IntoResponse,
77+ routing::any,
88+ Router,
99+};
1010+use axum_extra::TypedHeader;
1111+use headers::{Header, HeaderName, HeaderValue};
1212+use sqlx::{sqlite::{SqliteConnectOptions, SqliteJournalMode}, SqlitePool};
1313+use std::str::FromStr;
1414+use std::ops::ControlFlow;
1515+use std::net::SocketAddr;
1616+use axum::extract::connect_info::ConnectInfo;
1717+use axum::extract::ws::CloseFrame;
1818+use futures::{sink::SinkExt, stream::StreamExt};
1919+use axum::extract::Query;
2020+use axum::{Json, http::StatusCode, routing::get};
2121+use serde::Deserialize;
2222+use tower_http::decompression::RequestDecompressionLayer;
2323+use tower_http::compression::CompressionLayer;
2424+2525+use crate::{types::{AssignedLabelResponse, AssignedLabelResponseWrapper, SignatureBytes, SignatureEnum, SubscribeLabelsLabels, UriParams}, webrequest::Agent};
2626+2727+2828+/// Launch the web server to respond to label inquiries.
2929+#[tracing::instrument]
3030+pub async fn main_webserve() {
3131+ let app = Router::new()
3232+ .route("/xrpc/com.atproto.label.subscribeLabels", any(subscribe_labels))
3333+ .route("/xrpc/com.atproto.label.queryLabels", get(query_labels))
3434+ .route("/xrpc/app.bsky.actor.getProfile", get(get_profile))
3535+ .layer(RequestDecompressionLayer::new())
3636+ .layer(CompressionLayer::new().deflate(true));
3737+ let listener = tokio::net::TcpListener::bind("0.0.0.0:3000")
3838+ .await
3939+ .expect("Expected to bind to 0.0.0.0:3000 but failed.");
4040+ tracing::debug!("listening on {}", listener.local_addr().expect("Expected to get local address but failed."));
4141+ axum::serve(listener, app.into_make_service_with_connect_info::<SocketAddr>()).await.expect("Expected to be able to use axum::serve but failed.");
4242+}
4343+4444+/// Querys by DID. \
4545+async fn query_labels(Query(params): Query<UriParams>) -> impl IntoResponse {
4646+ tracing::debug!("Querying labels: {:?}", params.uriPatterns);
4747+ drop(dotenvy::dotenv().expect("Failed to load .env file"));
4848+ let self_did = dotenvy::var("SELF_DID").expect("Expected to be able to get the SELF_DID from the environment, but failed");
4949+ let src = jetstream_oxide::exports::Did::new(self_did).expect("Expected to be able to create a valid DID but failed");
5050+ if let Some(uri_patterns) = params.uriPatterns {
5151+ let pool_opts = SqliteConnectOptions::from_str("sqlite://prod.db?mode=ro").expect("Expected to be able to configure the database, but failed.")
5252+ .journal_mode(SqliteJournalMode::Wal)
5353+ .read_only(true);
5454+ let pool = SqlitePool::connect_with(pool_opts).await.expect("Expected to be able to connect to the database at sqlite://prod.db but failed.");
5555+ let pattern = uri_patterns.replace("%", "").replace("_", "\\_"); // .replaceAll(/%/g, "").replaceAll(/_/g, "\\_");
5656+ let star_index = pattern.find('*');
5757+ let limit = params.limit.unwrap_or(50);
5858+ let cursor = params.cursor.unwrap_or_else(|| "0".to_owned());
5959+ if let Some(star_index) = star_index {
6060+ if star_index != pattern.len() - 1 {
6161+ return (StatusCode::BAD_REQUEST, Json(AssignedLabelResponseWrapper {
6262+ cursor: "0".to_owned(), // TODO: Other servers don't respond with a cursor in this scenario.
6363+ labels: Vec::<AssignedLabelResponse>::new()
6464+ }));
6565+ }
6666+ let labels = sqlx::query!(
6767+ r#"
6868+ SELECT seq, uri "uri: String", val "val: String", neg, cts "cts: String", sig
6969+ FROM profile_labels
7070+ WHERE seq > ?
7171+ LIMIT ?
7272+ "#,
7373+ cursor,
7474+ limit
7575+ )
7676+ .fetch_all(&pool)
7777+ .await
7878+ .expect("Expected to be able to fetch all labels from the database but failed.");
7979+ let smallest_cursor = labels.iter().map(|label| label.seq).min().unwrap_or(0);
8080+ return (StatusCode::OK, Json(AssignedLabelResponseWrapper {
8181+ cursor: smallest_cursor.to_string(),
8282+ labels: labels
8383+ .iter()
8484+ .map(|label| {
8585+ AssignedLabelResponse::reconstruct(
8686+ src.to_owned(),
8787+ label.uri.clone(),
8888+ label.val.clone(),
8989+ label.neg.unwrap_or(false),
9090+ label.cts.clone(),
9191+ SignatureEnum::Json(SignatureBytes::from_vec(label.sig.clone()).as_json_object()),
9292+ )
9393+ })
9494+ .collect(),
9595+ }));
9696+ }
9797+ let labels = sqlx::query!(
9898+ r#"
9999+ SELECT seq "seq: i64", uri "uri: String", val "val: String", neg, cts "cts: String", sig
100100+ FROM profile_labels WHERE uri = ? AND seq > ? LIMIT ?
101101+ "#,
102102+ uri_patterns,
103103+ cursor,
104104+ limit
105105+ )
106106+ .fetch_all(&pool)
107107+ .await
108108+ .expect("Expected to be able to fetch all missing labels from the database but failed.");
109109+ let largest_cursor = labels.iter().map(|label| label.seq).max().unwrap_or(0);
110110+ return (StatusCode::OK, Json(AssignedLabelResponseWrapper {
111111+ cursor: largest_cursor.to_string(),
112112+ labels: labels
113113+ .iter()
114114+ .map(|label| {
115115+ AssignedLabelResponse::reconstruct(
116116+ src.to_owned(),
117117+ label.uri.clone(),
118118+ label.val.clone(),
119119+ label.neg.unwrap_or(false),
120120+ label.cts.clone(),
121121+ SignatureEnum::Json(SignatureBytes::from_vec(label.sig.clone()).as_json_object()),
122122+ )
123123+ })
124124+ .collect(),
125125+ }));
126126+ }
127127+ (
128128+ StatusCode::OK,
129129+ Json(AssignedLabelResponseWrapper {
130130+ cursor: "0".to_owned(),
131131+ labels: Vec::<AssignedLabelResponse>::new()
132132+ }),
133133+ )
134134+}
135135+/// Querys by profile name.
136136+async fn get_profile(Query(params): Query<UriParams>) -> impl IntoResponse {
137137+ if let Some(actor) = ¶ms.actor {
138138+ let mut agent = Agent::default();
139139+ if let Ok(profile) = agent.get_profile(actor).await {
140140+ return (StatusCode::OK, Json(profile));
141141+ }
142142+ }
143143+ (StatusCode::OK, Json(serde_json::json!({})))
144144+}
145145+146146+/// Query parameters for subscribing to labels.
147147+#[derive(Deserialize)]
148148+struct SubscribeLabelsQueryParams {
149149+ /// The last known event seq number to backfill from.
150150+ cursor: Option<u64>,
151151+}
152152+153153+154154+#[derive(Debug)]
155155+struct XForwardedFor(String);
156156+157157+impl Header for XForwardedFor {
158158+ fn name() -> &'static HeaderName {
159159+ &http::header::FORWARDED
160160+ }
161161+162162+ fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error>
163163+ where
164164+ I: Iterator<Item = &'i HeaderValue>,
165165+ {
166166+ let value = values
167167+ .next()
168168+ .ok_or_else(headers::Error::invalid)?;
169169+170170+ // We are only interested in the first IP address in the list.
171171+ let ip = value
172172+ .to_str()
173173+ .map_err(|_| headers::Error::invalid())?
174174+ .split(',')
175175+ .next()
176176+ .ok_or_else(headers::Error::invalid)?;
177177+178178+ Ok(Self(ip.to_owned()))
179179+ }
180180+181181+ fn encode<E>(&self, values: &mut E)
182182+ where
183183+ E: Extend<HeaderValue>,
184184+ {
185185+ let value = HeaderValue::from_str(&self.0).expect("Expected to be able to convert the X-Forwarded-For header to a string but failed.");
186186+ values.extend(std::iter::once(value));
187187+ }
188188+}
189189+190190+/// The handler for the HTTP request.
191191+///
192192+/// This gets called when the HTTP request lands at the start
193193+/// of websocket negotiation. After this completes, the actual switching from HTTP to
194194+/// websocket protocol will occur.
195195+/// This is the last point where we can extract TCP/IP metadata such as IP address of the client
196196+/// as well as things from HTTP headers such as user-agent of the browser etc.
197197+async fn subscribe_labels(
198198+ ws: WebSocketUpgrade,
199199+ user_agent: Option<TypedHeader<headers::UserAgent>>,
200200+ x_forwarded_for: Option<TypedHeader<XForwardedFor>>,
201201+ ConnectInfo(connection_address): ConnectInfo<SocketAddr>,
202202+ Query(params): Query<SubscribeLabelsQueryParams>,
203203+) -> impl IntoResponse {
204204+ let user_agent = if let Some(TypedHeader(user_agent)) = user_agent {
205205+ user_agent.to_string()
206206+ } else {
207207+ String::from("Unknown browser")
208208+ };
209209+ // Check X-Forwarded-For header to get the apparent IP address of the client
210210+ // TODO: This header can be spoofed, and should only be trusted from a trusted proxy.
211211+ let apparent_ip = if let Some(TypedHeader(x_forwarded_for)) = x_forwarded_for {
212212+ SocketAddr::new(x_forwarded_for.0.parse().expect("Expected to be able to parse the X-Forwarded-For header as a socket address but failed."),
213213+ connection_address.port())
214214+ } else {
215215+ connection_address
216216+ };
217217+ tracing::debug!("`{user_agent}` at {apparent_ip} connected.");
218218+ let pool_opts = SqliteConnectOptions::from_str("sqlite://prod.db?mode=ro").expect("Expected to be able to configure the database, but failed.")
219219+ .journal_mode(SqliteJournalMode::Wal)
220220+ .read_only(true);
221221+ let pool = SqlitePool::connect_with(pool_opts).await.expect("Expected to be able to connect to the database at sqlite://prod.db but failed.");
222222+ let cursor = params.cursor.unwrap_or(
223223+ get_current_cursor_count(&pool)
224224+ .await.expect("Expected to be able to get the current cursor count but failed.")
225225+ .try_into().expect("Expected to be able to convert the current cursor count to a u64 but failed.")
226226+ ) as i64;
227227+ // finalize the upgrade process by returning upgrade callback.
228228+ // we can customize the callback by sending additional info such as address.
229229+ ws.on_upgrade(move |socket| handle_socket(socket, apparent_ip, cursor, pool))
230230+}
231231+232232+async fn get_current_cursor_count(
233233+ pool: &SqlitePool,
234234+) -> Result<i64, sqlx::Error> {
235235+ let current_cursor_count = sqlx::query!(
236236+ r#"
237237+ SELECT seq FROM profile_labels ORDER BY seq DESC LIMIT 1
238238+ "#
239239+ )
240240+ .fetch_one(pool)
241241+ .await?;
242242+ Ok(current_cursor_count.seq)
243243+}
244244+245245+/// Actual websocket statemachine (one will be spawned per connection)
246246+async fn handle_socket(socket: WebSocket, who: SocketAddr, cursor: i64, pool: SqlitePool) {
247247+ let _ = websocket_context(socket, who, cursor, pool).await;
248248+ // returning from the handler closes the websocket connection
249249+ tracing::debug!("Websocket context {who} destroyed");
250250+}
251251+252252+/// Get all missed messages, based on cursor.
253253+async fn get_missed_messages(
254254+ pool: &SqlitePool,
255255+ cursor: i64,
256256+) -> Result<Vec<SubscribeLabelsLabels>, sqlx::Error> {
257257+ let missed_messages = sqlx::query!(
258258+ r#"
259259+ SELECT seq, uri "uri: String", val "val: String", neg "neg: bool", cts "cts: String", sig
260260+ FROM profile_labels WHERE seq > ?
261261+ "#,
262262+ cursor
263263+ )
264264+ .fetch_all(pool)
265265+ .await?;
266266+ drop(dotenvy::dotenv().expect("Failed to load .env file"));
267267+ let self_did = dotenvy::var("SELF_DID").expect("Expected to be able to get the SELF_DID from the environment, but failed");
268268+ let src = jetstream_oxide::exports::Did::new(self_did).expect("Expected to be able to create a valid DID but failed");
269269+ Ok(missed_messages
270270+ .iter()
271271+ .map(|label| {
272272+ SubscribeLabelsLabels {
273273+ seq: label.seq,
274274+ labels: vec![AssignedLabelResponse::reconstruct(
275275+ src.to_owned(),
276276+ label.uri.clone(),
277277+ label.val.clone(),
278278+ label.neg.unwrap_or(false),
279279+ label.cts.clone(),
280280+ SignatureEnum::Bytes(SignatureBytes::from_vec(label.sig.clone())),
281281+ )],
282282+ }
283283+ })
284284+ .collect())
285285+}
286286+287287+async fn websocket_context(mut socket: WebSocket, who: SocketAddr, mut cursor: i64, pool: SqlitePool) -> ControlFlow<()> {
288288+ ws_send(
289289+ &mut socket,
290290+ who,
291291+ Message::Ping(Bytes::from_static(&[1, 2, 3]))
292292+ ).await?;
293293+ tracing::info!("{who} connected with cursor {cursor}");
294294+ let current_cursor_count = get_current_cursor_count(&pool).await.unwrap_or_default();
295295+ tracing::debug!("Current cursor count: {current_cursor_count}");
296296+ if cursor < current_cursor_count {
297297+ let missed_messages = get_missed_messages(&pool, cursor).await.expect("Expected to be able to get missed messages but failed.");
298298+ for message in missed_messages {
299299+ tracing::info!("Sending missed message to {who}: {:?}", message);
300300+ let message_header: Vec<u8> = Bytes::from_static(b"\xa2atg#labelsbop\x01").into();
301301+ let message_body = serde_cbor::to_vec(&message).expect("Expected to be able to serialize message to CBOR but failed.");
302302+ let message_combined = [message_header, message_body].concat();
303303+ let message_finished = Message::Binary(message_combined.into());
304304+ ws_send(
305305+ &mut socket,
306306+ who,
307307+ message_finished
308308+ ).await?;
309309+ }
310310+ cursor = current_cursor_count;
311311+ }
312312+ // By splitting socket we can send and receive at the same time. In this example we will send
313313+ // unsolicited messages to client based on some sort of server's internal event (i.e .timer).
314314+ let (mut sender, mut receiver) = socket.split();
315315+ // Spawn a task that will push several messages to the client (does not matter what client does)
316316+ let mut send_task = tokio::spawn(async move {
317317+ const PING_INTERVAL: std::time::Duration = std::time::Duration::from_secs(60);
318318+ const BROADCAST_INTERVAL: std::time::Duration = std::time::Duration::from_secs(20);
319319+ let mut last_ping = tokio::time::Instant::now();
320320+ let mut last_broadcast = tokio::time::Instant::now();
321321+ let mut n_msg = 0;
322322+ loop {
323323+ tokio::select! {
324324+ _ = tokio::time::sleep_until(last_ping + PING_INTERVAL) => {
325325+ tracing::debug!("Sending ping to {who}...");
326326+ if ws_send(
327327+ &mut sender,
328328+ who,
329329+ Message::Ping(Bytes::from_static(&[1, 2, 3]))
330330+ ).await.is_break() {
331331+ tracing::warn!("Client {who} failed to respond to ping");
332332+ break;
333333+ }
334334+ tracing::debug!("Sent ping to {who}");
335335+ last_ping = tokio::time::Instant::now();
336336+ },
337337+ _ = tokio::time::sleep_until(last_broadcast + BROADCAST_INTERVAL) => {
338338+ tracing::debug!("Polling for new messages to send to {who}...");
339339+ let current_cursor_count = get_current_cursor_count(&pool).await.unwrap_or_default();
340340+ if cursor < current_cursor_count {
341341+ let missed_messages = get_missed_messages(&pool, cursor).await;
342342+ if missed_messages.is_err() {
343343+ tracing::warn!("Error getting missed messages: {missed_messages:?}");
344344+ last_broadcast = tokio::time::Instant::now();
345345+ continue;
346346+ }
347347+ for message in missed_messages.expect("Expected to be able to get missed messages but failed.") {
348348+ let seq = message.seq;
349349+ let neg = message.labels[0].neg;
350350+ let uri = message.labels[0].uri.clone();
351351+ let val = message.labels[0].val.clone();
352352+ let prefix = if neg { "Negation" } else { "Emitting" };
353353+ tracing::info!("{prefix} label {seq} to {who}: {uri} {val}");
354354+ let message_header: Vec<u8> = Bytes::from_static(b"\xa2atg#labelsbop\x01").into();
355355+ let message_body = serde_cbor::to_vec(&message).expect("Expected to be able to serialize message to CBOR but failed.");
356356+ let message_combined = [message_header, message_body].concat();
357357+ let message_finished = Message::Binary(message_combined.into());
358358+ if ws_send(
359359+ &mut sender,
360360+ who,
361361+ message_finished
362362+ ).await.is_break() {
363363+ tracing::warn!("Client {who} failed to receive missed message");
364364+ break;
365365+ }
366366+ n_msg += 1;
367367+ }
368368+ cursor = current_cursor_count;
369369+ }
370370+ tracing::debug!("Finished poll for {who}");
371371+ last_broadcast = tokio::time::Instant::now();
372372+ }
373373+ }
374374+ }
375375+ tracing::info!("Sending close to {who}...");
376376+ ws_close(sender).await;
377377+ n_msg
378378+ });
379379+380380+ // This second task will receive messages from client and print them on server console
381381+ let mut recv_task = tokio::spawn(async move {
382382+ let mut cnt = 0;
383383+ while let Some(Ok(msg)) = receiver.next().await {
384384+ cnt += 1;
385385+ // print message and break if instructed to do so
386386+ if process_message(msg, who).is_break() {
387387+ break;
388388+ }
389389+ }
390390+ cnt
391391+ });
392392+393393+ // If any one of the tasks exit, abort the other.
394394+ tokio::select! {
395395+ rv_a = (&mut send_task) => {
396396+ match rv_a {
397397+ Ok(a) => tracing::info!("{a} messages sent to {who}"),
398398+ Err(a) => tracing::warn!("Error sending messages {a:?}")
399399+ }
400400+ recv_task.abort();
401401+ },
402402+ rv_b = (&mut recv_task) => {
403403+ match rv_b {
404404+ Ok(b) => tracing::info!("Received {b} messages"),
405405+ Err(b) => tracing::warn!("Error receiving messages {b:?}")
406406+ }
407407+ send_task.abort();
408408+ }
409409+ }
410410+ ControlFlow::Continue(())
411411+}
412412+413413+async fn ws_send(
414414+ socket: &mut (impl SinkExt<Message> + Unpin),
415415+ who: SocketAddr,
416416+ msg: Message,
417417+) -> ControlFlow<(), ()> {
418418+ if socket
419419+ .send(msg)
420420+ .await
421421+ .is_err()
422422+ {
423423+ tracing::warn!("client {who} abruptly disconnected");
424424+ return ControlFlow::Break(());
425425+ }
426426+ ControlFlow::Continue(())
427427+}
428428+429429+async fn ws_close(mut sender: futures::stream::SplitSink<WebSocket, Message>) {
430430+ if let Err(e) = sender
431431+ .send(Message::Close(Some(CloseFrame {
432432+ code: axum::extract::ws::close_code::NORMAL,
433433+ reason: Utf8Bytes::from_static("Goodbye"),
434434+ })))
435435+ .await
436436+ {
437437+ tracing::warn!("Could not send Close due to {e}, probably it is ok?");
438438+ }
439439+}
440440+441441+/// helper to print contents of messages to stdout. Has special treatment for Close.
442442+#[allow(clippy::cognitive_complexity)]
443443+fn process_message(msg: Message, who: SocketAddr) -> ControlFlow<(), ()> {
444444+ match msg {
445445+ Message::Text(t) => {
446446+ tracing::debug!(">>> {who} sent str: {t:?}");
447447+ }
448448+ Message::Binary(d) => {
449449+ tracing::debug!(">>> {} sent {} bytes: {:?}", who, d.len(), d);
450450+ }
451451+ Message::Close(c) => {
452452+ if let Some(cf) = c {
453453+ tracing::debug!(
454454+ ">>> {} sent close with code {} and reason `{}`",
455455+ who, cf.code, cf.reason
456456+ );
457457+ } else {
458458+ tracing::debug!(">>> {who} somehow sent close message without CloseFrame");
459459+ }
460460+ return ControlFlow::Break(());
461461+ }
462462+463463+ Message::Pong(v) => {
464464+ tracing::debug!(">>> {who} sent pong with {v:?}");
465465+ }
466466+ // You should never need to manually handle Message::Ping, as axum's websocket library
467467+ // will do so for you automagically by replying with Pong and copying the v according to
468468+ // spec. But if you need the contents of the pings you can see them here.
469469+ Message::Ping(v) => {
470470+ tracing::debug!(">>> {who} sent ping with {v:?}");
471471+ }
472472+ }
473473+ ControlFlow::Continue(())
474474+}
475475+476476+/// WIP: fetch likes from app.bsky.feed.like
477477+/// https://shimeji.us-east.host.bsky.network/xrpc/com.atproto.repo.listRecords?repo=did:plc:jrtgsidnmxaen4offglr5lsh&collection=app.bsky.feed.like&limit=100
478478+///
479479+/// then return them as a custom feed
480480+/// request path is at "/xrpc/app.bsky.feed.getFeedSkeleton?<feed>&<limit>&<cursor>"
481481+#[allow(dead_code)]
482482+async fn get_feed_skeleton(Query(params): Query<UriParams>) -> impl IntoResponse {
483483+ if let Some(_uri_patterns) = ¶ms.uriPatterns {
484484+ let mut _agent = Agent::default();
485485+486486+ }
487487+ (StatusCode::OK, Json(serde_json::json!({})))
488488+}