this string has no description
export_atproto.py
1import requests
2import os
3from tqdm import tqdm
4
5# ================= Config =================
6
7PDS_URL = ""
8
9DID = ""
10
11ACCESS_JWT = ""
12
13OUTPUT_DIR = "backup"
14
15# ========================================
16
17session = requests.Session()
18headers = {}
19
20if ACCESS_JWT:
21 headers["Authorization"] = f"Bearer {ACCESS_JWT}"
22
23def export_repo():
24 print("Downloading repo.car...")
25 url = f"{PDS_URL}/xrpc/com.atproto.sync.getRepo?did={DID}"
26 r = session.get(url, headers=headers, stream=True)
27 r.raise_for_status()
28 os.makedirs(OUTPUT_DIR, exist_ok=True)
29 path = os.path.join(OUTPUT_DIR, "repo.car")
30 with open(path, "wb") as f:
31 for chunk in r.iter_content(8192):
32 f.write(chunk)
33 print(f"repo.car saved to {path}")
34
35def list_blobs():
36 print("Listing blobs...")
37 blobs = []
38 cursor = None
39 while True:
40 params = {}
41 if cursor:
42 params["cursor"] = cursor
43 url = f"{PDS_URL}/xrpc/com.atproto.sync.listBlobs?did={DID}"
44 r = session.get(url, headers=headers, params=params)
45 r.raise_for_status()
46 data = r.json()
47 blobs.extend(data.get("cids", []))
48 cursor = data.get("cursor")
49 if not cursor:
50 break
51 print(f"Found {len(blobs)} blobs")
52 return blobs
53
54def download_blobs(blobs):
55 os.makedirs(os.path.join(OUTPUT_DIR, "blobs"), exist_ok=True)
56 print(f"Downloading {len(blobs)} blobs...")
57 for cid in tqdm(blobs):
58 path = os.path.join(OUTPUT_DIR, "blobs", cid)
59 if os.path.exists(path):
60 continue
61 url = f"{PDS_URL}/xrpc/com.atproto.sync.getBlob?did={DID}&cid={cid}"
62 r = session.get(url, headers=headers)
63 if r.status_code == 200:
64 with open(path, "wb") as f:
65 f.write(r.content)
66
67def main():
68 export_repo()
69 blobs = list_blobs()
70 download_blobs(blobs)
71 print("Backup complete!")
72
73if __name__ == "__main__":
74 main()