A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go

fix all the places where did used to be an endpoint

evan.jarrett.net 80b65ee6 606c8a84

verified
+169 -185
+6 -4
.env.appview.example
··· 26 # Storage Configuration 27 # ============================================================================== 28 29 - # Default hold service endpoint for users without their own storage (REQUIRED) 30 # Users with a sailor profile defaultHold setting will override this 31 - # Docker: Use container name (http://atcr-hold:8080) 32 - # Local dev: Use localhost (http://127.0.0.1:8080) 33 - ATCR_DEFAULT_HOLD=http://127.0.0.1:8080 34 35 # ============================================================================== 36 # Authentication Configuration
··· 26 # Storage Configuration 27 # ============================================================================== 28 29 + # Default hold service DID for users without their own storage (REQUIRED) 30 # Users with a sailor profile defaultHold setting will override this 31 + # Format: did:web:hostname[:port] 32 + # Docker: did:web:atcr-hold:8080 33 + # Local dev: did:web:127.0.0.1:8080 34 + # Production: did:web:hold01.atcr.io 35 + ATCR_DEFAULT_HOLD_DID=did:web:127.0.0.1:8080 36 37 # ============================================================================== 38 # Authentication Configuration
+7 -7
cmd/appview/config.go
··· 34 config.Storage = buildStorageConfig() 35 36 // Middleware (ATProto resolver) 37 - defaultHold := os.Getenv("ATCR_DEFAULT_HOLD") 38 - if defaultHold == "" { 39 - return nil, fmt.Errorf("ATCR_DEFAULT_HOLD is required") 40 } 41 - config.Middleware = buildMiddlewareConfig(defaultHold) 42 43 // Auth 44 baseURL := getBaseURL(httpConfig.Addr) ··· 123 } 124 125 // buildMiddlewareConfig creates middleware configuration 126 - func buildMiddlewareConfig(defaultHold string) map[string][]configuration.Middleware { 127 // Check test mode 128 testMode := os.Getenv("TEST_MODE") == "true" 129 ··· 132 { 133 Name: "atproto-resolver", 134 Options: configuration.Parameters{ 135 - "default_storage_endpoint": defaultHold, 136 - "test_mode": testMode, 137 }, 138 }, 139 },
··· 34 config.Storage = buildStorageConfig() 35 36 // Middleware (ATProto resolver) 37 + defaultHoldDID := os.Getenv("ATCR_DEFAULT_HOLD_DID") 38 + if defaultHoldDID == "" { 39 + return nil, fmt.Errorf("ATCR_DEFAULT_HOLD_DID is required") 40 } 41 + config.Middleware = buildMiddlewareConfig(defaultHoldDID) 42 43 // Auth 44 baseURL := getBaseURL(httpConfig.Addr) ··· 123 } 124 125 // buildMiddlewareConfig creates middleware configuration 126 + func buildMiddlewareConfig(defaultHoldDID string) map[string][]configuration.Middleware { 127 // Check test mode 128 testMode := os.Getenv("TEST_MODE") == "true" 129 ··· 132 { 133 Name: "atproto-resolver", 134 Options: configuration.Parameters{ 135 + "default_hold_did": defaultHoldDID, 136 + "test_mode": testMode, 137 }, 138 }, 139 },
+16 -20
cmd/appview/serve.go
··· 20 "github.com/spf13/cobra" 21 22 "atcr.io/pkg/appview/middleware" 23 - "atcr.io/pkg/atproto" 24 "atcr.io/pkg/auth" 25 "atcr.io/pkg/auth/oauth" 26 "atcr.io/pkg/auth/token" ··· 110 // Initialize OAuth components 111 fmt.Println("Initializing OAuth components...") 112 113 - // 1. Create OAuth session storage (SQLite-backed) 114 oauthStore := db.NewOAuthStore(uiDatabase) 115 fmt.Println("Using SQLite for OAuth session storage") 116 117 - // 2. Create device store (SQLite-backed) 118 deviceStore := db.NewDeviceStore(uiDatabase) 119 fmt.Println("Using SQLite for device storage") 120 121 - // 3. Get base URL from config or environment 122 baseURL := os.Getenv("ATCR_BASE_URL") 123 if baseURL == "" { 124 // If addr is just a port (e.g., ":5000"), prepend localhost ··· 132 133 fmt.Printf("DEBUG: Base URL for OAuth: %s\n", baseURL) 134 135 - // 4. Create OAuth app (indigo client) 136 oauthApp, err := oauth.NewApp(baseURL, oauthStore) 137 if err != nil { 138 return fmt.Errorf("failed to create OAuth app: %w", err) 139 } 140 fmt.Println("Using full OAuth scopes (including blob: scope)") 141 142 - // 5. Create refresher 143 refresher := oauth.NewRefresher(oauthApp) 144 145 - // 6. Set global refresher for middleware 146 middleware.SetGlobalRefresher(refresher) 147 148 - // 6.5. Set global database for pull/push metrics tracking 149 metricsDB := db.NewMetricsDB(uiDatabase) 150 middleware.SetGlobalDatabase(metricsDB) 151 152 - // 6.6. Create RemoteHoldAuthorizer for hold authorization with caching 153 holdAuthorizer := auth.NewRemoteHoldAuthorizer(uiDatabase) 154 middleware.SetGlobalAuthorizer(holdAuthorizer) 155 fmt.Println("Hold authorizer initialized with database caching") ··· 161 // The extraction function normalizes URLs to DIDs for consistency 162 defaultHoldDID := extractDefaultHoldDID(config) 163 164 - // 7. Initialize UI routes with OAuth app, refresher, and device store 165 uiTemplates, uiRouter := initializeUIRoutes(uiDatabase, uiReadOnlyDB, uiSessionStore, oauthApp, refresher, baseURL, deviceStore, defaultHoldDID) 166 167 - // 8. Create OAuth server 168 oauthServer := oauth.NewServer(oauthApp) 169 // Connect server to refresher for cache invalidation 170 oauthServer.SetRefresher(refresher) ··· 175 // Connect database for user avatar management 176 oauthServer.SetDatabase(uiDatabase) 177 178 - // 8.5. Set default hold DID on OAuth server (extracted earlier) 179 // This is used to create sailor profiles on first login 180 if defaultHoldDID != "" { 181 oauthServer.SetDefaultHoldDID(defaultHoldDID) 182 fmt.Printf("OAuth server will create profiles with default hold: %s\n", defaultHoldDID) 183 } 184 185 - // 9. Initialize auth keys and create token issuer 186 var issuer *token.Issuer 187 if config.Auth["token"] != nil { 188 if err := initializeAuthKeys(config); err != nil { ··· 365 } 366 367 // extractDefaultHoldDID extracts the default hold DID from middleware config 368 - // Returns a DID (e.g., "did:web:hold01.atcr.io") for consistency 369 - // Accepts both DIDs and URLs in config for backward compatibility 370 // To find a hold's DID, visit: https://hold-url/.well-known/did.json 371 func extractDefaultHoldDID(config *configuration.Configuration) string { 372 - // Navigate through: middleware.registry[].options.default_storage_endpoint 373 registryMiddleware, ok := config.Middleware["registry"] 374 if !ok { 375 return "" ··· 384 385 // Extract options - options is configuration.Parameters which is map[string]any 386 if mw.Options != nil { 387 - if endpoint, ok := mw.Options["default_storage_endpoint"].(string); ok { 388 - // Normalize to DID (handles both URLs and DIDs) 389 - // This ensures we store DIDs consistently 390 - return atproto.ResolveHoldDIDFromURL(endpoint) 391 } 392 } 393 }
··· 20 "github.com/spf13/cobra" 21 22 "atcr.io/pkg/appview/middleware" 23 "atcr.io/pkg/auth" 24 "atcr.io/pkg/auth/oauth" 25 "atcr.io/pkg/auth/token" ··· 109 // Initialize OAuth components 110 fmt.Println("Initializing OAuth components...") 111 112 + // Create OAuth session storage (SQLite-backed) 113 oauthStore := db.NewOAuthStore(uiDatabase) 114 fmt.Println("Using SQLite for OAuth session storage") 115 116 + // Create device store (SQLite-backed) 117 deviceStore := db.NewDeviceStore(uiDatabase) 118 fmt.Println("Using SQLite for device storage") 119 120 + // Get base URL from config or environment 121 baseURL := os.Getenv("ATCR_BASE_URL") 122 if baseURL == "" { 123 // If addr is just a port (e.g., ":5000"), prepend localhost ··· 131 132 fmt.Printf("DEBUG: Base URL for OAuth: %s\n", baseURL) 133 134 + // Create OAuth app (indigo client) 135 oauthApp, err := oauth.NewApp(baseURL, oauthStore) 136 if err != nil { 137 return fmt.Errorf("failed to create OAuth app: %w", err) 138 } 139 fmt.Println("Using full OAuth scopes (including blob: scope)") 140 141 + // Create oauth token refresher 142 refresher := oauth.NewRefresher(oauthApp) 143 144 + // Set global refresher for middleware 145 middleware.SetGlobalRefresher(refresher) 146 147 + // Set global database for pull/push metrics tracking 148 metricsDB := db.NewMetricsDB(uiDatabase) 149 middleware.SetGlobalDatabase(metricsDB) 150 151 + // Create RemoteHoldAuthorizer for hold authorization with caching 152 holdAuthorizer := auth.NewRemoteHoldAuthorizer(uiDatabase) 153 middleware.SetGlobalAuthorizer(holdAuthorizer) 154 fmt.Println("Hold authorizer initialized with database caching") ··· 160 // The extraction function normalizes URLs to DIDs for consistency 161 defaultHoldDID := extractDefaultHoldDID(config) 162 163 + // Initialize UI routes with OAuth app, refresher, and device store 164 uiTemplates, uiRouter := initializeUIRoutes(uiDatabase, uiReadOnlyDB, uiSessionStore, oauthApp, refresher, baseURL, deviceStore, defaultHoldDID) 165 166 + // Create OAuth server 167 oauthServer := oauth.NewServer(oauthApp) 168 // Connect server to refresher for cache invalidation 169 oauthServer.SetRefresher(refresher) ··· 174 // Connect database for user avatar management 175 oauthServer.SetDatabase(uiDatabase) 176 177 + // Set default hold DID on OAuth server (extracted earlier) 178 // This is used to create sailor profiles on first login 179 if defaultHoldDID != "" { 180 oauthServer.SetDefaultHoldDID(defaultHoldDID) 181 fmt.Printf("OAuth server will create profiles with default hold: %s\n", defaultHoldDID) 182 } 183 184 + // Initialize auth keys and create token issuer 185 var issuer *token.Issuer 186 if config.Auth["token"] != nil { 187 if err := initializeAuthKeys(config); err != nil { ··· 364 } 365 366 // extractDefaultHoldDID extracts the default hold DID from middleware config 367 + // Returns a DID (e.g., "did:web:hold01.atcr.io") 368 // To find a hold's DID, visit: https://hold-url/.well-known/did.json 369 func extractDefaultHoldDID(config *configuration.Configuration) string { 370 + // Navigate through: middleware.registry[].options.default_hold_did 371 registryMiddleware, ok := config.Middleware["registry"] 372 if !ok { 373 return "" ··· 382 383 // Extract options - options is configuration.Parameters which is map[string]any 384 if mw.Options != nil { 385 + if holdDID, ok := mw.Options["default_hold_did"].(string); ok { 386 + return holdDID 387 } 388 } 389 }
+7
deploy/.env.prod.template
··· 126 # AppView Configuration 127 # ============================================================================== 128 129 # JWT token expiration in seconds 130 # Default: 300 (5 minutes) 131 ATCR_TOKEN_EXPIRATION=300
··· 126 # AppView Configuration 127 # ============================================================================== 128 129 + # Default hold service DID (REQUIRED) 130 + # This is automatically set by docker-compose.prod.yml to did:web:${HOLD_DOMAIN} 131 + # Only override this if you want to use a different default hold 132 + # Format: did:web:hostname[:port] 133 + # Example: did:web:hold01.atcr.io 134 + # Note: This is set automatically - no need to configure manually 135 + 136 # JWT token expiration in seconds 137 # Default: 300 (5 minutes) 138 ATCR_TOKEN_EXPIRATION=300
+1 -1
deploy/docker-compose.prod.yml
··· 51 ATCR_SERVICE_NAME: ${APPVIEW_DOMAIN:-atcr.io} 52 53 # Storage configuration 54 - ATCR_DEFAULT_HOLD: https://${HOLD_DOMAIN:-hold01.atcr.io} 55 56 # Authentication 57 ATCR_AUTH_KEY_PATH: /var/lib/atcr/auth/private-key.pem
··· 51 ATCR_SERVICE_NAME: ${APPVIEW_DOMAIN:-atcr.io} 52 53 # Storage configuration 54 + ATCR_DEFAULT_HOLD_DID: did:web:${HOLD_DOMAIN:-hold01.atcr.io} 55 56 # Authentication 57 ATCR_AUTH_KEY_PATH: /var/lib/atcr/auth/private-key.pem
+1 -1
docker-compose.yml
··· 13 environment: 14 # Server configuration 15 ATCR_HTTP_ADDR: :5000 16 - ATCR_DEFAULT_HOLD: http://172.28.0.3:8080 17 # UI configuration 18 ATCR_UI_ENABLED: true 19 ATCR_BACKFILL_ENABLED: true
··· 13 environment: 14 # Server configuration 15 ATCR_HTTP_ADDR: :5000 16 + ATCR_DEFAULT_HOLD_DID: did:web:172.28.0.3:8080 17 # UI configuration 18 ATCR_UI_ENABLED: true 19 ATCR_BACKFILL_ENABLED: true
+31 -33
pkg/appview/middleware/registry.go
··· 58 // NamespaceResolver wraps a namespace and resolves names 59 type NamespaceResolver struct { 60 distribution.Namespace 61 - directory identity.Directory 62 - defaultStorageEndpoint string 63 - testMode bool // If true, fallback to default hold when user's hold is unreachable 64 - repositories sync.Map // Cache of RoutingRepository instances by key (did:reponame) 65 } 66 67 // initATProtoResolver initializes the name resolution middleware ··· 69 // Use indigo's default directory (includes caching) 70 directory := identity.DefaultDirectory() 71 72 - // Get default storage endpoint from config (optional) 73 - // Normalize to DID format for consistency 74 - defaultStorageEndpoint := "" 75 - if endpoint, ok := options["default_storage_endpoint"].(string); ok { 76 - // Convert URL to DID if needed (or pass through if already a DID) 77 - defaultStorageEndpoint = atproto.ResolveHoldDIDFromURL(endpoint) 78 } 79 80 // Check test mode from options (passed via env var) ··· 84 } 85 86 return &NamespaceResolver{ 87 - Namespace: ns, 88 - directory: directory, 89 - defaultStorageEndpoint: defaultStorageEndpoint, 90 - testMode: testMode, 91 }, nil 92 } 93 ··· 128 129 fmt.Printf("DEBUG [registry/middleware]: Resolved identity: did=%s, pds=%s, handle=%s\n", did, pdsEndpoint, ident.Handle.String()) 130 131 - // Query for storage endpoint - either user's hold or default hold service 132 - storageEndpoint := nr.findStorageEndpoint(ctx, did, pdsEndpoint) 133 - if storageEndpoint == "" { 134 // This is a fatal configuration error - registry cannot function without a hold service 135 - return nil, fmt.Errorf("no storage endpoint configured: ensure default_storage_endpoint is set in middleware config") 136 } 137 - ctx = context.WithValue(ctx, "storage.endpoint", storageEndpoint) 138 139 // Create a new reference with identity/image format 140 // Use the identity (or DID) as the namespace to ensure canonical format ··· 195 196 // Create routing repository - routes manifests to ATProto, blobs to hold service 197 // The registry is stateless - no local storage is used 198 - // Pass storage endpoint, DID, and authorizer as parameters (can't use context as it gets lost) 199 - routingRepo := storage.NewRoutingRepository(repo, atprotoClient, repositoryName, storageEndpoint, did, globalDatabase, globalAuthorizer) 200 201 // Cache the repository 202 nr.repositories.Store(cacheKey, routingRepo) ··· 219 return nr.Namespace.BlobStatter() 220 } 221 222 - // findStorageEndpoint determines which hold endpoint to use for blob storage 223 // Priority order: 224 // 1. User's sailor profile defaultHold (if set) 225 // 2. User's own hold record (io.atcr.hold) 226 - // 3. AppView's default hold endpoint 227 // Returns a hold DID (e.g., "did:web:hold01.atcr.io"), or empty string if none configured 228 - // Note: Despite returning a DID, this is used as the "storage endpoint" throughout the code 229 - func (nr *NamespaceResolver) findStorageEndpoint(ctx context.Context, did, pdsEndpoint string) string { 230 // Create ATProto client (without auth - reading public records) 231 client := atproto.NewClient(pdsEndpoint, did, "") 232 233 - // 1. Check for sailor profile 234 profile, err := atproto.GetProfile(ctx, client) 235 if err != nil { 236 // Error reading profile (not a 404) - log and continue ··· 245 return profile.DefaultHold 246 } 247 fmt.Printf("DEBUG [registry/middleware/testmode]: User's defaultHold %s unreachable, falling back to default\n", profile.DefaultHold) 248 - return nr.defaultStorageEndpoint 249 } 250 return profile.DefaultHold 251 } 252 253 - // 2. Profile doesn't exist or defaultHold is null/empty 254 // Check for user's own hold records 255 records, err := client.ListRecords(ctx, atproto.HoldCollection, 10) 256 if err != nil { 257 // Failed to query holds, use default 258 - return nr.defaultStorageEndpoint 259 } 260 261 // Find the first hold record ··· 265 continue 266 } 267 268 - // Return the endpoint from the first hold 269 if holdRecord.Endpoint != "" { 270 - return holdRecord.Endpoint 271 } 272 } 273 274 - // 3. No profile defaultHold and no own hold records - use AppView default 275 - return nr.defaultStorageEndpoint 276 } 277 278 // isHoldReachable checks if a hold service is reachable
··· 58 // NamespaceResolver wraps a namespace and resolves names 59 type NamespaceResolver struct { 60 distribution.Namespace 61 + directory identity.Directory 62 + defaultHoldDID string // Default hold DID (e.g., "did:web:hold01.atcr.io") 63 + testMode bool // If true, fallback to default hold when user's hold is unreachable 64 + repositories sync.Map // Cache of RoutingRepository instances by key (did:reponame) 65 } 66 67 // initATProtoResolver initializes the name resolution middleware ··· 69 // Use indigo's default directory (includes caching) 70 directory := identity.DefaultDirectory() 71 72 + // Get default hold DID from config (required) 73 + // Expected format: "did:web:hold01.atcr.io" 74 + defaultHoldDID := "" 75 + if holdDID, ok := options["default_hold_did"].(string); ok { 76 + defaultHoldDID = holdDID 77 } 78 79 // Check test mode from options (passed via env var) ··· 83 } 84 85 return &NamespaceResolver{ 86 + Namespace: ns, 87 + directory: directory, 88 + defaultHoldDID: defaultHoldDID, 89 + testMode: testMode, 90 }, nil 91 } 92 ··· 127 128 fmt.Printf("DEBUG [registry/middleware]: Resolved identity: did=%s, pds=%s, handle=%s\n", did, pdsEndpoint, ident.Handle.String()) 129 130 + // Query for hold DID - either user's hold or default hold service 131 + holdDID := nr.findHoldDID(ctx, did, pdsEndpoint) 132 + if holdDID == "" { 133 // This is a fatal configuration error - registry cannot function without a hold service 134 + return nil, fmt.Errorf("no hold DID configured: ensure default_hold_did is set in middleware config") 135 } 136 + ctx = context.WithValue(ctx, "hold.did", holdDID) 137 138 // Create a new reference with identity/image format 139 // Use the identity (or DID) as the namespace to ensure canonical format ··· 194 195 // Create routing repository - routes manifests to ATProto, blobs to hold service 196 // The registry is stateless - no local storage is used 197 + // Pass hold DID, user DID, and authorizer as parameters (can't use context as it gets lost) 198 + routingRepo := storage.NewRoutingRepository(repo, atprotoClient, repositoryName, holdDID, did, globalDatabase, globalAuthorizer) 199 200 // Cache the repository 201 nr.repositories.Store(cacheKey, routingRepo) ··· 218 return nr.Namespace.BlobStatter() 219 } 220 221 + // findHoldDID determines which hold DID to use for blob storage 222 // Priority order: 223 // 1. User's sailor profile defaultHold (if set) 224 // 2. User's own hold record (io.atcr.hold) 225 + // 3. AppView's default hold DID 226 // Returns a hold DID (e.g., "did:web:hold01.atcr.io"), or empty string if none configured 227 + func (nr *NamespaceResolver) findHoldDID(ctx context.Context, did, pdsEndpoint string) string { 228 // Create ATProto client (without auth - reading public records) 229 client := atproto.NewClient(pdsEndpoint, did, "") 230 231 + // Check for sailor profile 232 profile, err := atproto.GetProfile(ctx, client) 233 if err != nil { 234 // Error reading profile (not a 404) - log and continue ··· 243 return profile.DefaultHold 244 } 245 fmt.Printf("DEBUG [registry/middleware/testmode]: User's defaultHold %s unreachable, falling back to default\n", profile.DefaultHold) 246 + return nr.defaultHoldDID 247 } 248 return profile.DefaultHold 249 } 250 251 + // Profile doesn't exist or defaultHold is null/empty 252 // Check for user's own hold records 253 records, err := client.ListRecords(ctx, atproto.HoldCollection, 10) 254 if err != nil { 255 // Failed to query holds, use default 256 + return nr.defaultHoldDID 257 } 258 259 // Find the first hold record ··· 263 continue 264 } 265 266 + // Return the endpoint from the first hold (normalize to DID if URL) 267 if holdRecord.Endpoint != "" { 268 + return atproto.ResolveHoldDIDFromURL(holdRecord.Endpoint) 269 } 270 } 271 272 + // No profile defaultHold and no own hold records - use AppView default 273 + return nr.defaultHoldDID 274 } 275 276 // isHoldReachable checks if a hold service is reachable
+9 -9
pkg/appview/storage/hold_cache.go
··· 5 "time" 6 ) 7 8 - // HoldCache caches hold endpoints for (DID, repository) pairs 9 // This avoids expensive ATProto lookups on every blob request during pulls 10 // 11 // NOTE: This is a simple in-memory cache for MVP. For production deployments: ··· 18 } 19 20 type holdCacheEntry struct { 21 - holdEndpoint string 22 - expiresAt time.Time 23 } 24 25 var globalHoldCache = &HoldCache{ ··· 42 return globalHoldCache 43 } 44 45 - // Set stores a hold endpoint for a (DID, repository) pair with a TTL 46 - func (c *HoldCache) Set(did, repository, holdEndpoint string, ttl time.Duration) { 47 c.mu.Lock() 48 defer c.mu.Unlock() 49 50 key := did + ":" + repository 51 c.cache[key] = &holdCacheEntry{ 52 - holdEndpoint: holdEndpoint, 53 - expiresAt: time.Now().Add(ttl), 54 } 55 } 56 57 - // Get retrieves a hold endpoint for a (DID, repository) pair 58 // Returns empty string and false if not found or expired 59 func (c *HoldCache) Get(did, repository string) (string, bool) { 60 c.mu.RLock() ··· 72 return "", false 73 } 74 75 - return entry.holdEndpoint, true 76 } 77 78 // Cleanup removes expired entries (called automatically every 5 minutes)
··· 5 "time" 6 ) 7 8 + // HoldCache caches hold DIDs for (DID, repository) pairs 9 // This avoids expensive ATProto lookups on every blob request during pulls 10 // 11 // NOTE: This is a simple in-memory cache for MVP. For production deployments: ··· 18 } 19 20 type holdCacheEntry struct { 21 + holdDID string 22 + expiresAt time.Time 23 } 24 25 var globalHoldCache = &HoldCache{ ··· 42 return globalHoldCache 43 } 44 45 + // Set stores a hold DID for a (DID, repository) pair with a TTL 46 + func (c *HoldCache) Set(did, repository, holdDID string, ttl time.Duration) { 47 c.mu.Lock() 48 defer c.mu.Unlock() 49 50 key := did + ":" + repository 51 c.cache[key] = &holdCacheEntry{ 52 + holdDID: holdDID, 53 + expiresAt: time.Now().Add(ttl), 54 } 55 } 56 57 + // Get retrieves a hold DID for a (DID, repository) pair 58 // Returns empty string and false if not found or expired 59 func (c *HoldCache) Get(did, repository string) (string, bool) { 60 c.mu.RLock() ··· 72 return "", false 73 } 74 75 + return entry.holdDID, true 76 } 77 78 // Cleanup removes expired entries (called automatically every 5 minutes)
+39 -21
pkg/appview/storage/proxy_blob_store.go
··· 7 "fmt" 8 "io" 9 "net/http" 10 "sync" 11 "time" 12 13 - "atcr.io/pkg/atproto" 14 "atcr.io/pkg/auth" 15 "github.com/distribution/distribution/v3" 16 "github.com/opencontainers/go-digest" ··· 31 32 // ProxyBlobStore proxies blob requests to an external storage service 33 type ProxyBlobStore struct { 34 - storageEndpoint string 35 - httpClient *http.Client 36 - did string 37 - database DatabaseMetrics 38 - repository string 39 - authorizer auth.HoldAuthorizer 40 - holdDID string 41 } 42 43 // NewProxyBlobStore creates a new proxy blob store 44 - func NewProxyBlobStore(storageEndpoint, did string, database DatabaseMetrics, repository string, authorizer auth.HoldAuthorizer) *ProxyBlobStore { 45 - // Convert storage endpoint URL to did:web DID for authorization 46 - holdDID := atproto.ResolveHoldDIDFromURL(storageEndpoint) 47 - fmt.Printf("DEBUG [proxy_blob_store]: NewProxyBlobStore created with endpoint=%s, holdDID=%s, userDID=%s, repo=%s\n", 48 - storageEndpoint, holdDID, did, repository) 49 50 return &ProxyBlobStore{ 51 - storageEndpoint: storageEndpoint, 52 httpClient: &http.Client{ 53 Timeout: 5 * time.Minute, // Timeout for presigned URL requests and uploads 54 Transport: &http.Transport{ ··· 63 database: database, 64 repository: repository, 65 authorizer: authorizer, 66 - holdDID: holdDID, 67 } 68 } 69 70 // checkReadAccess verifies the user has read access to the hold ··· 347 // Use XRPC endpoint: GET /xrpc/com.atproto.sync.getBlob?did={holdDID}&cid={digest} 348 // Per migration doc: hold accepts OCI digest directly as cid parameter (checks for sha256: prefix) 349 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", 350 - p.storageEndpoint, p.holdDID, dgst.String()) 351 return url, nil 352 } 353 ··· 356 func (p *ProxyBlobStore) getHeadURL(ctx context.Context, dgst digest.Digest) (string, error) { 357 // Same as GET - hold service handles HEAD method on getBlob endpoint 358 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", 359 - p.storageEndpoint, p.holdDID, dgst.String()) 360 return url, nil 361 } 362 ··· 378 return "", err 379 } 380 381 - url := fmt.Sprintf("%s/xrpc/com.atproto.repo.uploadBlob", p.storageEndpoint) 382 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 383 if err != nil { 384 return "", err ··· 428 return nil, err 429 } 430 431 - url := fmt.Sprintf("%s/xrpc/com.atproto.repo.uploadBlob", p.storageEndpoint) 432 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 433 if err != nil { 434 return nil, err ··· 478 return err 479 } 480 481 - url := fmt.Sprintf("%s/xrpc/com.atproto.repo.uploadBlob", p.storageEndpoint) 482 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 483 if err != nil { 484 return err ··· 512 return err 513 } 514 515 - url := fmt.Sprintf("%s/xrpc/com.atproto.repo.uploadBlob", p.storageEndpoint) 516 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 517 if err != nil { 518 return err
··· 7 "fmt" 8 "io" 9 "net/http" 10 + "strings" 11 "sync" 12 "time" 13 14 "atcr.io/pkg/auth" 15 "github.com/distribution/distribution/v3" 16 "github.com/opencontainers/go-digest" ··· 31 32 // ProxyBlobStore proxies blob requests to an external storage service 33 type ProxyBlobStore struct { 34 + holdDID string // Hold DID (e.g., "did:web:hold01.atcr.io") 35 + holdURL string // Resolved HTTP URL for XRPC requests 36 + httpClient *http.Client 37 + did string 38 + database DatabaseMetrics 39 + repository string 40 + authorizer auth.HoldAuthorizer 41 } 42 43 // NewProxyBlobStore creates a new proxy blob store 44 + func NewProxyBlobStore(holdDID, did string, database DatabaseMetrics, repository string, authorizer auth.HoldAuthorizer) *ProxyBlobStore { 45 + // Resolve DID to URL once at construction time 46 + holdURL := resolveHoldURL(holdDID) 47 + 48 + fmt.Printf("DEBUG [proxy_blob_store]: NewProxyBlobStore created with holdDID=%s, holdURL=%s, userDID=%s, repo=%s\n", 49 + holdDID, holdURL, did, repository) 50 51 return &ProxyBlobStore{ 52 + holdDID: holdDID, 53 + holdURL: holdURL, 54 httpClient: &http.Client{ 55 Timeout: 5 * time.Minute, // Timeout for presigned URL requests and uploads 56 Transport: &http.Transport{ ··· 65 database: database, 66 repository: repository, 67 authorizer: authorizer, 68 } 69 + } 70 + 71 + // resolveHoldURL converts a hold DID to an HTTP URL for XRPC requests 72 + // did:web:hold01.atcr.io → https://hold01.atcr.io 73 + // did:web:172.28.0.3:8080 → http://172.28.0.3:8080 74 + func resolveHoldURL(holdDID string) string { 75 + hostname := strings.TrimPrefix(holdDID, "did:web:") 76 + 77 + // Use HTTP for localhost/IP addresses with ports, HTTPS for domains 78 + if strings.Contains(hostname, ":") || 79 + strings.Contains(hostname, "127.0.0.1") || 80 + strings.Contains(hostname, "localhost") || 81 + // Check if it's an IP address (contains only digits and dots) 82 + (len(hostname) > 0 && (hostname[0] >= '0' && hostname[0] <= '9')) { 83 + return "http://" + hostname 84 + } 85 + return "https://" + hostname 86 } 87 88 // checkReadAccess verifies the user has read access to the hold ··· 365 // Use XRPC endpoint: GET /xrpc/com.atproto.sync.getBlob?did={holdDID}&cid={digest} 366 // Per migration doc: hold accepts OCI digest directly as cid parameter (checks for sha256: prefix) 367 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", 368 + p.holdURL, p.holdDID, dgst.String()) 369 return url, nil 370 } 371 ··· 374 func (p *ProxyBlobStore) getHeadURL(ctx context.Context, dgst digest.Digest) (string, error) { 375 // Same as GET - hold service handles HEAD method on getBlob endpoint 376 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", 377 + p.holdURL, p.holdDID, dgst.String()) 378 return url, nil 379 } 380 ··· 396 return "", err 397 } 398 399 + url := fmt.Sprintf("%s/xrpc/com.atproto.repo.uploadBlob", p.holdURL) 400 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 401 if err != nil { 402 return "", err ··· 446 return nil, err 447 } 448 449 + url := fmt.Sprintf("%s/xrpc/com.atproto.repo.uploadBlob", p.holdURL) 450 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 451 if err != nil { 452 return nil, err ··· 496 return err 497 } 498 499 + url := fmt.Sprintf("%s/xrpc/com.atproto.repo.uploadBlob", p.holdURL) 500 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 501 if err != nil { 502 return err ··· 530 return err 531 } 532 533 + url := fmt.Sprintf("%s/xrpc/com.atproto.repo.uploadBlob", p.holdURL) 534 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 535 if err != nil { 536 return err
+35 -36
pkg/appview/storage/routing_repository.go
··· 20 // The registry (AppView) is stateless and NEVER stores blobs locally 21 type RoutingRepository struct { 22 distribution.Repository 23 - atprotoClient *atproto.Client 24 - repositoryName string 25 - storageEndpoint string // Hold service endpoint for blobs (from discovery for push) 26 - did string // User's DID for authorization 27 - manifestStore *atproto.ManifestStore // Cached manifest store instance 28 - blobStore *ProxyBlobStore // Cached blob store instance 29 - database DatabaseMetrics // Database for metrics tracking 30 - authorizer auth.HoldAuthorizer // Authorization for hold access 31 } 32 33 // NewRoutingRepository creates a new routing repository ··· 35 baseRepo distribution.Repository, 36 atprotoClient *atproto.Client, 37 repoName string, 38 - storageEndpoint string, 39 did string, 40 database DatabaseMetrics, 41 authorizer auth.HoldAuthorizer, 42 ) *RoutingRepository { 43 return &RoutingRepository{ 44 - Repository: baseRepo, 45 - atprotoClient: atprotoClient, 46 - repositoryName: repoName, 47 - storageEndpoint: storageEndpoint, 48 - did: did, 49 - database: database, 50 - authorizer: authorizer, 51 } 52 } 53 ··· 58 // Ensure blob store is created first (needed for label extraction during push) 59 blobStore := r.Blobs(ctx) 60 61 - // Resolve hold endpoint URL to DID 62 - holdDID := atproto.ResolveHoldDIDFromURL(r.storageEndpoint) 63 - 64 - r.manifestStore = atproto.NewManifestStore(r.atprotoClient, r.repositoryName, r.storageEndpoint, holdDID, r.did, blobStore, r.database) 65 } 66 67 - // After any manifest operation, cache the hold endpoint for blob fetches 68 // We use a goroutine to avoid blocking, and check after a short delay to allow the operation to complete 69 go func() { 70 time.Sleep(100 * time.Millisecond) // Brief delay to let manifest fetch complete 71 - if holdEndpoint := r.manifestStore.GetLastFetchedHoldEndpoint(); holdEndpoint != "" { 72 // Cache for 10 minutes - should cover typical pull operations 73 - GetGlobalHoldCache().Set(r.did, r.repositoryName, holdEndpoint, 10*time.Minute) 74 - fmt.Printf("DEBUG [storage/routing]: Cached hold endpoint: did=%s, repo=%s, hold=%s\n", 75 - r.did, r.repositoryName, holdEndpoint) 76 } 77 }() 78 ··· 89 return r.blobStore 90 } 91 92 - // For pull operations, check if we have a cached hold endpoint from a recent manifest fetch 93 // This ensures blobs are fetched from the hold recorded in the manifest, not re-discovered 94 - holdEndpoint := r.storageEndpoint // Default to discovery-based endpoint 95 96 - if cachedHold, ok := GetGlobalHoldCache().Get(r.did, r.repositoryName); ok { 97 - // Use cached hold from manifest 98 - holdEndpoint = cachedHold 99 fmt.Printf("DEBUG [storage/blobs]: Using cached hold from manifest: did=%s, repo=%s, hold=%s\n", 100 - r.did, r.repositoryName, cachedHold) 101 } else { 102 - // No cached hold, use discovery-based endpoint (for push or first pull) 103 fmt.Printf("DEBUG [storage/blobs]: Using discovery-based hold: did=%s, repo=%s, hold=%s\n", 104 - r.did, r.repositoryName, holdEndpoint) 105 } 106 107 - if holdEndpoint == "" { 108 // This should never happen if middleware is configured correctly 109 - panic("storage endpoint not set in RoutingRepository - ensure default_storage_endpoint is configured in middleware") 110 } 111 112 // Create and cache proxy blob store with authorization 113 - r.blobStore = NewProxyBlobStore(holdEndpoint, r.did, r.database, r.repositoryName, r.authorizer) 114 return r.blobStore 115 } 116
··· 20 // The registry (AppView) is stateless and NEVER stores blobs locally 21 type RoutingRepository struct { 22 distribution.Repository 23 + atprotoClient *atproto.Client 24 + repositoryName string 25 + holdDID string // Hold service DID for blobs (from discovery for push), e.g., "did:web:hold01.atcr.io" 26 + did string // User's DID for authorization 27 + manifestStore *atproto.ManifestStore // Cached manifest store instance 28 + blobStore *ProxyBlobStore // Cached blob store instance 29 + database DatabaseMetrics // Database for metrics tracking 30 + authorizer auth.HoldAuthorizer // Authorization for hold access 31 } 32 33 // NewRoutingRepository creates a new routing repository ··· 35 baseRepo distribution.Repository, 36 atprotoClient *atproto.Client, 37 repoName string, 38 + holdDID string, 39 did string, 40 database DatabaseMetrics, 41 authorizer auth.HoldAuthorizer, 42 ) *RoutingRepository { 43 return &RoutingRepository{ 44 + Repository: baseRepo, 45 + atprotoClient: atprotoClient, 46 + repositoryName: repoName, 47 + holdDID: holdDID, 48 + did: did, 49 + database: database, 50 + authorizer: authorizer, 51 } 52 } 53 ··· 58 // Ensure blob store is created first (needed for label extraction during push) 59 blobStore := r.Blobs(ctx) 60 61 + // ManifestStore needs both DID and URL for backward compat (legacy holdEndpoint field) 62 + // For now, pass holdDID twice (will be cleaned up in manifest_store.go later) 63 + r.manifestStore = atproto.NewManifestStore(r.atprotoClient, r.repositoryName, r.holdDID, r.holdDID, r.did, blobStore, r.database) 64 } 65 66 + // After any manifest operation, cache the hold DID for blob fetches 67 // We use a goroutine to avoid blocking, and check after a short delay to allow the operation to complete 68 go func() { 69 time.Sleep(100 * time.Millisecond) // Brief delay to let manifest fetch complete 70 + if holdDID := r.manifestStore.GetLastFetchedHoldDID(); holdDID != "" { 71 // Cache for 10 minutes - should cover typical pull operations 72 + GetGlobalHoldCache().Set(r.did, r.repositoryName, holdDID, 10*time.Minute) 73 + fmt.Printf("DEBUG [storage/routing]: Cached hold DID: did=%s, repo=%s, hold=%s\n", 74 + r.did, r.repositoryName, holdDID) 75 } 76 }() 77 ··· 88 return r.blobStore 89 } 90 91 + // For pull operations, check if we have a cached hold DID from a recent manifest fetch 92 // This ensures blobs are fetched from the hold recorded in the manifest, not re-discovered 93 + holdDID := r.holdDID // Default to discovery-based DID 94 95 + if cachedHoldDID, ok := GetGlobalHoldCache().Get(r.did, r.repositoryName); ok { 96 + // Use cached hold DID from manifest 97 + holdDID = cachedHoldDID 98 fmt.Printf("DEBUG [storage/blobs]: Using cached hold from manifest: did=%s, repo=%s, hold=%s\n", 99 + r.did, r.repositoryName, cachedHoldDID) 100 } else { 101 + // No cached hold, use discovery-based DID (for push or first pull) 102 fmt.Printf("DEBUG [storage/blobs]: Using discovery-based hold: did=%s, repo=%s, hold=%s\n", 103 + r.did, r.repositoryName, holdDID) 104 } 105 106 + if holdDID == "" { 107 // This should never happen if middleware is configured correctly 108 + panic("hold DID not set in RoutingRepository - ensure default_hold_did is configured in middleware") 109 } 110 111 // Create and cache proxy blob store with authorization 112 + r.blobStore = NewProxyBlobStore(holdDID, r.did, r.database, r.repositoryName, r.authorizer) 113 return r.blobStore 114 } 115
+16 -35
pkg/atproto/manifest_store.go
··· 21 // ManifestStore implements distribution.ManifestService 22 // It stores manifests in ATProto as records 23 type ManifestStore struct { 24 - client *Client 25 - repository string 26 - holdEndpoint string // Hold service endpoint URL (for legacy, to be deprecated) 27 - holdDID string // Hold service DID (primary reference) 28 - did string // User's DID for cache key 29 - lastFetchedHoldEndpoint string // Hold endpoint from most recently fetched manifest (for pull) 30 - blobStore distribution.BlobStore // Blob store for fetching config during push 31 - database DatabaseMetrics // Database for metrics tracking 32 } 33 34 // NewManifestStore creates a new ATProto-backed manifest store ··· 74 return nil, fmt.Errorf("failed to unmarshal manifest record: %w", err) 75 } 76 77 - // Store the hold endpoint for subsequent blob requests during pull 78 // Prefer HoldDID (new format) with fallback to HoldEndpoint (legacy URL format) 79 // The routing repository will cache this for concurrent blob fetches 80 if manifestRecord.HoldDID != "" { 81 - // New format: DID reference 82 - // Convert did:web back to URL for blob fetching 83 - // TODO: Routing repository should handle DID→URL conversion 84 - // For now, fall back to HoldEndpoint if available 85 - if manifestRecord.HoldEndpoint != "" { 86 - s.lastFetchedHoldEndpoint = manifestRecord.HoldEndpoint 87 - } else { 88 - // Convert did:web:hold.example.com → https://hold.example.com 89 - s.lastFetchedHoldEndpoint = didToURL(manifestRecord.HoldDID) 90 - } 91 } else if manifestRecord.HoldEndpoint != "" { 92 - // Legacy format: URL reference 93 - s.lastFetchedHoldEndpoint = manifestRecord.HoldEndpoint 94 } 95 96 var ociManifest []byte ··· 246 return repository, tag 247 } 248 249 - // GetLastFetchedHoldEndpoint returns the hold endpoint from the most recently fetched manifest 250 // This is used by the routing repository to cache the hold for blob requests 251 - func (s *ManifestStore) GetLastFetchedHoldEndpoint() string { 252 - return s.lastFetchedHoldEndpoint 253 } 254 255 // rawManifest is a simple implementation of distribution.Manifest ··· 294 295 return configJSON.Config.Labels, nil 296 } 297 - 298 - // didToURL converts a did:web DID to an HTTPS URL 299 - // e.g., did:web:hold.example.com → https://hold.example.com 300 - func didToURL(didWeb string) string { 301 - if !strings.HasPrefix(didWeb, "did:web:") { 302 - return didWeb // Not a did:web, return as-is 303 - } 304 - 305 - hostname := strings.TrimPrefix(didWeb, "did:web:") 306 - return "https://" + hostname 307 - }
··· 21 // ManifestStore implements distribution.ManifestService 22 // It stores manifests in ATProto as records 23 type ManifestStore struct { 24 + client *Client 25 + repository string 26 + holdEndpoint string // Hold service endpoint URL (for legacy, to be deprecated) 27 + holdDID string // Hold service DID (primary reference) 28 + did string // User's DID for cache key 29 + lastFetchedHoldDID string // Hold DID from most recently fetched manifest (for pull) 30 + blobStore distribution.BlobStore // Blob store for fetching config during push 31 + database DatabaseMetrics // Database for metrics tracking 32 } 33 34 // NewManifestStore creates a new ATProto-backed manifest store ··· 74 return nil, fmt.Errorf("failed to unmarshal manifest record: %w", err) 75 } 76 77 + // Store the hold DID for subsequent blob requests during pull 78 // Prefer HoldDID (new format) with fallback to HoldEndpoint (legacy URL format) 79 // The routing repository will cache this for concurrent blob fetches 80 if manifestRecord.HoldDID != "" { 81 + // New format: DID reference (preferred) 82 + s.lastFetchedHoldDID = manifestRecord.HoldDID 83 } else if manifestRecord.HoldEndpoint != "" { 84 + // Legacy format: URL reference - convert to DID 85 + s.lastFetchedHoldDID = ResolveHoldDIDFromURL(manifestRecord.HoldEndpoint) 86 } 87 88 var ociManifest []byte ··· 238 return repository, tag 239 } 240 241 + // GetLastFetchedHoldDID returns the hold DID from the most recently fetched manifest 242 // This is used by the routing repository to cache the hold for blob requests 243 + func (s *ManifestStore) GetLastFetchedHoldDID() string { 244 + return s.lastFetchedHoldDID 245 } 246 247 // rawManifest is a simple implementation of distribution.Manifest ··· 286 287 return configJSON.Config.Labels, nil 288 }
+1 -18
pkg/auth/oauth/server.go
··· 342 fmt.Printf("DEBUG [oauth/server]: Migrating hold URL to DID for %s: %s\n", did, profile.DefaultHold) 343 344 // Resolve URL to DID 345 - holdDID = resolveHoldDIDFromURL(profile.DefaultHold) 346 347 // Update profile with DID 348 profile.DefaultHold = holdDID ··· 362 // For now, crew registration will happen on first push when appview validates access 363 fmt.Printf("DEBUG [oauth/server]: Skipping crew registration for now - will happen on first push. Hold DID: %s\n", holdDID) 364 _ = session // TODO: use session for crew registration 365 - } 366 - 367 - // resolveHoldDIDFromURL converts a hold endpoint URL to a DID 368 - // For did:web holds: https://hold01.atcr.io → did:web:hold01.atcr.io 369 - func resolveHoldDIDFromURL(holdURL string) string { 370 - // Parse URL to get hostname 371 - holdURL = strings.TrimPrefix(holdURL, "http://") 372 - holdURL = strings.TrimPrefix(holdURL, "https://") 373 - holdURL = strings.TrimSuffix(holdURL, "/") 374 - 375 - // Extract hostname (remove path if present) 376 - parts := strings.Split(holdURL, "/") 377 - hostname := parts[0] 378 - 379 - // Convert to did:web 380 - // did:web uses hostname directly (port included if non-standard) 381 - return "did:web:" + hostname 382 } 383 384 // HTML templates
··· 342 fmt.Printf("DEBUG [oauth/server]: Migrating hold URL to DID for %s: %s\n", did, profile.DefaultHold) 343 344 // Resolve URL to DID 345 + holdDID = atproto.ResolveHoldDIDFromURL(profile.DefaultHold) 346 347 // Update profile with DID 348 profile.DefaultHold = holdDID ··· 362 // For now, crew registration will happen on first push when appview validates access 363 fmt.Printf("DEBUG [oauth/server]: Skipping crew registration for now - will happen on first push. Hold DID: %s\n", holdDID) 364 _ = session // TODO: use session for crew registration 365 } 366 367 // HTML templates