···11+use jacquard::api::com_atproto;
22+use jacquard::client::Agent;
33+use jacquard::types::{did::Did, tid::Tid};
44+use jacquard::url::Url;
55+use jacquard::xrpc::XrpcExt;
66+use jacquard_repo::commit::Commit;
77+use jacquard_repo::{BlockStore, MemoryBlockStore, Mst};
88+use std::sync::Arc;
99+use thiserror::Error;
1010+1111+#[derive(Error, Debug)]
1212+pub enum Error {
1313+ #[error("Error loading car file: {}", .0)]
1414+ ClientError(#[from] jacquard::error::ClientError),
1515+ #[error("Error loading car file: {}", .0)]
1616+ RepoError(#[from] jacquard_repo::RepoError),
1717+ #[error("Missing root block from car file (malformed car file)")]
1818+ MissingRoot,
1919+}
2020+2121+pub struct Car {
2222+ pub storage: MemoryBlockStore,
2323+ pub mst: Mst<MemoryBlockStore>,
2424+ pub rev: Tid,
2525+}
2626+2727+impl PartialEq<Tid> for Car {
2828+ fn eq(&self, other: &Tid) -> bool {
2929+ &self.rev == other
3030+ }
3131+}
3232+3333+use std::cmp::Ordering;
3434+impl PartialOrd<Tid> for Car {
3535+ fn partial_cmp(&self, other: &Tid) -> Option<Ordering> {
3636+ return match self.rev.compare_to(other) {
3737+ 1 => Some(Ordering::Greater),
3838+ 0 => Some(Ordering::Equal),
3939+ -1 => Some(Ordering::Less),
4040+ _ => None,
4141+ };
4242+ }
4343+}
4444+4545+pub async fn load_car(user: Did<'_>, pds: Url) -> Result<Car, Error> {
4646+ let agent = Agent::unauthenticated();
4747+ let req = com_atproto::sync::get_repo::GetRepo::new()
4848+ .did(user)
4949+ .build();
5050+ let res = agent.xrpc(pds).send(&req).await?;
5151+5252+ let car = res.buffer();
5353+ let car = jacquard_repo::car::parse_car_bytes(&car).await?;
5454+5555+ let storage = jacquard_repo::storage::MemoryBlockStore::new_from_blocks(car.blocks);
5656+5757+ let root = storage
5858+ .get(&car.root)
5959+ .await?
6060+ .ok_or_else(|| Error::MissingRoot)?;
6161+ let root = Commit::from_cbor(&root)?;
6262+ let rev = root.rev().to_owned();
6363+ let root = root.data();
6464+6565+ let mst = Mst::load(Arc::new(storage.clone()), *root, None);
6666+6767+ Ok(Car { storage, mst, rev })
6868+}
+138
src/backfill/mod.rs
···11+use std::{cmp::Ordering, str::FromStr};
22+33+use jacquard::{types::tid::Tid, url::Url};
44+use sqlx::{Pool, Postgres, query};
55+use thiserror::Error;
66+77+use crate::{
88+ backfill::{load_car::load_car, parse_car::parse_car},
99+ config,
1010+};
1111+1212+pub mod load_car;
1313+pub mod parse_car;
1414+1515+const DB_MAX_REQ: usize = 65535;
1616+1717+#[derive(Error, Debug)]
1818+pub enum Error {
1919+ #[error("Error parsing TID: {}", .0)]
2020+ TidParse(#[from] jacquard::types::string::AtStrError),
2121+ #[error("{}", .0)]
2222+ GetCarError(#[from] crate::backfill::load_car::Error),
2323+ #[error(
2424+ "The database claims to be more up to date than the PDS.
2525+Most likely either the PDS or repo is broken, or the database has been corrupted.
2626+Check your PDS repo is working and/or drop the database."
2727+ )]
2828+ DbTidTooLow,
2929+ #[error("Database error: {}", .0)]
3030+ DbError(#[from] sqlx::Error),
3131+ #[error("{}", .0)]
3232+ ParseCarError(#[from] crate::backfill::parse_car::Error),
3333+}
3434+3535+/// backfill works as follows (https://docs.bsky.app/docs/advanced-guides/backfill)
3636+///
3737+/// 1. resolve did -> pds
3838+/// 2. stream com.atproto.sync.subscribeRepos to a buffer
3939+/// 3. get a car file from com.atproto.sync.getRepo (diff if a rev is stored in database)
4040+/// 4. apply car file diff to database (incl rev)
4141+/// 5. start playing events from buffer
4242+/// 1. drop all events from other users
4343+/// 2. drop all events with a lower rev than current rev
4444+/// 3. apply event & update rev
4545+/// 4. (non blocking) get blobs if missing
4646+/// 5. (non blocking) parse for strongref and store strongrefs
4747+/// 6. (non blocking) trigger garbage collection of blobs and strongref
4848+/// 6. once buffer is empty, parse events live
4949+pub async fn backfill(pds: &str, conn: &Pool<Postgres>) -> Result<(), Error> {
5050+ let db_rev = if let Some(rev) = query!(
5151+ "SELECT (rev) FROM meta WHERE did = $1",
5252+ config::USER.to_string()
5353+ )
5454+ .fetch_one(conn)
5555+ .await
5656+ .ok()
5757+ .and_then(|x| x.rev)
5858+ {
5959+ Tid::from_str(&rev)?
6060+ } else {
6161+ Tid::from_time(0, 0)
6262+ };
6363+6464+ let pds = Url::from_str(&format!("https://{pds}/")).unwrap();
6565+ let car = load_car(config::USER.clone(), pds).await?;
6666+6767+ match car.partial_cmp(&db_rev) {
6868+ Some(val) => match val {
6969+ // car rev newer than db rev
7070+ // continue on; every other branch diverges
7171+ Ordering::Greater => {}
7272+ // revisions are the same so we can skip backfill
7373+ Ordering::Equal => return Ok(()),
7474+ // db rev newer than car rev
7575+ // this means the db or car file is borked
7676+ // panic out and let the user deal with things
7777+ Ordering::Less => return Err(Error::DbTidTooLow),
7878+ // panic!(
7979+ // r"The database claims to be more up to date than the PDS.
8080+ // Most likely either the PDS or repo is broken, or the database has been corrupted.
8181+ // Check your PDS repo is working and/or drop the database."
8282+ // ),
8383+ },
8484+ // cant compare rev so assume all is ok and continue
8585+ None => {}
8686+ };
8787+8888+ // erase all old records and return if it fails
8989+ // we dont use diffs bc theyre complex and the overhead is minimal rn
9090+ // only real overhead is network latency which would be ~= anyway
9191+ let _ = query!("DELETE FROM records").execute(conn).await?;
9292+9393+ let data = parse_car(&car).await?;
9494+ let mut data = data.chunks(DB_MAX_REQ / 4);
9595+9696+ while let Some(data) = data.next() {
9797+ let mut query = sqlx::QueryBuilder::new("INSERT INTO records(collection, rkey, record) ");
9898+ query.push_values(
9999+ data,
100100+ |mut b: sqlx::query_builder::Separated<'_, '_, Postgres, &'static str>, data| {
101101+ b.push_bind(data.0.0.clone())
102102+ .push_bind(data.0.1.clone())
103103+ .push_bind(data.1.clone());
104104+ },
105105+ );
106106+107107+ match query.build().execute(conn).await {
108108+ Err(err) => {
109109+ // couldnt backfill so go nuclear
110110+ // this is program startup so its prolly safe lol
111111+ println!("Got error \"{}\"\nDeleting records and exiting...", err);
112112+ let _ = query!("DELETE FROM records").execute(conn).await?;
113113+ panic!()
114114+ }
115115+ _ => {}
116116+ };
117117+ }
118118+119119+ match query!(
120120+ "UPDATE meta SET rev = $1 WHERE did = $2",
121121+ car.rev.to_string(),
122122+ config::USER.to_string()
123123+ )
124124+ .execute(conn)
125125+ .await
126126+ {
127127+ Err(err) => {
128128+ // couldnt save tid so go nuclear
129129+ // this is program startup so its prolly safe lol
130130+ println!("Got error \"{}\"\nDeleting records and exiting...", err);
131131+ let _ = query!("DELETE FROM records").execute(conn).await?;
132132+ panic!()
133133+ }
134134+ _ => {}
135135+ };
136136+137137+ Ok(())
138138+}
+69
src/backfill/parse_car.rs
···11+use std::iter::zip;
22+33+use ipld_core::{cid::CidGeneric, ipld::Ipld};
44+use jacquard::smol_str::SmolStr;
55+use jacquard_repo::BlockStore;
66+use serde_json::Value;
77+use thiserror::Error;
88+99+use crate::{backfill::load_car::Car, utils::ipld_json::ipld_to_json_value};
1010+1111+#[derive(Debug, Error)]
1212+pub enum Error {
1313+ #[error("Error getting records from car file: {}", .0)]
1414+ RepoError(#[from] jacquard_repo::RepoError),
1515+ #[error("Missing CID from car file")]
1616+ MissingCid,
1717+ #[error("Could not decode record: {}", .0)]
1818+ DecodeError(#[from] serde_ipld_dagcbor::DecodeError<std::convert::Infallible>),
1919+ #[error("Could not convert into json: {}", .0)]
2020+ IpldToJsonError(#[from] crate::utils::ipld_json::Error),
2121+ #[error("Could not break {} into a collection and rkey", .0)]
2222+ MalformedRecordKey(SmolStr),
2323+}
2424+2525+pub type AccountData = Vec<((String, String), Value)>;
2626+2727+pub async fn parse_car(car: &Car) -> Result<AccountData, Error> {
2828+ let (keys, records): (Vec<SmolStr>, Vec<CidGeneric<64>>) =
2929+ car.mst.leaves().await?.into_iter().unzip();
3030+3131+ // convert keys into (collection, rkey)
3232+ let keys = keys
3333+ .into_iter()
3434+ .map(|x| {
3535+ let mut parts = x.split('/');
3636+ let collection = parts.next();
3737+ let rkey = parts.next();
3838+ if parts.next().is_none()
3939+ && let Some(collection) = collection
4040+ && let Some(rkey) = rkey
4141+ {
4242+ Ok::<_, Error>((collection.to_string(), rkey.to_string()))
4343+ } else {
4444+ Err(Error::MalformedRecordKey(x))
4545+ }
4646+ })
4747+ .collect::<Result<Vec<_>, _>>()?;
4848+4949+ // convert records into Value
5050+ let records = &records[..];
5151+ let records = car
5252+ .storage
5353+ .get_many(records)
5454+ .await?
5555+ .into_iter()
5656+ .collect::<Option<Vec<_>>>()
5757+ .ok_or_else(|| Error::MissingCid)?
5858+ .into_iter()
5959+ .map(|x| {
6060+ let data = serde_ipld_dagcbor::from_slice::<Ipld>(&x)?;
6161+ let value = ipld_to_json_value(&data)?;
6262+ Ok::<_, Error>(value)
6363+ })
6464+ .collect::<Result<Vec<_>, _>>()?;
6565+6666+ let data = zip(keys, records).collect::<Vec<((_, _), _)>>();
6767+6868+ Ok(data)
6969+}