+50
-8
server/handle_repo_upload_blob.go
+50
-8
server/handle_repo_upload_blob.go
···
2
3
import (
4
"bytes"
5
"io"
6
7
"github.com/haileyok/cocoon/internal/helpers"
8
"github.com/haileyok/cocoon/models"
9
"github.com/ipfs/go-cid"
···
34
mime = "application/octet-stream"
35
}
36
37
blob := models.Blob{
38
Did: urepo.Repo.Did,
39
RefCount: 0,
40
CreatedAt: s.repoman.clock.Next().String(),
41
}
42
43
if err := s.db.Create(&blob, nil).Error; err != nil {
···
66
read += n
67
fulldata.Write(data)
68
69
-
blobPart := models.BlobPart{
70
-
BlobID: blob.ID,
71
-
Idx: part,
72
-
Data: data,
73
-
}
74
75
-
if err := s.db.Create(&blobPart, nil).Error; err != nil {
76
-
s.logger.Error("error adding blob part to db", "error", err)
77
-
return helpers.ServerError(e, nil)
78
}
79
part++
80
···
87
if err != nil {
88
s.logger.Error("error creating cid prefix", "error", err)
89
return helpers.ServerError(e, nil)
90
}
91
92
if err := s.db.Exec("UPDATE blobs SET cid = ? WHERE id = ?", nil, c.Bytes(), blob.ID).Error; err != nil {
···
2
3
import (
4
"bytes"
5
+
"fmt"
6
"io"
7
8
+
"github.com/aws/aws-sdk-go/aws"
9
+
"github.com/aws/aws-sdk-go/aws/credentials"
10
+
"github.com/aws/aws-sdk-go/aws/session"
11
+
"github.com/aws/aws-sdk-go/service/s3"
12
"github.com/haileyok/cocoon/internal/helpers"
13
"github.com/haileyok/cocoon/models"
14
"github.com/ipfs/go-cid"
···
39
mime = "application/octet-stream"
40
}
41
42
+
storage := "sqlite"
43
+
s3Upload := s.s3Config != nil && s.s3Config.BlobstoreEnabled
44
+
if s3Upload {
45
+
storage = "s3"
46
+
}
47
blob := models.Blob{
48
Did: urepo.Repo.Did,
49
RefCount: 0,
50
CreatedAt: s.repoman.clock.Next().String(),
51
+
Storage: storage,
52
}
53
54
if err := s.db.Create(&blob, nil).Error; err != nil {
···
77
read += n
78
fulldata.Write(data)
79
80
+
if !s3Upload {
81
+
blobPart := models.BlobPart{
82
+
BlobID: blob.ID,
83
+
Idx: part,
84
+
Data: data,
85
+
}
86
87
+
if err := s.db.Create(&blobPart, nil).Error; err != nil {
88
+
s.logger.Error("error adding blob part to db", "error", err)
89
+
return helpers.ServerError(e, nil)
90
+
}
91
}
92
part++
93
···
100
if err != nil {
101
s.logger.Error("error creating cid prefix", "error", err)
102
return helpers.ServerError(e, nil)
103
+
}
104
+
105
+
if s3Upload {
106
+
config := &aws.Config{
107
+
Region: aws.String(s.s3Config.Region),
108
+
Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""),
109
+
}
110
+
111
+
if s.s3Config.Endpoint != "" {
112
+
config.Endpoint = aws.String(s.s3Config.Endpoint)
113
+
config.S3ForcePathStyle = aws.Bool(true)
114
+
}
115
+
116
+
sess, err := session.NewSession(config)
117
+
if err != nil {
118
+
s.logger.Error("error creating aws session", "error", err)
119
+
return helpers.ServerError(e, nil)
120
+
}
121
+
122
+
svc := s3.New(sess)
123
+
124
+
if _, err := svc.PutObject(&s3.PutObjectInput{
125
+
Bucket: aws.String(s.s3Config.Bucket),
126
+
Key: aws.String(fmt.Sprintf("blobs/%s/%s", urepo.Repo.Did, c.String())),
127
+
Body: bytes.NewReader(fulldata.Bytes()),
128
+
}); err != nil {
129
+
s.logger.Error("error uploading blob to s3", "error", err)
130
+
return helpers.ServerError(e, nil)
131
+
}
132
}
133
134
if err := s.db.Exec("UPDATE blobs SET cid = ? WHERE id = ?", nil, c.Bytes(), blob.ID).Error; err != nil {
+65
-8
server/handle_sync_get_blob.go
+65
-8
server/handle_sync_get_blob.go
···
2
3
import (
4
"bytes"
5
6
"github.com/Azure/go-autorest/autorest/to"
7
"github.com/haileyok/cocoon/internal/helpers"
8
"github.com/haileyok/cocoon/models"
9
"github.com/ipfs/go-cid"
···
47
48
buf := new(bytes.Buffer)
49
50
-
var parts []models.BlobPart
51
-
if err := s.db.Raw("SELECT * FROM blob_parts WHERE blob_id = ? ORDER BY idx", nil, blob.ID).Scan(&parts).Error; err != nil {
52
-
s.logger.Error("error getting blob parts", "error", err)
53
-
return helpers.ServerError(e, nil)
54
-
}
55
56
-
// TODO: we can just stream this, don't need to make a buffer
57
-
for _, p := range parts {
58
-
buf.Write(p.Data)
59
}
60
61
e.Response().Header().Set(echo.HeaderContentDisposition, "attachment; filename="+c.String())
···
2
3
import (
4
"bytes"
5
+
"fmt"
6
+
"io"
7
8
"github.com/Azure/go-autorest/autorest/to"
9
+
"github.com/aws/aws-sdk-go/aws"
10
+
"github.com/aws/aws-sdk-go/aws/credentials"
11
+
"github.com/aws/aws-sdk-go/aws/session"
12
+
"github.com/aws/aws-sdk-go/service/s3"
13
"github.com/haileyok/cocoon/internal/helpers"
14
"github.com/haileyok/cocoon/models"
15
"github.com/ipfs/go-cid"
···
53
54
buf := new(bytes.Buffer)
55
56
+
if blob.Storage == "sqlite" {
57
+
var parts []models.BlobPart
58
+
if err := s.db.Raw("SELECT * FROM blob_parts WHERE blob_id = ? ORDER BY idx", nil, blob.ID).Scan(&parts).Error; err != nil {
59
+
s.logger.Error("error getting blob parts", "error", err)
60
+
return helpers.ServerError(e, nil)
61
+
}
62
+
63
+
// TODO: we can just stream this, don't need to make a buffer
64
+
for _, p := range parts {
65
+
buf.Write(p.Data)
66
+
}
67
+
} else if blob.Storage == "s3" && s.s3Config != nil && s.s3Config.BlobstoreEnabled {
68
+
config := &aws.Config{
69
+
Region: aws.String(s.s3Config.Region),
70
+
Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""),
71
+
}
72
+
73
+
if s.s3Config.Endpoint != "" {
74
+
config.Endpoint = aws.String(s.s3Config.Endpoint)
75
+
config.S3ForcePathStyle = aws.Bool(true)
76
+
}
77
+
78
+
sess, err := session.NewSession(config)
79
+
if err != nil {
80
+
s.logger.Error("error creating aws session", "error", err)
81
+
return helpers.ServerError(e, nil)
82
+
}
83
+
84
+
svc := s3.New(sess)
85
+
if result, err := svc.GetObject(&s3.GetObjectInput{
86
+
Bucket: aws.String(s.s3Config.Bucket),
87
+
Key: aws.String(fmt.Sprintf("blobs/%s/%s", urepo.Repo.Did, c.String())),
88
+
}); err != nil {
89
+
s.logger.Error("error getting blob from s3", "error", err)
90
+
return helpers.ServerError(e, nil)
91
+
} else {
92
+
read := 0
93
+
part := 0
94
+
partBuf := make([]byte, 0x10000)
95
96
+
for {
97
+
n, err := io.ReadFull(result.Body, partBuf)
98
+
if err == io.ErrUnexpectedEOF || err == io.EOF {
99
+
if n == 0 {
100
+
break
101
+
}
102
+
} else if err != nil && err != io.ErrUnexpectedEOF {
103
+
s.logger.Error("error reading blob", "error", err)
104
+
return helpers.ServerError(e, nil)
105
+
}
106
+
107
+
data := partBuf[:n]
108
+
read += n
109
+
buf.Write(data)
110
+
part++
111
+
}
112
+
}
113
+
} else {
114
+
s.logger.Error("unknown storage", "storage", blob.Storage)
115
+
return helpers.ServerError(e, nil)
116
}
117
118
e.Response().Header().Set(echo.HeaderContentDisposition, "attachment; filename="+c.String())