A tool for backing up ATProto related data to S3

Improve the flow of the app #1

merged opened by willdot.net targeting main from store-cids-locally

This will store the blobs locally before uploading so that only new blobs are downloaded and sent to minio instead of downloading each of them every time.

This also tries to stop using the forced streaming of data to Minio as it has memory implications.

Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:dadhhalkfcq3gucaq25hjqon/sh.tangled.repo.pull/3mfjzh7bdcr22
+321 -90
Diff #1
+3 -2
.env.example
··· 4 4 S3_BUCKET_NAME="my-super-duper-bucket" 5 5 DID="the-did-to-backup" 6 6 PDS_HOST="https://your-pds.com" 7 - TANGLED_KNOT_DATABASE_DIRECTORY="/path/to/database/directory" 8 - TANGLED_KNOT_REPOSITORY_DIRECTORY="/path/to/repository/directory" 7 + TANGLED_KNOT_DATABASE_DIRECTORY="./tangled/path/to/database/directory" # ./tangled is from the docker-compose volume 8 + TANGLED_KNOT_REPOSITORY_DIRECTORY="./tangled/path/to/repository/directory" # ./tangled is from the docker-compose volume 9 9 BUGSNAG_API_KEY="enter-api-key-to-enable" 10 + BLOB_DIR="./blobs"
+1
.gitignore
··· 1 1 .env 2 + .DS_Store
+19
Dockerfile
··· 1 + FROM golang:alpine AS builder 2 + 3 + WORKDIR /app 4 + 5 + COPY . . 6 + RUN go mod download 7 + 8 + COPY . . 9 + 10 + RUN CGO_ENABLED=0 go build -o back-at-it . 11 + 12 + FROM alpine:latest 13 + 14 + RUN apk --no-cache add ca-certificates 15 + 16 + WORKDIR /app/ 17 + COPY --from=builder /app/back-at-it . 18 + 19 + ENTRYPOINT ["./back-at-it"]
+10
docker-compose.yaml
··· 1 + services: 2 + back-at-it: 3 + container_name: back-at-it 4 + image: willdot/back-at-it:latest 5 + environment: 6 + ENV_LOCATION: "/app/data/back-at-it.env" 7 + volumes: 8 + - ./data:/app/data 9 + - /home/will/apps/tangled:/app/tangled:ro 10 + restart: always
+3 -2
go.mod
··· 1 1 module tangled.sh/willdot.net/backatit 2 2 3 - go 1.25.0 3 + go 1.26.0 4 4 5 5 require ( 6 + github.com/bugsnag/bugsnag-go/v2 v2.6.2 6 7 github.com/joho/godotenv v1.5.1 7 8 github.com/minio/minio-go/v7 v7.0.95 9 + github.com/robfig/cron v1.2.0 8 10 ) 9 11 10 12 require ( 11 - github.com/bugsnag/bugsnag-go/v2 v2.6.2 // indirect 12 13 github.com/bugsnag/panicwrap v1.3.4 // indirect 13 14 github.com/dustin/go-humanize v1.0.1 // indirect 14 15 github.com/go-ini/ini v1.67.0 // indirect
+3
go.sum
··· 1 + github.com/bitly/go-simplejson v0.5.1 h1:xgwPbetQScXt1gh9BmoJ6j9JMr3TElvuIyjR8pgdoow= 1 2 github.com/bitly/go-simplejson v0.5.1/go.mod h1:YOPVLzCfwK14b4Sff3oP1AmGhI9T9Vsg84etUnlyp+Q= 2 3 github.com/bugsnag/bugsnag-go/v2 v2.6.2 h1:gGjr8txMtPYWKovEBC+4o6tthYveuE7fjzu6XYVIApg= 3 4 github.com/bugsnag/bugsnag-go/v2 v2.6.2/go.mod h1:S9njhE7l6XCiKycOZ2zp0x1zoEE5nL3HjROCSsKc/3c= ··· 34 35 github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= 35 36 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= 36 37 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= 38 + github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= 39 + github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= 37 40 github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU= 38 41 github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= 39 42 github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
+55 -4
main.go
··· 3 3 import ( 4 4 "context" 5 5 "log/slog" 6 + "net/http" 6 7 "os" 8 + "os/signal" 9 + "time" 7 10 8 11 "github.com/bugsnag/bugsnag-go/v2" 9 12 "github.com/joho/godotenv" 10 13 "github.com/minio/minio-go/v7" 11 14 "github.com/minio/minio-go/v7/pkg/credentials" 15 + "github.com/robfig/cron" 12 16 ) 13 17 18 + type service struct { 19 + pdsHost string 20 + did string 21 + blobDir string 22 + bucketName string 23 + httpClient *http.Client 24 + minioClient *minio.Client 25 + } 26 + 14 27 func main() { 15 - ctx := context.Background() 28 + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) 29 + defer stop() 16 30 17 - err := godotenv.Load(".env") 31 + envLocation := os.Getenv("ENV_LOCATION") 32 + if envLocation == "" { 33 + envLocation = ".env" 34 + } 35 + 36 + err := godotenv.Load(envLocation) 18 37 if err != nil { 19 38 if !os.IsNotExist(err) { 20 39 slog.Error("load env", "error", err) ··· 40 59 return 41 60 } 42 61 43 - backupPDS(ctx, minioClient, bucketName) 44 - backupTangledKnot(ctx, minioClient, bucketName) 62 + pdsHost := os.Getenv("PDS_HOST") 63 + did := os.Getenv("DID") 64 + blobDir := os.Getenv("BLOB_DIR") 65 + 66 + service := service{ 67 + pdsHost: pdsHost, 68 + did: did, 69 + blobDir: blobDir, 70 + bucketName: bucketName, 71 + minioClient: minioClient, 72 + httpClient: &http.Client{ 73 + Timeout: time.Second * 5, 74 + Transport: &http.Transport{ 75 + IdleConnTimeout: time.Second * 90, 76 + }, 77 + }, 78 + } 79 + 80 + service.backupPDS(ctx) 81 + service.backupTangledKnot(ctx) 82 + 83 + c := cron.New() 84 + 85 + c.AddFunc("@hourly", func() { 86 + service.backupPDS(ctx) 87 + }) 88 + c.AddFunc("@hourly", func() { 89 + service.backupTangledKnot(ctx) 90 + }) 91 + 92 + c.Start() 93 + defer c.Stop() 94 + 95 + <-ctx.Done() 45 96 } 46 97 47 98 func createMinioClient() (*minio.Client, error) {
+132 -56
pds.go
··· 9 9 "log/slog" 10 10 "net/http" 11 11 "os" 12 + "path" 13 + "path/filepath" 14 + "time" 12 15 13 16 "github.com/bugsnag/bugsnag-go/v2" 14 17 "github.com/minio/minio-go/v7" 15 18 ) 16 19 17 - func backupPDS(ctx context.Context, minioClient *minio.Client, bucketName string) { 18 - if os.Getenv("PDS_HOST") == "" || os.Getenv("DID") == "" { 20 + func (s *service) backupPDS(ctx context.Context) { 21 + if s.pdsHost == "" || s.did == "" { 19 22 slog.Info("PDS_HOST or DID env not set - skipping PDS backup") 20 23 return 21 24 } 22 25 23 - err := backupRepo(ctx, minioClient, bucketName) 26 + err := s.backupRepo(ctx) 24 27 if err != nil { 25 28 slog.Error("backup repo", "error", err) 26 29 bugsnag.Notify(err) 27 - return 28 30 } 29 31 30 - err = backupBlobs(ctx, minioClient, bucketName) 32 + err = s.backupBlobs(ctx) 31 33 if err != nil { 32 34 slog.Error("backup blobs", "error", err) 33 35 bugsnag.Notify(err) 34 - return 35 36 } 36 - } 37 37 38 - func backupRepo(ctx context.Context, minioClient *minio.Client, bucketName string) error { 39 - pdsHost := os.Getenv("PDS_HOST") 40 - did := os.Getenv("DID") 38 + slog.Info("finished PDS backup") 39 + } 41 40 42 - url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", pdsHost, did) 41 + func (s *service) backupRepo(ctx context.Context) error { 42 + url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", s.pdsHost, s.did) 43 43 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 44 44 if err != nil { 45 45 return fmt.Errorf("create get repo request: %w", err) 46 46 } 47 47 48 48 req.Header.Add("ACCEPT", "application/vnd.ipld.car") 49 - resp, err := http.DefaultClient.Do(req) 49 + resp, err := s.httpClient.Do(req) 50 50 if err != nil { 51 51 return fmt.Errorf("get repo: %w", err) 52 52 } 53 53 54 54 defer resp.Body.Close() 55 55 56 - _, err = minioClient.PutObject(ctx, bucketName, "pds-repo", resp.Body, -1, minio.PutObjectOptions{}) 56 + b, err := io.ReadAll(resp.Body) 57 57 if err != nil { 58 - return fmt.Errorf("stream repo to bucket: %w", err) 58 + return fmt.Errorf("reading repo response body: %w", err) 59 + } 60 + 61 + filename := path.Join(s.blobDir, fmt.Sprintf("%s-%d-repo.car", s.did, time.Now().UnixMilli())) 62 + f, err := os.Create(filename) 63 + if err != nil { 64 + return fmt.Errorf("creating temp file: %w", err) 65 + } 66 + defer func() { 67 + f.Close() 68 + 69 + err = os.Remove(filename) 70 + if err != nil { 71 + slog.Error("failed to delete pds repo file after uploading", "error", err, "filename", f.Name()) 72 + metadata := bugsnag.MetaData{ 73 + "file": { 74 + "filename": f.Name(), 75 + }, 76 + } 77 + bugsnag.Notify(fmt.Errorf("delete pds repo file after uploading: %w", err), metadata) 78 + } 79 + }() 80 + 81 + zipWriter := zip.NewWriter(f) 82 + zipFile, err := zipWriter.Create("repo.car") 83 + if err != nil { 84 + return fmt.Errorf("create zip file: %w", err) 85 + } 86 + 87 + _, err = zipFile.Write(b) 88 + if err != nil { 89 + return fmt.Errorf("write repo to file: %w", err) 90 + } 91 + 92 + zipWriter.Close() 93 + 94 + // reset the reader back to the start so that the minio upload can read the data that's been written. 95 + _, err = f.Seek(0, 0) 96 + if err != nil { 97 + return fmt.Errorf("setting seek on written file: %w", err) 98 + } 99 + 100 + fi, err := f.Stat() 101 + if err != nil { 102 + return fmt.Errorf("stat written file: %w", err) 103 + } 104 + 105 + _, err = s.minioClient.PutObject(ctx, s.bucketName, "pds-repo.zip", f, fi.Size(), minio.PutObjectOptions{}) 106 + if err != nil { 107 + return fmt.Errorf("put repo file to bucket: %w", err) 59 108 } 60 109 61 110 return nil 62 111 } 63 112 64 - func backupBlobs(ctx context.Context, minioClient *minio.Client, bucketName string) error { 65 - cids, err := getAllBlobCIDs(ctx) 113 + func (s *service) backupBlobs(ctx context.Context) error { 114 + cids, err := s.getAllBlobCIDs(ctx) 66 115 if err != nil { 67 116 return fmt.Errorf("get all blob CIDs: %w", err) 68 117 } 69 118 70 - reader, writer := io.Pipe() 71 - defer reader.Close() 119 + filename := fmt.Sprintf("%s-%s-blobs.zip", s.did, time.Now()) 120 + defer os.Remove(filename) 72 121 73 - zipWriter := zip.NewWriter(writer) 122 + f, err := os.Create(filename) 123 + if err != nil { 124 + return fmt.Errorf("creating zip file: %w", err) 125 + } 126 + defer f.Close() 74 127 75 - go func() { 76 - defer writer.Close() 77 - defer zipWriter.Close() 128 + zipWriter := zip.NewWriter(f) 129 + for _, cid := range cids { 130 + blob, err := s.getBlob(ctx, cid) 131 + if err != nil { 132 + slog.Error("failed to get blob", "cid", cid, "error", err) 133 + bugsnag.Notify(err) 134 + continue 135 + } 78 136 79 - for _, cid := range cids { 80 - slog.Info("processing cid", "cid", cid) 81 - blob, err := getBlob(ctx, cid) 82 - if err != nil { 83 - slog.Error("failed to get blob", "cid", cid, "error", err) 84 - bugsnag.Notify(err) 85 - continue 86 - } 137 + zipFile, err := zipWriter.Create(cid) 138 + if err != nil { 139 + slog.Error("create new file in zipwriter", "cid", cid, "error", err) 140 + bugsnag.Notify(err) 141 + continue 142 + } 87 143 88 - zipFile, err := zipWriter.Create(cid) 89 - if err != nil { 90 - slog.Error("create new file in zipwriter", "cid", cid, "error", err) 91 - bugsnag.Notify(err) 92 - blob.Close() 93 - continue 94 - } 144 + zipFile.Write(blob) 145 + } 146 + err = zipWriter.Close() 147 + if err != nil { 148 + return fmt.Errorf("close zip writer: %w", err) 149 + } 95 150 96 - io.Copy(zipFile, blob) 97 - blob.Close() 98 - } 99 - }() 151 + fi, err := f.Stat() 152 + if err != nil { 153 + return fmt.Errorf("stat zip file: %w", err) 154 + } 100 155 101 - _, err = minioClient.PutObject(ctx, bucketName, "pds-blobs.zip", reader, -1, minio.PutObjectOptions{}) 156 + _, err = s.minioClient.PutObject(ctx, s.bucketName, "pds-blobs.zip", f, fi.Size(), minio.PutObjectOptions{}) 102 157 if err != nil { 103 158 return fmt.Errorf("stream blobs to bucket: %w", err) 104 159 } ··· 106 161 return nil 107 162 } 108 163 109 - func getAllBlobCIDs(ctx context.Context) ([]string, error) { 164 + func (s *service) getAllBlobCIDs(ctx context.Context) ([]string, error) { 110 165 cursor := "" 111 166 limit := 100 112 167 var cids []string 113 168 for { 114 - res, err := listBlobs(ctx, cursor, int64(limit)) 169 + res, err := s.listBlobs(ctx, cursor, int64(limit)) 115 170 if err != nil { 116 171 return nil, fmt.Errorf("list blobs: %w", err) 117 172 } ··· 134 189 CIDs []string `json:"cids"` 135 190 } 136 191 137 - func listBlobs(ctx context.Context, cursor string, limit int64) (listBlobsResponse, error) { 138 - pdsHost := os.Getenv("PDS_HOST") 139 - did := os.Getenv("DID") 140 - 192 + func (s *service) listBlobs(ctx context.Context, cursor string, limit int64) (listBlobsResponse, error) { 141 193 // TODO: do proper url encoding of query params 142 - url := fmt.Sprintf("%s/xrpc/com.atproto.sync.listBlobs?did=%s&cursor=%s&limit=%d", pdsHost, did, cursor, limit) 194 + url := fmt.Sprintf("%s/xrpc/com.atproto.sync.listBlobs?did=%s&cursor=%s&limit=%d", s.pdsHost, s.did, cursor, limit) 143 195 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 144 196 if err != nil { 145 197 return listBlobsResponse{}, fmt.Errorf("create list blobs request: %w", err) 146 198 } 147 199 148 - resp, err := http.DefaultClient.Do(req) 200 + resp, err := s.httpClient.Do(req) 149 201 if err != nil { 150 202 return listBlobsResponse{}, fmt.Errorf("list blobs: %w", err) 151 203 } ··· 166 218 return result, nil 167 219 } 168 220 169 - func getBlob(ctx context.Context, cid string) (io.ReadCloser, error) { 170 - pdsHost := os.Getenv("PDS_HOST") 171 - did := os.Getenv("DID") 221 + func (s *service) getBlob(ctx context.Context, cid string) ([]byte, error) { 222 + filename := filepath.Join(s.blobDir, fmt.Sprintf("blob-%s-%s", s.did, cid)) 223 + existing, err := os.ReadFile(filename) 224 + if !os.IsNotExist(err) { 225 + return existing, nil 226 + } 227 + 228 + slog.Info("blob not found locally - downloading", "did", s.did, "cid", cid) 172 229 173 230 // TODO: do proper url encoding of query params 174 - url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", pdsHost, did, cid) 231 + url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", s.pdsHost, s.did, cid) 175 232 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 176 233 if err != nil { 177 234 return nil, fmt.Errorf("create get blob request: %w", err) 178 235 } 179 236 180 - resp, err := http.DefaultClient.Do(req) 237 + resp, err := s.httpClient.Do(req) 181 238 if err != nil { 182 239 return nil, fmt.Errorf("get blob: %w", err) 183 240 } 241 + defer resp.Body.Close() 184 242 185 - return resp.Body, nil 243 + b, err := io.ReadAll(resp.Body) 244 + if err != nil { 245 + return nil, fmt.Errorf("read blob response body: %w", err) 246 + } 247 + 248 + err = os.WriteFile(filename, b, os.ModePerm) 249 + if err != nil { 250 + slog.Error("writing blob", "error", err, "cid", cid) 251 + metadata := bugsnag.MetaData{ 252 + "blob": { 253 + "filename": filename, 254 + "cid": cid, 255 + "did": s.did, 256 + }, 257 + } 258 + bugsnag.Notify(fmt.Errorf("writing blob to local storage: %w", err), metadata) 259 + } 260 + 261 + return b, nil 186 262 }
+6 -3
readme.md
··· 18 18 19 19 For PDS data backup you need to ensure that `DID` and `PDS_HOST` are populated. (You can run this tool on any machine to back PDS data up) 20 20 21 - For Knot data backup you need to ensure that `TANGLED_KNOT_DATABASE_DIRECTORY` and `TANGLED_KNOT_REPOSITORY_DIRECTORY` are populated. (You need to run this tool on your Knot server to back up Knot data) 21 + For Knot data backup you need to ensure that `TANGLED_KNOT_DATABASE_DIRECTORY` and `TANGLED_KNOT_REPOSITORY_DIRECTORY` are populated. (You need to run this tool on your Knot server to back up Knot data). 22 + 23 + If using Docker, in the `docker-compose.yaml` file there is a mount that needs to be set which should point to the directory on your host machine that contains your Tangled Knot application running (it has read only permissions, don't worry). Then inside the `.env` file you need to set the `TANGLED_KNOT_DATABASE_DIRECTORY` and `TANGLED_KNOT_REPOSITORY_DIRECTORY` envs to be the directories inside that Knot application volume. 22 24 23 - Run `go run .` 25 + Run `go run .` or use Docker. 24 26 25 27 ### Todo 26 28 27 - - [ ] - Turn this into a long running app using a cron library perhaps 29 + - [x] - Turn this into a long running app using a cron library perhaps 30 + - [ ] - Work out how to tar just the directory requested, not the full path (ie `/home/will/tangled/repo` should back up `/repo` only and not create the full path of empty directories) 28 31 - [ ] - User query params properly when creating the URLs to fetch repo and blobs 29 32 - [ ] - Allow configuring the backup of knot repo data per users DID maybe?
+89 -23
tangled_knot.go
··· 4 4 "archive/tar" 5 5 "compress/gzip" 6 6 "context" 7 + "fmt" 7 8 "io" 8 9 "log/slog" 9 10 "os" 11 + "path" 10 12 "path/filepath" 13 + "time" 11 14 12 15 "github.com/bugsnag/bugsnag-go/v2" 13 16 "github.com/minio/minio-go/v7" 14 17 ) 15 18 16 - func backupTangledKnot(ctx context.Context, minioClient *minio.Client, bucketName string) { 17 - backupKnotDB(ctx, minioClient, bucketName) 18 - backupKnotRepos(ctx, minioClient, bucketName) 19 + func (s *service) backupTangledKnot(ctx context.Context) { 20 + err := s.backupKnotDB(ctx) 21 + if err != nil { 22 + bugsnag.Notify(fmt.Errorf("backup knot db: %w", err)) 23 + slog.Error("failed to backup knot db", "error", err) 24 + } 25 + err = s.backupKnotRepos(ctx) 26 + if err != nil { 27 + bugsnag.Notify(fmt.Errorf("backup knot db: %w", err)) 28 + slog.Error("failed to backup knot db", "error", err) 29 + } 30 + 31 + slog.Info("finished tangled knot backup") 19 32 } 20 33 21 - func backupKnotDB(ctx context.Context, minioClient *minio.Client, bucketName string) { 34 + func (s *service) backupKnotDB(ctx context.Context) error { 22 35 dir := os.Getenv("TANGLED_KNOT_DATABASE_DIRECTORY") 23 36 if dir == "" { 24 37 slog.Info("TANGLED_KNOT_DATABASE_DIRECTORY env not set - skipping knot DB backup") 25 38 } 26 39 27 - pipeReader, pipeWriter := io.Pipe() 28 - defer pipeReader.Close() 40 + filename := path.Join(s.blobDir, fmt.Sprintf("%d-knot.zip", time.Now().UnixMilli())) 41 + f, err := os.Create(filename) 42 + if err != nil { 43 + return fmt.Errorf("creating temp file: %w", err) 44 + } 45 + defer func() { 46 + f.Close() 29 47 30 - go compress(dir, pipeWriter) 48 + err = os.Remove(filename) 49 + if err != nil { 50 + slog.Error("failed to delete knot db zip file after uploading", "error", err, "filename", f.Name()) 51 + metadata := bugsnag.MetaData{ 52 + "file": { 53 + "filename": f.Name(), 54 + }, 55 + } 56 + bugsnag.Notify(fmt.Errorf("delete knot db zip file after uploading: %w", err), metadata) 57 + } 58 + }() 31 59 32 - _, err := minioClient.PutObject(ctx, bucketName, "knot-db.zip", pipeReader, -1, minio.PutObjectOptions{}) 60 + compress(dir, f) 61 + 62 + // reset the reader back to the start so that the minio upload can read the data that's been written. 63 + _, err = f.Seek(0, 0) 33 64 if err != nil { 34 - slog.Error("stream knot DB to bucket: %w") 35 - bugsnag.Notify(err) 65 + return fmt.Errorf("setting seek on written file: %w", err) 36 66 } 67 + 68 + fi, err := f.Stat() 69 + if err != nil { 70 + return fmt.Errorf("stat written file: %w", err) 71 + } 72 + 73 + _, err = s.minioClient.PutObject(ctx, s.bucketName, "knot-db.zip", f, fi.Size(), minio.PutObjectOptions{}) 74 + if err != nil { 75 + return fmt.Errorf("put knot db zip file to bucket: %w", err) 76 + } 77 + return nil 37 78 } 38 79 39 - func backupKnotRepos(ctx context.Context, minioClient *minio.Client, bucketName string) { 80 + func (s *service) backupKnotRepos(ctx context.Context) error { 40 81 dir := os.Getenv("TANGLED_KNOT_REPOSITORY_DIRECTORY") 41 82 if dir == "" { 42 83 slog.Info("TANGLED_KNOT_REPOSITORY_DIRECTORY env not set - skipping knot repo backup") 43 84 } 44 85 45 - pipeReader, pipeWriter := io.Pipe() 46 - defer pipeReader.Close() 86 + filename := path.Join(s.blobDir, fmt.Sprintf("%d-knot-repos.zip", time.Now().UnixMilli())) 87 + f, err := os.Create(filename) 88 + if err != nil { 89 + return fmt.Errorf("creating temp file: %w", err) 90 + } 91 + defer func() { 92 + f.Close() 47 93 48 - go compress(dir, pipeWriter) 94 + err = os.Remove(filename) 95 + if err != nil { 96 + slog.Error("failed to delete knot repos zip file after uploading", "error", err, "filename", f.Name()) 97 + metadata := bugsnag.MetaData{ 98 + "file": { 99 + "filename": f.Name(), 100 + }, 101 + } 102 + bugsnag.Notify(fmt.Errorf("delete knot repos zip file after uploading: %w", err), metadata) 103 + } 104 + }() 49 105 50 - _, err := minioClient.PutObject(ctx, bucketName, "knot-repos.zip", pipeReader, -1, minio.PutObjectOptions{}) 106 + compress(dir, f) 107 + 108 + // reset the reader back to the start so that the minio upload can read the data that's been written. 109 + _, err = f.Seek(0, 0) 51 110 if err != nil { 52 - slog.Error("stream knot repos to bucket: %w") 53 - bugsnag.Notify(err) 111 + return fmt.Errorf("setting seek on written file: %w", err) 54 112 } 113 + 114 + fi, err := f.Stat() 115 + if err != nil { 116 + return fmt.Errorf("stat written file: %w", err) 117 + } 118 + 119 + _, err = s.minioClient.PutObject(ctx, s.bucketName, "knot-repos.zip", f, fi.Size(), minio.PutObjectOptions{}) 120 + if err != nil { 121 + return fmt.Errorf("put knot repo file to bucket: %w", err) 122 + } 123 + return nil 55 124 } 56 125 57 - func compress(src string, writer io.WriteCloser) error { 126 + func compress(src string, writer io.Writer) error { 58 127 zipWriter := gzip.NewWriter(writer) 59 128 tarWriter := tar.NewWriter(zipWriter) 60 - 61 - defer writer.Close() 62 - defer zipWriter.Close() 63 - defer tarWriter.Close() 64 129 65 130 filepath.Walk(src, func(file string, fi os.FileInfo, err error) error { 66 131 header, err := tar.FileInfoHeader(fi, file) ··· 85 150 return err 86 151 } 87 152 } 153 + 88 154 return nil 89 155 }) 90 156 ··· 96 162 if err := zipWriter.Close(); err != nil { 97 163 return err 98 164 } 99 - // 165 + 100 166 return nil 101 167 }

History

2 rounds 0 comments
sign up or login to add to the discussion
3 commits
expand
Run on cron job. Store blobs locally as a cache.
Improve and fix the Tangled Knot backup to not stream but store locally and then upload after.
improve error handling
expand 0 comments
pull request successfully merged
1 commit
expand
Run on cron job. Store blobs locally as a cache.
expand 0 comments