Monorepo for Tangled
at 74318eac9fdd72cf69e916276814351931ed0dcb 360 lines 10 kB view raw
1package repo 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "time" 11 12 "tangled.org/core/api/tangled" 13 "tangled.org/core/appview/db" 14 "tangled.org/core/appview/models" 15 "tangled.org/core/appview/pages" 16 "tangled.org/core/appview/xrpcclient" 17 "tangled.org/core/orm" 18 "tangled.org/core/tid" 19 "tangled.org/core/types" 20 "tangled.org/core/xrpc" 21 22 comatproto "github.com/bluesky-social/indigo/api/atproto" 23 lexutil "github.com/bluesky-social/indigo/lex/util" 24 indigoxrpc "github.com/bluesky-social/indigo/xrpc" 25 "github.com/dustin/go-humanize" 26 "github.com/go-chi/chi/v5" 27 "github.com/go-git/go-git/v5/plumbing" 28 "github.com/ipfs/go-cid" 29) 30 31// TODO: proper statuses here on early exit 32func (rp *Repo) AttachArtifact(w http.ResponseWriter, r *http.Request) { 33 l := rp.logger.With("handler", "AttachArtifact") 34 35 user := rp.oauth.GetMultiAccountUser(r) 36 tagParam := chi.URLParam(r, "tag") 37 f, err := rp.repoResolver.Resolve(r) 38 if err != nil { 39 l.Error("failed to get repo and knot", "err", err) 40 rp.pages.Notice(w, "upload", "failed to upload artifact, error in repo resolution") 41 return 42 } 43 44 tag, err := rp.resolveTag(r.Context(), f, tagParam) 45 if err != nil { 46 l.Error("failed to resolve tag", "err", err) 47 rp.pages.Notice(w, "upload", "failed to upload artifact, error in tag resolution") 48 return 49 } 50 51 file, header, err := r.FormFile("artifact") 52 if err != nil { 53 l.Error("failed to upload artifact", "err", err) 54 rp.pages.Notice(w, "upload", "failed to upload artifact") 55 return 56 } 57 defer file.Close() 58 59 client, err := rp.oauth.AuthorizedClient(r) 60 if err != nil { 61 l.Error("failed to get authorized client", "err", err) 62 rp.pages.Notice(w, "upload", "failed to get authorized client") 63 return 64 } 65 66 uploadBlobResp, err := xrpc.RepoUploadBlob(r.Context(), client, file, header.Header.Get("Content-Type")) 67 if err != nil { 68 l.Error("failed to upload blob", "err", err) 69 rp.pages.Notice(w, "upload", "Failed to upload blob to your PDS. Try again later.") 70 return 71 } 72 73 l.Info("uploaded blob", "size", humanize.Bytes(uint64(uploadBlobResp.Blob.Size)), "blobRef", uploadBlobResp.Blob.Ref.String()) 74 75 rkey := tid.TID() 76 createdAt := time.Now() 77 78 putRecordResp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 79 Collection: tangled.RepoArtifactNSID, 80 Repo: user.Active.Did, 81 Rkey: rkey, 82 Record: &lexutil.LexiconTypeDecoder{ 83 Val: &tangled.RepoArtifact{ 84 Artifact: uploadBlobResp.Blob, 85 CreatedAt: createdAt.Format(time.RFC3339), 86 Name: header.Filename, 87 Repo: f.RepoAt().String(), 88 Tag: tag.Tag.Hash[:], 89 }, 90 }, 91 }) 92 if err != nil { 93 l.Error("failed to create record", "err", err) 94 rp.pages.Notice(w, "upload", "Failed to create artifact record. Try again later.") 95 return 96 } 97 98 l.Debug("created record for blob", "aturi", putRecordResp.Uri) 99 100 tx, err := rp.db.BeginTx(r.Context(), nil) 101 if err != nil { 102 l.Error("failed to start tx") 103 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.") 104 return 105 } 106 defer tx.Rollback() 107 108 artifact := models.Artifact{ 109 Did: user.Active.Did, 110 Rkey: rkey, 111 RepoAt: f.RepoAt(), 112 Tag: tag.Tag.Hash, 113 CreatedAt: createdAt, 114 BlobCid: cid.Cid(uploadBlobResp.Blob.Ref), 115 Name: header.Filename, 116 Size: uint64(uploadBlobResp.Blob.Size), 117 MimeType: uploadBlobResp.Blob.MimeType, 118 } 119 120 err = db.AddArtifact(tx, artifact) 121 if err != nil { 122 l.Error("failed to add artifact record to db", "err", err) 123 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.") 124 return 125 } 126 127 err = tx.Commit() 128 if err != nil { 129 l.Error("failed to add artifact record to db") 130 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.") 131 return 132 } 133 134 rp.pages.RepoArtifactFragment(w, pages.RepoArtifactParams{ 135 LoggedInUser: user, 136 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 137 Artifact: artifact, 138 }) 139} 140 141func (rp *Repo) DownloadArtifact(w http.ResponseWriter, r *http.Request) { 142 l := rp.logger.With("handler", "DownloadArtifact") 143 144 f, err := rp.repoResolver.Resolve(r) 145 if err != nil { 146 l.Error("failed to get repo and knot", "err", err) 147 http.Error(w, "failed to resolve repo", http.StatusInternalServerError) 148 return 149 } 150 151 tagParam := chi.URLParam(r, "tag") 152 filename := chi.URLParam(r, "file") 153 154 tag, err := rp.resolveTag(r.Context(), f, tagParam) 155 if err != nil { 156 l.Error("failed to resolve tag", "err", err) 157 rp.pages.Notice(w, "upload", "failed to upload artifact, error in tag resolution") 158 return 159 } 160 161 artifacts, err := db.GetArtifact( 162 rp.db, 163 orm.FilterEq("repo_at", f.RepoAt()), 164 orm.FilterEq("tag", tag.Tag.Hash[:]), 165 orm.FilterEq("name", filename), 166 ) 167 if err != nil { 168 l.Error("failed to get artifacts", "err", err) 169 http.Error(w, "failed to get artifact", http.StatusInternalServerError) 170 return 171 } 172 173 if len(artifacts) != 1 { 174 l.Error("too many or too few artifacts found") 175 http.Error(w, "artifact not found", http.StatusNotFound) 176 return 177 } 178 179 artifact := artifacts[0] 180 181 ownerId, err := rp.idResolver.ResolveIdent(r.Context(), f.Did) 182 if err != nil { 183 l.Error("failed to resolve repo owner did", "did", f.Did, "err", err) 184 http.Error(w, "repository owner not found", http.StatusNotFound) 185 return 186 } 187 188 ownerPds := ownerId.PDSEndpoint() 189 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 190 q := url.Query() 191 q.Set("cid", artifact.BlobCid.String()) 192 q.Set("did", artifact.Did) 193 url.RawQuery = q.Encode() 194 195 req, err := http.NewRequest(http.MethodGet, url.String(), nil) 196 if err != nil { 197 l.Error("failed to create request", "err", err) 198 http.Error(w, "failed to create request", http.StatusInternalServerError) 199 return 200 } 201 req.Header.Set("Content-Type", "application/json") 202 203 resp, err := http.DefaultClient.Do(req) 204 if err != nil { 205 l.Error("failed to make request", "err", err) 206 http.Error(w, "failed to make request to PDS", http.StatusInternalServerError) 207 return 208 } 209 defer resp.Body.Close() 210 211 // copy status code and relevant headers from upstream response 212 w.WriteHeader(resp.StatusCode) 213 for key, values := range resp.Header { 214 for _, v := range values { 215 w.Header().Add(key, v) 216 } 217 } 218 219 // stream the body directly to the client 220 if _, err := io.Copy(w, resp.Body); err != nil { 221 l.Error("error streaming response to client:", "err", err) 222 } 223} 224 225// TODO: proper statuses here on early exit 226func (rp *Repo) DeleteArtifact(w http.ResponseWriter, r *http.Request) { 227 l := rp.logger.With("handler", "DeleteArtifact") 228 229 user := rp.oauth.GetMultiAccountUser(r) 230 tagParam := chi.URLParam(r, "tag") 231 filename := chi.URLParam(r, "file") 232 f, err := rp.repoResolver.Resolve(r) 233 if err != nil { 234 l.Error("failed to get repo and knot", "err", err) 235 return 236 } 237 238 client, _ := rp.oauth.AuthorizedClient(r) 239 240 tag := plumbing.NewHash(tagParam) 241 242 artifacts, err := db.GetArtifact( 243 rp.db, 244 orm.FilterEq("repo_at", f.RepoAt()), 245 orm.FilterEq("tag", tag[:]), 246 orm.FilterEq("name", filename), 247 ) 248 if err != nil { 249 l.Error("failed to get artifacts", "err", err) 250 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.") 251 return 252 } 253 if len(artifacts) != 1 { 254 rp.pages.Notice(w, "remove", "Unable to find artifact.") 255 return 256 } 257 258 artifact := artifacts[0] 259 260 if user.Active.Did != artifact.Did { 261 l.Error("user not authorized to delete artifact", "err", err) 262 rp.pages.Notice(w, "remove", "Unauthorized deletion of artifact.") 263 return 264 } 265 266 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{ 267 Collection: tangled.RepoArtifactNSID, 268 Repo: user.Active.Did, 269 Rkey: artifact.Rkey, 270 }) 271 if err != nil { 272 l.Error("failed to get blob from pds", "err", err) 273 rp.pages.Notice(w, "remove", "Failed to remove blob from PDS.") 274 return 275 } 276 277 tx, err := rp.db.BeginTx(r.Context(), nil) 278 if err != nil { 279 l.Error("failed to start tx") 280 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.") 281 return 282 } 283 defer tx.Rollback() 284 285 err = db.DeleteArtifact(tx, 286 orm.FilterEq("repo_at", f.RepoAt()), 287 orm.FilterEq("tag", artifact.Tag[:]), 288 orm.FilterEq("name", filename), 289 ) 290 if err != nil { 291 l.Error("failed to remove artifact record from db", "err", err) 292 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.") 293 return 294 } 295 296 err = tx.Commit() 297 if err != nil { 298 l.Error("failed to remove artifact record from db") 299 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.") 300 return 301 } 302 303 l.Info("successfully deleted artifact", "tag", tagParam, "file", filename) 304 305 w.Write([]byte{}) 306} 307 308func (rp *Repo) resolveTag(ctx context.Context, f *models.Repo, tagParam string) (*types.TagReference, error) { 309 l := rp.logger.With("handler", "resolveTag") 310 311 tagParam, err := url.QueryUnescape(tagParam) 312 if err != nil { 313 return nil, err 314 } 315 316 scheme := "http" 317 if !rp.config.Core.Dev { 318 scheme = "https" 319 } 320 host := fmt.Sprintf("%s://%s", scheme, f.Knot) 321 xrpcc := &indigoxrpc.Client{ 322 Host: host, 323 } 324 325 repo := fmt.Sprintf("%s/%s", f.Did, f.Name) 326 xrpcBytes, err := tangled.RepoTags(ctx, xrpcc, "", 0, repo) 327 if err != nil { 328 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 329 l.Error("failed to call XRPC repo.tags", "err", xrpcerr) 330 return nil, xrpcerr 331 } 332 l.Error("failed to reach knotserver", "err", err) 333 return nil, err 334 } 335 336 var result types.RepoTagsResponse 337 if err := json.Unmarshal(xrpcBytes, &result); err != nil { 338 l.Error("failed to decode XRPC tags response", "err", err) 339 return nil, err 340 } 341 342 var tag *types.TagReference 343 for _, t := range result.Tags { 344 if t.Tag != nil { 345 if t.Reference.Name == tagParam || t.Reference.Hash == tagParam { 346 tag = t 347 } 348 } 349 } 350 351 if tag == nil { 352 return nil, fmt.Errorf("invalid tag, only annotated tags are supported for artifacts") 353 } 354 355 if tag.Tag.Target.IsZero() { 356 return nil, fmt.Errorf("invalid tag, only annotated tags are supported for artifacts") 357 } 358 359 return tag, nil 360}