1package server
2
3import (
4 "bytes"
5 "fmt"
6 "io"
7
8 "github.com/Azure/go-autorest/autorest/to"
9 "github.com/aws/aws-sdk-go/aws"
10 "github.com/aws/aws-sdk-go/aws/credentials"
11 "github.com/aws/aws-sdk-go/aws/session"
12 "github.com/aws/aws-sdk-go/service/s3"
13 "github.com/haileyok/cocoon/internal/helpers"
14 "github.com/haileyok/cocoon/models"
15 "github.com/ipfs/go-cid"
16 "github.com/labstack/echo/v4"
17)
18
19func (s *Server) handleSyncGetBlob(e echo.Context) error {
20 ctx := e.Request().Context()
21
22 did := e.QueryParam("did")
23 if did == "" {
24 return helpers.InputError(e, nil)
25 }
26
27 cstr := e.QueryParam("cid")
28 if cstr == "" {
29 return helpers.InputError(e, nil)
30 }
31
32 c, err := cid.Parse(cstr)
33 if err != nil {
34 return helpers.InputError(e, nil)
35 }
36
37 urepo, err := s.getRepoActorByDid(ctx, did)
38 if err != nil {
39 s.logger.Error("could not find user for requested blob", "error", err)
40 return helpers.InputError(e, nil)
41 }
42
43 status := urepo.Status()
44 if status != nil {
45 if *status == "deactivated" {
46 return helpers.InputError(e, to.StringPtr("RepoDeactivated"))
47 }
48 }
49
50 var blob models.Blob
51 if err := s.db.Raw(ctx, "SELECT * FROM blobs WHERE did = ? AND cid = ?", nil, did, c.Bytes()).Scan(&blob).Error; err != nil {
52 s.logger.Error("error looking up blob", "error", err)
53 return helpers.ServerError(e, nil)
54 }
55
56 buf := new(bytes.Buffer)
57
58 if blob.Storage == "sqlite" {
59 var parts []models.BlobPart
60 if err := s.db.Raw(ctx, "SELECT * FROM blob_parts WHERE blob_id = ? ORDER BY idx", nil, blob.ID).Scan(&parts).Error; err != nil {
61 s.logger.Error("error getting blob parts", "error", err)
62 return helpers.ServerError(e, nil)
63 }
64
65 // TODO: we can just stream this, don't need to make a buffer
66 for _, p := range parts {
67 buf.Write(p.Data)
68 }
69 } else if blob.Storage == "s3" {
70 if !(s.s3Config != nil && s.s3Config.BlobstoreEnabled) {
71 s.logger.Error("s3 storage disabled")
72 return helpers.ServerError(e, nil)
73 }
74
75 blobKey := fmt.Sprintf("blobs/%s/%s", urepo.Repo.Did, c.String())
76
77 if s.s3Config.CDNUrl != "" {
78 redirectUrl := fmt.Sprintf("%s/%s", s.s3Config.CDNUrl, blobKey)
79 return e.Redirect(302, redirectUrl)
80 }
81
82 config := &aws.Config{
83 Region: aws.String(s.s3Config.Region),
84 Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""),
85 }
86
87 if s.s3Config.Endpoint != "" {
88 config.Endpoint = aws.String(s.s3Config.Endpoint)
89 config.S3ForcePathStyle = aws.Bool(true)
90 }
91
92 sess, err := session.NewSession(config)
93 if err != nil {
94 s.logger.Error("error creating aws session", "error", err)
95 return helpers.ServerError(e, nil)
96 }
97
98 svc := s3.New(sess)
99 if result, err := svc.GetObject(&s3.GetObjectInput{
100 Bucket: aws.String(s.s3Config.Bucket),
101 Key: aws.String(blobKey),
102 }); err != nil {
103 s.logger.Error("error getting blob from s3", "error", err)
104 return helpers.ServerError(e, nil)
105 } else {
106 read := 0
107 part := 0
108 partBuf := make([]byte, 0x10000)
109
110 for {
111 n, err := io.ReadFull(result.Body, partBuf)
112 if err == io.ErrUnexpectedEOF || err == io.EOF {
113 if n == 0 {
114 break
115 }
116 } else if err != nil && err != io.ErrUnexpectedEOF {
117 s.logger.Error("error reading blob", "error", err)
118 return helpers.ServerError(e, nil)
119 }
120
121 data := partBuf[:n]
122 read += n
123 buf.Write(data)
124 part++
125 }
126 }
127 } else {
128 s.logger.Error("unknown storage", "storage", blob.Storage)
129 return helpers.ServerError(e, nil)
130 }
131
132 e.Response().Header().Set(echo.HeaderContentDisposition, "attachment; filename="+c.String())
133
134 return e.Stream(200, "application/octet-stream", buf)
135}