A tool for backing up ATProto related data to S3
at main 266 lines 6.4 kB view raw
1package main 2 3import ( 4 "archive/zip" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "log/slog" 10 "net/http" 11 "os" 12 "path" 13 "path/filepath" 14 "time" 15 16 "github.com/bugsnag/bugsnag-go/v2" 17 "github.com/minio/minio-go/v7" 18) 19 20func (s *service) backupPDS(ctx context.Context) { 21 if s.pdsHost == "" || s.did == "" { 22 slog.Info("PDS_HOST or DID env not set - skipping PDS backup") 23 return 24 } 25 26 err := s.backupRepo(ctx) 27 if err != nil { 28 slog.Error("backup repo", "error", err) 29 bugsnag.Notify(err) 30 } 31 32 err = s.backupBlobs(ctx) 33 if err != nil { 34 slog.Error("backup blobs", "error", err) 35 bugsnag.Notify(err) 36 } 37 38 slog.Info("finished PDS backup") 39} 40 41func (s *service) backupRepo(ctx context.Context) error { 42 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", s.pdsHost, s.did) 43 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 44 if err != nil { 45 return fmt.Errorf("create get repo request: %w", err) 46 } 47 48 req.Header.Add("ACCEPT", "application/vnd.ipld.car") 49 resp, err := s.httpClient.Do(req) 50 if err != nil { 51 return fmt.Errorf("get repo: %w", err) 52 } 53 54 defer resp.Body.Close() 55 56 b, err := io.ReadAll(resp.Body) 57 if err != nil { 58 return fmt.Errorf("reading repo response body: %w", err) 59 } 60 61 filename := path.Join(s.blobDir, fmt.Sprintf("%s-%d-repo.car", s.did, time.Now().UnixMilli())) 62 f, err := os.Create(filename) 63 if err != nil { 64 return fmt.Errorf("creating temp file: %w", err) 65 } 66 defer func() { 67 f.Close() 68 69 err = os.Remove(filename) 70 if err != nil { 71 slog.Error("failed to delete pds repo file after uploading", "error", err, "filename", f.Name()) 72 metadata := bugsnag.MetaData{ 73 "file": { 74 "filename": f.Name(), 75 }, 76 } 77 bugsnag.Notify(fmt.Errorf("delete pds repo file after uploading: %w", err), metadata) 78 } 79 }() 80 81 zipWriter := zip.NewWriter(f) 82 zipFile, err := zipWriter.Create("repo.car") 83 if err != nil { 84 return fmt.Errorf("create zip file: %w", err) 85 } 86 87 _, err = zipFile.Write(b) 88 if err != nil { 89 return fmt.Errorf("write repo to file: %w", err) 90 } 91 92 zipWriter.Close() 93 94 // reset the reader back to the start so that the minio upload can read the data that's been written. 95 _, err = f.Seek(0, 0) 96 if err != nil { 97 return fmt.Errorf("setting seek on written file: %w", err) 98 } 99 100 fi, err := f.Stat() 101 if err != nil { 102 return fmt.Errorf("stat written file: %w", err) 103 } 104 105 _, err = s.minioClient.PutObject(ctx, s.bucketName, "pds-repo.zip", f, fi.Size(), minio.PutObjectOptions{}) 106 if err != nil { 107 return fmt.Errorf("put repo file to bucket: %w", err) 108 } 109 110 return nil 111} 112 113func (s *service) backupBlobs(ctx context.Context) error { 114 cids, err := s.getAllBlobCIDs(ctx) 115 if err != nil { 116 return fmt.Errorf("get all blob CIDs: %w", err) 117 } 118 119 if len(cids) == 0 { 120 return nil 121 } 122 123 filename := path.Join(s.blobDir, fmt.Sprintf("%s-%d-blobs.zip", s.did, time.Now().UnixMilli())) 124 defer os.Remove(filename) 125 126 f, err := os.Create(filename) 127 if err != nil { 128 return fmt.Errorf("creating zip file: %w", err) 129 } 130 defer f.Close() 131 132 zipWriter := zip.NewWriter(f) 133 for _, cid := range cids { 134 blob, err := s.getBlob(ctx, cid) 135 if err != nil { 136 slog.Error("failed to get blob", "cid", cid, "error", err) 137 bugsnag.Notify(err) 138 continue 139 } 140 141 zipFile, err := zipWriter.Create(cid) 142 if err != nil { 143 slog.Error("create new file in zipwriter", "cid", cid, "error", err) 144 bugsnag.Notify(err) 145 continue 146 } 147 148 zipFile.Write(blob) 149 } 150 err = zipWriter.Close() 151 if err != nil { 152 return fmt.Errorf("close zip writer: %w", err) 153 } 154 155 fi, err := f.Stat() 156 if err != nil { 157 return fmt.Errorf("stat zip file: %w", err) 158 } 159 160 _, err = s.minioClient.PutObject(ctx, s.bucketName, "pds-blobs.zip", f, fi.Size(), minio.PutObjectOptions{}) 161 if err != nil { 162 return fmt.Errorf("stream blobs to bucket: %w", err) 163 } 164 165 return nil 166} 167 168func (s *service) getAllBlobCIDs(ctx context.Context) ([]string, error) { 169 cursor := "" 170 limit := 100 171 var cids []string 172 for { 173 res, err := s.listBlobs(ctx, cursor, int64(limit)) 174 if err != nil { 175 return nil, fmt.Errorf("list blobs: %w", err) 176 } 177 if len(res.CIDs) == 0 { 178 return cids, nil 179 } 180 181 cids = append(cids, res.CIDs...) 182 183 if len(res.CIDs) < limit { 184 return cids, nil 185 } 186 187 cursor = res.Cursor 188 } 189} 190 191type listBlobsResponse struct { 192 Cursor string `json:"cursor"` 193 CIDs []string `json:"cids"` 194} 195 196func (s *service) listBlobs(ctx context.Context, cursor string, limit int64) (listBlobsResponse, error) { 197 // TODO: do proper url encoding of query params 198 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.listBlobs?did=%s&cursor=%s&limit=%d", s.pdsHost, s.did, cursor, limit) 199 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 200 if err != nil { 201 return listBlobsResponse{}, fmt.Errorf("create list blobs request: %w", err) 202 } 203 204 resp, err := s.httpClient.Do(req) 205 if err != nil { 206 return listBlobsResponse{}, fmt.Errorf("list blobs: %w", err) 207 } 208 209 defer resp.Body.Close() 210 211 resBody, err := io.ReadAll(resp.Body) 212 if err != nil { 213 return listBlobsResponse{}, fmt.Errorf("failed to read response: %w", err) 214 } 215 216 var result listBlobsResponse 217 err = json.Unmarshal(resBody, &result) 218 if err != nil { 219 return listBlobsResponse{}, fmt.Errorf("failed to unmarshal response: %w", err) 220 } 221 222 return result, nil 223} 224 225func (s *service) getBlob(ctx context.Context, cid string) ([]byte, error) { 226 filename := filepath.Join(s.blobDir, fmt.Sprintf("blob-%s-%s", s.did, cid)) 227 existing, err := os.ReadFile(filename) 228 if !os.IsNotExist(err) { 229 return existing, nil 230 } 231 232 slog.Info("blob not found locally - downloading", "did", s.did, "cid", cid) 233 234 // TODO: do proper url encoding of query params 235 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", s.pdsHost, s.did, cid) 236 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 237 if err != nil { 238 return nil, fmt.Errorf("create get blob request: %w", err) 239 } 240 241 resp, err := s.httpClient.Do(req) 242 if err != nil { 243 return nil, fmt.Errorf("get blob: %w", err) 244 } 245 defer resp.Body.Close() 246 247 b, err := io.ReadAll(resp.Body) 248 if err != nil { 249 return nil, fmt.Errorf("read blob response body: %w", err) 250 } 251 252 err = os.WriteFile(filename, b, os.ModePerm) 253 if err != nil { 254 slog.Error("writing blob", "error", err, "cid", cid) 255 metadata := bugsnag.MetaData{ 256 "blob": { 257 "filename": filename, 258 "cid": cid, 259 "did": s.did, 260 }, 261 } 262 bugsnag.Notify(fmt.Errorf("writing blob to local storage: %w", err), metadata) 263 } 264 265 return b, nil 266}