A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
at main 421 lines 14 kB view raw
1package hold 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "net/http" 8 "os" 9 "os/signal" 10 "syscall" 11 "time" 12 13 "atcr.io/pkg/atproto" 14 "atcr.io/pkg/hold/admin" 15 holddb "atcr.io/pkg/hold/db" 16 "atcr.io/pkg/hold/gc" 17 "atcr.io/pkg/hold/oci" 18 "atcr.io/pkg/hold/pds" 19 "atcr.io/pkg/hold/quota" 20 "atcr.io/pkg/logging" 21 "atcr.io/pkg/s3" 22 23 "github.com/go-chi/chi/v5" 24 "github.com/go-chi/chi/v5/middleware" 25) 26 27// HoldServer is the hold service with an exposed router for extensibility. 28// Consumers can add routes to Router before calling Serve(). 29type HoldServer struct { 30 // Router is the chi router. Add routes before calling Serve(). 31 Router chi.Router 32 33 // PDS is the embedded ATProto PDS. Nil if database path is not configured. 34 PDS *pds.HoldPDS 35 36 // QuotaManager manages storage quotas per tier. 37 QuotaManager *quota.Manager 38 39 // Config is the hold service configuration. 40 Config *Config 41 42 // internal fields for shutdown 43 httpServer *http.Server 44 broadcaster *pds.EventBroadcaster 45 scanBroadcaster *pds.ScanBroadcaster 46 garbageCollector *gc.GarbageCollector 47 adminUI *admin.AdminUI 48 holdDB *holddb.HoldDB // shared database connection (nil for :memory:) 49} 50 51// NewHoldServer initializes PDS, storage, quota, XRPC handlers, and returns 52// before starting. Consumer can add routes to Router before calling Serve(). 53func NewHoldServer(cfg *Config) (*HoldServer, error) { 54 // Initialize structured logging with optional remote shipping 55 logging.InitLoggerWithShipper(cfg.LogLevel, logging.ShipperConfig{ 56 Backend: cfg.LogShipper.Backend, 57 URL: cfg.LogShipper.URL, 58 BatchSize: cfg.LogShipper.BatchSize, 59 FlushInterval: cfg.LogShipper.FlushInterval, 60 Service: "hold", 61 Username: cfg.LogShipper.Username, 62 Password: cfg.LogShipper.Password, 63 }) 64 65 s := &HoldServer{ 66 Config: cfg, 67 } 68 69 if cfg.Server.TestMode { 70 atproto.SetTestMode(true) 71 } 72 73 // Initialize embedded PDS if database path is configured 74 var xrpcHandler *pds.XRPCHandler 75 var s3Service *s3.S3Service 76 if cfg.Database.Path != "" { 77 ctx := context.Background() 78 79 holdDID, err := pds.LoadOrCreateDID(ctx, pds.DIDConfig{ 80 DID: cfg.Database.DID, 81 DIDMethod: cfg.Database.DIDMethod, 82 PublicURL: cfg.Server.PublicURL, 83 DBPath: cfg.Database.Path, 84 SigningKeyPath: cfg.Database.KeyPath, 85 RotationKey: cfg.Database.RotationKey, 86 PLCDirectoryURL: cfg.Database.PLCDirectoryURL, 87 }) 88 if err != nil { 89 return nil, fmt.Errorf("failed to resolve hold DID: %w", err) 90 } 91 slog.Info("Initializing embedded PDS", "did", holdDID) 92 93 if cfg.Database.Path != ":memory:" { 94 // File mode: open centralized shared DB (supports embedded replica sync) 95 dbFilePath := cfg.Database.Path + "/db.sqlite3" 96 libsqlCfg := holddb.LibsqlConfig{ 97 SyncURL: cfg.Database.LibsqlSyncURL, 98 AuthToken: cfg.Database.LibsqlAuthToken, 99 SyncInterval: cfg.Database.LibsqlSyncInterval, 100 } 101 s.holdDB, err = holddb.OpenHoldDB(dbFilePath, libsqlCfg) 102 if err != nil { 103 return nil, fmt.Errorf("failed to open hold database: %w", err) 104 } 105 106 // Use shared DB for all subsystems 107 s.PDS, err = pds.NewHoldPDSWithDB(ctx, holdDID, cfg.Server.PublicURL, cfg.Server.AppviewURL(), cfg.Database.Path, cfg.Database.KeyPath, cfg.Registration.EnableBlueskyPosts, s.holdDB.DB) 108 if err != nil { 109 return nil, fmt.Errorf("failed to initialize embedded PDS: %w", err) 110 } 111 112 s.broadcaster = pds.NewEventBroadcasterWithDB(holdDID, 100, s.holdDB.DB) 113 } else { 114 // In-memory mode (tests): each subsystem opens its own connection 115 s.PDS, err = pds.NewHoldPDS(ctx, holdDID, cfg.Server.PublicURL, cfg.Server.AppviewURL(), cfg.Database.Path, cfg.Database.KeyPath, cfg.Registration.EnableBlueskyPosts) 116 if err != nil { 117 return nil, fmt.Errorf("failed to initialize embedded PDS: %w", err) 118 } 119 120 s.broadcaster = pds.NewEventBroadcaster(holdDID, 100, ":memory:") 121 } 122 123 // Create S3 service (used for bootstrap, handlers, GC, etc.) 124 s3Service, err = s3.NewS3Service(cfg.Storage.S3Params()) 125 if err != nil { 126 return nil, fmt.Errorf("failed to create S3 service: %w", err) 127 } 128 129 // Bootstrap events from existing repo records (one-time migration). 130 // Must run BEFORE the live event handler is wired, so it captures 131 // the full historical state without interference from new writes. 132 if err := s.broadcaster.BootstrapFromRepo(s.PDS); err != nil { 133 slog.Warn("Failed to bootstrap events from repo", "error", err) 134 } 135 136 // Backfill records index from existing MST data (one-time on startup) 137 if err := s.PDS.BackfillRecordsIndex(ctx); err != nil { 138 slog.Warn("Failed to backfill records index", "error", err) 139 } 140 141 // Wire up repo event handler with records indexing + broadcaster. 142 // Must be BEFORE Bootstrap so that record creates/updates during 143 // bootstrap (captain, crew, profile) emit to the firehose. 144 indexingHandler := s.PDS.CreateRecordsIndexEventHandler(s.broadcaster.SetRepoEventHandler()) 145 s.PDS.RepomgrRef().SetEventHandler(indexingHandler, true) 146 147 // Bootstrap PDS with captain record, hold owner as first crew member, and profile. 148 // Now that the event handler is wired, any changes here emit to the firehose. 149 if err := s.PDS.Bootstrap(ctx, s3Service, pds.BootstrapConfig{ 150 OwnerDID: cfg.Registration.OwnerDID, 151 Public: cfg.Server.Public, 152 AllowAllCrew: cfg.Registration.AllowAllCrew, 153 ProfileAvatarURL: cfg.Registration.ProfileAvatarURL, 154 ProfileDisplayName: cfg.Registration.ProfileDisplayName, 155 ProfileDescription: cfg.Registration.ProfileDescription, 156 Region: cfg.Registration.Region, 157 }); err != nil { 158 return nil, fmt.Errorf("failed to bootstrap PDS: %w", err) 159 } 160 161 // Sync successor from config (if set) — separate from Bootstrap to avoid changing its signature 162 if cfg.Server.Successor != "" { 163 if _, captain, err := s.PDS.GetCaptainRecord(ctx); err == nil && captain.Successor != cfg.Server.Successor { 164 captain.Successor = cfg.Server.Successor 165 if _, err := s.PDS.UpdateCaptainRecord(ctx, captain); err != nil { 166 slog.Warn("Failed to sync successor from config", "error", err) 167 } else { 168 slog.Info("Synced successor from config", "successor", cfg.Server.Successor) 169 } 170 } 171 } 172 173 slog.Info("Embedded PDS initialized successfully with firehose and records index enabled") 174 } else { 175 return nil, fmt.Errorf("database path is required for embedded PDS authorization") 176 } 177 178 // Initialize quota manager from config 179 var err error 180 s.QuotaManager, err = quota.NewManagerFromConfig(&cfg.Quota) 181 if err != nil { 182 return nil, fmt.Errorf("failed to load quota config: %w", err) 183 } 184 if s.QuotaManager.IsEnabled() { 185 slog.Info("Quota enforcement enabled", "tiers", s.QuotaManager.TierCount(), "defaultTier", s.QuotaManager.GetDefaultTier()) 186 } else { 187 slog.Info("Quota enforcement disabled (no quota tiers configured)") 188 } 189 190 // Create XRPC handlers 191 var ociHandler *oci.XRPCHandler 192 if s.PDS != nil { 193 xrpcHandler = pds.NewXRPCHandler(s.PDS, *s3Service, s.broadcaster, nil, s.QuotaManager) 194 if cfg.Server.AppviewDID != "" { 195 xrpcHandler.SetAppviewDID(cfg.Server.AppviewDID) 196 } 197 ociHandler = oci.NewXRPCHandler(s.PDS, *s3Service, cfg.Registration.EnableBlueskyPosts, nil, s.QuotaManager) 198 199 // Initialize scan broadcaster if scanner secret is configured 200 if cfg.Scanner.Secret != "" { 201 holdDID := s.PDS.DID() 202 rescanInterval := cfg.Scanner.RescanInterval 203 var sb *pds.ScanBroadcaster 204 if s.holdDB != nil { 205 sb, err = pds.NewScanBroadcasterWithDB(holdDID, cfg.Server.PublicURL, cfg.Scanner.Secret, cfg.Server.RelayEndpoint, s.holdDB.DB, s3Service, s.PDS, rescanInterval) 206 } else { 207 scanDBPath := cfg.Database.Path + "/db.sqlite3" 208 sb, err = pds.NewScanBroadcaster(holdDID, cfg.Server.PublicURL, cfg.Scanner.Secret, cfg.Server.RelayEndpoint, scanDBPath, s3Service, s.PDS, rescanInterval) 209 } 210 if err != nil { 211 return nil, fmt.Errorf("failed to initialize scan broadcaster: %w", err) 212 } 213 s.scanBroadcaster = sb 214 xrpcHandler.SetScanBroadcaster(sb) 215 ociHandler.SetScanBroadcaster(sb) 216 slog.Info("Scan broadcaster initialized (scanner WebSocket enabled)", 217 "rescanInterval", rescanInterval) 218 } 219 220 // Initialize garbage collector 221 s.garbageCollector = gc.NewGarbageCollector(s.PDS, s3Service, cfg.GC) 222 slog.Info("Garbage collector initialized", 223 "enabled", cfg.GC.Enabled) 224 } 225 226 // Setup HTTP routes with chi router 227 r := chi.NewRouter() 228 r.Use(middleware.RealIP) 229 r.Use(middleware.Maybe(middleware.Logger, func(r *http.Request) bool { 230 return r.URL.Path != "/xrpc/_health" 231 })) 232 233 if xrpcHandler != nil { 234 r.Use(xrpcHandler.CORSMiddleware()) 235 } 236 237 // Root page 238 r.Get("/", func(w http.ResponseWriter, r *http.Request) { 239 w.Header().Set("Content-Type", "text/plain") 240 fmt.Fprintf(w, "This is a hold server. More info at https://atcr.io") 241 }) 242 243 // Robots.txt - disallow crawling of all endpoints except root 244 r.Get("/robots.txt", func(w http.ResponseWriter, r *http.Request) { 245 w.Header().Set("Content-Type", "text/plain") 246 fmt.Fprint(w, "User-agent: *\nAllow: /\nDisallow: /xrpc/\nDisallow: /admin/\n") 247 }) 248 249 // Register XRPC/ATProto PDS endpoints 250 if xrpcHandler != nil { 251 slog.Info("Registering ATProto PDS endpoints") 252 xrpcHandler.RegisterHandlers(r) 253 } 254 255 // Register OCI multipart upload endpoints 256 if ociHandler != nil { 257 slog.Info("Registering OCI multipart upload endpoints") 258 ociHandler.RegisterHandlers(r) 259 } 260 261 // Initialize and register admin panel if enabled 262 if cfg.Admin.Enabled && s.PDS != nil { 263 adminCfg := admin.AdminConfig{ 264 Enabled: true, 265 PublicURL: cfg.Server.PublicURL, 266 ConfigPath: cfg.ConfigPath(), 267 } 268 269 s.adminUI, err = admin.NewAdminUI(context.Background(), s.PDS, s.QuotaManager, s.garbageCollector, adminCfg) 270 if err != nil { 271 return nil, fmt.Errorf("failed to initialize admin panel: %w", err) 272 } 273 274 if s.adminUI != nil { 275 slog.Info("Registering admin panel routes") 276 s.adminUI.RegisterRoutes(r) 277 } 278 } 279 280 s.Router = r 281 282 return s, nil 283} 284 285// Serve starts the HTTP server and blocks until shutdown signal. 286func (s *HoldServer) Serve() error { 287 s.httpServer = &http.Server{ 288 Addr: s.Config.Server.Addr, 289 Handler: s.Router, 290 ReadTimeout: s.Config.Server.ReadTimeout, 291 WriteTimeout: s.Config.Server.WriteTimeout, 292 } 293 294 // Set up signal handling for graceful shutdown 295 sigChan := make(chan os.Signal, 1) 296 signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) 297 298 // Start server in goroutine 299 serverErr := make(chan error, 1) 300 go func() { 301 slog.Info("Starting hold service", "addr", s.Config.Server.Addr) 302 if err := s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { 303 serverErr <- err 304 } 305 }() 306 307 // Update status post to "online" after server starts 308 if s.PDS != nil { 309 ctx := context.Background() 310 if err := s.PDS.SetStatus(ctx, "online"); err != nil { 311 slog.Warn("Failed to set status post to online", "error", err) 312 } else { 313 slog.Info("Status post set to online") 314 } 315 } 316 317 // Fetch appview metadata for branding (Bluesky posts) 318 if s.Config.Server.AppviewURL() != "" { 319 meta, err := atproto.FetchAppviewMetadata(context.Background(), s.Config.Server.AppviewURL()) 320 if err != nil { 321 slog.Warn("Failed to fetch appview metadata, using defaults", "appview_url", s.Config.Server.AppviewURL(), "error", err) 322 } else { 323 s.PDS.SetAppviewMeta(meta) 324 slog.Info("Fetched appview metadata", "clientName", meta.ClientName, "clientShortName", meta.ClientShortName) 325 } 326 } 327 328 // Request crawl from relay to make PDS discoverable 329 if s.Config.Server.RelayEndpoint != "" { 330 slog.Info("Requesting crawl from relay", "relay", s.Config.Server.RelayEndpoint) 331 if err := atproto.RequestCrawl(s.Config.Server.RelayEndpoint, s.Config.Server.PublicURL); err != nil { 332 slog.Warn("Failed to request crawl from relay", "error", err) 333 } else { 334 slog.Info("Crawl requested successfully") 335 } 336 } 337 338 // Start garbage collector (runs on startup + nightly) 339 if s.garbageCollector != nil { 340 s.garbageCollector.Start(context.Background()) 341 } 342 343 // Wait for signal or server error 344 select { 345 case err := <-serverErr: 346 slog.Error("Server failed", "error", err) 347 logging.Shutdown() 348 return err 349 case sig := <-sigChan: 350 slog.Info("Received signal, shutting down gracefully", "signal", sig) 351 s.shutdown() 352 } 353 354 return nil 355} 356 357func (s *HoldServer) shutdown() { 358 // Update status post to "offline" before shutdown 359 if s.PDS != nil { 360 ctx := context.Background() 361 if err := s.PDS.SetStatus(ctx, "offline"); err != nil { 362 slog.Warn("Failed to set status post to offline", "error", err) 363 } else { 364 slog.Info("Status post set to offline") 365 } 366 } 367 368 // Stop garbage collector 369 if s.garbageCollector != nil { 370 s.garbageCollector.Stop() 371 slog.Info("Garbage collector stopped") 372 } 373 374 // Close scan broadcaster database connection 375 if s.scanBroadcaster != nil { 376 if err := s.scanBroadcaster.Close(); err != nil { 377 slog.Warn("Failed to close scan broadcaster database", "error", err) 378 } else { 379 slog.Info("Scan broadcaster database closed") 380 } 381 } 382 383 // Close broadcaster database connection 384 if s.broadcaster != nil { 385 if err := s.broadcaster.Close(); err != nil { 386 slog.Warn("Failed to close broadcaster database", "error", err) 387 } else { 388 slog.Info("Broadcaster database closed") 389 } 390 } 391 392 // Close admin panel 393 if s.adminUI != nil { 394 if err := s.adminUI.Close(); err != nil { 395 slog.Warn("Failed to close admin panel", "error", err) 396 } else { 397 slog.Info("Admin panel closed") 398 } 399 } 400 401 // Close shared database connection and connector (after all subsystems) 402 if s.holdDB != nil { 403 if err := s.holdDB.Close(); err != nil { 404 slog.Warn("Failed to close hold database", "error", err) 405 } else { 406 slog.Info("Hold database closed") 407 } 408 } 409 410 // Graceful shutdown with 10 second timeout 411 shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 412 defer cancel() 413 414 if err := s.httpServer.Shutdown(shutdownCtx); err != nil { 415 slog.Error("Server shutdown error", "error", err) 416 } else { 417 slog.Info("Server shutdown complete") 418 } 419 420 logging.Shutdown() 421}