A tool for backing up ATProto related data to S3

improve error handling

authored by willdot.net and committed by

Tangled c39156f1 079a1de0

+52 -25
+18 -5
pds.go
··· 27 if err != nil { 28 slog.Error("backup repo", "error", err) 29 bugsnag.Notify(err) 30 - return 31 } 32 33 err = s.backupBlobs(ctx) 34 if err != nil { 35 slog.Error("backup blobs", "error", err) 36 bugsnag.Notify(err) 37 - return 38 } 39 40 slog.Info("finished PDS backup") ··· 71 err = os.Remove(filename) 72 if err != nil { 73 slog.Error("failed to delete pds repo file after uploading", "error", err, "filename", f.Name()) 74 } 75 }() 76 ··· 95 96 fi, err := f.Stat() 97 if err != nil { 98 - return fmt.Errorf("stat file: %w", err) 99 } 100 101 _, err = s.minioClient.PutObject(ctx, s.bucketName, "pds-repo.zip", f, fi.Size(), minio.PutObjectOptions{}) 102 if err != nil { 103 - return fmt.Errorf("stream repo to bucket: %w", err) 104 } 105 106 return nil ··· 123 124 zipWriter := zip.NewWriter(f) 125 for _, cid := range cids { 126 - slog.Info("processing cid", "cid", cid) 127 blob, err := s.getBlob(ctx, cid) 128 if err != nil { 129 slog.Error("failed to get blob", "cid", cid, "error", err) ··· 222 return existing, nil 223 } 224 225 // TODO: do proper url encoding of query params 226 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", s.pdsHost, s.did, cid) 227 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) ··· 243 err = os.WriteFile(filename, b, os.ModePerm) 244 if err != nil { 245 slog.Error("writing blob", "error", err, "cid", cid) 246 } 247 248 return b, nil
··· 27 if err != nil { 28 slog.Error("backup repo", "error", err) 29 bugsnag.Notify(err) 30 } 31 32 err = s.backupBlobs(ctx) 33 if err != nil { 34 slog.Error("backup blobs", "error", err) 35 bugsnag.Notify(err) 36 } 37 38 slog.Info("finished PDS backup") ··· 69 err = os.Remove(filename) 70 if err != nil { 71 slog.Error("failed to delete pds repo file after uploading", "error", err, "filename", f.Name()) 72 + metadata := bugsnag.MetaData{ 73 + "file": { 74 + "filename": f.Name(), 75 + }, 76 + } 77 + bugsnag.Notify(fmt.Errorf("delete pds repo file after uploading: %w", err), metadata) 78 } 79 }() 80 ··· 99 100 fi, err := f.Stat() 101 if err != nil { 102 + return fmt.Errorf("stat written file: %w", err) 103 } 104 105 _, err = s.minioClient.PutObject(ctx, s.bucketName, "pds-repo.zip", f, fi.Size(), minio.PutObjectOptions{}) 106 if err != nil { 107 + return fmt.Errorf("put repo file to bucket: %w", err) 108 } 109 110 return nil ··· 127 128 zipWriter := zip.NewWriter(f) 129 for _, cid := range cids { 130 blob, err := s.getBlob(ctx, cid) 131 if err != nil { 132 slog.Error("failed to get blob", "cid", cid, "error", err) ··· 225 return existing, nil 226 } 227 228 + slog.Info("blob not found locally - downloading", "did", s.did, "cid", cid) 229 + 230 // TODO: do proper url encoding of query params 231 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", s.pdsHost, s.did, cid) 232 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) ··· 248 err = os.WriteFile(filename, b, os.ModePerm) 249 if err != nil { 250 slog.Error("writing blob", "error", err, "cid", cid) 251 + metadata := bugsnag.MetaData{ 252 + "blob": { 253 + "filename": filename, 254 + "cid": cid, 255 + "did": s.did, 256 + }, 257 + } 258 + bugsnag.Notify(fmt.Errorf("writing blob to local storage: %w", err), metadata) 259 } 260 261 return b, nil
+34 -20
tangled_knot.go
··· 17 ) 18 19 func (s *service) backupTangledKnot(ctx context.Context) { 20 - s.backupKnotDB(ctx) 21 - s.backupKnotRepos(ctx) 22 23 slog.Info("finished tangled knot backup") 24 } 25 26 - func (s *service) backupKnotDB(ctx context.Context) { 27 dir := os.Getenv("TANGLED_KNOT_DATABASE_DIRECTORY") 28 if dir == "" { 29 slog.Info("TANGLED_KNOT_DATABASE_DIRECTORY env not set - skipping knot DB backup") ··· 32 filename := path.Join(s.blobDir, fmt.Sprintf("%d-knot.zip", time.Now().UnixMilli())) 33 f, err := os.Create(filename) 34 if err != nil { 35 - slog.Error("creating temp file", "error", err) 36 - return 37 } 38 defer func() { 39 f.Close() ··· 41 err = os.Remove(filename) 42 if err != nil { 43 slog.Error("failed to delete knot db zip file after uploading", "error", err, "filename", f.Name()) 44 } 45 }() 46 ··· 49 // reset the reader back to the start so that the minio upload can read the data that's been written. 50 _, err = f.Seek(0, 0) 51 if err != nil { 52 - slog.Error("setting seek on written file", "error", err) 53 - return 54 } 55 56 fi, err := f.Stat() 57 if err != nil { 58 - slog.Error("failed to stat knot db zip file", "error", err) 59 - return 60 } 61 62 _, err = s.minioClient.PutObject(ctx, s.bucketName, "knot-db.zip", f, fi.Size(), minio.PutObjectOptions{}) 63 if err != nil { 64 - slog.Error("stream knot DB to bucket", "error", err) 65 - bugsnag.Notify(err) 66 } 67 } 68 69 - func (s *service) backupKnotRepos(ctx context.Context) { 70 dir := os.Getenv("TANGLED_KNOT_REPOSITORY_DIRECTORY") 71 if dir == "" { 72 slog.Info("TANGLED_KNOT_REPOSITORY_DIRECTORY env not set - skipping knot repo backup") ··· 75 filename := path.Join(s.blobDir, fmt.Sprintf("%d-knot-repos.zip", time.Now().UnixMilli())) 76 f, err := os.Create(filename) 77 if err != nil { 78 - slog.Error("creating temp file", "error", err) 79 - return 80 } 81 defer func() { 82 f.Close() ··· 84 err = os.Remove(filename) 85 if err != nil { 86 slog.Error("failed to delete knot repos zip file after uploading", "error", err, "filename", f.Name()) 87 } 88 }() 89 ··· 92 // reset the reader back to the start so that the minio upload can read the data that's been written. 93 _, err = f.Seek(0, 0) 94 if err != nil { 95 - slog.Error("setting seek on written file", "error", err) 96 - return 97 } 98 99 fi, err := f.Stat() 100 if err != nil { 101 - slog.Error("failed to stat knot db zip file", "error", err) 102 - return 103 } 104 105 _, err = s.minioClient.PutObject(ctx, s.bucketName, "knot-repos.zip", f, fi.Size(), minio.PutObjectOptions{}) 106 if err != nil { 107 - slog.Error("write knot repos to bucket", "error", err) 108 - bugsnag.Notify(err) 109 } 110 } 111 112 func compress(src string, writer io.Writer) error {
··· 17 ) 18 19 func (s *service) backupTangledKnot(ctx context.Context) { 20 + err := s.backupKnotDB(ctx) 21 + if err != nil { 22 + bugsnag.Notify(fmt.Errorf("backup knot db: %w", err)) 23 + slog.Error("failed to backup knot db", "error", err) 24 + } 25 + err = s.backupKnotRepos(ctx) 26 + if err != nil { 27 + bugsnag.Notify(fmt.Errorf("backup knot db: %w", err)) 28 + slog.Error("failed to backup knot db", "error", err) 29 + } 30 31 slog.Info("finished tangled knot backup") 32 } 33 34 + func (s *service) backupKnotDB(ctx context.Context) error { 35 dir := os.Getenv("TANGLED_KNOT_DATABASE_DIRECTORY") 36 if dir == "" { 37 slog.Info("TANGLED_KNOT_DATABASE_DIRECTORY env not set - skipping knot DB backup") ··· 40 filename := path.Join(s.blobDir, fmt.Sprintf("%d-knot.zip", time.Now().UnixMilli())) 41 f, err := os.Create(filename) 42 if err != nil { 43 + return fmt.Errorf("creating temp file: %w", err) 44 } 45 defer func() { 46 f.Close() ··· 48 err = os.Remove(filename) 49 if err != nil { 50 slog.Error("failed to delete knot db zip file after uploading", "error", err, "filename", f.Name()) 51 + metadata := bugsnag.MetaData{ 52 + "file": { 53 + "filename": f.Name(), 54 + }, 55 + } 56 + bugsnag.Notify(fmt.Errorf("delete knot db zip file after uploading: %w", err), metadata) 57 } 58 }() 59 ··· 62 // reset the reader back to the start so that the minio upload can read the data that's been written. 63 _, err = f.Seek(0, 0) 64 if err != nil { 65 + return fmt.Errorf("setting seek on written file: %w", err) 66 } 67 68 fi, err := f.Stat() 69 if err != nil { 70 + return fmt.Errorf("stat written file: %w", err) 71 } 72 73 _, err = s.minioClient.PutObject(ctx, s.bucketName, "knot-db.zip", f, fi.Size(), minio.PutObjectOptions{}) 74 if err != nil { 75 + return fmt.Errorf("put knot db zip file to bucket: %w", err) 76 } 77 + return nil 78 } 79 80 + func (s *service) backupKnotRepos(ctx context.Context) error { 81 dir := os.Getenv("TANGLED_KNOT_REPOSITORY_DIRECTORY") 82 if dir == "" { 83 slog.Info("TANGLED_KNOT_REPOSITORY_DIRECTORY env not set - skipping knot repo backup") ··· 86 filename := path.Join(s.blobDir, fmt.Sprintf("%d-knot-repos.zip", time.Now().UnixMilli())) 87 f, err := os.Create(filename) 88 if err != nil { 89 + return fmt.Errorf("creating temp file: %w", err) 90 } 91 defer func() { 92 f.Close() ··· 94 err = os.Remove(filename) 95 if err != nil { 96 slog.Error("failed to delete knot repos zip file after uploading", "error", err, "filename", f.Name()) 97 + metadata := bugsnag.MetaData{ 98 + "file": { 99 + "filename": f.Name(), 100 + }, 101 + } 102 + bugsnag.Notify(fmt.Errorf("delete knot repos zip file after uploading: %w", err), metadata) 103 } 104 }() 105 ··· 108 // reset the reader back to the start so that the minio upload can read the data that's been written. 109 _, err = f.Seek(0, 0) 110 if err != nil { 111 + return fmt.Errorf("setting seek on written file: %w", err) 112 } 113 114 fi, err := f.Stat() 115 if err != nil { 116 + return fmt.Errorf("stat written file: %w", err) 117 } 118 119 _, err = s.minioClient.PutObject(ctx, s.bucketName, "knot-repos.zip", f, fi.Size(), minio.PutObjectOptions{}) 120 if err != nil { 121 + return fmt.Errorf("put knot repo file to bucket: %w", err) 122 } 123 + return nil 124 } 125 126 func compress(src string, writer io.Writer) error {