1package server
2
3import (
4 "fmt"
5 "strconv"
6
7 "github.com/bluesky-social/indigo/atproto/atdata"
8 "github.com/haileyok/cocoon/internal/helpers"
9 "github.com/haileyok/cocoon/models"
10 "github.com/ipfs/go-cid"
11 "github.com/labstack/echo/v4"
12)
13
14type ComAtprotoRepoListMissingBlobsResponse struct {
15 Cursor *string `json:"cursor,omitempty"`
16 Blobs []ComAtprotoRepoListMissingBlobsRecordBlob `json:"blobs"`
17}
18
19type ComAtprotoRepoListMissingBlobsRecordBlob struct {
20 Cid string `json:"cid"`
21 RecordUri string `json:"recordUri"`
22}
23
24func (s *Server) handleListMissingBlobs(e echo.Context) error {
25 ctx := e.Request().Context()
26
27 urepo := e.Get("repo").(*models.RepoActor)
28
29 limitStr := e.QueryParam("limit")
30 cursor := e.QueryParam("cursor")
31
32 limit := 500
33 if limitStr != "" {
34 if l, err := strconv.Atoi(limitStr); err == nil && l > 0 && l <= 1000 {
35 limit = l
36 }
37 }
38
39 var records []models.Record
40 if err := s.db.Raw(ctx, "SELECT * FROM records WHERE did = ?", nil, urepo.Repo.Did).Scan(&records).Error; err != nil {
41 s.logger.Error("failed to get records for listMissingBlobs", "error", err)
42 return helpers.ServerError(e, nil)
43 }
44
45 type blobRef struct {
46 cid cid.Cid
47 recordUri string
48 }
49 var allBlobRefs []blobRef
50
51 for _, rec := range records {
52 blobs := getBlobsFromRecord(rec.Value)
53 recordUri := fmt.Sprintf("at://%s/%s/%s", urepo.Repo.Did, rec.Nsid, rec.Rkey)
54 for _, b := range blobs {
55 allBlobRefs = append(allBlobRefs, blobRef{cid: cid.Cid(b.Ref), recordUri: recordUri})
56 }
57 }
58
59 missingBlobs := make([]ComAtprotoRepoListMissingBlobsRecordBlob, 0)
60 seenCids := make(map[string]bool)
61
62 for _, ref := range allBlobRefs {
63 cidStr := ref.cid.String()
64
65 if seenCids[cidStr] {
66 continue
67 }
68
69 if cursor != "" && cidStr <= cursor {
70 continue
71 }
72
73 var count int64
74 if err := s.db.Raw(ctx, "SELECT COUNT(*) FROM blobs WHERE did = ? AND cid = ?", nil, urepo.Repo.Did, ref.cid.Bytes()).Scan(&count).Error; err != nil {
75 continue
76 }
77
78 if count == 0 {
79 missingBlobs = append(missingBlobs, ComAtprotoRepoListMissingBlobsRecordBlob{
80 Cid: cidStr,
81 RecordUri: ref.recordUri,
82 })
83 seenCids[cidStr] = true
84
85 if len(missingBlobs) >= limit {
86 break
87 }
88 }
89 }
90
91 var nextCursor *string
92 if len(missingBlobs) > 0 && len(missingBlobs) >= limit {
93 lastCid := missingBlobs[len(missingBlobs)-1].Cid
94 nextCursor = &lastCid
95 }
96
97 return e.JSON(200, ComAtprotoRepoListMissingBlobsResponse{
98 Cursor: nextCursor,
99 Blobs: missingBlobs,
100 })
101}
102
103func getBlobsFromRecord(data []byte) []atdata.Blob {
104 if len(data) == 0 {
105 return nil
106 }
107
108 decoded, err := atdata.UnmarshalCBOR(data)
109 if err != nil {
110 return nil
111 }
112
113 return atdata.ExtractBlobs(decoded)
114}