Monorepo for Tangled
at c2049613392b034666f88e5c2b55d991f0c51040 351 lines 9.6 kB view raw
1package repo 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "log" 9 "net/http" 10 "net/url" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview/db" 15 "tangled.org/core/appview/models" 16 "tangled.org/core/appview/pages" 17 "tangled.org/core/appview/xrpcclient" 18 "tangled.org/core/orm" 19 "tangled.org/core/tid" 20 "tangled.org/core/types" 21 "tangled.org/core/xrpc" 22 23 comatproto "github.com/bluesky-social/indigo/api/atproto" 24 lexutil "github.com/bluesky-social/indigo/lex/util" 25 indigoxrpc "github.com/bluesky-social/indigo/xrpc" 26 "github.com/dustin/go-humanize" 27 "github.com/go-chi/chi/v5" 28 "github.com/go-git/go-git/v5/plumbing" 29 "github.com/ipfs/go-cid" 30) 31 32// TODO: proper statuses here on early exit 33func (rp *Repo) AttachArtifact(w http.ResponseWriter, r *http.Request) { 34 user := rp.oauth.GetMultiAccountUser(r) 35 tagParam := chi.URLParam(r, "tag") 36 f, err := rp.repoResolver.Resolve(r) 37 if err != nil { 38 log.Println("failed to get repo and knot", err) 39 rp.pages.Notice(w, "upload", "failed to upload artifact, error in repo resolution") 40 return 41 } 42 43 tag, err := rp.resolveTag(r.Context(), f, tagParam) 44 if err != nil { 45 log.Println("failed to resolve tag", err) 46 rp.pages.Notice(w, "upload", "failed to upload artifact, error in tag resolution") 47 return 48 } 49 50 file, header, err := r.FormFile("artifact") 51 if err != nil { 52 log.Println("failed to upload artifact", err) 53 rp.pages.Notice(w, "upload", "failed to upload artifact") 54 return 55 } 56 defer file.Close() 57 58 client, err := rp.oauth.AuthorizedClient(r) 59 if err != nil { 60 log.Println("failed to get authorized client", err) 61 rp.pages.Notice(w, "upload", "failed to get authorized client") 62 return 63 } 64 65 uploadBlobResp, err := xrpc.RepoUploadBlob(r.Context(), client, file, header.Header.Get("Content-Type")) 66 if err != nil { 67 log.Println("failed to upload blob", err) 68 rp.pages.Notice(w, "upload", "Failed to upload blob to your PDS. Try again later.") 69 return 70 } 71 72 log.Println("uploaded blob", humanize.Bytes(uint64(uploadBlobResp.Blob.Size)), uploadBlobResp.Blob.Ref.String()) 73 74 rkey := tid.TID() 75 createdAt := time.Now() 76 77 putRecordResp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 78 Collection: tangled.RepoArtifactNSID, 79 Repo: user.Active.Did, 80 Rkey: rkey, 81 Record: &lexutil.LexiconTypeDecoder{ 82 Val: &tangled.RepoArtifact{ 83 Artifact: uploadBlobResp.Blob, 84 CreatedAt: createdAt.Format(time.RFC3339), 85 Name: header.Filename, 86 Repo: f.RepoAt().String(), 87 Tag: tag.Tag.Hash[:], 88 }, 89 }, 90 }) 91 if err != nil { 92 log.Println("failed to create record", err) 93 rp.pages.Notice(w, "upload", "Failed to create artifact record. Try again later.") 94 return 95 } 96 97 log.Println(putRecordResp.Uri) 98 99 tx, err := rp.db.BeginTx(r.Context(), nil) 100 if err != nil { 101 log.Println("failed to start tx") 102 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.") 103 return 104 } 105 defer tx.Rollback() 106 107 artifact := models.Artifact{ 108 Did: user.Active.Did, 109 Rkey: rkey, 110 RepoAt: f.RepoAt(), 111 Tag: tag.Tag.Hash, 112 CreatedAt: createdAt, 113 BlobCid: cid.Cid(uploadBlobResp.Blob.Ref), 114 Name: header.Filename, 115 Size: uint64(uploadBlobResp.Blob.Size), 116 MimeType: uploadBlobResp.Blob.MimeType, 117 } 118 119 err = db.AddArtifact(tx, artifact) 120 if err != nil { 121 log.Println("failed to add artifact record to db", err) 122 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.") 123 return 124 } 125 126 err = tx.Commit() 127 if err != nil { 128 log.Println("failed to add artifact record to db") 129 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.") 130 return 131 } 132 133 rp.pages.RepoArtifactFragment(w, pages.RepoArtifactParams{ 134 LoggedInUser: user, 135 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 136 Artifact: artifact, 137 }) 138} 139 140func (rp *Repo) DownloadArtifact(w http.ResponseWriter, r *http.Request) { 141 f, err := rp.repoResolver.Resolve(r) 142 if err != nil { 143 log.Println("failed to get repo and knot", err) 144 http.Error(w, "failed to resolve repo", http.StatusInternalServerError) 145 return 146 } 147 148 tagParam := chi.URLParam(r, "tag") 149 filename := chi.URLParam(r, "file") 150 151 tag, err := rp.resolveTag(r.Context(), f, tagParam) 152 if err != nil { 153 log.Println("failed to resolve tag", err) 154 rp.pages.Notice(w, "upload", "failed to upload artifact, error in tag resolution") 155 return 156 } 157 158 artifacts, err := db.GetArtifact( 159 rp.db, 160 orm.FilterEq("repo_at", f.RepoAt()), 161 orm.FilterEq("tag", tag.Tag.Hash[:]), 162 orm.FilterEq("name", filename), 163 ) 164 if err != nil { 165 log.Println("failed to get artifacts", err) 166 http.Error(w, "failed to get artifact", http.StatusInternalServerError) 167 return 168 } 169 170 if len(artifacts) != 1 { 171 log.Printf("too many or too few artifacts found") 172 http.Error(w, "artifact not found", http.StatusNotFound) 173 return 174 } 175 176 artifact := artifacts[0] 177 178 ownerId, err := rp.idResolver.ResolveIdent(r.Context(), f.Did) 179 if err != nil { 180 log.Println("failed to resolve repo owner did", f.Did, err) 181 http.Error(w, "repository owner not found", http.StatusNotFound) 182 return 183 } 184 185 ownerPds := ownerId.PDSEndpoint() 186 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 187 q := url.Query() 188 q.Set("cid", artifact.BlobCid.String()) 189 q.Set("did", artifact.Did) 190 url.RawQuery = q.Encode() 191 192 req, err := http.NewRequest(http.MethodGet, url.String(), nil) 193 if err != nil { 194 log.Println("failed to create request", err) 195 http.Error(w, "failed to create request", http.StatusInternalServerError) 196 return 197 } 198 req.Header.Set("Content-Type", "application/json") 199 200 resp, err := http.DefaultClient.Do(req) 201 if err != nil { 202 log.Println("failed to make request", err) 203 http.Error(w, "failed to make request to PDS", http.StatusInternalServerError) 204 return 205 } 206 defer resp.Body.Close() 207 208 // copy status code and relevant headers from upstream response 209 w.WriteHeader(resp.StatusCode) 210 for key, values := range resp.Header { 211 for _, v := range values { 212 w.Header().Add(key, v) 213 } 214 } 215 216 // stream the body directly to the client 217 if _, err := io.Copy(w, resp.Body); err != nil { 218 log.Println("error streaming response to client:", err) 219 } 220} 221 222// TODO: proper statuses here on early exit 223func (rp *Repo) DeleteArtifact(w http.ResponseWriter, r *http.Request) { 224 user := rp.oauth.GetMultiAccountUser(r) 225 tagParam := chi.URLParam(r, "tag") 226 filename := chi.URLParam(r, "file") 227 f, err := rp.repoResolver.Resolve(r) 228 if err != nil { 229 log.Println("failed to get repo and knot", err) 230 return 231 } 232 233 client, _ := rp.oauth.AuthorizedClient(r) 234 235 tag := plumbing.NewHash(tagParam) 236 237 artifacts, err := db.GetArtifact( 238 rp.db, 239 orm.FilterEq("repo_at", f.RepoAt()), 240 orm.FilterEq("tag", tag[:]), 241 orm.FilterEq("name", filename), 242 ) 243 if err != nil { 244 log.Println("failed to get artifacts", err) 245 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.") 246 return 247 } 248 if len(artifacts) != 1 { 249 rp.pages.Notice(w, "remove", "Unable to find artifact.") 250 return 251 } 252 253 artifact := artifacts[0] 254 255 if user.Active.Did != artifact.Did { 256 log.Println("user not authorized to delete artifact", err) 257 rp.pages.Notice(w, "remove", "Unauthorized deletion of artifact.") 258 return 259 } 260 261 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{ 262 Collection: tangled.RepoArtifactNSID, 263 Repo: user.Active.Did, 264 Rkey: artifact.Rkey, 265 }) 266 if err != nil { 267 log.Println("failed to get blob from pds", err) 268 rp.pages.Notice(w, "remove", "Failed to remove blob from PDS.") 269 return 270 } 271 272 tx, err := rp.db.BeginTx(r.Context(), nil) 273 if err != nil { 274 log.Println("failed to start tx") 275 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.") 276 return 277 } 278 defer tx.Rollback() 279 280 err = db.DeleteArtifact(tx, 281 orm.FilterEq("repo_at", f.RepoAt()), 282 orm.FilterEq("tag", artifact.Tag[:]), 283 orm.FilterEq("name", filename), 284 ) 285 if err != nil { 286 log.Println("failed to remove artifact record from db", err) 287 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.") 288 return 289 } 290 291 err = tx.Commit() 292 if err != nil { 293 log.Println("failed to remove artifact record from db") 294 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.") 295 return 296 } 297 298 w.Write([]byte{}) 299} 300 301func (rp *Repo) resolveTag(ctx context.Context, f *models.Repo, tagParam string) (*types.TagReference, error) { 302 tagParam, err := url.QueryUnescape(tagParam) 303 if err != nil { 304 return nil, err 305 } 306 307 scheme := "http" 308 if !rp.config.Core.Dev { 309 scheme = "https" 310 } 311 host := fmt.Sprintf("%s://%s", scheme, f.Knot) 312 xrpcc := &indigoxrpc.Client{ 313 Host: host, 314 } 315 316 repo := fmt.Sprintf("%s/%s", f.Did, f.Name) 317 xrpcBytes, err := tangled.RepoTags(ctx, xrpcc, "", 0, repo) 318 if err != nil { 319 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 320 log.Println("failed to call XRPC repo.tags", xrpcerr) 321 return nil, xrpcerr 322 } 323 log.Println("failed to reach knotserver", err) 324 return nil, err 325 } 326 327 var result types.RepoTagsResponse 328 if err := json.Unmarshal(xrpcBytes, &result); err != nil { 329 log.Println("failed to decode XRPC tags response", err) 330 return nil, err 331 } 332 333 var tag *types.TagReference 334 for _, t := range result.Tags { 335 if t.Tag != nil { 336 if t.Reference.Name == tagParam || t.Reference.Hash == tagParam { 337 tag = t 338 } 339 } 340 } 341 342 if tag == nil { 343 return nil, fmt.Errorf("invalid tag, only annotated tags are supported for artifacts") 344 } 345 346 if tag.Tag.Target.IsZero() { 347 return nil, fmt.Errorf("invalid tag, only annotated tags are supported for artifacts") 348 } 349 350 return tag, nil 351}