A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
at main 470 lines 15 kB view raw
1// Package oci provides OCI registry endpoints for the hold service. 2package oci 3 4import ( 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "net/http" 9 "strings" 10 11 "atcr.io/pkg/atproto" 12 "atcr.io/pkg/hold/pds" 13 "atcr.io/pkg/hold/quota" 14 "atcr.io/pkg/s3" 15 "github.com/go-chi/chi/v5" 16 "github.com/go-chi/render" 17) 18 19// XRPCHandler handles OCI-specific XRPC endpoints for multipart uploads 20type XRPCHandler struct { 21 s3Service s3.S3Service 22 MultipartMgr *MultipartManager // Exported for access in route handlers 23 pds *pds.HoldPDS 24 httpClient pds.HTTPClient 25 enableBlueskyPosts bool 26 quotaMgr *quota.Manager // Quota manager for tier-based limits 27 scanBroadcaster *pds.ScanBroadcaster // Scan job dispatcher (nil = scanning disabled) 28} 29 30// NewXRPCHandler creates a new OCI XRPC handler 31func NewXRPCHandler(holdPDS *pds.HoldPDS, s3Service s3.S3Service, enableBlueskyPosts bool, httpClient pds.HTTPClient, quotaMgr *quota.Manager) *XRPCHandler { 32 return &XRPCHandler{ 33 MultipartMgr: NewMultipartManager(), 34 s3Service: s3Service, 35 pds: holdPDS, 36 httpClient: httpClient, 37 enableBlueskyPosts: enableBlueskyPosts, 38 quotaMgr: quotaMgr, 39 } 40} 41 42// SetScanBroadcaster sets the scan broadcaster for triggering scans on push 43func (h *XRPCHandler) SetScanBroadcaster(sb *pds.ScanBroadcaster) { 44 h.scanBroadcaster = sb 45} 46 47// RegisterHandlers registers all OCI XRPC endpoints with the chi router 48func (h *XRPCHandler) RegisterHandlers(r chi.Router) { 49 // All multipart upload endpoints require blob:write permission 50 r.Group(func(r chi.Router) { 51 r.Use(h.requireBlobWriteAccess) 52 53 r.Post(atproto.HoldInitiateUpload, h.HandleInitiateUpload) 54 r.Post(atproto.HoldGetPartUploadURL, h.HandleGetPartUploadURL) 55 r.Post(atproto.HoldCompleteUpload, h.HandleCompleteUpload) 56 r.Post(atproto.HoldAbortUpload, h.HandleAbortUpload) 57 r.Post(atproto.HoldNotifyManifest, h.HandleNotifyManifest) 58 }) 59} 60 61// HandleInitiateUpload starts a new multipart upload 62// Replaces the old "action: start" pattern 63func (h *XRPCHandler) HandleInitiateUpload(w http.ResponseWriter, r *http.Request) { 64 var req struct { 65 Digest string `json:"digest"` 66 } 67 68 if err := render.Decode(r, &req); err != nil { 69 render.Status(r, http.StatusBadRequest) 70 render.JSON(w, r, map[string]string{"error": err.Error()}) 71 return 72 } 73 74 if req.Digest == "" { 75 render.Status(r, http.StatusBadRequest) 76 render.JSON(w, r, map[string]string{"error": "digest is required"}) 77 return 78 } 79 80 uploadID, err := h.StartMultipartUploadWithManager(r.Context(), req.Digest) 81 if err != nil { 82 render.Status(r, http.StatusInternalServerError) 83 render.JSON(w, r, map[string]string{"error": fmt.Sprintf("failed to initiate upload: %v", err)}) 84 return 85 } 86 87 render.JSON(w, r, map[string]any{ 88 "uploadId": uploadID, 89 }) 90} 91 92// HandleGetPartUploadURL returns a presigned URL or endpoint info for uploading a part 93// Replaces the old "action: part" pattern 94func (h *XRPCHandler) HandleGetPartUploadURL(w http.ResponseWriter, r *http.Request) { 95 var req struct { 96 UploadID string `json:"uploadId"` 97 PartNumber int `json:"partNumber"` 98 } 99 100 if err := render.Decode(r, &req); err != nil { 101 render.Status(r, http.StatusBadRequest) 102 render.JSON(w, r, map[string]string{"error": err.Error()}) 103 return 104 } 105 106 if req.UploadID == "" || req.PartNumber == 0 { 107 render.Status(r, http.StatusBadRequest) 108 render.JSON(w, r, map[string]string{"error": "uploadId and partNumber are required"}) 109 return 110 } 111 112 uploadInfo, err := h.GetPartUploadURL(r.Context(), req.UploadID, req.PartNumber) 113 if err != nil { 114 render.Status(r, http.StatusInternalServerError) 115 render.JSON(w, r, map[string]string{"error": fmt.Sprintf("failed to get part upload URL: %v", err)}) 116 return 117 } 118 119 render.JSON(w, r, uploadInfo) 120} 121 122// HandleCompleteUpload finalizes a multipart upload 123// Replaces the old "action: complete" pattern 124func (h *XRPCHandler) HandleCompleteUpload(w http.ResponseWriter, r *http.Request) { 125 var req struct { 126 UploadID string `json:"uploadId"` 127 Digest string `json:"digest"` 128 Parts []PartInfo `json:"parts"` 129 } 130 131 if err := render.Decode(r, &req); err != nil { 132 render.Status(r, http.StatusBadRequest) 133 render.JSON(w, r, map[string]string{"error": err.Error()}) 134 return 135 } 136 137 if req.UploadID == "" || req.Digest == "" || len(req.Parts) == 0 { 138 render.Status(r, http.StatusBadRequest) 139 render.JSON(w, r, map[string]string{"error": "uploadId, digest, and parts are required"}) 140 return 141 } 142 143 err := h.CompleteMultipartUploadWithManager(r.Context(), req.UploadID, req.Digest, req.Parts) 144 if err != nil { 145 render.Status(r, http.StatusInternalServerError) 146 render.JSON(w, r, map[string]string{"error": fmt.Sprintf("failed to complete upload: %v", err)}) 147 return 148 } 149 150 render.JSON(w, r, map[string]any{ 151 "status": "completed", 152 "digest": req.Digest, 153 }) 154} 155 156// HandleAbortUpload cancels a multipart upload 157// Replaces the old "action: abort" pattern 158func (h *XRPCHandler) HandleAbortUpload(w http.ResponseWriter, r *http.Request) { 159 var req struct { 160 UploadID string `json:"uploadId"` 161 } 162 163 if err := render.Decode(r, &req); err != nil { 164 render.Status(r, http.StatusBadRequest) 165 render.JSON(w, r, map[string]string{"error": err.Error()}) 166 return 167 } 168 169 if req.UploadID == "" { 170 render.Status(r, http.StatusBadRequest) 171 render.JSON(w, r, map[string]string{"error": "uploadId is required"}) 172 return 173 } 174 175 err := h.AbortMultipartUploadWithManager(r.Context(), req.UploadID) 176 if err != nil { 177 render.Status(r, http.StatusInternalServerError) 178 render.JSON(w, r, map[string]string{"error": fmt.Sprintf("failed to abort upload: %v", err)}) 179 return 180 } 181 182 render.JSON(w, r, map[string]any{ 183 "status": "aborted", 184 }) 185} 186 187// HandleNotifyManifest handles manifest notifications from AppView 188// For pushes: Creates layer records and optionally posts to Bluesky 189// For pulls: Just increments stats (no layer records or posts) 190// Always increments stats (pull or push counts) 191func (h *XRPCHandler) HandleNotifyManifest(w http.ResponseWriter, r *http.Request) { 192 ctx := r.Context() 193 194 // Validate service token (same auth as blob:write endpoints) 195 validatedUser, err := pds.ValidateBlobWriteAccess(r, h.pds, h.httpClient) 196 if err != nil { 197 render.Status(r, http.StatusForbidden) 198 render.JSON(w, r, map[string]string{"error": fmt.Sprintf("authorization failed: %v", err)}) 199 return 200 } 201 202 // Parse request 203 var req struct { 204 Repository string `json:"repository"` 205 Tag string `json:"tag"` 206 UserDID string `json:"userDid"` 207 ManifestDigest string `json:"manifestDigest"` // For building layer record AT-URIs 208 Operation string `json:"operation"` // "push" or "pull", defaults to "push" for backward compatibility 209 Manifest struct { 210 MediaType string `json:"mediaType"` 211 Config struct { 212 Digest string `json:"digest"` 213 Size int64 `json:"size"` 214 MediaType string `json:"mediaType"` 215 } `json:"config"` 216 Layers []struct { 217 Digest string `json:"digest"` 218 Size int64 `json:"size"` 219 MediaType string `json:"mediaType"` 220 } `json:"layers"` 221 Manifests []struct { 222 Digest string `json:"digest"` 223 Size int64 `json:"size"` 224 MediaType string `json:"mediaType"` 225 Platform *struct { 226 OS string `json:"os"` 227 Architecture string `json:"architecture"` 228 } `json:"platform"` 229 } `json:"manifests"` 230 } `json:"manifest"` 231 } 232 233 if err := render.Decode(r, &req); err != nil { 234 render.Status(r, http.StatusBadRequest) 235 render.JSON(w, r, map[string]string{"error": err.Error()}) 236 return 237 } 238 239 // Default operation to "push" for backward compatibility 240 operation := req.Operation 241 if operation == "" { 242 operation = "push" 243 } 244 245 // Validate operation 246 if operation != "push" && operation != "pull" { 247 render.Status(r, http.StatusBadRequest) 248 render.JSON(w, r, map[string]string{"error": fmt.Sprintf("invalid operation: %s (must be 'push' or 'pull')", operation)}) 249 return 250 } 251 252 // Verify user DID matches token - only for pushes 253 // For pulls: userDID is the repo owner (for stats), but the token belongs to the puller 254 // This allows anyone to pull from a public repo and have stats tracked under the owner 255 if operation == "push" && req.UserDID != validatedUser.DID { 256 render.Status(r, http.StatusForbidden) 257 render.JSON(w, r, map[string]string{"error": "user DID mismatch"}) 258 return 259 } 260 261 var layersCreated int 262 var postCreated bool 263 var postURI string 264 265 // Only create layer records and Bluesky posts for pushes 266 if operation == "push" { 267 // Soft limit check: block if ALREADY over quota 268 // (blobs already uploaded to S3 by this point, no sense rejecting) 269 stats, err := h.pds.GetQuotaForUserWithTier(ctx, req.UserDID, h.quotaMgr) 270 if err == nil && stats.Limit != nil && stats.TotalSize > *stats.Limit { 271 slog.Warn("Quota exceeded for push", 272 "userDid", req.UserDID, 273 "currentUsage", stats.TotalSize, 274 "limit", *stats.Limit, 275 "repository", req.Repository, 276 "tag", req.Tag, 277 ) 278 render.Status(r, http.StatusForbidden) 279 render.JSON(w, r, map[string]string{"error": fmt.Sprintf( 280 "quota exceeded: current=%d bytes, limit=%d bytes. Delete images to free space.", 281 stats.TotalSize, *stats.Limit, 282 )}) 283 return 284 } 285 286 // Check if manifest posts are enabled 287 // Read from captain record (which is synced with HOLD_BLUESKY_POSTS_ENABLED env var) 288 postsEnabled := false 289 _, captain, err := h.pds.GetCaptainRecord(ctx) 290 if err == nil { 291 postsEnabled = captain.EnableBlueskyPosts 292 } else { 293 // Fallback to env var if captain record doesn't exist (shouldn't happen in normal operation) 294 postsEnabled = h.enableBlueskyPosts 295 } 296 297 // Build manifest AT-URI for layer records 298 manifestURI := atproto.BuildManifestURI(req.UserDID, req.ManifestDigest) 299 300 // Create layer records for each blob 301 for _, layer := range req.Manifest.Layers { 302 record := atproto.NewLayerRecord( 303 layer.Digest, 304 layer.Size, 305 layer.MediaType, 306 req.UserDID, 307 manifestURI, 308 ) 309 310 _, _, err := h.pds.CreateLayerRecord(ctx, record) 311 if err != nil { 312 slog.Error("Failed to create layer record", "error", err) 313 // Continue creating other records 314 } else { 315 layersCreated++ 316 } 317 } 318 319 // Check if this is a multi-arch image (has manifests instead of layers) 320 isMultiArch := len(req.Manifest.Manifests) > 0 321 322 // Calculate total size from all layers (for single-arch images) 323 var totalSize int64 324 for _, layer := range req.Manifest.Layers { 325 totalSize += layer.Size 326 } 327 totalSize += req.Manifest.Config.Size // Add config blob size 328 329 // Extract platforms for multi-arch images 330 // Filter out attestation manifests which have unknown/unknown or empty platforms 331 var platforms []string 332 if isMultiArch { 333 for _, m := range req.Manifest.Manifests { 334 if m.Platform != nil && 335 m.Platform.OS != "" && m.Platform.OS != "unknown" && 336 m.Platform.Architecture != "" && m.Platform.Architecture != "unknown" { 337 platforms = append(platforms, m.Platform.OS+"/"+m.Platform.Architecture) 338 } 339 } 340 } 341 342 // Detect artifact type from config media type 343 artifactType := "container-image" 344 if strings.Contains(req.Manifest.Config.MediaType, "helm.config") { 345 artifactType = "helm-chart" 346 } 347 348 // Create Bluesky post if enabled and tag is present 349 // Skip posts for tagless pushes (e.g., buildx platform manifests pushed by digest) 350 if postsEnabled && req.Tag != "" { 351 // Resolve handle from DID (cached, 24-hour TTL) 352 _, userHandle, _, resolveErr := atproto.ResolveIdentity(ctx, req.UserDID) 353 if resolveErr != nil { 354 slog.Warn("Failed to resolve handle for user", "did", req.UserDID, "error", resolveErr) 355 userHandle = req.UserDID // Fallback to DID if resolution fails 356 } 357 358 // Extract manifest digest from first layer (or use config digest as fallback) 359 manifestDigest := req.Manifest.Config.Digest 360 if len(req.Manifest.Layers) > 0 { 361 manifestDigest = req.Manifest.Layers[0].Digest 362 } 363 364 postURI, err = h.pds.CreateManifestPost( 365 ctx, 366 &h.s3Service, 367 req.Repository, 368 req.Tag, 369 userHandle, 370 req.UserDID, 371 manifestDigest, 372 totalSize, 373 platforms, 374 artifactType, 375 ) 376 if err != nil { 377 slog.Error("Failed to create manifest post", "error", err) 378 } else { 379 postCreated = true 380 } 381 } 382 383 // Enqueue scan job if scanner is connected (skip manifest lists — children get their own jobs) 384 if h.scanBroadcaster != nil && !isMultiArch { 385 tier := "deckhand" 386 if stats != nil && stats.Tier != "" { 387 tier = stats.Tier 388 } 389 390 // Check if this tier gets scan-on-push. 391 // Captain ("owner") always gets scan-on-push. 392 // When quotas are disabled, all pushes trigger scans (backwards compat). 393 shouldScan := tier == "owner" || 394 h.quotaMgr == nil || !h.quotaMgr.IsEnabled() || 395 h.quotaMgr.ScanOnPush(tier) 396 397 if shouldScan { 398 configJSON, _ := json.Marshal(req.Manifest.Config) 399 layersJSON, _ := json.Marshal(req.Manifest.Layers) 400 401 // Resolve handle for scanner context 402 _, userHandle, _, resolveErr := atproto.ResolveIdentity(ctx, req.UserDID) 403 if resolveErr != nil { 404 userHandle = req.UserDID 405 } 406 407 if err := h.scanBroadcaster.Enqueue(&pds.ScanJobEvent{ 408 ManifestDigest: req.ManifestDigest, 409 Repository: req.Repository, 410 Tag: req.Tag, 411 UserDID: req.UserDID, 412 UserHandle: userHandle, 413 Tier: tier, 414 Config: configJSON, 415 Layers: layersJSON, 416 }); err != nil { 417 slog.Error("Failed to enqueue scan job", 418 "repository", req.Repository, 419 "error", err) 420 } 421 } else { 422 slog.Debug("Scan-on-push skipped for tier", 423 "tier", tier, 424 "repository", req.Repository, 425 "userDid", req.UserDID) 426 } 427 } 428 } 429 430 // ALWAYS increment stats (even if Bluesky posts disabled, even for pulls) 431 statsUpdated := false 432 if err := h.pds.IncrementStats(ctx, req.UserDID, req.Repository, operation); err != nil { 433 slog.Error("Failed to increment stats", "operation", operation, "error", err) 434 } else { 435 statsUpdated = true 436 } 437 438 // Return response 439 resp := map[string]any{ 440 "success": statsUpdated || layersCreated > 0 || postCreated, 441 "operation": operation, 442 "statsUpdated": statsUpdated, 443 } 444 445 // Only include push-specific fields for push operations 446 if operation == "push" { 447 resp["layersCreated"] = layersCreated 448 resp["postCreated"] = postCreated 449 if postURI != "" { 450 resp["postUri"] = postURI 451 } 452 } 453 454 render.JSON(w, r, resp) 455} 456 457// requireBlobWriteAccess middleware - validates DPoP + OAuth and checks for blob:write permission 458func (h *XRPCHandler) requireBlobWriteAccess(next http.Handler) http.Handler { 459 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 460 _, err := pds.ValidateBlobWriteAccess(r, h.pds, h.httpClient) 461 if err != nil { 462 http.Error(w, fmt.Sprintf("authorization failed: %v", err), http.StatusForbidden) 463 return 464 } 465 466 // Validation successful - user has blob:write permission 467 // No need to store user in context since handlers don't need it 468 next.ServeHTTP(w, r) 469 }) 470}