A tool for backing up ATProto related data to S3

Improve the flow of the app #1

merged opened by willdot.net targeting main from store-cids-locally

This will store the blobs locally before uploading so that only new blobs are downloaded and sent to minio instead of downloading each of them every time.

This also tries to stop using the forced streaming of data to Minio as it has memory implications.

Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:dadhhalkfcq3gucaq25hjqon/sh.tangled.repo.pull/3mfjzh7bdcr22
+161 -65
Diff #0
+1
.env.example
··· 7 TANGLED_KNOT_DATABASE_DIRECTORY="/path/to/database/directory" 8 TANGLED_KNOT_REPOSITORY_DIRECTORY="/path/to/repository/directory" 9 BUGSNAG_API_KEY="enter-api-key-to-enable"
··· 7 TANGLED_KNOT_DATABASE_DIRECTORY="/path/to/database/directory" 8 TANGLED_KNOT_REPOSITORY_DIRECTORY="/path/to/repository/directory" 9 BUGSNAG_API_KEY="enter-api-key-to-enable" 10 + BLOB_DIR="./blobs"
+1
.gitignore
··· 1 .env
··· 1 .env 2 + .DS_Store
+19
Dockerfile
···
··· 1 + FROM golang:alpine AS builder 2 + 3 + WORKDIR /app 4 + 5 + COPY . . 6 + RUN go mod download 7 + 8 + COPY . . 9 + 10 + RUN CGO_ENABLED=0 go build -o back-at-it . 11 + 12 + FROM alpine:latest 13 + 14 + RUN apk --no-cache add ca-certificates 15 + 16 + WORKDIR /app/ 17 + COPY --from=builder /app/back-at-it . 18 + 19 + ENTRYPOINT ["./back-at-it"]
+9
docker-compose.yaml
···
··· 1 + services: 2 + back-at-it: 3 + container_name: back-at-it 4 + image: willdot/back-at-it:latest 5 + environment: 6 + ENV_LOCATION: "/app/data/ back-at-it.env" 7 + volumes: 8 + - ./data:/app/data 9 + restart: always
+1
go.mod
··· 21 github.com/minio/md5-simd v1.1.2 // indirect 22 github.com/philhofer/fwd v1.2.0 // indirect 23 github.com/pkg/errors v0.9.1 // indirect 24 github.com/rs/xid v1.6.0 // indirect 25 github.com/stretchr/testify v1.10.0 // indirect 26 github.com/tinylib/msgp v1.3.0 // indirect
··· 21 github.com/minio/md5-simd v1.1.2 // indirect 22 github.com/philhofer/fwd v1.2.0 // indirect 23 github.com/pkg/errors v0.9.1 // indirect 24 + github.com/robfig/cron v1.2.0 // indirect 25 github.com/rs/xid v1.6.0 // indirect 26 github.com/stretchr/testify v1.10.0 // indirect 27 github.com/tinylib/msgp v1.3.0 // indirect
+2
go.sum
··· 34 github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= 35 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= 36 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= 37 github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU= 38 github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= 39 github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
··· 34 github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= 35 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= 36 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= 37 + github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= 38 + github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= 39 github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU= 40 github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= 41 github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
+43 -2
main.go
··· 3 import ( 4 "context" 5 "log/slog" 6 "os" 7 8 "github.com/bugsnag/bugsnag-go/v2" 9 "github.com/joho/godotenv" 10 "github.com/minio/minio-go/v7" 11 "github.com/minio/minio-go/v7/pkg/credentials" 12 ) 13 14 func main() { 15 ctx := context.Background() 16 ··· 40 return 41 } 42 43 - backupPDS(ctx, minioClient, bucketName) 44 - backupTangledKnot(ctx, minioClient, bucketName) 45 } 46 47 func createMinioClient() (*minio.Client, error) {
··· 3 import ( 4 "context" 5 "log/slog" 6 + "net/http" 7 "os" 8 + "time" 9 10 "github.com/bugsnag/bugsnag-go/v2" 11 "github.com/joho/godotenv" 12 "github.com/minio/minio-go/v7" 13 "github.com/minio/minio-go/v7/pkg/credentials" 14 + "github.com/robfig/cron" 15 ) 16 17 + type service struct { 18 + pdsHost string 19 + did string 20 + blobDir string 21 + bucketName string 22 + httpClient *http.Client 23 + minioClient *minio.Client 24 + } 25 + 26 func main() { 27 ctx := context.Background() 28 ··· 52 return 53 } 54 55 + pdsHost := os.Getenv("PDS_HOST") 56 + did := os.Getenv("DID") 57 + blobDir := os.Getenv("BLOB_DIR") 58 + 59 + service := service{ 60 + pdsHost: pdsHost, 61 + did: did, 62 + blobDir: blobDir, 63 + bucketName: bucketName, 64 + minioClient: minioClient, 65 + httpClient: &http.Client{ 66 + Timeout: time.Second * 5, 67 + Transport: &http.Transport{ 68 + IdleConnTimeout: time.Second * 90, 69 + }, 70 + }, 71 + } 72 + 73 + service.backupPDS(ctx) 74 + service.backupTangledKnot(ctx) 75 + 76 + c := cron.New() 77 + 78 + c.AddFunc("@hourly", func() { 79 + service.backupPDS(ctx) 80 + }) 81 + c.AddFunc("@hourly", func() { 82 + service.backupTangledKnot(ctx) 83 + }) 84 + 85 + c.Start() 86 } 87 88 func createMinioClient() (*minio.Client, error) {
+72 -53
pds.go
··· 9 "log/slog" 10 "net/http" 11 "os" 12 13 "github.com/bugsnag/bugsnag-go/v2" 14 "github.com/minio/minio-go/v7" 15 ) 16 17 - func backupPDS(ctx context.Context, minioClient *minio.Client, bucketName string) { 18 - if os.Getenv("PDS_HOST") == "" || os.Getenv("DID") == "" { 19 slog.Info("PDS_HOST or DID env not set - skipping PDS backup") 20 return 21 } 22 23 - err := backupRepo(ctx, minioClient, bucketName) 24 if err != nil { 25 slog.Error("backup repo", "error", err) 26 bugsnag.Notify(err) 27 return 28 } 29 30 - err = backupBlobs(ctx, minioClient, bucketName) 31 if err != nil { 32 slog.Error("backup blobs", "error", err) 33 bugsnag.Notify(err) 34 return 35 } 36 - } 37 38 - func backupRepo(ctx context.Context, minioClient *minio.Client, bucketName string) error { 39 - pdsHost := os.Getenv("PDS_HOST") 40 - did := os.Getenv("DID") 41 42 - url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", pdsHost, did) 43 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 44 if err != nil { 45 return fmt.Errorf("create get repo request: %w", err) 46 } 47 48 req.Header.Add("ACCEPT", "application/vnd.ipld.car") 49 - resp, err := http.DefaultClient.Do(req) 50 if err != nil { 51 return fmt.Errorf("get repo: %w", err) 52 } 53 54 defer resp.Body.Close() 55 56 - _, err = minioClient.PutObject(ctx, bucketName, "pds-repo", resp.Body, -1, minio.PutObjectOptions{}) 57 if err != nil { 58 return fmt.Errorf("stream repo to bucket: %w", err) 59 } ··· 61 return nil 62 } 63 64 - func backupBlobs(ctx context.Context, minioClient *minio.Client, bucketName string) error { 65 - cids, err := getAllBlobCIDs(ctx) 66 if err != nil { 67 return fmt.Errorf("get all blob CIDs: %w", err) 68 } 69 70 - reader, writer := io.Pipe() 71 - defer reader.Close() 72 73 - zipWriter := zip.NewWriter(writer) 74 75 - go func() { 76 - defer writer.Close() 77 - defer zipWriter.Close() 78 79 - for _, cid := range cids { 80 - slog.Info("processing cid", "cid", cid) 81 - blob, err := getBlob(ctx, cid) 82 - if err != nil { 83 - slog.Error("failed to get blob", "cid", cid, "error", err) 84 - bugsnag.Notify(err) 85 - continue 86 - } 87 88 - zipFile, err := zipWriter.Create(cid) 89 - if err != nil { 90 - slog.Error("create new file in zipwriter", "cid", cid, "error", err) 91 - bugsnag.Notify(err) 92 - blob.Close() 93 - continue 94 - } 95 96 - io.Copy(zipFile, blob) 97 - blob.Close() 98 - } 99 - }() 100 101 - _, err = minioClient.PutObject(ctx, bucketName, "pds-blobs.zip", reader, -1, minio.PutObjectOptions{}) 102 if err != nil { 103 return fmt.Errorf("stream blobs to bucket: %w", err) 104 } ··· 106 return nil 107 } 108 109 - func getAllBlobCIDs(ctx context.Context) ([]string, error) { 110 cursor := "" 111 limit := 100 112 var cids []string 113 for { 114 - res, err := listBlobs(ctx, cursor, int64(limit)) 115 if err != nil { 116 return nil, fmt.Errorf("list blobs: %w", err) 117 } ··· 134 CIDs []string `json:"cids"` 135 } 136 137 - func listBlobs(ctx context.Context, cursor string, limit int64) (listBlobsResponse, error) { 138 - pdsHost := os.Getenv("PDS_HOST") 139 - did := os.Getenv("DID") 140 - 141 // TODO: do proper url encoding of query params 142 - url := fmt.Sprintf("%s/xrpc/com.atproto.sync.listBlobs?did=%s&cursor=%s&limit=%d", pdsHost, did, cursor, limit) 143 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 144 if err != nil { 145 return listBlobsResponse{}, fmt.Errorf("create list blobs request: %w", err) 146 } 147 148 - resp, err := http.DefaultClient.Do(req) 149 if err != nil { 150 return listBlobsResponse{}, fmt.Errorf("list blobs: %w", err) 151 } ··· 166 return result, nil 167 } 168 169 - func getBlob(ctx context.Context, cid string) (io.ReadCloser, error) { 170 - pdsHost := os.Getenv("PDS_HOST") 171 - did := os.Getenv("DID") 172 173 // TODO: do proper url encoding of query params 174 - url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", pdsHost, did, cid) 175 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 176 if err != nil { 177 return nil, fmt.Errorf("create get blob request: %w", err) 178 } 179 180 - resp, err := http.DefaultClient.Do(req) 181 if err != nil { 182 return nil, fmt.Errorf("get blob: %w", err) 183 } 184 185 - return resp.Body, nil 186 }
··· 9 "log/slog" 10 "net/http" 11 "os" 12 + "path/filepath" 13 + "time" 14 15 "github.com/bugsnag/bugsnag-go/v2" 16 "github.com/minio/minio-go/v7" 17 ) 18 19 + func (s *service) backupPDS(ctx context.Context) { 20 + if s.pdsHost == "" || s.did == "" { 21 slog.Info("PDS_HOST or DID env not set - skipping PDS backup") 22 return 23 } 24 25 + err := s.backupRepo(ctx) 26 if err != nil { 27 slog.Error("backup repo", "error", err) 28 bugsnag.Notify(err) 29 return 30 } 31 32 + err = s.backupBlobs(ctx) 33 if err != nil { 34 slog.Error("backup blobs", "error", err) 35 bugsnag.Notify(err) 36 return 37 } 38 39 + slog.Info("finished PDS backup") 40 + } 41 42 + func (s *service) backupRepo(ctx context.Context) error { 43 + url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", s.pdsHost, s.did) 44 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 45 if err != nil { 46 return fmt.Errorf("create get repo request: %w", err) 47 } 48 49 req.Header.Add("ACCEPT", "application/vnd.ipld.car") 50 + resp, err := s.httpClient.Do(req) 51 if err != nil { 52 return fmt.Errorf("get repo: %w", err) 53 } 54 55 defer resp.Body.Close() 56 57 + _, err = s.minioClient.PutObject(ctx, s.bucketName, "pds-repo", resp.Body, -1, minio.PutObjectOptions{}) 58 if err != nil { 59 return fmt.Errorf("stream repo to bucket: %w", err) 60 } ··· 62 return nil 63 } 64 65 + func (s *service) backupBlobs(ctx context.Context) error { 66 + cids, err := s.getAllBlobCIDs(ctx) 67 if err != nil { 68 return fmt.Errorf("get all blob CIDs: %w", err) 69 } 70 71 + filename := fmt.Sprintf("%s-%s-blobs.zip", s.did, time.Now()) 72 + defer os.Remove(filename) 73 74 + f, err := os.Create(filename) 75 + if err != nil { 76 + return fmt.Errorf("creating zip file: %w", err) 77 + } 78 + defer f.Close() 79 80 + zipWriter := zip.NewWriter(f) 81 + for _, cid := range cids { 82 + slog.Info("processing cid", "cid", cid) 83 + blob, err := s.getBlob(ctx, cid) 84 + if err != nil { 85 + slog.Error("failed to get blob", "cid", cid, "error", err) 86 + bugsnag.Notify(err) 87 + continue 88 + } 89 90 + zipFile, err := zipWriter.Create(cid) 91 + if err != nil { 92 + slog.Error("create new file in zipwriter", "cid", cid, "error", err) 93 + bugsnag.Notify(err) 94 + continue 95 + } 96 97 + zipFile.Write(blob) 98 + } 99 + err = zipWriter.Close() 100 + if err != nil { 101 + return fmt.Errorf("close zip writer: %w", err) 102 + } 103 104 + fi, err := f.Stat() 105 + if err != nil { 106 + return fmt.Errorf("stat zip file: %w", err) 107 + } 108 109 + _, err = s.minioClient.PutObject(ctx, s.bucketName, "pds-blobs.zip", f, fi.Size(), minio.PutObjectOptions{}) 110 if err != nil { 111 return fmt.Errorf("stream blobs to bucket: %w", err) 112 } ··· 114 return nil 115 } 116 117 + func (s *service) getAllBlobCIDs(ctx context.Context) ([]string, error) { 118 cursor := "" 119 limit := 100 120 var cids []string 121 for { 122 + res, err := s.listBlobs(ctx, cursor, int64(limit)) 123 if err != nil { 124 return nil, fmt.Errorf("list blobs: %w", err) 125 } ··· 142 CIDs []string `json:"cids"` 143 } 144 145 + func (s *service) listBlobs(ctx context.Context, cursor string, limit int64) (listBlobsResponse, error) { 146 // TODO: do proper url encoding of query params 147 + url := fmt.Sprintf("%s/xrpc/com.atproto.sync.listBlobs?did=%s&cursor=%s&limit=%d", s.pdsHost, s.did, cursor, limit) 148 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 149 if err != nil { 150 return listBlobsResponse{}, fmt.Errorf("create list blobs request: %w", err) 151 } 152 153 + resp, err := s.httpClient.Do(req) 154 if err != nil { 155 return listBlobsResponse{}, fmt.Errorf("list blobs: %w", err) 156 } ··· 171 return result, nil 172 } 173 174 + func (s *service) getBlob(ctx context.Context, cid string) ([]byte, error) { 175 + filename := filepath.Join(s.blobDir, fmt.Sprintf("blob-%s-%s", s.did, cid)) 176 + existing, err := os.ReadFile(filename) 177 + if !os.IsNotExist(err) { 178 + return existing, nil 179 + } 180 181 // TODO: do proper url encoding of query params 182 + url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", s.pdsHost, s.did, cid) 183 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 184 if err != nil { 185 return nil, fmt.Errorf("create get blob request: %w", err) 186 } 187 188 + resp, err := s.httpClient.Do(req) 189 if err != nil { 190 return nil, fmt.Errorf("get blob: %w", err) 191 } 192 + defer resp.Body.Close() 193 + 194 + b, err := io.ReadAll(resp.Body) 195 + if err != nil { 196 + return nil, fmt.Errorf("read blob response body: %w", err) 197 + } 198 + 199 + err = os.WriteFile(filename, b, os.ModePerm) 200 + if err != nil { 201 + slog.Error("writing blob", "error", err, "cid", cid) 202 + } 203 204 + return b, nil 205 }
+2 -1
readme.md
··· 24 25 ### Todo 26 27 - - [ ] - Turn this into a long running app using a cron library perhaps 28 - [ ] - User query params properly when creating the URLs to fetch repo and blobs 29 - [ ] - Allow configuring the backup of knot repo data per users DID maybe?
··· 24 25 ### Todo 26 27 + - [x] - Turn this into a long running app using a cron library perhaps 28 + - [ ] - Work out how to tar just the directory requested, not the full path (ie `/home/will/tangled/repo` should back up `/repo` only and not create the full path of empty directories) 29 - [ ] - User query params properly when creating the URLs to fetch repo and blobs 30 - [ ] - Allow configuring the backup of knot repo data per users DID maybe?
+11 -9
tangled_knot.go
··· 13 "github.com/minio/minio-go/v7" 14 ) 15 16 - func backupTangledKnot(ctx context.Context, minioClient *minio.Client, bucketName string) { 17 - backupKnotDB(ctx, minioClient, bucketName) 18 - backupKnotRepos(ctx, minioClient, bucketName) 19 } 20 21 - func backupKnotDB(ctx context.Context, minioClient *minio.Client, bucketName string) { 22 dir := os.Getenv("TANGLED_KNOT_DATABASE_DIRECTORY") 23 if dir == "" { 24 slog.Info("TANGLED_KNOT_DATABASE_DIRECTORY env not set - skipping knot DB backup") ··· 29 30 go compress(dir, pipeWriter) 31 32 - _, err := minioClient.PutObject(ctx, bucketName, "knot-db.zip", pipeReader, -1, minio.PutObjectOptions{}) 33 if err != nil { 34 - slog.Error("stream knot DB to bucket: %w") 35 bugsnag.Notify(err) 36 } 37 } 38 39 - func backupKnotRepos(ctx context.Context, minioClient *minio.Client, bucketName string) { 40 dir := os.Getenv("TANGLED_KNOT_REPOSITORY_DIRECTORY") 41 if dir == "" { 42 slog.Info("TANGLED_KNOT_REPOSITORY_DIRECTORY env not set - skipping knot repo backup") ··· 47 48 go compress(dir, pipeWriter) 49 50 - _, err := minioClient.PutObject(ctx, bucketName, "knot-repos.zip", pipeReader, -1, minio.PutObjectOptions{}) 51 if err != nil { 52 - slog.Error("stream knot repos to bucket: %w") 53 bugsnag.Notify(err) 54 } 55 }
··· 13 "github.com/minio/minio-go/v7" 14 ) 15 16 + func (s *service) backupTangledKnot(ctx context.Context) { 17 + s.backupKnotDB(ctx) 18 + s.backupKnotRepos(ctx) 19 + 20 + slog.Info("finished tangled knot backup") 21 } 22 23 + func (s *service) backupKnotDB(ctx context.Context) { 24 dir := os.Getenv("TANGLED_KNOT_DATABASE_DIRECTORY") 25 if dir == "" { 26 slog.Info("TANGLED_KNOT_DATABASE_DIRECTORY env not set - skipping knot DB backup") ··· 31 32 go compress(dir, pipeWriter) 33 34 + _, err := s.minioClient.PutObject(ctx, s.bucketName, "knot-db.zip", pipeReader, -1, minio.PutObjectOptions{}) 35 if err != nil { 36 + slog.Error("stream knot DB to bucket", "error", err) 37 bugsnag.Notify(err) 38 } 39 } 40 41 + func (s *service) backupKnotRepos(ctx context.Context) { 42 dir := os.Getenv("TANGLED_KNOT_REPOSITORY_DIRECTORY") 43 if dir == "" { 44 slog.Info("TANGLED_KNOT_REPOSITORY_DIRECTORY env not set - skipping knot repo backup") ··· 49 50 go compress(dir, pipeWriter) 51 52 + _, err := s.minioClient.PutObject(ctx, s.bucketName, "knot-repos.zip", pipeReader, -1, minio.PutObjectOptions{}) 53 if err != nil { 54 + slog.Error("stream knot repos to bucket", "error", err) 55 bugsnag.Notify(err) 56 } 57 }

History

2 rounds 0 comments
sign up or login to add to the discussion
3 commits
expand
Run on cron job. Store blobs locally as a cache.
Improve and fix the Tangled Knot backup to not stream but store locally and then upload after.
improve error handling
expand 0 comments
pull request successfully merged
willdot.net submitted #0
1 commit
expand
Run on cron job. Store blobs locally as a cache.
expand 0 comments