A tool for backing up ATProto related data to S3
1package main
2
3import (
4 "archive/zip"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "log/slog"
10 "net/http"
11 "os"
12 "path"
13 "path/filepath"
14 "time"
15
16 "github.com/bugsnag/bugsnag-go/v2"
17 "github.com/minio/minio-go/v7"
18)
19
20func (s *service) backupPDS(ctx context.Context) {
21 if s.pdsHost == "" || s.did == "" {
22 slog.Info("PDS_HOST or DID env not set - skipping PDS backup")
23 return
24 }
25
26 err := s.backupRepo(ctx)
27 if err != nil {
28 slog.Error("backup repo", "error", err)
29 bugsnag.Notify(err)
30 }
31
32 err = s.backupBlobs(ctx)
33 if err != nil {
34 slog.Error("backup blobs", "error", err)
35 bugsnag.Notify(err)
36 }
37
38 slog.Info("finished PDS backup")
39}
40
41func (s *service) backupRepo(ctx context.Context) error {
42 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", s.pdsHost, s.did)
43 req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
44 if err != nil {
45 return fmt.Errorf("create get repo request: %w", err)
46 }
47
48 req.Header.Add("ACCEPT", "application/vnd.ipld.car")
49 resp, err := s.httpClient.Do(req)
50 if err != nil {
51 return fmt.Errorf("get repo: %w", err)
52 }
53
54 defer resp.Body.Close()
55
56 b, err := io.ReadAll(resp.Body)
57 if err != nil {
58 return fmt.Errorf("reading repo response body: %w", err)
59 }
60
61 filename := path.Join(s.blobDir, fmt.Sprintf("%s-%d-repo.car", s.did, time.Now().UnixMilli()))
62 f, err := os.Create(filename)
63 if err != nil {
64 return fmt.Errorf("creating temp file: %w", err)
65 }
66 defer func() {
67 f.Close()
68
69 err = os.Remove(filename)
70 if err != nil {
71 slog.Error("failed to delete pds repo file after uploading", "error", err, "filename", f.Name())
72 metadata := bugsnag.MetaData{
73 "file": {
74 "filename": f.Name(),
75 },
76 }
77 bugsnag.Notify(fmt.Errorf("delete pds repo file after uploading: %w", err), metadata)
78 }
79 }()
80
81 zipWriter := zip.NewWriter(f)
82 zipFile, err := zipWriter.Create("repo.car")
83 if err != nil {
84 return fmt.Errorf("create zip file: %w", err)
85 }
86
87 _, err = zipFile.Write(b)
88 if err != nil {
89 return fmt.Errorf("write repo to file: %w", err)
90 }
91
92 zipWriter.Close()
93
94 // reset the reader back to the start so that the minio upload can read the data that's been written.
95 _, err = f.Seek(0, 0)
96 if err != nil {
97 return fmt.Errorf("setting seek on written file: %w", err)
98 }
99
100 fi, err := f.Stat()
101 if err != nil {
102 return fmt.Errorf("stat written file: %w", err)
103 }
104
105 _, err = s.minioClient.PutObject(ctx, s.bucketName, "pds-repo.zip", f, fi.Size(), minio.PutObjectOptions{})
106 if err != nil {
107 return fmt.Errorf("put repo file to bucket: %w", err)
108 }
109
110 return nil
111}
112
113func (s *service) backupBlobs(ctx context.Context) error {
114 cids, err := s.getAllBlobCIDs(ctx)
115 if err != nil {
116 return fmt.Errorf("get all blob CIDs: %w", err)
117 }
118
119 if len(cids) == 0 {
120 return nil
121 }
122
123 filename := path.Join(s.blobDir, fmt.Sprintf("%s-%d-blobs.zip", s.did, time.Now().UnixMilli()))
124 defer os.Remove(filename)
125
126 f, err := os.Create(filename)
127 if err != nil {
128 return fmt.Errorf("creating zip file: %w", err)
129 }
130 defer f.Close()
131
132 zipWriter := zip.NewWriter(f)
133 for _, cid := range cids {
134 blob, err := s.getBlob(ctx, cid)
135 if err != nil {
136 slog.Error("failed to get blob", "cid", cid, "error", err)
137 bugsnag.Notify(err)
138 continue
139 }
140
141 zipFile, err := zipWriter.Create(cid)
142 if err != nil {
143 slog.Error("create new file in zipwriter", "cid", cid, "error", err)
144 bugsnag.Notify(err)
145 continue
146 }
147
148 zipFile.Write(blob)
149 }
150 err = zipWriter.Close()
151 if err != nil {
152 return fmt.Errorf("close zip writer: %w", err)
153 }
154
155 fi, err := f.Stat()
156 if err != nil {
157 return fmt.Errorf("stat zip file: %w", err)
158 }
159
160 _, err = s.minioClient.PutObject(ctx, s.bucketName, "pds-blobs.zip", f, fi.Size(), minio.PutObjectOptions{})
161 if err != nil {
162 return fmt.Errorf("stream blobs to bucket: %w", err)
163 }
164
165 return nil
166}
167
168func (s *service) getAllBlobCIDs(ctx context.Context) ([]string, error) {
169 cursor := ""
170 limit := 100
171 var cids []string
172 for {
173 res, err := s.listBlobs(ctx, cursor, int64(limit))
174 if err != nil {
175 return nil, fmt.Errorf("list blobs: %w", err)
176 }
177 if len(res.CIDs) == 0 {
178 return cids, nil
179 }
180
181 cids = append(cids, res.CIDs...)
182
183 if len(res.CIDs) < limit {
184 return cids, nil
185 }
186
187 cursor = res.Cursor
188 }
189}
190
191type listBlobsResponse struct {
192 Cursor string `json:"cursor"`
193 CIDs []string `json:"cids"`
194}
195
196func (s *service) listBlobs(ctx context.Context, cursor string, limit int64) (listBlobsResponse, error) {
197 // TODO: do proper url encoding of query params
198 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.listBlobs?did=%s&cursor=%s&limit=%d", s.pdsHost, s.did, cursor, limit)
199 req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
200 if err != nil {
201 return listBlobsResponse{}, fmt.Errorf("create list blobs request: %w", err)
202 }
203
204 resp, err := s.httpClient.Do(req)
205 if err != nil {
206 return listBlobsResponse{}, fmt.Errorf("list blobs: %w", err)
207 }
208
209 defer resp.Body.Close()
210
211 resBody, err := io.ReadAll(resp.Body)
212 if err != nil {
213 return listBlobsResponse{}, fmt.Errorf("failed to read response: %w", err)
214 }
215
216 var result listBlobsResponse
217 err = json.Unmarshal(resBody, &result)
218 if err != nil {
219 return listBlobsResponse{}, fmt.Errorf("failed to unmarshal response: %w", err)
220 }
221
222 return result, nil
223}
224
225func (s *service) getBlob(ctx context.Context, cid string) ([]byte, error) {
226 filename := filepath.Join(s.blobDir, fmt.Sprintf("blob-%s-%s", s.did, cid))
227 existing, err := os.ReadFile(filename)
228 if !os.IsNotExist(err) {
229 return existing, nil
230 }
231
232 slog.Info("blob not found locally - downloading", "did", s.did, "cid", cid)
233
234 // TODO: do proper url encoding of query params
235 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", s.pdsHost, s.did, cid)
236 req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
237 if err != nil {
238 return nil, fmt.Errorf("create get blob request: %w", err)
239 }
240
241 resp, err := s.httpClient.Do(req)
242 if err != nil {
243 return nil, fmt.Errorf("get blob: %w", err)
244 }
245 defer resp.Body.Close()
246
247 b, err := io.ReadAll(resp.Body)
248 if err != nil {
249 return nil, fmt.Errorf("read blob response body: %w", err)
250 }
251
252 err = os.WriteFile(filename, b, os.ModePerm)
253 if err != nil {
254 slog.Error("writing blob", "error", err, "cid", cid)
255 metadata := bugsnag.MetaData{
256 "blob": {
257 "filename": filename,
258 "cid": cid,
259 "did": s.did,
260 },
261 }
262 bugsnag.Notify(fmt.Errorf("writing blob to local storage: %w", err), metadata)
263 }
264
265 return b, nil
266}