Monorepo for Tangled
at push-zpskmntwpyxz 370 lines 10 kB view raw
1package repo 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "time" 11 12 "tangled.org/core/api/tangled" 13 "tangled.org/core/appview/db" 14 "tangled.org/core/appview/models" 15 "tangled.org/core/appview/pages" 16 "tangled.org/core/appview/xrpcclient" 17 "tangled.org/core/orm" 18 "tangled.org/core/tid" 19 "tangled.org/core/types" 20 "tangled.org/core/xrpc" 21 22 comatproto "github.com/bluesky-social/indigo/api/atproto" 23 lexutil "github.com/bluesky-social/indigo/lex/util" 24 indigoxrpc "github.com/bluesky-social/indigo/xrpc" 25 "github.com/dustin/go-humanize" 26 "github.com/go-chi/chi/v5" 27 "github.com/go-git/go-git/v5/plumbing" 28 "github.com/ipfs/go-cid" 29) 30 31// TODO: proper statuses here on early exit 32func (rp *Repo) AttachArtifact(w http.ResponseWriter, r *http.Request) { 33 l := rp.logger.With("handler", "AttachArtifact") 34 35 user := rp.oauth.GetMultiAccountUser(r) 36 tagParam := chi.URLParam(r, "tag") 37 f, err := rp.repoResolver.Resolve(r) 38 if err != nil { 39 l.Error("failed to get repo and knot", "err", err) 40 rp.pages.Notice(w, "upload", "failed to upload artifact, error in repo resolution") 41 return 42 } 43 44 tag, err := rp.resolveTag(r.Context(), f, tagParam) 45 if err != nil { 46 l.Error("failed to resolve tag", "err", err) 47 rp.pages.Notice(w, "upload", "failed to upload artifact, error in tag resolution") 48 return 49 } 50 51 file, header, err := r.FormFile("artifact") 52 if err != nil { 53 l.Error("failed to upload artifact", "err", err) 54 rp.pages.Notice(w, "upload", "failed to upload artifact") 55 return 56 } 57 defer file.Close() 58 59 client, err := rp.oauth.AuthorizedClient(r) 60 if err != nil { 61 l.Error("failed to get authorized client", "err", err) 62 rp.pages.Notice(w, "upload", "failed to get authorized client") 63 return 64 } 65 66 uploadBlobResp, err := xrpc.RepoUploadBlob(r.Context(), client, file, header.Header.Get("Content-Type")) 67 if err != nil { 68 l.Error("failed to upload blob", "err", err) 69 rp.pages.Notice(w, "upload", "Failed to upload blob to your PDS. Try again later.") 70 return 71 } 72 73 l.Info("uploaded blob", "size", humanize.Bytes(uint64(uploadBlobResp.Blob.Size)), "blobRef", uploadBlobResp.Blob.Ref.String()) 74 75 rkey := tid.TID() 76 createdAt := time.Now() 77 78 putRecordResp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 79 Collection: tangled.RepoArtifactNSID, 80 Repo: user.Active.Did, 81 Rkey: rkey, 82 Record: &lexutil.LexiconTypeDecoder{ 83 Val: repoArtifactRecord(f, uploadBlobResp.Blob, createdAt, header.Filename, tag.Tag.Hash[:]), 84 }, 85 }) 86 if err != nil { 87 l.Error("failed to create record", "err", err) 88 rp.pages.Notice(w, "upload", "Failed to create artifact record. Try again later.") 89 return 90 } 91 92 l.Debug("created record for blob", "aturi", putRecordResp.Uri) 93 94 tx, err := rp.db.BeginTx(r.Context(), nil) 95 if err != nil { 96 l.Error("failed to start tx") 97 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.") 98 return 99 } 100 defer tx.Rollback() 101 102 artifact := models.Artifact{ 103 Did: user.Active.Did, 104 Rkey: rkey, 105 RepoAt: f.RepoAt(), 106 RepoDid: f.RepoDid, 107 Tag: tag.Tag.Hash, 108 CreatedAt: createdAt, 109 BlobCid: cid.Cid(uploadBlobResp.Blob.Ref), 110 Name: header.Filename, 111 Size: uint64(uploadBlobResp.Blob.Size), 112 MimeType: uploadBlobResp.Blob.MimeType, 113 } 114 115 err = db.AddArtifact(tx, artifact) 116 if err != nil { 117 l.Error("failed to add artifact record to db", "err", err) 118 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.") 119 return 120 } 121 122 err = tx.Commit() 123 if err != nil { 124 l.Error("failed to add artifact record to db") 125 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.") 126 return 127 } 128 129 rp.pages.RepoArtifactFragment(w, pages.RepoArtifactParams{ 130 LoggedInUser: user, 131 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 132 Artifact: artifact, 133 }) 134} 135 136func (rp *Repo) DownloadArtifact(w http.ResponseWriter, r *http.Request) { 137 l := rp.logger.With("handler", "DownloadArtifact") 138 139 f, err := rp.repoResolver.Resolve(r) 140 if err != nil { 141 l.Error("failed to get repo and knot", "err", err) 142 http.Error(w, "failed to resolve repo", http.StatusInternalServerError) 143 return 144 } 145 146 tagParam := chi.URLParam(r, "tag") 147 filename := chi.URLParam(r, "file") 148 149 tag, err := rp.resolveTag(r.Context(), f, tagParam) 150 if err != nil { 151 l.Error("failed to resolve tag", "err", err) 152 rp.pages.Notice(w, "upload", "failed to upload artifact, error in tag resolution") 153 return 154 } 155 156 artifacts, err := db.GetArtifact( 157 rp.db, 158 orm.FilterEq("repo_at", f.RepoAt()), 159 orm.FilterEq("tag", tag.Tag.Hash[:]), 160 orm.FilterEq("name", filename), 161 ) 162 if err != nil { 163 l.Error("failed to get artifacts", "err", err) 164 http.Error(w, "failed to get artifact", http.StatusInternalServerError) 165 return 166 } 167 168 if len(artifacts) != 1 { 169 l.Error("too many or too few artifacts found") 170 http.Error(w, "artifact not found", http.StatusNotFound) 171 return 172 } 173 174 artifact := artifacts[0] 175 176 ownerId, err := rp.idResolver.ResolveIdent(r.Context(), f.Did) 177 if err != nil { 178 l.Error("failed to resolve repo owner did", "did", f.Did, "err", err) 179 http.Error(w, "repository owner not found", http.StatusNotFound) 180 return 181 } 182 183 ownerPds := ownerId.PDSEndpoint() 184 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 185 q := url.Query() 186 q.Set("cid", artifact.BlobCid.String()) 187 q.Set("did", artifact.Did) 188 url.RawQuery = q.Encode() 189 190 req, err := http.NewRequest(http.MethodGet, url.String(), nil) 191 if err != nil { 192 l.Error("failed to create request", "err", err) 193 http.Error(w, "failed to create request", http.StatusInternalServerError) 194 return 195 } 196 req.Header.Set("Content-Type", "application/json") 197 198 resp, err := http.DefaultClient.Do(req) 199 if err != nil { 200 l.Error("failed to make request", "err", err) 201 http.Error(w, "failed to make request to PDS", http.StatusInternalServerError) 202 return 203 } 204 defer resp.Body.Close() 205 206 // copy status code and relevant headers from upstream response 207 w.WriteHeader(resp.StatusCode) 208 for key, values := range resp.Header { 209 for _, v := range values { 210 w.Header().Add(key, v) 211 } 212 } 213 214 // stream the body directly to the client 215 if _, err := io.Copy(w, resp.Body); err != nil { 216 l.Error("error streaming response to client:", "err", err) 217 } 218} 219 220// TODO: proper statuses here on early exit 221func (rp *Repo) DeleteArtifact(w http.ResponseWriter, r *http.Request) { 222 l := rp.logger.With("handler", "DeleteArtifact") 223 224 user := rp.oauth.GetMultiAccountUser(r) 225 tagParam := chi.URLParam(r, "tag") 226 filename := chi.URLParam(r, "file") 227 f, err := rp.repoResolver.Resolve(r) 228 if err != nil { 229 l.Error("failed to get repo and knot", "err", err) 230 return 231 } 232 233 client, _ := rp.oauth.AuthorizedClient(r) 234 235 tag := plumbing.NewHash(tagParam) 236 237 artifacts, err := db.GetArtifact( 238 rp.db, 239 orm.FilterEq("repo_at", f.RepoAt()), 240 orm.FilterEq("tag", tag[:]), 241 orm.FilterEq("name", filename), 242 ) 243 if err != nil { 244 l.Error("failed to get artifacts", "err", err) 245 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.") 246 return 247 } 248 if len(artifacts) != 1 { 249 rp.pages.Notice(w, "remove", "Unable to find artifact.") 250 return 251 } 252 253 artifact := artifacts[0] 254 255 if user.Active.Did != artifact.Did { 256 l.Error("user not authorized to delete artifact", "err", err) 257 rp.pages.Notice(w, "remove", "Unauthorized deletion of artifact.") 258 return 259 } 260 261 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{ 262 Collection: tangled.RepoArtifactNSID, 263 Repo: user.Active.Did, 264 Rkey: artifact.Rkey, 265 }) 266 if err != nil { 267 l.Error("failed to get blob from pds", "err", err) 268 rp.pages.Notice(w, "remove", "Failed to remove blob from PDS.") 269 return 270 } 271 272 tx, err := rp.db.BeginTx(r.Context(), nil) 273 if err != nil { 274 l.Error("failed to start tx") 275 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.") 276 return 277 } 278 defer tx.Rollback() 279 280 err = db.DeleteArtifact(tx, 281 orm.FilterEq("repo_at", f.RepoAt()), 282 orm.FilterEq("tag", artifact.Tag[:]), 283 orm.FilterEq("name", filename), 284 ) 285 if err != nil { 286 l.Error("failed to remove artifact record from db", "err", err) 287 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.") 288 return 289 } 290 291 err = tx.Commit() 292 if err != nil { 293 l.Error("failed to remove artifact record from db") 294 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.") 295 return 296 } 297 298 l.Info("successfully deleted artifact", "tag", tagParam, "file", filename) 299 300 w.Write([]byte{}) 301} 302 303func (rp *Repo) resolveTag(ctx context.Context, f *models.Repo, tagParam string) (*types.TagReference, error) { 304 l := rp.logger.With("handler", "resolveTag") 305 306 tagParam, err := url.QueryUnescape(tagParam) 307 if err != nil { 308 return nil, err 309 } 310 311 scheme := "http" 312 if !rp.config.Core.Dev { 313 scheme = "https" 314 } 315 host := fmt.Sprintf("%s://%s", scheme, f.Knot) 316 xrpcc := &indigoxrpc.Client{ 317 Host: host, 318 } 319 320 xrpcBytes, err := tangled.RepoTags(ctx, xrpcc, "", 0, f.RepoIdentifier()) 321 if err != nil { 322 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 323 l.Error("failed to call XRPC repo.tags", "err", xrpcerr) 324 return nil, xrpcerr 325 } 326 l.Error("failed to reach knotserver", "err", err) 327 return nil, err 328 } 329 330 var result types.RepoTagsResponse 331 if err := json.Unmarshal(xrpcBytes, &result); err != nil { 332 l.Error("failed to decode XRPC tags response", "err", err) 333 return nil, err 334 } 335 336 var tag *types.TagReference 337 for _, t := range result.Tags { 338 if t.Tag != nil { 339 if t.Reference.Name == tagParam || t.Reference.Hash == tagParam { 340 tag = t 341 } 342 } 343 } 344 345 if tag == nil { 346 return nil, fmt.Errorf("invalid tag, only annotated tags are supported for artifacts") 347 } 348 349 if tag.Tag.Target.IsZero() { 350 return nil, fmt.Errorf("invalid tag, only annotated tags are supported for artifacts") 351 } 352 353 return tag, nil 354} 355 356func repoArtifactRecord(f *models.Repo, blob *lexutil.LexBlob, createdAt time.Time, name string, tag []byte) *tangled.RepoArtifact { 357 rec := &tangled.RepoArtifact{ 358 Artifact: blob, 359 CreatedAt: createdAt.Format(time.RFC3339), 360 Name: name, 361 Tag: tag, 362 } 363 if f.RepoDid != "" { 364 rec.RepoDid = &f.RepoDid 365 } else { 366 s := f.RepoAt().String() 367 rec.Repo = &s 368 } 369 return rec 370}