···11+//! backfill works as follows (https://docs.bsky.app/docs/advanced-guides/backfill)
22+//!
33+//! 1. resolve did -> pds
44+//! 2. get a car file from com.atproto.sync.getRepo
55+//! 3. extract collection, rkey, and cbor data from each leaf
66+//! 4. convert cbor data to json
77+//! 5. store in db (limit to DB_MAX_REQ / 4 to avoid err)
88+19use std::{cmp::Ordering, str::FromStr};
210311use jacquard::{types::tid::Tid, url::Url};
···3240 ParseCarError(#[from] crate::backfill::parse_car::Error),
3341}
34423535-/// backfill works as follows (https://docs.bsky.app/docs/advanced-guides/backfill)
3636-///
3737-/// 1. resolve did -> pds
3838-/// 2. stream com.atproto.sync.subscribeRepos to a buffer
3939-/// 3. get a car file from com.atproto.sync.getRepo (diff if a rev is stored in database)
4040-/// 4. apply car file diff to database (incl rev)
4141-/// 5. start playing events from buffer
4242-/// 1. drop all events from other users
4343-/// 2. drop all events with a lower rev than current rev
4444-/// 3. apply event & update rev
4545-/// 4. (non blocking) get blobs if missing
4646-/// 5. (non blocking) parse for strongref and store strongrefs
4747-/// 6. (non blocking) trigger garbage collection of blobs and strongref
4848-/// 6. once buffer is empty, parse events live
4949-pub async fn backfill(pds: &str, conn: &Pool<Postgres>) -> Result<(), Error> {
4343+pub async fn backfill(
4444+ pds: &str,
4545+ conn: &Pool<Postgres>,
4646+ time: Option<std::time::Instant>,
4747+) -> Result<(), Error> {
5048 let db_rev = if let Some(rev) = query!(
5149 "SELECT (rev) FROM meta WHERE did = $1",
5250 config::USER.to_string()
···6462 let pds = Url::from_str(&format!("https://{pds}/")).unwrap();
6563 let car = load_car(config::USER.clone(), pds).await?;
66646767- match car.partial_cmp(&db_rev) {
6868- Some(val) => match val {
6565+ if let Some(time) = time {
6666+ println!("Downloaded car file ({:?})", time.elapsed());
6767+ }
6868+ let time = time.map(|_| std::time::Instant::now());
6969+7070+ if let Some(val) = car.partial_cmp(&db_rev) {
7171+ match val {
6972 // car rev newer than db rev
7073 // continue on; every other branch diverges
7174 Ordering::Greater => {}
···8083 // Most likely either the PDS or repo is broken, or the database has been corrupted.
8184 // Check your PDS repo is working and/or drop the database."
8285 // ),
8383- },
8484- // cant compare rev so assume all is ok and continue
8585- None => {}
8686+ };
8687 };
87888889 // erase all old records and return if it fails
···92939394 let data = parse_car(&car).await?;
9495 let mut data = data.chunks(DB_MAX_REQ / 4);
9696+9797+ if let Some(time) = time {
9898+ println!("Parsed car file ({:?})", time.elapsed());
9999+ }
100100+ let time = time.map(|_| std::time::Instant::now());
9510196102 while let Some(data) = data.next() {
97103 let mut query = sqlx::QueryBuilder::new("INSERT INTO records(collection, rkey, record) ");
···114120 }
115121 _ => {}
116122 };
123123+ }
124124+125125+ if let Some(time) = time {
126126+ println!("Saved to database ({:?})", time.elapsed());
117127 }
118128119129 match query!(
+5
src/config.rs
···11+//! get static and parsed environment variables
22+//!
33+//! USER is from env variable USER and parsed into a jacquard Did
44+//! POSTGRES_URL is from POSTGRES_USER, POSTGRES_PASSWORD, and POSTGRES_HOST
55+16use jacquard::types::string::Did;
27use std::env;
38use std::sync::LazyLock;
+2-16
src/db.rs
···11+//! create a connection pool and setup tables before making avaliable
22+13use crate::config;
24use sqlx::{Pool, Postgres, postgres::PgPool, query};
35···2527 {
2628 println!("Creating table `records`: \n{err}");
2729 panic!("Could not instantiate db");
2828- };
2929-3030- if let Err(err) = query!(
3131- "CREATE TABLE IF NOT EXISTS foreign_records (
3232- did TEXT,
3333- collection TEXT,
3434- rkey TEXT,
3535- record JSON NOT NULL,
3636- PRIMARY KEY (did, collection, rkey)
3737- );"
3838- )
3939- .execute(&conn)
4040- .await
4141- {
4242- println!("Creating table `foreign_records`: \n{err}");
4343- panic!();
4430 };
45314632 if let Err(err) = query!(
···11+//! convert an ipld_core::ipld::Ipld enum into a serde_json::value::Value in the atproto data model
22+//!
33+//! a specific helper is required for this as Bytes and Link have differing representations to how serde_json handles them by default
44+//!
55+//! in general. types are naievely converted. the following types have special cases:
66+//! - `integer`: this could throw an error if the number is `x` in `i64::MIN < x < u64::MAX`
77+//! - `float`: always issues a warning since this is technically illegal. If its NaN or infinity, this errors as they cant be represented in json
88+//! - `bytes`: atproto JSON represents them as `{"$bytes": "BASE 64 NO PADDING"}`, but serde_json defaults to `[u8]`
99+//! - `link`: atproto JSON represents them as `{"$link": "BASE 32 NO PADDING"}`, but serde_json defaults to `[u8]`
1010+111use base64::{Engine, prelude::BASE64_STANDARD_NO_PAD};
212use ipld_core::{cid::multibase::Base, ipld::Ipld};
313use log::warn;
···4858 .map(|(k, v)| Ok::<_, Error>((k.clone(), ipld_to_json_value(v)?)))
4959 .collect::<Result<Map<String, Value>, _>>()?,
5060 ),
5151- Ipld::Link(cid) => json!({"$link":
5252- cid.to_string_of_base(Base::Base32Lower)? }),
6161+ Ipld::Link(cid) => json!({
6262+ "$link": cid.to_string_of_base(Base::Base32Lower)?
6363+ }),
5364 })
5465}
+4
src/utils/mod.rs
···11+//! contains utility functions
22+//!
33+//! see sub modules for more details
44+15pub mod ipld_json;
26pub mod resolver;
+2
src/utils/resolver.rs
···11+//! resolve a Did to a pds domain
22+13use jacquard::prelude::IdentityResolver;
24use jacquard::types::did::Did;
35use thiserror::Error;