A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
atcr.io
docker
container
atproto
go
1// Package oci provides OCI registry endpoints for the hold service.
2package oci
3
4import (
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "net/http"
9 "strings"
10
11 "atcr.io/pkg/atproto"
12 "atcr.io/pkg/hold/pds"
13 "atcr.io/pkg/hold/quota"
14 "atcr.io/pkg/s3"
15 "github.com/go-chi/chi/v5"
16 "github.com/go-chi/render"
17)
18
19// XRPCHandler handles OCI-specific XRPC endpoints for multipart uploads
20type XRPCHandler struct {
21 s3Service s3.S3Service
22 MultipartMgr *MultipartManager // Exported for access in route handlers
23 pds *pds.HoldPDS
24 httpClient pds.HTTPClient
25 enableBlueskyPosts bool
26 quotaMgr *quota.Manager // Quota manager for tier-based limits
27 scanBroadcaster *pds.ScanBroadcaster // Scan job dispatcher (nil = scanning disabled)
28}
29
30// NewXRPCHandler creates a new OCI XRPC handler
31func NewXRPCHandler(holdPDS *pds.HoldPDS, s3Service s3.S3Service, enableBlueskyPosts bool, httpClient pds.HTTPClient, quotaMgr *quota.Manager) *XRPCHandler {
32 return &XRPCHandler{
33 MultipartMgr: NewMultipartManager(),
34 s3Service: s3Service,
35 pds: holdPDS,
36 httpClient: httpClient,
37 enableBlueskyPosts: enableBlueskyPosts,
38 quotaMgr: quotaMgr,
39 }
40}
41
42// SetScanBroadcaster sets the scan broadcaster for triggering scans on push
43func (h *XRPCHandler) SetScanBroadcaster(sb *pds.ScanBroadcaster) {
44 h.scanBroadcaster = sb
45}
46
47// RegisterHandlers registers all OCI XRPC endpoints with the chi router
48func (h *XRPCHandler) RegisterHandlers(r chi.Router) {
49 // All multipart upload endpoints require blob:write permission
50 r.Group(func(r chi.Router) {
51 r.Use(h.requireBlobWriteAccess)
52
53 r.Post(atproto.HoldInitiateUpload, h.HandleInitiateUpload)
54 r.Post(atproto.HoldGetPartUploadURL, h.HandleGetPartUploadURL)
55 r.Post(atproto.HoldCompleteUpload, h.HandleCompleteUpload)
56 r.Post(atproto.HoldAbortUpload, h.HandleAbortUpload)
57 r.Post(atproto.HoldNotifyManifest, h.HandleNotifyManifest)
58 })
59}
60
61// HandleInitiateUpload starts a new multipart upload
62// Replaces the old "action: start" pattern
63func (h *XRPCHandler) HandleInitiateUpload(w http.ResponseWriter, r *http.Request) {
64 var req struct {
65 Digest string `json:"digest"`
66 }
67
68 if err := render.Decode(r, &req); err != nil {
69 render.Status(r, http.StatusBadRequest)
70 render.JSON(w, r, map[string]string{"error": err.Error()})
71 return
72 }
73
74 if req.Digest == "" {
75 render.Status(r, http.StatusBadRequest)
76 render.JSON(w, r, map[string]string{"error": "digest is required"})
77 return
78 }
79
80 uploadID, err := h.StartMultipartUploadWithManager(r.Context(), req.Digest)
81 if err != nil {
82 render.Status(r, http.StatusInternalServerError)
83 render.JSON(w, r, map[string]string{"error": fmt.Sprintf("failed to initiate upload: %v", err)})
84 return
85 }
86
87 render.JSON(w, r, map[string]any{
88 "uploadId": uploadID,
89 })
90}
91
92// HandleGetPartUploadURL returns a presigned URL or endpoint info for uploading a part
93// Replaces the old "action: part" pattern
94func (h *XRPCHandler) HandleGetPartUploadURL(w http.ResponseWriter, r *http.Request) {
95 var req struct {
96 UploadID string `json:"uploadId"`
97 PartNumber int `json:"partNumber"`
98 }
99
100 if err := render.Decode(r, &req); err != nil {
101 render.Status(r, http.StatusBadRequest)
102 render.JSON(w, r, map[string]string{"error": err.Error()})
103 return
104 }
105
106 if req.UploadID == "" || req.PartNumber == 0 {
107 render.Status(r, http.StatusBadRequest)
108 render.JSON(w, r, map[string]string{"error": "uploadId and partNumber are required"})
109 return
110 }
111
112 uploadInfo, err := h.GetPartUploadURL(r.Context(), req.UploadID, req.PartNumber)
113 if err != nil {
114 render.Status(r, http.StatusInternalServerError)
115 render.JSON(w, r, map[string]string{"error": fmt.Sprintf("failed to get part upload URL: %v", err)})
116 return
117 }
118
119 render.JSON(w, r, uploadInfo)
120}
121
122// HandleCompleteUpload finalizes a multipart upload
123// Replaces the old "action: complete" pattern
124func (h *XRPCHandler) HandleCompleteUpload(w http.ResponseWriter, r *http.Request) {
125 var req struct {
126 UploadID string `json:"uploadId"`
127 Digest string `json:"digest"`
128 Parts []PartInfo `json:"parts"`
129 }
130
131 if err := render.Decode(r, &req); err != nil {
132 render.Status(r, http.StatusBadRequest)
133 render.JSON(w, r, map[string]string{"error": err.Error()})
134 return
135 }
136
137 if req.UploadID == "" || req.Digest == "" || len(req.Parts) == 0 {
138 render.Status(r, http.StatusBadRequest)
139 render.JSON(w, r, map[string]string{"error": "uploadId, digest, and parts are required"})
140 return
141 }
142
143 err := h.CompleteMultipartUploadWithManager(r.Context(), req.UploadID, req.Digest, req.Parts)
144 if err != nil {
145 render.Status(r, http.StatusInternalServerError)
146 render.JSON(w, r, map[string]string{"error": fmt.Sprintf("failed to complete upload: %v", err)})
147 return
148 }
149
150 render.JSON(w, r, map[string]any{
151 "status": "completed",
152 "digest": req.Digest,
153 })
154}
155
156// HandleAbortUpload cancels a multipart upload
157// Replaces the old "action: abort" pattern
158func (h *XRPCHandler) HandleAbortUpload(w http.ResponseWriter, r *http.Request) {
159 var req struct {
160 UploadID string `json:"uploadId"`
161 }
162
163 if err := render.Decode(r, &req); err != nil {
164 render.Status(r, http.StatusBadRequest)
165 render.JSON(w, r, map[string]string{"error": err.Error()})
166 return
167 }
168
169 if req.UploadID == "" {
170 render.Status(r, http.StatusBadRequest)
171 render.JSON(w, r, map[string]string{"error": "uploadId is required"})
172 return
173 }
174
175 err := h.AbortMultipartUploadWithManager(r.Context(), req.UploadID)
176 if err != nil {
177 render.Status(r, http.StatusInternalServerError)
178 render.JSON(w, r, map[string]string{"error": fmt.Sprintf("failed to abort upload: %v", err)})
179 return
180 }
181
182 render.JSON(w, r, map[string]any{
183 "status": "aborted",
184 })
185}
186
187// HandleNotifyManifest handles manifest notifications from AppView
188// For pushes: Creates layer records and optionally posts to Bluesky
189// For pulls: Just increments stats (no layer records or posts)
190// Always increments stats (pull or push counts)
191func (h *XRPCHandler) HandleNotifyManifest(w http.ResponseWriter, r *http.Request) {
192 ctx := r.Context()
193
194 // Validate service token (same auth as blob:write endpoints)
195 validatedUser, err := pds.ValidateBlobWriteAccess(r, h.pds, h.httpClient)
196 if err != nil {
197 render.Status(r, http.StatusForbidden)
198 render.JSON(w, r, map[string]string{"error": fmt.Sprintf("authorization failed: %v", err)})
199 return
200 }
201
202 // Parse request
203 var req struct {
204 Repository string `json:"repository"`
205 Tag string `json:"tag"`
206 UserDID string `json:"userDid"`
207 ManifestDigest string `json:"manifestDigest"` // For building layer record AT-URIs
208 Operation string `json:"operation"` // "push" or "pull", defaults to "push" for backward compatibility
209 Manifest struct {
210 MediaType string `json:"mediaType"`
211 Config struct {
212 Digest string `json:"digest"`
213 Size int64 `json:"size"`
214 MediaType string `json:"mediaType"`
215 } `json:"config"`
216 Layers []struct {
217 Digest string `json:"digest"`
218 Size int64 `json:"size"`
219 MediaType string `json:"mediaType"`
220 } `json:"layers"`
221 Manifests []struct {
222 Digest string `json:"digest"`
223 Size int64 `json:"size"`
224 MediaType string `json:"mediaType"`
225 Platform *struct {
226 OS string `json:"os"`
227 Architecture string `json:"architecture"`
228 } `json:"platform"`
229 } `json:"manifests"`
230 } `json:"manifest"`
231 }
232
233 if err := render.Decode(r, &req); err != nil {
234 render.Status(r, http.StatusBadRequest)
235 render.JSON(w, r, map[string]string{"error": err.Error()})
236 return
237 }
238
239 // Default operation to "push" for backward compatibility
240 operation := req.Operation
241 if operation == "" {
242 operation = "push"
243 }
244
245 // Validate operation
246 if operation != "push" && operation != "pull" {
247 render.Status(r, http.StatusBadRequest)
248 render.JSON(w, r, map[string]string{"error": fmt.Sprintf("invalid operation: %s (must be 'push' or 'pull')", operation)})
249 return
250 }
251
252 // Verify user DID matches token - only for pushes
253 // For pulls: userDID is the repo owner (for stats), but the token belongs to the puller
254 // This allows anyone to pull from a public repo and have stats tracked under the owner
255 if operation == "push" && req.UserDID != validatedUser.DID {
256 render.Status(r, http.StatusForbidden)
257 render.JSON(w, r, map[string]string{"error": "user DID mismatch"})
258 return
259 }
260
261 var layersCreated int
262 var postCreated bool
263 var postURI string
264
265 // Only create layer records and Bluesky posts for pushes
266 if operation == "push" {
267 // Soft limit check: block if ALREADY over quota
268 // (blobs already uploaded to S3 by this point, no sense rejecting)
269 stats, err := h.pds.GetQuotaForUserWithTier(ctx, req.UserDID, h.quotaMgr)
270 if err == nil && stats.Limit != nil && stats.TotalSize > *stats.Limit {
271 slog.Warn("Quota exceeded for push",
272 "userDid", req.UserDID,
273 "currentUsage", stats.TotalSize,
274 "limit", *stats.Limit,
275 "repository", req.Repository,
276 "tag", req.Tag,
277 )
278 render.Status(r, http.StatusForbidden)
279 render.JSON(w, r, map[string]string{"error": fmt.Sprintf(
280 "quota exceeded: current=%d bytes, limit=%d bytes. Delete images to free space.",
281 stats.TotalSize, *stats.Limit,
282 )})
283 return
284 }
285
286 // Check if manifest posts are enabled
287 // Read from captain record (which is synced with HOLD_BLUESKY_POSTS_ENABLED env var)
288 postsEnabled := false
289 _, captain, err := h.pds.GetCaptainRecord(ctx)
290 if err == nil {
291 postsEnabled = captain.EnableBlueskyPosts
292 } else {
293 // Fallback to env var if captain record doesn't exist (shouldn't happen in normal operation)
294 postsEnabled = h.enableBlueskyPosts
295 }
296
297 // Build manifest AT-URI for layer records
298 manifestURI := atproto.BuildManifestURI(req.UserDID, req.ManifestDigest)
299
300 // Create layer records for each blob
301 for _, layer := range req.Manifest.Layers {
302 record := atproto.NewLayerRecord(
303 layer.Digest,
304 layer.Size,
305 layer.MediaType,
306 req.UserDID,
307 manifestURI,
308 )
309
310 _, _, err := h.pds.CreateLayerRecord(ctx, record)
311 if err != nil {
312 slog.Error("Failed to create layer record", "error", err)
313 // Continue creating other records
314 } else {
315 layersCreated++
316 }
317 }
318
319 // Check if this is a multi-arch image (has manifests instead of layers)
320 isMultiArch := len(req.Manifest.Manifests) > 0
321
322 // Calculate total size from all layers (for single-arch images)
323 var totalSize int64
324 for _, layer := range req.Manifest.Layers {
325 totalSize += layer.Size
326 }
327 totalSize += req.Manifest.Config.Size // Add config blob size
328
329 // Extract platforms for multi-arch images
330 // Filter out attestation manifests which have unknown/unknown or empty platforms
331 var platforms []string
332 if isMultiArch {
333 for _, m := range req.Manifest.Manifests {
334 if m.Platform != nil &&
335 m.Platform.OS != "" && m.Platform.OS != "unknown" &&
336 m.Platform.Architecture != "" && m.Platform.Architecture != "unknown" {
337 platforms = append(platforms, m.Platform.OS+"/"+m.Platform.Architecture)
338 }
339 }
340 }
341
342 // Detect artifact type from config media type
343 artifactType := "container-image"
344 if strings.Contains(req.Manifest.Config.MediaType, "helm.config") {
345 artifactType = "helm-chart"
346 }
347
348 // Create Bluesky post if enabled and tag is present
349 // Skip posts for tagless pushes (e.g., buildx platform manifests pushed by digest)
350 if postsEnabled && req.Tag != "" {
351 // Resolve handle from DID (cached, 24-hour TTL)
352 _, userHandle, _, resolveErr := atproto.ResolveIdentity(ctx, req.UserDID)
353 if resolveErr != nil {
354 slog.Warn("Failed to resolve handle for user", "did", req.UserDID, "error", resolveErr)
355 userHandle = req.UserDID // Fallback to DID if resolution fails
356 }
357
358 // Extract manifest digest from first layer (or use config digest as fallback)
359 manifestDigest := req.Manifest.Config.Digest
360 if len(req.Manifest.Layers) > 0 {
361 manifestDigest = req.Manifest.Layers[0].Digest
362 }
363
364 postURI, err = h.pds.CreateManifestPost(
365 ctx,
366 &h.s3Service,
367 req.Repository,
368 req.Tag,
369 userHandle,
370 req.UserDID,
371 manifestDigest,
372 totalSize,
373 platforms,
374 artifactType,
375 )
376 if err != nil {
377 slog.Error("Failed to create manifest post", "error", err)
378 } else {
379 postCreated = true
380 }
381 }
382
383 // Enqueue scan job if scanner is connected (skip manifest lists — children get their own jobs)
384 if h.scanBroadcaster != nil && !isMultiArch {
385 tier := "deckhand"
386 if stats != nil && stats.Tier != "" {
387 tier = stats.Tier
388 }
389
390 // Check if this tier gets scan-on-push.
391 // Captain ("owner") always gets scan-on-push.
392 // When quotas are disabled, all pushes trigger scans (backwards compat).
393 shouldScan := tier == "owner" ||
394 h.quotaMgr == nil || !h.quotaMgr.IsEnabled() ||
395 h.quotaMgr.ScanOnPush(tier)
396
397 if shouldScan {
398 configJSON, _ := json.Marshal(req.Manifest.Config)
399 layersJSON, _ := json.Marshal(req.Manifest.Layers)
400
401 // Resolve handle for scanner context
402 _, userHandle, _, resolveErr := atproto.ResolveIdentity(ctx, req.UserDID)
403 if resolveErr != nil {
404 userHandle = req.UserDID
405 }
406
407 if err := h.scanBroadcaster.Enqueue(&pds.ScanJobEvent{
408 ManifestDigest: req.ManifestDigest,
409 Repository: req.Repository,
410 Tag: req.Tag,
411 UserDID: req.UserDID,
412 UserHandle: userHandle,
413 Tier: tier,
414 Config: configJSON,
415 Layers: layersJSON,
416 }); err != nil {
417 slog.Error("Failed to enqueue scan job",
418 "repository", req.Repository,
419 "error", err)
420 }
421 } else {
422 slog.Debug("Scan-on-push skipped for tier",
423 "tier", tier,
424 "repository", req.Repository,
425 "userDid", req.UserDID)
426 }
427 }
428 }
429
430 // ALWAYS increment stats (even if Bluesky posts disabled, even for pulls)
431 statsUpdated := false
432 if err := h.pds.IncrementStats(ctx, req.UserDID, req.Repository, operation); err != nil {
433 slog.Error("Failed to increment stats", "operation", operation, "error", err)
434 } else {
435 statsUpdated = true
436 }
437
438 // Return response
439 resp := map[string]any{
440 "success": statsUpdated || layersCreated > 0 || postCreated,
441 "operation": operation,
442 "statsUpdated": statsUpdated,
443 }
444
445 // Only include push-specific fields for push operations
446 if operation == "push" {
447 resp["layersCreated"] = layersCreated
448 resp["postCreated"] = postCreated
449 if postURI != "" {
450 resp["postUri"] = postURI
451 }
452 }
453
454 render.JSON(w, r, resp)
455}
456
457// requireBlobWriteAccess middleware - validates DPoP + OAuth and checks for blob:write permission
458func (h *XRPCHandler) requireBlobWriteAccess(next http.Handler) http.Handler {
459 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
460 _, err := pds.ValidateBlobWriteAccess(r, h.pds, h.httpClient)
461 if err != nil {
462 http.Error(w, fmt.Sprintf("authorization failed: %v", err), http.StatusForbidden)
463 return
464 }
465
466 // Validation successful - user has blob:write permission
467 // No need to store user in context since handlers don't need it
468 next.ServeHTTP(w, r)
469 })
470}