this repo has no description
1package server 2 3import ( 4 "bytes" 5 "fmt" 6 "io" 7 8 "github.com/aws/aws-sdk-go/aws" 9 "github.com/aws/aws-sdk-go/aws/credentials" 10 "github.com/aws/aws-sdk-go/aws/session" 11 "github.com/aws/aws-sdk-go/service/s3" 12 "github.com/haileyok/cocoon/internal/helpers" 13 "github.com/haileyok/cocoon/models" 14 "github.com/ipfs/go-cid" 15 "github.com/labstack/echo/v4" 16 "github.com/multiformats/go-multihash" 17) 18 19const ( 20 blockSize = 0x10000 21) 22 23type ComAtprotoRepoUploadBlobResponse struct { 24 Blob struct { 25 Type string `json:"$type"` 26 Ref struct { 27 Link string `json:"$link"` 28 } `json:"ref"` 29 MimeType string `json:"mimeType"` 30 Size int `json:"size"` 31 } `json:"blob"` 32} 33 34func (s *Server) handleRepoUploadBlob(e echo.Context) error { 35 ctx := e.Request().Context() 36 37 urepo := e.Get("repo").(*models.RepoActor) 38 39 mime := e.Request().Header.Get("content-type") 40 if mime == "" { 41 mime = "application/octet-stream" 42 } 43 44 storage := "sqlite" 45 s3Upload := s.s3Config != nil && s.s3Config.BlobstoreEnabled 46 if s3Upload { 47 storage = "s3" 48 } 49 blob := models.Blob{ 50 Did: urepo.Repo.Did, 51 RefCount: 0, 52 CreatedAt: s.repoman.clock.Next().String(), 53 Storage: storage, 54 } 55 56 if err := s.db.Create(ctx, &blob, nil).Error; err != nil { 57 s.logger.Error("error creating new blob in db", "error", err) 58 return helpers.ServerError(e, nil) 59 } 60 61 read := 0 62 part := 0 63 64 buf := make([]byte, 0x10000) 65 fulldata := new(bytes.Buffer) 66 67 for { 68 n, err := io.ReadFull(e.Request().Body, buf) 69 if err == io.ErrUnexpectedEOF || err == io.EOF { 70 if n == 0 { 71 break 72 } 73 } else if err != nil && err != io.ErrUnexpectedEOF { 74 s.logger.Error("error reading blob", "error", err) 75 return helpers.ServerError(e, nil) 76 } 77 78 data := buf[:n] 79 read += n 80 fulldata.Write(data) 81 82 if !s3Upload { 83 blobPart := models.BlobPart{ 84 BlobID: blob.ID, 85 Idx: part, 86 Data: data, 87 } 88 89 if err := s.db.Create(ctx, &blobPart, nil).Error; err != nil { 90 s.logger.Error("error adding blob part to db", "error", err) 91 return helpers.ServerError(e, nil) 92 } 93 } 94 part++ 95 96 if n < blockSize { 97 break 98 } 99 } 100 101 c, err := cid.NewPrefixV1(cid.Raw, multihash.SHA2_256).Sum(fulldata.Bytes()) 102 if err != nil { 103 s.logger.Error("error creating cid prefix", "error", err) 104 return helpers.ServerError(e, nil) 105 } 106 107 if s3Upload { 108 config := &aws.Config{ 109 Region: aws.String(s.s3Config.Region), 110 Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""), 111 } 112 113 if s.s3Config.Endpoint != "" { 114 config.Endpoint = aws.String(s.s3Config.Endpoint) 115 config.S3ForcePathStyle = aws.Bool(true) 116 } 117 118 sess, err := session.NewSession(config) 119 if err != nil { 120 s.logger.Error("error creating aws session", "error", err) 121 return helpers.ServerError(e, nil) 122 } 123 124 svc := s3.New(sess) 125 126 if _, err := svc.PutObject(&s3.PutObjectInput{ 127 Bucket: aws.String(s.s3Config.Bucket), 128 Key: aws.String(fmt.Sprintf("blobs/%s/%s", urepo.Repo.Did, c.String())), 129 Body: bytes.NewReader(fulldata.Bytes()), 130 }); err != nil { 131 s.logger.Error("error uploading blob to s3", "error", err) 132 return helpers.ServerError(e, nil) 133 } 134 } 135 136 if err := s.db.Exec(ctx, "UPDATE blobs SET cid = ? WHERE id = ?", nil, c.Bytes(), blob.ID).Error; err != nil { 137 // there should probably be somme handling here if this fails... 138 s.logger.Error("error updating blob", "error", err) 139 return helpers.ServerError(e, nil) 140 } 141 142 resp := ComAtprotoRepoUploadBlobResponse{} 143 resp.Blob.Type = "blob" 144 resp.Blob.Ref.Link = c.String() 145 resp.Blob.MimeType = mime 146 resp.Blob.Size = read 147 148 return e.JSON(200, resp) 149}