A tool for backing up ATProto related data to S3
at store-cids-locally 262 lines 6.3 kB view raw
1package main 2 3import ( 4 "archive/zip" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "log/slog" 10 "net/http" 11 "os" 12 "path" 13 "path/filepath" 14 "time" 15 16 "github.com/bugsnag/bugsnag-go/v2" 17 "github.com/minio/minio-go/v7" 18) 19 20func (s *service) backupPDS(ctx context.Context) { 21 if s.pdsHost == "" || s.did == "" { 22 slog.Info("PDS_HOST or DID env not set - skipping PDS backup") 23 return 24 } 25 26 err := s.backupRepo(ctx) 27 if err != nil { 28 slog.Error("backup repo", "error", err) 29 bugsnag.Notify(err) 30 } 31 32 err = s.backupBlobs(ctx) 33 if err != nil { 34 slog.Error("backup blobs", "error", err) 35 bugsnag.Notify(err) 36 } 37 38 slog.Info("finished PDS backup") 39} 40 41func (s *service) backupRepo(ctx context.Context) error { 42 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", s.pdsHost, s.did) 43 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 44 if err != nil { 45 return fmt.Errorf("create get repo request: %w", err) 46 } 47 48 req.Header.Add("ACCEPT", "application/vnd.ipld.car") 49 resp, err := s.httpClient.Do(req) 50 if err != nil { 51 return fmt.Errorf("get repo: %w", err) 52 } 53 54 defer resp.Body.Close() 55 56 b, err := io.ReadAll(resp.Body) 57 if err != nil { 58 return fmt.Errorf("reading repo response body: %w", err) 59 } 60 61 filename := path.Join(s.blobDir, fmt.Sprintf("%s-%d-repo.car", s.did, time.Now().UnixMilli())) 62 f, err := os.Create(filename) 63 if err != nil { 64 return fmt.Errorf("creating temp file: %w", err) 65 } 66 defer func() { 67 f.Close() 68 69 err = os.Remove(filename) 70 if err != nil { 71 slog.Error("failed to delete pds repo file after uploading", "error", err, "filename", f.Name()) 72 metadata := bugsnag.MetaData{ 73 "file": { 74 "filename": f.Name(), 75 }, 76 } 77 bugsnag.Notify(fmt.Errorf("delete pds repo file after uploading: %w", err), metadata) 78 } 79 }() 80 81 zipWriter := zip.NewWriter(f) 82 zipFile, err := zipWriter.Create("repo.car") 83 if err != nil { 84 return fmt.Errorf("create zip file: %w", err) 85 } 86 87 _, err = zipFile.Write(b) 88 if err != nil { 89 return fmt.Errorf("write repo to file: %w", err) 90 } 91 92 zipWriter.Close() 93 94 // reset the reader back to the start so that the minio upload can read the data that's been written. 95 _, err = f.Seek(0, 0) 96 if err != nil { 97 return fmt.Errorf("setting seek on written file: %w", err) 98 } 99 100 fi, err := f.Stat() 101 if err != nil { 102 return fmt.Errorf("stat written file: %w", err) 103 } 104 105 _, err = s.minioClient.PutObject(ctx, s.bucketName, "pds-repo.zip", f, fi.Size(), minio.PutObjectOptions{}) 106 if err != nil { 107 return fmt.Errorf("put repo file to bucket: %w", err) 108 } 109 110 return nil 111} 112 113func (s *service) backupBlobs(ctx context.Context) error { 114 cids, err := s.getAllBlobCIDs(ctx) 115 if err != nil { 116 return fmt.Errorf("get all blob CIDs: %w", err) 117 } 118 119 filename := fmt.Sprintf("%s-%s-blobs.zip", s.did, time.Now()) 120 defer os.Remove(filename) 121 122 f, err := os.Create(filename) 123 if err != nil { 124 return fmt.Errorf("creating zip file: %w", err) 125 } 126 defer f.Close() 127 128 zipWriter := zip.NewWriter(f) 129 for _, cid := range cids { 130 blob, err := s.getBlob(ctx, cid) 131 if err != nil { 132 slog.Error("failed to get blob", "cid", cid, "error", err) 133 bugsnag.Notify(err) 134 continue 135 } 136 137 zipFile, err := zipWriter.Create(cid) 138 if err != nil { 139 slog.Error("create new file in zipwriter", "cid", cid, "error", err) 140 bugsnag.Notify(err) 141 continue 142 } 143 144 zipFile.Write(blob) 145 } 146 err = zipWriter.Close() 147 if err != nil { 148 return fmt.Errorf("close zip writer: %w", err) 149 } 150 151 fi, err := f.Stat() 152 if err != nil { 153 return fmt.Errorf("stat zip file: %w", err) 154 } 155 156 _, err = s.minioClient.PutObject(ctx, s.bucketName, "pds-blobs.zip", f, fi.Size(), minio.PutObjectOptions{}) 157 if err != nil { 158 return fmt.Errorf("stream blobs to bucket: %w", err) 159 } 160 161 return nil 162} 163 164func (s *service) getAllBlobCIDs(ctx context.Context) ([]string, error) { 165 cursor := "" 166 limit := 100 167 var cids []string 168 for { 169 res, err := s.listBlobs(ctx, cursor, int64(limit)) 170 if err != nil { 171 return nil, fmt.Errorf("list blobs: %w", err) 172 } 173 if len(res.CIDs) == 0 { 174 return cids, nil 175 } 176 177 cids = append(cids, res.CIDs...) 178 179 if len(res.CIDs) < limit { 180 return cids, nil 181 } 182 183 cursor = res.Cursor 184 } 185} 186 187type listBlobsResponse struct { 188 Cursor string `json:"cursor"` 189 CIDs []string `json:"cids"` 190} 191 192func (s *service) listBlobs(ctx context.Context, cursor string, limit int64) (listBlobsResponse, error) { 193 // TODO: do proper url encoding of query params 194 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.listBlobs?did=%s&cursor=%s&limit=%d", s.pdsHost, s.did, cursor, limit) 195 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 196 if err != nil { 197 return listBlobsResponse{}, fmt.Errorf("create list blobs request: %w", err) 198 } 199 200 resp, err := s.httpClient.Do(req) 201 if err != nil { 202 return listBlobsResponse{}, fmt.Errorf("list blobs: %w", err) 203 } 204 205 defer resp.Body.Close() 206 207 resBody, err := io.ReadAll(resp.Body) 208 if err != nil { 209 return listBlobsResponse{}, fmt.Errorf("failed to read response: %w", err) 210 } 211 212 var result listBlobsResponse 213 err = json.Unmarshal(resBody, &result) 214 if err != nil { 215 return listBlobsResponse{}, fmt.Errorf("failed to unmarshal response: %w", err) 216 } 217 218 return result, nil 219} 220 221func (s *service) getBlob(ctx context.Context, cid string) ([]byte, error) { 222 filename := filepath.Join(s.blobDir, fmt.Sprintf("blob-%s-%s", s.did, cid)) 223 existing, err := os.ReadFile(filename) 224 if !os.IsNotExist(err) { 225 return existing, nil 226 } 227 228 slog.Info("blob not found locally - downloading", "did", s.did, "cid", cid) 229 230 // TODO: do proper url encoding of query params 231 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", s.pdsHost, s.did, cid) 232 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 233 if err != nil { 234 return nil, fmt.Errorf("create get blob request: %w", err) 235 } 236 237 resp, err := s.httpClient.Do(req) 238 if err != nil { 239 return nil, fmt.Errorf("get blob: %w", err) 240 } 241 defer resp.Body.Close() 242 243 b, err := io.ReadAll(resp.Body) 244 if err != nil { 245 return nil, fmt.Errorf("read blob response body: %w", err) 246 } 247 248 err = os.WriteFile(filename, b, os.ModePerm) 249 if err != nil { 250 slog.Error("writing blob", "error", err, "cid", cid) 251 metadata := bugsnag.MetaData{ 252 "blob": { 253 "filename": filename, 254 "cid": cid, 255 "did": s.did, 256 }, 257 } 258 bugsnag.Notify(fmt.Errorf("writing blob to local storage: %w", err), metadata) 259 } 260 261 return b, nil 262}