1package server
2
3import (
4 "bytes"
5 "fmt"
6 "io"
7
8 "github.com/aws/aws-sdk-go/aws"
9 "github.com/aws/aws-sdk-go/aws/credentials"
10 "github.com/aws/aws-sdk-go/aws/session"
11 "github.com/aws/aws-sdk-go/service/s3"
12 "github.com/haileyok/cocoon/internal/helpers"
13 "github.com/haileyok/cocoon/models"
14 "github.com/ipfs/go-cid"
15 "github.com/labstack/echo/v4"
16 "github.com/multiformats/go-multihash"
17)
18
19const (
20 blockSize = 0x10000
21)
22
23type ComAtprotoRepoUploadBlobResponse struct {
24 Blob struct {
25 Type string `json:"$type"`
26 Ref struct {
27 Link string `json:"$link"`
28 } `json:"ref"`
29 MimeType string `json:"mimeType"`
30 Size int `json:"size"`
31 } `json:"blob"`
32}
33
34func (s *Server) handleRepoUploadBlob(e echo.Context) error {
35 ctx := e.Request().Context()
36
37 urepo := e.Get("repo").(*models.RepoActor)
38
39 mime := e.Request().Header.Get("content-type")
40 if mime == "" {
41 mime = "application/octet-stream"
42 }
43
44 storage := "sqlite"
45 s3Upload := s.s3Config != nil && s.s3Config.BlobstoreEnabled
46 if s3Upload {
47 storage = "s3"
48 }
49 blob := models.Blob{
50 Did: urepo.Repo.Did,
51 RefCount: 0,
52 CreatedAt: s.repoman.clock.Next().String(),
53 Storage: storage,
54 }
55
56 if err := s.db.Create(ctx, &blob, nil).Error; err != nil {
57 s.logger.Error("error creating new blob in db", "error", err)
58 return helpers.ServerError(e, nil)
59 }
60
61 read := 0
62 part := 0
63
64 buf := make([]byte, 0x10000)
65 fulldata := new(bytes.Buffer)
66
67 for {
68 n, err := io.ReadFull(e.Request().Body, buf)
69 if err == io.ErrUnexpectedEOF || err == io.EOF {
70 if n == 0 {
71 break
72 }
73 } else if err != nil && err != io.ErrUnexpectedEOF {
74 s.logger.Error("error reading blob", "error", err)
75 return helpers.ServerError(e, nil)
76 }
77
78 data := buf[:n]
79 read += n
80 fulldata.Write(data)
81
82 if !s3Upload {
83 blobPart := models.BlobPart{
84 BlobID: blob.ID,
85 Idx: part,
86 Data: data,
87 }
88
89 if err := s.db.Create(ctx, &blobPart, nil).Error; err != nil {
90 s.logger.Error("error adding blob part to db", "error", err)
91 return helpers.ServerError(e, nil)
92 }
93 }
94 part++
95
96 if n < blockSize {
97 break
98 }
99 }
100
101 c, err := cid.NewPrefixV1(cid.Raw, multihash.SHA2_256).Sum(fulldata.Bytes())
102 if err != nil {
103 s.logger.Error("error creating cid prefix", "error", err)
104 return helpers.ServerError(e, nil)
105 }
106
107 if s3Upload {
108 config := &aws.Config{
109 Region: aws.String(s.s3Config.Region),
110 Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""),
111 }
112
113 if s.s3Config.Endpoint != "" {
114 config.Endpoint = aws.String(s.s3Config.Endpoint)
115 config.S3ForcePathStyle = aws.Bool(true)
116 }
117
118 sess, err := session.NewSession(config)
119 if err != nil {
120 s.logger.Error("error creating aws session", "error", err)
121 return helpers.ServerError(e, nil)
122 }
123
124 svc := s3.New(sess)
125
126 if _, err := svc.PutObject(&s3.PutObjectInput{
127 Bucket: aws.String(s.s3Config.Bucket),
128 Key: aws.String(fmt.Sprintf("blobs/%s/%s", urepo.Repo.Did, c.String())),
129 Body: bytes.NewReader(fulldata.Bytes()),
130 }); err != nil {
131 s.logger.Error("error uploading blob to s3", "error", err)
132 return helpers.ServerError(e, nil)
133 }
134 }
135
136 if err := s.db.Exec(ctx, "UPDATE blobs SET cid = ? WHERE id = ?", nil, c.Bytes(), blob.ID).Error; err != nil {
137 // there should probably be somme handling here if this fails...
138 s.logger.Error("error updating blob", "error", err)
139 return helpers.ServerError(e, nil)
140 }
141
142 resp := ComAtprotoRepoUploadBlobResponse{}
143 resp.Blob.Type = "blob"
144 resp.Blob.Ref.Link = c.String()
145 resp.Blob.MimeType = mime
146 resp.Blob.Size = read
147
148 return e.JSON(200, resp)
149}