Monorepo for Tangled
1package repo
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "time"
11
12 "tangled.org/core/api/tangled"
13 "tangled.org/core/appview/db"
14 "tangled.org/core/appview/models"
15 "tangled.org/core/appview/pages"
16 "tangled.org/core/appview/xrpcclient"
17 "tangled.org/core/orm"
18 "tangled.org/core/tid"
19 "tangled.org/core/types"
20 "tangled.org/core/xrpc"
21
22 comatproto "github.com/bluesky-social/indigo/api/atproto"
23 lexutil "github.com/bluesky-social/indigo/lex/util"
24 indigoxrpc "github.com/bluesky-social/indigo/xrpc"
25 "github.com/dustin/go-humanize"
26 "github.com/go-chi/chi/v5"
27 "github.com/go-git/go-git/v5/plumbing"
28 "github.com/ipfs/go-cid"
29)
30
31// TODO: proper statuses here on early exit
32func (rp *Repo) AttachArtifact(w http.ResponseWriter, r *http.Request) {
33 l := rp.logger.With("handler", "AttachArtifact")
34
35 user := rp.oauth.GetMultiAccountUser(r)
36 tagParam := chi.URLParam(r, "tag")
37 f, err := rp.repoResolver.Resolve(r)
38 if err != nil {
39 l.Error("failed to get repo and knot", "err", err)
40 rp.pages.Notice(w, "upload", "failed to upload artifact, error in repo resolution")
41 return
42 }
43
44 tag, err := rp.resolveTag(r.Context(), f, tagParam)
45 if err != nil {
46 l.Error("failed to resolve tag", "err", err)
47 rp.pages.Notice(w, "upload", "failed to upload artifact, error in tag resolution")
48 return
49 }
50
51 file, header, err := r.FormFile("artifact")
52 if err != nil {
53 l.Error("failed to upload artifact", "err", err)
54 rp.pages.Notice(w, "upload", "failed to upload artifact")
55 return
56 }
57 defer file.Close()
58
59 client, err := rp.oauth.AuthorizedClient(r)
60 if err != nil {
61 l.Error("failed to get authorized client", "err", err)
62 rp.pages.Notice(w, "upload", "failed to get authorized client")
63 return
64 }
65
66 uploadBlobResp, err := xrpc.RepoUploadBlob(r.Context(), client, file, header.Header.Get("Content-Type"))
67 if err != nil {
68 l.Error("failed to upload blob", "err", err)
69 rp.pages.Notice(w, "upload", "Failed to upload blob to your PDS. Try again later.")
70 return
71 }
72
73 l.Info("uploaded blob", "size", humanize.Bytes(uint64(uploadBlobResp.Blob.Size)), "blobRef", uploadBlobResp.Blob.Ref.String())
74
75 rkey := tid.TID()
76 createdAt := time.Now()
77
78 putRecordResp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
79 Collection: tangled.RepoArtifactNSID,
80 Repo: user.Active.Did,
81 Rkey: rkey,
82 Record: &lexutil.LexiconTypeDecoder{
83 Val: repoArtifactRecord(f, uploadBlobResp.Blob, createdAt, header.Filename, tag.Tag.Hash[:]),
84 },
85 })
86 if err != nil {
87 l.Error("failed to create record", "err", err)
88 rp.pages.Notice(w, "upload", "Failed to create artifact record. Try again later.")
89 return
90 }
91
92 l.Debug("created record for blob", "aturi", putRecordResp.Uri)
93
94 tx, err := rp.db.BeginTx(r.Context(), nil)
95 if err != nil {
96 l.Error("failed to start tx")
97 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.")
98 return
99 }
100 defer tx.Rollback()
101
102 artifact := models.Artifact{
103 Did: user.Active.Did,
104 Rkey: rkey,
105 RepoAt: f.RepoAt(),
106 RepoDid: f.RepoDid,
107 Tag: tag.Tag.Hash,
108 CreatedAt: createdAt,
109 BlobCid: cid.Cid(uploadBlobResp.Blob.Ref),
110 Name: header.Filename,
111 Size: uint64(uploadBlobResp.Blob.Size),
112 MimeType: uploadBlobResp.Blob.MimeType,
113 }
114
115 err = db.AddArtifact(tx, artifact)
116 if err != nil {
117 l.Error("failed to add artifact record to db", "err", err)
118 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.")
119 return
120 }
121
122 err = tx.Commit()
123 if err != nil {
124 l.Error("failed to add artifact record to db")
125 rp.pages.Notice(w, "upload", "Failed to create artifact. Try again later.")
126 return
127 }
128
129 rp.pages.RepoArtifactFragment(w, pages.RepoArtifactParams{
130 LoggedInUser: user,
131 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
132 Artifact: artifact,
133 })
134}
135
136func (rp *Repo) DownloadArtifact(w http.ResponseWriter, r *http.Request) {
137 l := rp.logger.With("handler", "DownloadArtifact")
138
139 f, err := rp.repoResolver.Resolve(r)
140 if err != nil {
141 l.Error("failed to get repo and knot", "err", err)
142 http.Error(w, "failed to resolve repo", http.StatusInternalServerError)
143 return
144 }
145
146 tagParam := chi.URLParam(r, "tag")
147 filename := chi.URLParam(r, "file")
148
149 tag, err := rp.resolveTag(r.Context(), f, tagParam)
150 if err != nil {
151 l.Error("failed to resolve tag", "err", err)
152 rp.pages.Notice(w, "upload", "failed to upload artifact, error in tag resolution")
153 return
154 }
155
156 artifacts, err := db.GetArtifact(
157 rp.db,
158 orm.FilterEq("repo_at", f.RepoAt()),
159 orm.FilterEq("tag", tag.Tag.Hash[:]),
160 orm.FilterEq("name", filename),
161 )
162 if err != nil {
163 l.Error("failed to get artifacts", "err", err)
164 http.Error(w, "failed to get artifact", http.StatusInternalServerError)
165 return
166 }
167
168 if len(artifacts) != 1 {
169 l.Error("too many or too few artifacts found")
170 http.Error(w, "artifact not found", http.StatusNotFound)
171 return
172 }
173
174 artifact := artifacts[0]
175
176 ownerId, err := rp.idResolver.ResolveIdent(r.Context(), f.Did)
177 if err != nil {
178 l.Error("failed to resolve repo owner did", "did", f.Did, "err", err)
179 http.Error(w, "repository owner not found", http.StatusNotFound)
180 return
181 }
182
183 ownerPds := ownerId.PDSEndpoint()
184 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds))
185 q := url.Query()
186 q.Set("cid", artifact.BlobCid.String())
187 q.Set("did", artifact.Did)
188 url.RawQuery = q.Encode()
189
190 req, err := http.NewRequest(http.MethodGet, url.String(), nil)
191 if err != nil {
192 l.Error("failed to create request", "err", err)
193 http.Error(w, "failed to create request", http.StatusInternalServerError)
194 return
195 }
196 req.Header.Set("Content-Type", "application/json")
197
198 resp, err := http.DefaultClient.Do(req)
199 if err != nil {
200 l.Error("failed to make request", "err", err)
201 http.Error(w, "failed to make request to PDS", http.StatusInternalServerError)
202 return
203 }
204 defer resp.Body.Close()
205
206 // copy status code and relevant headers from upstream response
207 w.WriteHeader(resp.StatusCode)
208 for key, values := range resp.Header {
209 for _, v := range values {
210 w.Header().Add(key, v)
211 }
212 }
213
214 // stream the body directly to the client
215 if _, err := io.Copy(w, resp.Body); err != nil {
216 l.Error("error streaming response to client:", "err", err)
217 }
218}
219
220// TODO: proper statuses here on early exit
221func (rp *Repo) DeleteArtifact(w http.ResponseWriter, r *http.Request) {
222 l := rp.logger.With("handler", "DeleteArtifact")
223
224 user := rp.oauth.GetMultiAccountUser(r)
225 tagParam := chi.URLParam(r, "tag")
226 filename := chi.URLParam(r, "file")
227 f, err := rp.repoResolver.Resolve(r)
228 if err != nil {
229 l.Error("failed to get repo and knot", "err", err)
230 return
231 }
232
233 client, _ := rp.oauth.AuthorizedClient(r)
234
235 tag := plumbing.NewHash(tagParam)
236
237 artifacts, err := db.GetArtifact(
238 rp.db,
239 orm.FilterEq("repo_at", f.RepoAt()),
240 orm.FilterEq("tag", tag[:]),
241 orm.FilterEq("name", filename),
242 )
243 if err != nil {
244 l.Error("failed to get artifacts", "err", err)
245 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.")
246 return
247 }
248 if len(artifacts) != 1 {
249 rp.pages.Notice(w, "remove", "Unable to find artifact.")
250 return
251 }
252
253 artifact := artifacts[0]
254
255 if user.Active.Did != artifact.Did {
256 l.Error("user not authorized to delete artifact", "err", err)
257 rp.pages.Notice(w, "remove", "Unauthorized deletion of artifact.")
258 return
259 }
260
261 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{
262 Collection: tangled.RepoArtifactNSID,
263 Repo: user.Active.Did,
264 Rkey: artifact.Rkey,
265 })
266 if err != nil {
267 l.Error("failed to get blob from pds", "err", err)
268 rp.pages.Notice(w, "remove", "Failed to remove blob from PDS.")
269 return
270 }
271
272 tx, err := rp.db.BeginTx(r.Context(), nil)
273 if err != nil {
274 l.Error("failed to start tx")
275 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.")
276 return
277 }
278 defer tx.Rollback()
279
280 err = db.DeleteArtifact(tx,
281 orm.FilterEq("repo_at", f.RepoAt()),
282 orm.FilterEq("tag", artifact.Tag[:]),
283 orm.FilterEq("name", filename),
284 )
285 if err != nil {
286 l.Error("failed to remove artifact record from db", "err", err)
287 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.")
288 return
289 }
290
291 err = tx.Commit()
292 if err != nil {
293 l.Error("failed to remove artifact record from db")
294 rp.pages.Notice(w, "remove", "Failed to delete artifact. Try again later.")
295 return
296 }
297
298 l.Info("successfully deleted artifact", "tag", tagParam, "file", filename)
299
300 w.Write([]byte{})
301}
302
303func (rp *Repo) resolveTag(ctx context.Context, f *models.Repo, tagParam string) (*types.TagReference, error) {
304 l := rp.logger.With("handler", "resolveTag")
305
306 tagParam, err := url.QueryUnescape(tagParam)
307 if err != nil {
308 return nil, err
309 }
310
311 scheme := "http"
312 if !rp.config.Core.Dev {
313 scheme = "https"
314 }
315 host := fmt.Sprintf("%s://%s", scheme, f.Knot)
316 xrpcc := &indigoxrpc.Client{
317 Host: host,
318 }
319
320 xrpcBytes, err := tangled.RepoTags(ctx, xrpcc, "", 0, f.RepoIdentifier())
321 if err != nil {
322 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil {
323 l.Error("failed to call XRPC repo.tags", "err", xrpcerr)
324 return nil, xrpcerr
325 }
326 l.Error("failed to reach knotserver", "err", err)
327 return nil, err
328 }
329
330 var result types.RepoTagsResponse
331 if err := json.Unmarshal(xrpcBytes, &result); err != nil {
332 l.Error("failed to decode XRPC tags response", "err", err)
333 return nil, err
334 }
335
336 var tag *types.TagReference
337 for _, t := range result.Tags {
338 if t.Tag != nil {
339 if t.Reference.Name == tagParam || t.Reference.Hash == tagParam {
340 tag = t
341 }
342 }
343 }
344
345 if tag == nil {
346 return nil, fmt.Errorf("invalid tag, only annotated tags are supported for artifacts")
347 }
348
349 if tag.Tag.Target.IsZero() {
350 return nil, fmt.Errorf("invalid tag, only annotated tags are supported for artifacts")
351 }
352
353 return tag, nil
354}
355
356func repoArtifactRecord(f *models.Repo, blob *lexutil.LexBlob, createdAt time.Time, name string, tag []byte) *tangled.RepoArtifact {
357 rec := &tangled.RepoArtifact{
358 Artifact: blob,
359 CreatedAt: createdAt.Format(time.RFC3339),
360 Name: name,
361 Tag: tag,
362 }
363 if f.RepoDid != "" {
364 rec.RepoDid = &f.RepoDid
365 } else {
366 s := f.RepoAt().String()
367 rec.Repo = &s
368 }
369 return rec
370}