A tool for backing up ATProto related data to S3

Improve the flow of the app #1

merged opened by willdot.net targeting main from store-cids-locally

This will store the blobs locally before uploading so that only new blobs are downloaded and sent to minio instead of downloading each of them every time.

This also tries to stop using the forced streaming of data to Minio as it has memory implications.

Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:dadhhalkfcq3gucaq25hjqon/sh.tangled.repo.pull/3mfjzh7bdcr22
+161 -65
Diff #0
+1
.env.example
··· 7 7 TANGLED_KNOT_DATABASE_DIRECTORY="/path/to/database/directory" 8 8 TANGLED_KNOT_REPOSITORY_DIRECTORY="/path/to/repository/directory" 9 9 BUGSNAG_API_KEY="enter-api-key-to-enable" 10 + BLOB_DIR="./blobs"
+1
.gitignore
··· 1 1 .env 2 + .DS_Store
+19
Dockerfile
··· 1 + FROM golang:alpine AS builder 2 + 3 + WORKDIR /app 4 + 5 + COPY . . 6 + RUN go mod download 7 + 8 + COPY . . 9 + 10 + RUN CGO_ENABLED=0 go build -o back-at-it . 11 + 12 + FROM alpine:latest 13 + 14 + RUN apk --no-cache add ca-certificates 15 + 16 + WORKDIR /app/ 17 + COPY --from=builder /app/back-at-it . 18 + 19 + ENTRYPOINT ["./back-at-it"]
+9
docker-compose.yaml
··· 1 + services: 2 + back-at-it: 3 + container_name: back-at-it 4 + image: willdot/back-at-it:latest 5 + environment: 6 + ENV_LOCATION: "/app/data/ back-at-it.env" 7 + volumes: 8 + - ./data:/app/data 9 + restart: always
+1
go.mod
··· 21 21 github.com/minio/md5-simd v1.1.2 // indirect 22 22 github.com/philhofer/fwd v1.2.0 // indirect 23 23 github.com/pkg/errors v0.9.1 // indirect 24 + github.com/robfig/cron v1.2.0 // indirect 24 25 github.com/rs/xid v1.6.0 // indirect 25 26 github.com/stretchr/testify v1.10.0 // indirect 26 27 github.com/tinylib/msgp v1.3.0 // indirect
+2
go.sum
··· 34 34 github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= 35 35 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= 36 36 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= 37 + github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= 38 + github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= 37 39 github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU= 38 40 github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= 39 41 github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
+43 -2
main.go
··· 3 3 import ( 4 4 "context" 5 5 "log/slog" 6 + "net/http" 6 7 "os" 8 + "time" 7 9 8 10 "github.com/bugsnag/bugsnag-go/v2" 9 11 "github.com/joho/godotenv" 10 12 "github.com/minio/minio-go/v7" 11 13 "github.com/minio/minio-go/v7/pkg/credentials" 14 + "github.com/robfig/cron" 12 15 ) 13 16 17 + type service struct { 18 + pdsHost string 19 + did string 20 + blobDir string 21 + bucketName string 22 + httpClient *http.Client 23 + minioClient *minio.Client 24 + } 25 + 14 26 func main() { 15 27 ctx := context.Background() 16 28 ··· 40 52 return 41 53 } 42 54 43 - backupPDS(ctx, minioClient, bucketName) 44 - backupTangledKnot(ctx, minioClient, bucketName) 55 + pdsHost := os.Getenv("PDS_HOST") 56 + did := os.Getenv("DID") 57 + blobDir := os.Getenv("BLOB_DIR") 58 + 59 + service := service{ 60 + pdsHost: pdsHost, 61 + did: did, 62 + blobDir: blobDir, 63 + bucketName: bucketName, 64 + minioClient: minioClient, 65 + httpClient: &http.Client{ 66 + Timeout: time.Second * 5, 67 + Transport: &http.Transport{ 68 + IdleConnTimeout: time.Second * 90, 69 + }, 70 + }, 71 + } 72 + 73 + service.backupPDS(ctx) 74 + service.backupTangledKnot(ctx) 75 + 76 + c := cron.New() 77 + 78 + c.AddFunc("@hourly", func() { 79 + service.backupPDS(ctx) 80 + }) 81 + c.AddFunc("@hourly", func() { 82 + service.backupTangledKnot(ctx) 83 + }) 84 + 85 + c.Start() 45 86 } 46 87 47 88 func createMinioClient() (*minio.Client, error) {
+72 -53
pds.go
··· 9 9 "log/slog" 10 10 "net/http" 11 11 "os" 12 + "path/filepath" 13 + "time" 12 14 13 15 "github.com/bugsnag/bugsnag-go/v2" 14 16 "github.com/minio/minio-go/v7" 15 17 ) 16 18 17 - func backupPDS(ctx context.Context, minioClient *minio.Client, bucketName string) { 18 - if os.Getenv("PDS_HOST") == "" || os.Getenv("DID") == "" { 19 + func (s *service) backupPDS(ctx context.Context) { 20 + if s.pdsHost == "" || s.did == "" { 19 21 slog.Info("PDS_HOST or DID env not set - skipping PDS backup") 20 22 return 21 23 } 22 24 23 - err := backupRepo(ctx, minioClient, bucketName) 25 + err := s.backupRepo(ctx) 24 26 if err != nil { 25 27 slog.Error("backup repo", "error", err) 26 28 bugsnag.Notify(err) 27 29 return 28 30 } 29 31 30 - err = backupBlobs(ctx, minioClient, bucketName) 32 + err = s.backupBlobs(ctx) 31 33 if err != nil { 32 34 slog.Error("backup blobs", "error", err) 33 35 bugsnag.Notify(err) 34 36 return 35 37 } 36 - } 37 38 38 - func backupRepo(ctx context.Context, minioClient *minio.Client, bucketName string) error { 39 - pdsHost := os.Getenv("PDS_HOST") 40 - did := os.Getenv("DID") 39 + slog.Info("finished PDS backup") 40 + } 41 41 42 - url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", pdsHost, did) 42 + func (s *service) backupRepo(ctx context.Context) error { 43 + url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", s.pdsHost, s.did) 43 44 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 44 45 if err != nil { 45 46 return fmt.Errorf("create get repo request: %w", err) 46 47 } 47 48 48 49 req.Header.Add("ACCEPT", "application/vnd.ipld.car") 49 - resp, err := http.DefaultClient.Do(req) 50 + resp, err := s.httpClient.Do(req) 50 51 if err != nil { 51 52 return fmt.Errorf("get repo: %w", err) 52 53 } 53 54 54 55 defer resp.Body.Close() 55 56 56 - _, err = minioClient.PutObject(ctx, bucketName, "pds-repo", resp.Body, -1, minio.PutObjectOptions{}) 57 + _, err = s.minioClient.PutObject(ctx, s.bucketName, "pds-repo", resp.Body, -1, minio.PutObjectOptions{}) 57 58 if err != nil { 58 59 return fmt.Errorf("stream repo to bucket: %w", err) 59 60 } ··· 61 62 return nil 62 63 } 63 64 64 - func backupBlobs(ctx context.Context, minioClient *minio.Client, bucketName string) error { 65 - cids, err := getAllBlobCIDs(ctx) 65 + func (s *service) backupBlobs(ctx context.Context) error { 66 + cids, err := s.getAllBlobCIDs(ctx) 66 67 if err != nil { 67 68 return fmt.Errorf("get all blob CIDs: %w", err) 68 69 } 69 70 70 - reader, writer := io.Pipe() 71 - defer reader.Close() 71 + filename := fmt.Sprintf("%s-%s-blobs.zip", s.did, time.Now()) 72 + defer os.Remove(filename) 72 73 73 - zipWriter := zip.NewWriter(writer) 74 + f, err := os.Create(filename) 75 + if err != nil { 76 + return fmt.Errorf("creating zip file: %w", err) 77 + } 78 + defer f.Close() 74 79 75 - go func() { 76 - defer writer.Close() 77 - defer zipWriter.Close() 80 + zipWriter := zip.NewWriter(f) 81 + for _, cid := range cids { 82 + slog.Info("processing cid", "cid", cid) 83 + blob, err := s.getBlob(ctx, cid) 84 + if err != nil { 85 + slog.Error("failed to get blob", "cid", cid, "error", err) 86 + bugsnag.Notify(err) 87 + continue 88 + } 78 89 79 - for _, cid := range cids { 80 - slog.Info("processing cid", "cid", cid) 81 - blob, err := getBlob(ctx, cid) 82 - if err != nil { 83 - slog.Error("failed to get blob", "cid", cid, "error", err) 84 - bugsnag.Notify(err) 85 - continue 86 - } 90 + zipFile, err := zipWriter.Create(cid) 91 + if err != nil { 92 + slog.Error("create new file in zipwriter", "cid", cid, "error", err) 93 + bugsnag.Notify(err) 94 + continue 95 + } 87 96 88 - zipFile, err := zipWriter.Create(cid) 89 - if err != nil { 90 - slog.Error("create new file in zipwriter", "cid", cid, "error", err) 91 - bugsnag.Notify(err) 92 - blob.Close() 93 - continue 94 - } 97 + zipFile.Write(blob) 98 + } 99 + err = zipWriter.Close() 100 + if err != nil { 101 + return fmt.Errorf("close zip writer: %w", err) 102 + } 95 103 96 - io.Copy(zipFile, blob) 97 - blob.Close() 98 - } 99 - }() 104 + fi, err := f.Stat() 105 + if err != nil { 106 + return fmt.Errorf("stat zip file: %w", err) 107 + } 100 108 101 - _, err = minioClient.PutObject(ctx, bucketName, "pds-blobs.zip", reader, -1, minio.PutObjectOptions{}) 109 + _, err = s.minioClient.PutObject(ctx, s.bucketName, "pds-blobs.zip", f, fi.Size(), minio.PutObjectOptions{}) 102 110 if err != nil { 103 111 return fmt.Errorf("stream blobs to bucket: %w", err) 104 112 } ··· 106 114 return nil 107 115 } 108 116 109 - func getAllBlobCIDs(ctx context.Context) ([]string, error) { 117 + func (s *service) getAllBlobCIDs(ctx context.Context) ([]string, error) { 110 118 cursor := "" 111 119 limit := 100 112 120 var cids []string 113 121 for { 114 - res, err := listBlobs(ctx, cursor, int64(limit)) 122 + res, err := s.listBlobs(ctx, cursor, int64(limit)) 115 123 if err != nil { 116 124 return nil, fmt.Errorf("list blobs: %w", err) 117 125 } ··· 134 142 CIDs []string `json:"cids"` 135 143 } 136 144 137 - func listBlobs(ctx context.Context, cursor string, limit int64) (listBlobsResponse, error) { 138 - pdsHost := os.Getenv("PDS_HOST") 139 - did := os.Getenv("DID") 140 - 145 + func (s *service) listBlobs(ctx context.Context, cursor string, limit int64) (listBlobsResponse, error) { 141 146 // TODO: do proper url encoding of query params 142 - url := fmt.Sprintf("%s/xrpc/com.atproto.sync.listBlobs?did=%s&cursor=%s&limit=%d", pdsHost, did, cursor, limit) 147 + url := fmt.Sprintf("%s/xrpc/com.atproto.sync.listBlobs?did=%s&cursor=%s&limit=%d", s.pdsHost, s.did, cursor, limit) 143 148 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 144 149 if err != nil { 145 150 return listBlobsResponse{}, fmt.Errorf("create list blobs request: %w", err) 146 151 } 147 152 148 - resp, err := http.DefaultClient.Do(req) 153 + resp, err := s.httpClient.Do(req) 149 154 if err != nil { 150 155 return listBlobsResponse{}, fmt.Errorf("list blobs: %w", err) 151 156 } ··· 166 171 return result, nil 167 172 } 168 173 169 - func getBlob(ctx context.Context, cid string) (io.ReadCloser, error) { 170 - pdsHost := os.Getenv("PDS_HOST") 171 - did := os.Getenv("DID") 174 + func (s *service) getBlob(ctx context.Context, cid string) ([]byte, error) { 175 + filename := filepath.Join(s.blobDir, fmt.Sprintf("blob-%s-%s", s.did, cid)) 176 + existing, err := os.ReadFile(filename) 177 + if !os.IsNotExist(err) { 178 + return existing, nil 179 + } 172 180 173 181 // TODO: do proper url encoding of query params 174 - url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", pdsHost, did, cid) 182 + url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", s.pdsHost, s.did, cid) 175 183 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 176 184 if err != nil { 177 185 return nil, fmt.Errorf("create get blob request: %w", err) 178 186 } 179 187 180 - resp, err := http.DefaultClient.Do(req) 188 + resp, err := s.httpClient.Do(req) 181 189 if err != nil { 182 190 return nil, fmt.Errorf("get blob: %w", err) 183 191 } 192 + defer resp.Body.Close() 193 + 194 + b, err := io.ReadAll(resp.Body) 195 + if err != nil { 196 + return nil, fmt.Errorf("read blob response body: %w", err) 197 + } 198 + 199 + err = os.WriteFile(filename, b, os.ModePerm) 200 + if err != nil { 201 + slog.Error("writing blob", "error", err, "cid", cid) 202 + } 184 203 185 - return resp.Body, nil 204 + return b, nil 186 205 }
+2 -1
readme.md
··· 24 24 25 25 ### Todo 26 26 27 - - [ ] - Turn this into a long running app using a cron library perhaps 27 + - [x] - Turn this into a long running app using a cron library perhaps 28 + - [ ] - Work out how to tar just the directory requested, not the full path (ie `/home/will/tangled/repo` should back up `/repo` only and not create the full path of empty directories) 28 29 - [ ] - User query params properly when creating the URLs to fetch repo and blobs 29 30 - [ ] - Allow configuring the backup of knot repo data per users DID maybe?
+11 -9
tangled_knot.go
··· 13 13 "github.com/minio/minio-go/v7" 14 14 ) 15 15 16 - func backupTangledKnot(ctx context.Context, minioClient *minio.Client, bucketName string) { 17 - backupKnotDB(ctx, minioClient, bucketName) 18 - backupKnotRepos(ctx, minioClient, bucketName) 16 + func (s *service) backupTangledKnot(ctx context.Context) { 17 + s.backupKnotDB(ctx) 18 + s.backupKnotRepos(ctx) 19 + 20 + slog.Info("finished tangled knot backup") 19 21 } 20 22 21 - func backupKnotDB(ctx context.Context, minioClient *minio.Client, bucketName string) { 23 + func (s *service) backupKnotDB(ctx context.Context) { 22 24 dir := os.Getenv("TANGLED_KNOT_DATABASE_DIRECTORY") 23 25 if dir == "" { 24 26 slog.Info("TANGLED_KNOT_DATABASE_DIRECTORY env not set - skipping knot DB backup") ··· 29 31 30 32 go compress(dir, pipeWriter) 31 33 32 - _, err := minioClient.PutObject(ctx, bucketName, "knot-db.zip", pipeReader, -1, minio.PutObjectOptions{}) 34 + _, err := s.minioClient.PutObject(ctx, s.bucketName, "knot-db.zip", pipeReader, -1, minio.PutObjectOptions{}) 33 35 if err != nil { 34 - slog.Error("stream knot DB to bucket: %w") 36 + slog.Error("stream knot DB to bucket", "error", err) 35 37 bugsnag.Notify(err) 36 38 } 37 39 } 38 40 39 - func backupKnotRepos(ctx context.Context, minioClient *minio.Client, bucketName string) { 41 + func (s *service) backupKnotRepos(ctx context.Context) { 40 42 dir := os.Getenv("TANGLED_KNOT_REPOSITORY_DIRECTORY") 41 43 if dir == "" { 42 44 slog.Info("TANGLED_KNOT_REPOSITORY_DIRECTORY env not set - skipping knot repo backup") ··· 47 49 48 50 go compress(dir, pipeWriter) 49 51 50 - _, err := minioClient.PutObject(ctx, bucketName, "knot-repos.zip", pipeReader, -1, minio.PutObjectOptions{}) 52 + _, err := s.minioClient.PutObject(ctx, s.bucketName, "knot-repos.zip", pipeReader, -1, minio.PutObjectOptions{}) 51 53 if err != nil { 52 - slog.Error("stream knot repos to bucket: %w") 54 + slog.Error("stream knot repos to bucket", "error", err) 53 55 bugsnag.Notify(err) 54 56 } 55 57 }

History

2 rounds 0 comments
sign up or login to add to the discussion
3 commits
expand
Run on cron job. Store blobs locally as a cache.
Improve and fix the Tangled Knot backup to not stream but store locally and then upload after.
improve error handling
expand 0 comments
pull request successfully merged
willdot.net submitted #0
1 commit
expand
Run on cron job. Store blobs locally as a cache.
expand 0 comments