package hold import ( "context" "fmt" "log/slog" "net/http" "os" "os/signal" "syscall" "time" "atcr.io/pkg/atproto" "atcr.io/pkg/hold/admin" holddb "atcr.io/pkg/hold/db" "atcr.io/pkg/hold/gc" "atcr.io/pkg/hold/oci" "atcr.io/pkg/hold/pds" "atcr.io/pkg/hold/quota" "atcr.io/pkg/logging" "atcr.io/pkg/s3" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" ) // HoldServer is the hold service with an exposed router for extensibility. // Consumers can add routes to Router before calling Serve(). type HoldServer struct { // Router is the chi router. Add routes before calling Serve(). Router chi.Router // PDS is the embedded ATProto PDS. Nil if database path is not configured. PDS *pds.HoldPDS // QuotaManager manages storage quotas per tier. QuotaManager *quota.Manager // Config is the hold service configuration. Config *Config // internal fields for shutdown httpServer *http.Server broadcaster *pds.EventBroadcaster scanBroadcaster *pds.ScanBroadcaster garbageCollector *gc.GarbageCollector adminUI *admin.AdminUI holdDB *holddb.HoldDB // shared database connection (nil for :memory:) } // NewHoldServer initializes PDS, storage, quota, XRPC handlers, and returns // before starting. Consumer can add routes to Router before calling Serve(). func NewHoldServer(cfg *Config) (*HoldServer, error) { // Initialize structured logging with optional remote shipping logging.InitLoggerWithShipper(cfg.LogLevel, logging.ShipperConfig{ Backend: cfg.LogShipper.Backend, URL: cfg.LogShipper.URL, BatchSize: cfg.LogShipper.BatchSize, FlushInterval: cfg.LogShipper.FlushInterval, Service: "hold", Username: cfg.LogShipper.Username, Password: cfg.LogShipper.Password, }) s := &HoldServer{ Config: cfg, } if cfg.Server.TestMode { atproto.SetTestMode(true) } // Initialize embedded PDS if database path is configured var xrpcHandler *pds.XRPCHandler var s3Service *s3.S3Service if cfg.Database.Path != "" { ctx := context.Background() holdDID, err := pds.LoadOrCreateDID(ctx, pds.DIDConfig{ DID: cfg.Database.DID, DIDMethod: cfg.Database.DIDMethod, PublicURL: cfg.Server.PublicURL, DBPath: cfg.Database.Path, SigningKeyPath: cfg.Database.KeyPath, RotationKey: cfg.Database.RotationKey, PLCDirectoryURL: cfg.Database.PLCDirectoryURL, }) if err != nil { return nil, fmt.Errorf("failed to resolve hold DID: %w", err) } slog.Info("Initializing embedded PDS", "did", holdDID) if cfg.Database.Path != ":memory:" { // File mode: open centralized shared DB (supports embedded replica sync) dbFilePath := cfg.Database.Path + "/db.sqlite3" libsqlCfg := holddb.LibsqlConfig{ SyncURL: cfg.Database.LibsqlSyncURL, AuthToken: cfg.Database.LibsqlAuthToken, SyncInterval: cfg.Database.LibsqlSyncInterval, } s.holdDB, err = holddb.OpenHoldDB(dbFilePath, libsqlCfg) if err != nil { return nil, fmt.Errorf("failed to open hold database: %w", err) } // Use shared DB for all subsystems s.PDS, err = pds.NewHoldPDSWithDB(ctx, holdDID, cfg.Server.PublicURL, cfg.Server.AppviewURL(), cfg.Database.Path, cfg.Database.KeyPath, cfg.Registration.EnableBlueskyPosts, s.holdDB.DB) if err != nil { return nil, fmt.Errorf("failed to initialize embedded PDS: %w", err) } s.broadcaster = pds.NewEventBroadcasterWithDB(holdDID, 100, s.holdDB.DB) } else { // In-memory mode (tests): each subsystem opens its own connection s.PDS, err = pds.NewHoldPDS(ctx, holdDID, cfg.Server.PublicURL, cfg.Server.AppviewURL(), cfg.Database.Path, cfg.Database.KeyPath, cfg.Registration.EnableBlueskyPosts) if err != nil { return nil, fmt.Errorf("failed to initialize embedded PDS: %w", err) } s.broadcaster = pds.NewEventBroadcaster(holdDID, 100, ":memory:") } // Create S3 service (used for bootstrap, handlers, GC, etc.) s3Service, err = s3.NewS3Service(cfg.Storage.S3Params()) if err != nil { return nil, fmt.Errorf("failed to create S3 service: %w", err) } // Bootstrap events from existing repo records (one-time migration). // Must run BEFORE the live event handler is wired, so it captures // the full historical state without interference from new writes. if err := s.broadcaster.BootstrapFromRepo(s.PDS); err != nil { slog.Warn("Failed to bootstrap events from repo", "error", err) } // Backfill records index from existing MST data (one-time on startup) if err := s.PDS.BackfillRecordsIndex(ctx); err != nil { slog.Warn("Failed to backfill records index", "error", err) } // Wire up repo event handler with records indexing + broadcaster. // Must be BEFORE Bootstrap so that record creates/updates during // bootstrap (captain, crew, profile) emit to the firehose. indexingHandler := s.PDS.CreateRecordsIndexEventHandler(s.broadcaster.SetRepoEventHandler()) s.PDS.RepomgrRef().SetEventHandler(indexingHandler, true) // Bootstrap PDS with captain record, hold owner as first crew member, and profile. // Now that the event handler is wired, any changes here emit to the firehose. if err := s.PDS.Bootstrap(ctx, s3Service, pds.BootstrapConfig{ OwnerDID: cfg.Registration.OwnerDID, Public: cfg.Server.Public, AllowAllCrew: cfg.Registration.AllowAllCrew, ProfileAvatarURL: cfg.Registration.ProfileAvatarURL, ProfileDisplayName: cfg.Registration.ProfileDisplayName, ProfileDescription: cfg.Registration.ProfileDescription, Region: cfg.Registration.Region, }); err != nil { return nil, fmt.Errorf("failed to bootstrap PDS: %w", err) } // Sync successor from config (if set) — separate from Bootstrap to avoid changing its signature if cfg.Server.Successor != "" { if _, captain, err := s.PDS.GetCaptainRecord(ctx); err == nil && captain.Successor != cfg.Server.Successor { captain.Successor = cfg.Server.Successor if _, err := s.PDS.UpdateCaptainRecord(ctx, captain); err != nil { slog.Warn("Failed to sync successor from config", "error", err) } else { slog.Info("Synced successor from config", "successor", cfg.Server.Successor) } } } slog.Info("Embedded PDS initialized successfully with firehose and records index enabled") } else { return nil, fmt.Errorf("database path is required for embedded PDS authorization") } // Initialize quota manager from config var err error s.QuotaManager, err = quota.NewManagerFromConfig(&cfg.Quota) if err != nil { return nil, fmt.Errorf("failed to load quota config: %w", err) } if s.QuotaManager.IsEnabled() { slog.Info("Quota enforcement enabled", "tiers", s.QuotaManager.TierCount(), "defaultTier", s.QuotaManager.GetDefaultTier()) } else { slog.Info("Quota enforcement disabled (no quota tiers configured)") } // Create XRPC handlers var ociHandler *oci.XRPCHandler if s.PDS != nil { xrpcHandler = pds.NewXRPCHandler(s.PDS, *s3Service, s.broadcaster, nil, s.QuotaManager) if cfg.Server.AppviewDID != "" { xrpcHandler.SetAppviewDID(cfg.Server.AppviewDID) } ociHandler = oci.NewXRPCHandler(s.PDS, *s3Service, cfg.Registration.EnableBlueskyPosts, nil, s.QuotaManager) // Initialize scan broadcaster if scanner secret is configured if cfg.Scanner.Secret != "" { holdDID := s.PDS.DID() rescanInterval := cfg.Scanner.RescanInterval var sb *pds.ScanBroadcaster if s.holdDB != nil { sb, err = pds.NewScanBroadcasterWithDB(holdDID, cfg.Server.PublicURL, cfg.Scanner.Secret, cfg.Server.RelayEndpoint, s.holdDB.DB, s3Service, s.PDS, rescanInterval) } else { scanDBPath := cfg.Database.Path + "/db.sqlite3" sb, err = pds.NewScanBroadcaster(holdDID, cfg.Server.PublicURL, cfg.Scanner.Secret, cfg.Server.RelayEndpoint, scanDBPath, s3Service, s.PDS, rescanInterval) } if err != nil { return nil, fmt.Errorf("failed to initialize scan broadcaster: %w", err) } s.scanBroadcaster = sb xrpcHandler.SetScanBroadcaster(sb) ociHandler.SetScanBroadcaster(sb) slog.Info("Scan broadcaster initialized (scanner WebSocket enabled)", "rescanInterval", rescanInterval) } // Initialize garbage collector s.garbageCollector = gc.NewGarbageCollector(s.PDS, s3Service, cfg.GC) slog.Info("Garbage collector initialized", "enabled", cfg.GC.Enabled) } // Setup HTTP routes with chi router r := chi.NewRouter() r.Use(middleware.RealIP) r.Use(middleware.Maybe(middleware.Logger, func(r *http.Request) bool { return r.URL.Path != "/xrpc/_health" })) if xrpcHandler != nil { r.Use(xrpcHandler.CORSMiddleware()) } // Root page r.Get("/", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/plain") fmt.Fprintf(w, "This is a hold server. More info at https://atcr.io") }) // Robots.txt - disallow crawling of all endpoints except root r.Get("/robots.txt", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/plain") fmt.Fprint(w, "User-agent: *\nAllow: /\nDisallow: /xrpc/\nDisallow: /admin/\n") }) // Register XRPC/ATProto PDS endpoints if xrpcHandler != nil { slog.Info("Registering ATProto PDS endpoints") xrpcHandler.RegisterHandlers(r) } // Register OCI multipart upload endpoints if ociHandler != nil { slog.Info("Registering OCI multipart upload endpoints") ociHandler.RegisterHandlers(r) } // Initialize and register admin panel if enabled if cfg.Admin.Enabled && s.PDS != nil { adminCfg := admin.AdminConfig{ Enabled: true, PublicURL: cfg.Server.PublicURL, ConfigPath: cfg.ConfigPath(), } s.adminUI, err = admin.NewAdminUI(context.Background(), s.PDS, s.QuotaManager, s.garbageCollector, adminCfg) if err != nil { return nil, fmt.Errorf("failed to initialize admin panel: %w", err) } if s.adminUI != nil { slog.Info("Registering admin panel routes") s.adminUI.RegisterRoutes(r) } } s.Router = r return s, nil } // Serve starts the HTTP server and blocks until shutdown signal. func (s *HoldServer) Serve() error { s.httpServer = &http.Server{ Addr: s.Config.Server.Addr, Handler: s.Router, ReadTimeout: s.Config.Server.ReadTimeout, WriteTimeout: s.Config.Server.WriteTimeout, } // Set up signal handling for graceful shutdown sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) // Start server in goroutine serverErr := make(chan error, 1) go func() { slog.Info("Starting hold service", "addr", s.Config.Server.Addr) if err := s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { serverErr <- err } }() // Update status post to "online" after server starts if s.PDS != nil { ctx := context.Background() if err := s.PDS.SetStatus(ctx, "online"); err != nil { slog.Warn("Failed to set status post to online", "error", err) } else { slog.Info("Status post set to online") } } // Fetch appview metadata for branding (Bluesky posts) if s.Config.Server.AppviewURL() != "" { meta, err := atproto.FetchAppviewMetadata(context.Background(), s.Config.Server.AppviewURL()) if err != nil { slog.Warn("Failed to fetch appview metadata, using defaults", "appview_url", s.Config.Server.AppviewURL(), "error", err) } else { s.PDS.SetAppviewMeta(meta) slog.Info("Fetched appview metadata", "clientName", meta.ClientName, "clientShortName", meta.ClientShortName) } } // Request crawl from relay to make PDS discoverable if s.Config.Server.RelayEndpoint != "" { slog.Info("Requesting crawl from relay", "relay", s.Config.Server.RelayEndpoint) if err := atproto.RequestCrawl(s.Config.Server.RelayEndpoint, s.Config.Server.PublicURL); err != nil { slog.Warn("Failed to request crawl from relay", "error", err) } else { slog.Info("Crawl requested successfully") } } // Start garbage collector (runs on startup + nightly) if s.garbageCollector != nil { s.garbageCollector.Start(context.Background()) } // Wait for signal or server error select { case err := <-serverErr: slog.Error("Server failed", "error", err) logging.Shutdown() return err case sig := <-sigChan: slog.Info("Received signal, shutting down gracefully", "signal", sig) s.shutdown() } return nil } func (s *HoldServer) shutdown() { // Update status post to "offline" before shutdown if s.PDS != nil { ctx := context.Background() if err := s.PDS.SetStatus(ctx, "offline"); err != nil { slog.Warn("Failed to set status post to offline", "error", err) } else { slog.Info("Status post set to offline") } } // Stop garbage collector if s.garbageCollector != nil { s.garbageCollector.Stop() slog.Info("Garbage collector stopped") } // Close scan broadcaster database connection if s.scanBroadcaster != nil { if err := s.scanBroadcaster.Close(); err != nil { slog.Warn("Failed to close scan broadcaster database", "error", err) } else { slog.Info("Scan broadcaster database closed") } } // Close broadcaster database connection if s.broadcaster != nil { if err := s.broadcaster.Close(); err != nil { slog.Warn("Failed to close broadcaster database", "error", err) } else { slog.Info("Broadcaster database closed") } } // Close admin panel if s.adminUI != nil { if err := s.adminUI.Close(); err != nil { slog.Warn("Failed to close admin panel", "error", err) } else { slog.Info("Admin panel closed") } } // Close shared database connection and connector (after all subsystems) if s.holdDB != nil { if err := s.holdDB.Close(); err != nil { slog.Warn("Failed to close hold database", "error", err) } else { slog.Info("Hold database closed") } } // Graceful shutdown with 10 second timeout shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if err := s.httpServer.Shutdown(shutdownCtx); err != nil { slog.Error("Server shutdown error", "error", err) } else { slog.Info("Server shutdown complete") } logging.Shutdown() }