A tool for backing up ATProto related data to S3
1package main
2
3import (
4 "archive/zip"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "log/slog"
10 "net/http"
11 "os"
12 "path"
13 "path/filepath"
14 "time"
15
16 "github.com/bugsnag/bugsnag-go/v2"
17 "github.com/minio/minio-go/v7"
18)
19
20func (s *service) backupPDS(ctx context.Context) {
21 if s.pdsHost == "" || s.did == "" {
22 slog.Info("PDS_HOST or DID env not set - skipping PDS backup")
23 return
24 }
25
26 err := s.backupRepo(ctx)
27 if err != nil {
28 slog.Error("backup repo", "error", err)
29 bugsnag.Notify(err)
30 }
31
32 err = s.backupBlobs(ctx)
33 if err != nil {
34 slog.Error("backup blobs", "error", err)
35 bugsnag.Notify(err)
36 }
37
38 slog.Info("finished PDS backup")
39}
40
41func (s *service) backupRepo(ctx context.Context) error {
42 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", s.pdsHost, s.did)
43 req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
44 if err != nil {
45 return fmt.Errorf("create get repo request: %w", err)
46 }
47
48 req.Header.Add("ACCEPT", "application/vnd.ipld.car")
49 resp, err := s.httpClient.Do(req)
50 if err != nil {
51 return fmt.Errorf("get repo: %w", err)
52 }
53
54 defer resp.Body.Close()
55
56 b, err := io.ReadAll(resp.Body)
57 if err != nil {
58 return fmt.Errorf("reading repo response body: %w", err)
59 }
60
61 filename := path.Join(s.blobDir, fmt.Sprintf("%s-%d-repo.car", s.did, time.Now().UnixMilli()))
62 f, err := os.Create(filename)
63 if err != nil {
64 return fmt.Errorf("creating temp file: %w", err)
65 }
66 defer func() {
67 f.Close()
68
69 err = os.Remove(filename)
70 if err != nil {
71 slog.Error("failed to delete pds repo file after uploading", "error", err, "filename", f.Name())
72 metadata := bugsnag.MetaData{
73 "file": {
74 "filename": f.Name(),
75 },
76 }
77 bugsnag.Notify(fmt.Errorf("delete pds repo file after uploading: %w", err), metadata)
78 }
79 }()
80
81 zipWriter := zip.NewWriter(f)
82 zipFile, err := zipWriter.Create("repo.car")
83 if err != nil {
84 return fmt.Errorf("create zip file: %w", err)
85 }
86
87 _, err = zipFile.Write(b)
88 if err != nil {
89 return fmt.Errorf("write repo to file: %w", err)
90 }
91
92 zipWriter.Close()
93
94 // reset the reader back to the start so that the minio upload can read the data that's been written.
95 _, err = f.Seek(0, 0)
96 if err != nil {
97 return fmt.Errorf("setting seek on written file: %w", err)
98 }
99
100 fi, err := f.Stat()
101 if err != nil {
102 return fmt.Errorf("stat written file: %w", err)
103 }
104
105 _, err = s.minioClient.PutObject(ctx, s.bucketName, "pds-repo.zip", f, fi.Size(), minio.PutObjectOptions{})
106 if err != nil {
107 return fmt.Errorf("put repo file to bucket: %w", err)
108 }
109
110 return nil
111}
112
113func (s *service) backupBlobs(ctx context.Context) error {
114 cids, err := s.getAllBlobCIDs(ctx)
115 if err != nil {
116 return fmt.Errorf("get all blob CIDs: %w", err)
117 }
118
119 filename := fmt.Sprintf("%s-%s-blobs.zip", s.did, time.Now())
120 defer os.Remove(filename)
121
122 f, err := os.Create(filename)
123 if err != nil {
124 return fmt.Errorf("creating zip file: %w", err)
125 }
126 defer f.Close()
127
128 zipWriter := zip.NewWriter(f)
129 for _, cid := range cids {
130 blob, err := s.getBlob(ctx, cid)
131 if err != nil {
132 slog.Error("failed to get blob", "cid", cid, "error", err)
133 bugsnag.Notify(err)
134 continue
135 }
136
137 zipFile, err := zipWriter.Create(cid)
138 if err != nil {
139 slog.Error("create new file in zipwriter", "cid", cid, "error", err)
140 bugsnag.Notify(err)
141 continue
142 }
143
144 zipFile.Write(blob)
145 }
146 err = zipWriter.Close()
147 if err != nil {
148 return fmt.Errorf("close zip writer: %w", err)
149 }
150
151 fi, err := f.Stat()
152 if err != nil {
153 return fmt.Errorf("stat zip file: %w", err)
154 }
155
156 _, err = s.minioClient.PutObject(ctx, s.bucketName, "pds-blobs.zip", f, fi.Size(), minio.PutObjectOptions{})
157 if err != nil {
158 return fmt.Errorf("stream blobs to bucket: %w", err)
159 }
160
161 return nil
162}
163
164func (s *service) getAllBlobCIDs(ctx context.Context) ([]string, error) {
165 cursor := ""
166 limit := 100
167 var cids []string
168 for {
169 res, err := s.listBlobs(ctx, cursor, int64(limit))
170 if err != nil {
171 return nil, fmt.Errorf("list blobs: %w", err)
172 }
173 if len(res.CIDs) == 0 {
174 return cids, nil
175 }
176
177 cids = append(cids, res.CIDs...)
178
179 if len(res.CIDs) < limit {
180 return cids, nil
181 }
182
183 cursor = res.Cursor
184 }
185}
186
187type listBlobsResponse struct {
188 Cursor string `json:"cursor"`
189 CIDs []string `json:"cids"`
190}
191
192func (s *service) listBlobs(ctx context.Context, cursor string, limit int64) (listBlobsResponse, error) {
193 // TODO: do proper url encoding of query params
194 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.listBlobs?did=%s&cursor=%s&limit=%d", s.pdsHost, s.did, cursor, limit)
195 req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
196 if err != nil {
197 return listBlobsResponse{}, fmt.Errorf("create list blobs request: %w", err)
198 }
199
200 resp, err := s.httpClient.Do(req)
201 if err != nil {
202 return listBlobsResponse{}, fmt.Errorf("list blobs: %w", err)
203 }
204
205 defer resp.Body.Close()
206
207 resBody, err := io.ReadAll(resp.Body)
208 if err != nil {
209 return listBlobsResponse{}, fmt.Errorf("failed to read response: %w", err)
210 }
211
212 var result listBlobsResponse
213 err = json.Unmarshal(resBody, &result)
214 if err != nil {
215 return listBlobsResponse{}, fmt.Errorf("failed to unmarshal response: %w", err)
216 }
217
218 return result, nil
219}
220
221func (s *service) getBlob(ctx context.Context, cid string) ([]byte, error) {
222 filename := filepath.Join(s.blobDir, fmt.Sprintf("blob-%s-%s", s.did, cid))
223 existing, err := os.ReadFile(filename)
224 if !os.IsNotExist(err) {
225 return existing, nil
226 }
227
228 slog.Info("blob not found locally - downloading", "did", s.did, "cid", cid)
229
230 // TODO: do proper url encoding of query params
231 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", s.pdsHost, s.did, cid)
232 req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
233 if err != nil {
234 return nil, fmt.Errorf("create get blob request: %w", err)
235 }
236
237 resp, err := s.httpClient.Do(req)
238 if err != nil {
239 return nil, fmt.Errorf("get blob: %w", err)
240 }
241 defer resp.Body.Close()
242
243 b, err := io.ReadAll(resp.Body)
244 if err != nil {
245 return nil, fmt.Errorf("read blob response body: %w", err)
246 }
247
248 err = os.WriteFile(filename, b, os.ModePerm)
249 if err != nil {
250 slog.Error("writing blob", "error", err, "cid", cid)
251 metadata := bugsnag.MetaData{
252 "blob": {
253 "filename": filename,
254 "cid": cid,
255 "did": s.did,
256 },
257 }
258 bugsnag.Notify(fmt.Errorf("writing blob to local storage: %w", err), metadata)
259 }
260
261 return b, nil
262}