A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
atcr.io
docker
container
atproto
go
1package hold
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "net/http"
8 "os"
9 "os/signal"
10 "syscall"
11 "time"
12
13 "atcr.io/pkg/atproto"
14 "atcr.io/pkg/hold/admin"
15 holddb "atcr.io/pkg/hold/db"
16 "atcr.io/pkg/hold/gc"
17 "atcr.io/pkg/hold/oci"
18 "atcr.io/pkg/hold/pds"
19 "atcr.io/pkg/hold/quota"
20 "atcr.io/pkg/logging"
21 "atcr.io/pkg/s3"
22
23 "github.com/go-chi/chi/v5"
24 "github.com/go-chi/chi/v5/middleware"
25)
26
27// HoldServer is the hold service with an exposed router for extensibility.
28// Consumers can add routes to Router before calling Serve().
29type HoldServer struct {
30 // Router is the chi router. Add routes before calling Serve().
31 Router chi.Router
32
33 // PDS is the embedded ATProto PDS. Nil if database path is not configured.
34 PDS *pds.HoldPDS
35
36 // QuotaManager manages storage quotas per tier.
37 QuotaManager *quota.Manager
38
39 // Config is the hold service configuration.
40 Config *Config
41
42 // internal fields for shutdown
43 httpServer *http.Server
44 broadcaster *pds.EventBroadcaster
45 scanBroadcaster *pds.ScanBroadcaster
46 garbageCollector *gc.GarbageCollector
47 adminUI *admin.AdminUI
48 holdDB *holddb.HoldDB // shared database connection (nil for :memory:)
49}
50
51// NewHoldServer initializes PDS, storage, quota, XRPC handlers, and returns
52// before starting. Consumer can add routes to Router before calling Serve().
53func NewHoldServer(cfg *Config) (*HoldServer, error) {
54 // Initialize structured logging with optional remote shipping
55 logging.InitLoggerWithShipper(cfg.LogLevel, logging.ShipperConfig{
56 Backend: cfg.LogShipper.Backend,
57 URL: cfg.LogShipper.URL,
58 BatchSize: cfg.LogShipper.BatchSize,
59 FlushInterval: cfg.LogShipper.FlushInterval,
60 Service: "hold",
61 Username: cfg.LogShipper.Username,
62 Password: cfg.LogShipper.Password,
63 })
64
65 s := &HoldServer{
66 Config: cfg,
67 }
68
69 if cfg.Server.TestMode {
70 atproto.SetTestMode(true)
71 }
72
73 // Initialize embedded PDS if database path is configured
74 var xrpcHandler *pds.XRPCHandler
75 var s3Service *s3.S3Service
76 if cfg.Database.Path != "" {
77 ctx := context.Background()
78
79 holdDID, err := pds.LoadOrCreateDID(ctx, pds.DIDConfig{
80 DID: cfg.Database.DID,
81 DIDMethod: cfg.Database.DIDMethod,
82 PublicURL: cfg.Server.PublicURL,
83 DBPath: cfg.Database.Path,
84 SigningKeyPath: cfg.Database.KeyPath,
85 RotationKey: cfg.Database.RotationKey,
86 PLCDirectoryURL: cfg.Database.PLCDirectoryURL,
87 })
88 if err != nil {
89 return nil, fmt.Errorf("failed to resolve hold DID: %w", err)
90 }
91 slog.Info("Initializing embedded PDS", "did", holdDID)
92
93 if cfg.Database.Path != ":memory:" {
94 // File mode: open centralized shared DB (supports embedded replica sync)
95 dbFilePath := cfg.Database.Path + "/db.sqlite3"
96 libsqlCfg := holddb.LibsqlConfig{
97 SyncURL: cfg.Database.LibsqlSyncURL,
98 AuthToken: cfg.Database.LibsqlAuthToken,
99 SyncInterval: cfg.Database.LibsqlSyncInterval,
100 }
101 s.holdDB, err = holddb.OpenHoldDB(dbFilePath, libsqlCfg)
102 if err != nil {
103 return nil, fmt.Errorf("failed to open hold database: %w", err)
104 }
105
106 // Use shared DB for all subsystems
107 s.PDS, err = pds.NewHoldPDSWithDB(ctx, holdDID, cfg.Server.PublicURL, cfg.Server.AppviewURL(), cfg.Database.Path, cfg.Database.KeyPath, cfg.Registration.EnableBlueskyPosts, s.holdDB.DB)
108 if err != nil {
109 return nil, fmt.Errorf("failed to initialize embedded PDS: %w", err)
110 }
111
112 s.broadcaster = pds.NewEventBroadcasterWithDB(holdDID, 100, s.holdDB.DB)
113 } else {
114 // In-memory mode (tests): each subsystem opens its own connection
115 s.PDS, err = pds.NewHoldPDS(ctx, holdDID, cfg.Server.PublicURL, cfg.Server.AppviewURL(), cfg.Database.Path, cfg.Database.KeyPath, cfg.Registration.EnableBlueskyPosts)
116 if err != nil {
117 return nil, fmt.Errorf("failed to initialize embedded PDS: %w", err)
118 }
119
120 s.broadcaster = pds.NewEventBroadcaster(holdDID, 100, ":memory:")
121 }
122
123 // Create S3 service (used for bootstrap, handlers, GC, etc.)
124 s3Service, err = s3.NewS3Service(cfg.Storage.S3Params())
125 if err != nil {
126 return nil, fmt.Errorf("failed to create S3 service: %w", err)
127 }
128
129 // Bootstrap events from existing repo records (one-time migration).
130 // Must run BEFORE the live event handler is wired, so it captures
131 // the full historical state without interference from new writes.
132 if err := s.broadcaster.BootstrapFromRepo(s.PDS); err != nil {
133 slog.Warn("Failed to bootstrap events from repo", "error", err)
134 }
135
136 // Backfill records index from existing MST data (one-time on startup)
137 if err := s.PDS.BackfillRecordsIndex(ctx); err != nil {
138 slog.Warn("Failed to backfill records index", "error", err)
139 }
140
141 // Wire up repo event handler with records indexing + broadcaster.
142 // Must be BEFORE Bootstrap so that record creates/updates during
143 // bootstrap (captain, crew, profile) emit to the firehose.
144 indexingHandler := s.PDS.CreateRecordsIndexEventHandler(s.broadcaster.SetRepoEventHandler())
145 s.PDS.RepomgrRef().SetEventHandler(indexingHandler, true)
146
147 // Bootstrap PDS with captain record, hold owner as first crew member, and profile.
148 // Now that the event handler is wired, any changes here emit to the firehose.
149 if err := s.PDS.Bootstrap(ctx, s3Service, pds.BootstrapConfig{
150 OwnerDID: cfg.Registration.OwnerDID,
151 Public: cfg.Server.Public,
152 AllowAllCrew: cfg.Registration.AllowAllCrew,
153 ProfileAvatarURL: cfg.Registration.ProfileAvatarURL,
154 ProfileDisplayName: cfg.Registration.ProfileDisplayName,
155 ProfileDescription: cfg.Registration.ProfileDescription,
156 Region: cfg.Registration.Region,
157 }); err != nil {
158 return nil, fmt.Errorf("failed to bootstrap PDS: %w", err)
159 }
160
161 // Sync successor from config (if set) — separate from Bootstrap to avoid changing its signature
162 if cfg.Server.Successor != "" {
163 if _, captain, err := s.PDS.GetCaptainRecord(ctx); err == nil && captain.Successor != cfg.Server.Successor {
164 captain.Successor = cfg.Server.Successor
165 if _, err := s.PDS.UpdateCaptainRecord(ctx, captain); err != nil {
166 slog.Warn("Failed to sync successor from config", "error", err)
167 } else {
168 slog.Info("Synced successor from config", "successor", cfg.Server.Successor)
169 }
170 }
171 }
172
173 slog.Info("Embedded PDS initialized successfully with firehose and records index enabled")
174 } else {
175 return nil, fmt.Errorf("database path is required for embedded PDS authorization")
176 }
177
178 // Initialize quota manager from config
179 var err error
180 s.QuotaManager, err = quota.NewManagerFromConfig(&cfg.Quota)
181 if err != nil {
182 return nil, fmt.Errorf("failed to load quota config: %w", err)
183 }
184 if s.QuotaManager.IsEnabled() {
185 slog.Info("Quota enforcement enabled", "tiers", s.QuotaManager.TierCount(), "defaultTier", s.QuotaManager.GetDefaultTier())
186 } else {
187 slog.Info("Quota enforcement disabled (no quota tiers configured)")
188 }
189
190 // Create XRPC handlers
191 var ociHandler *oci.XRPCHandler
192 if s.PDS != nil {
193 xrpcHandler = pds.NewXRPCHandler(s.PDS, *s3Service, s.broadcaster, nil, s.QuotaManager)
194 if cfg.Server.AppviewDID != "" {
195 xrpcHandler.SetAppviewDID(cfg.Server.AppviewDID)
196 }
197 ociHandler = oci.NewXRPCHandler(s.PDS, *s3Service, cfg.Registration.EnableBlueskyPosts, nil, s.QuotaManager)
198
199 // Initialize scan broadcaster if scanner secret is configured
200 if cfg.Scanner.Secret != "" {
201 holdDID := s.PDS.DID()
202 rescanInterval := cfg.Scanner.RescanInterval
203 var sb *pds.ScanBroadcaster
204 if s.holdDB != nil {
205 sb, err = pds.NewScanBroadcasterWithDB(holdDID, cfg.Server.PublicURL, cfg.Scanner.Secret, cfg.Server.RelayEndpoint, s.holdDB.DB, s3Service, s.PDS, rescanInterval)
206 } else {
207 scanDBPath := cfg.Database.Path + "/db.sqlite3"
208 sb, err = pds.NewScanBroadcaster(holdDID, cfg.Server.PublicURL, cfg.Scanner.Secret, cfg.Server.RelayEndpoint, scanDBPath, s3Service, s.PDS, rescanInterval)
209 }
210 if err != nil {
211 return nil, fmt.Errorf("failed to initialize scan broadcaster: %w", err)
212 }
213 s.scanBroadcaster = sb
214 xrpcHandler.SetScanBroadcaster(sb)
215 ociHandler.SetScanBroadcaster(sb)
216 slog.Info("Scan broadcaster initialized (scanner WebSocket enabled)",
217 "rescanInterval", rescanInterval)
218 }
219
220 // Initialize garbage collector
221 s.garbageCollector = gc.NewGarbageCollector(s.PDS, s3Service, cfg.GC)
222 slog.Info("Garbage collector initialized",
223 "enabled", cfg.GC.Enabled)
224 }
225
226 // Setup HTTP routes with chi router
227 r := chi.NewRouter()
228 r.Use(middleware.RealIP)
229 r.Use(middleware.Maybe(middleware.Logger, func(r *http.Request) bool {
230 return r.URL.Path != "/xrpc/_health"
231 }))
232
233 if xrpcHandler != nil {
234 r.Use(xrpcHandler.CORSMiddleware())
235 }
236
237 // Root page
238 r.Get("/", func(w http.ResponseWriter, r *http.Request) {
239 w.Header().Set("Content-Type", "text/plain")
240 fmt.Fprintf(w, "This is a hold server. More info at https://atcr.io")
241 })
242
243 // Robots.txt - disallow crawling of all endpoints except root
244 r.Get("/robots.txt", func(w http.ResponseWriter, r *http.Request) {
245 w.Header().Set("Content-Type", "text/plain")
246 fmt.Fprint(w, "User-agent: *\nAllow: /\nDisallow: /xrpc/\nDisallow: /admin/\n")
247 })
248
249 // Register XRPC/ATProto PDS endpoints
250 if xrpcHandler != nil {
251 slog.Info("Registering ATProto PDS endpoints")
252 xrpcHandler.RegisterHandlers(r)
253 }
254
255 // Register OCI multipart upload endpoints
256 if ociHandler != nil {
257 slog.Info("Registering OCI multipart upload endpoints")
258 ociHandler.RegisterHandlers(r)
259 }
260
261 // Initialize and register admin panel if enabled
262 if cfg.Admin.Enabled && s.PDS != nil {
263 adminCfg := admin.AdminConfig{
264 Enabled: true,
265 PublicURL: cfg.Server.PublicURL,
266 ConfigPath: cfg.ConfigPath(),
267 }
268
269 s.adminUI, err = admin.NewAdminUI(context.Background(), s.PDS, s.QuotaManager, s.garbageCollector, adminCfg)
270 if err != nil {
271 return nil, fmt.Errorf("failed to initialize admin panel: %w", err)
272 }
273
274 if s.adminUI != nil {
275 slog.Info("Registering admin panel routes")
276 s.adminUI.RegisterRoutes(r)
277 }
278 }
279
280 s.Router = r
281
282 return s, nil
283}
284
285// Serve starts the HTTP server and blocks until shutdown signal.
286func (s *HoldServer) Serve() error {
287 s.httpServer = &http.Server{
288 Addr: s.Config.Server.Addr,
289 Handler: s.Router,
290 ReadTimeout: s.Config.Server.ReadTimeout,
291 WriteTimeout: s.Config.Server.WriteTimeout,
292 }
293
294 // Set up signal handling for graceful shutdown
295 sigChan := make(chan os.Signal, 1)
296 signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
297
298 // Start server in goroutine
299 serverErr := make(chan error, 1)
300 go func() {
301 slog.Info("Starting hold service", "addr", s.Config.Server.Addr)
302 if err := s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
303 serverErr <- err
304 }
305 }()
306
307 // Update status post to "online" after server starts
308 if s.PDS != nil {
309 ctx := context.Background()
310 if err := s.PDS.SetStatus(ctx, "online"); err != nil {
311 slog.Warn("Failed to set status post to online", "error", err)
312 } else {
313 slog.Info("Status post set to online")
314 }
315 }
316
317 // Fetch appview metadata for branding (Bluesky posts)
318 if s.Config.Server.AppviewURL() != "" {
319 meta, err := atproto.FetchAppviewMetadata(context.Background(), s.Config.Server.AppviewURL())
320 if err != nil {
321 slog.Warn("Failed to fetch appview metadata, using defaults", "appview_url", s.Config.Server.AppviewURL(), "error", err)
322 } else {
323 s.PDS.SetAppviewMeta(meta)
324 slog.Info("Fetched appview metadata", "clientName", meta.ClientName, "clientShortName", meta.ClientShortName)
325 }
326 }
327
328 // Request crawl from relay to make PDS discoverable
329 if s.Config.Server.RelayEndpoint != "" {
330 slog.Info("Requesting crawl from relay", "relay", s.Config.Server.RelayEndpoint)
331 if err := atproto.RequestCrawl(s.Config.Server.RelayEndpoint, s.Config.Server.PublicURL); err != nil {
332 slog.Warn("Failed to request crawl from relay", "error", err)
333 } else {
334 slog.Info("Crawl requested successfully")
335 }
336 }
337
338 // Start garbage collector (runs on startup + nightly)
339 if s.garbageCollector != nil {
340 s.garbageCollector.Start(context.Background())
341 }
342
343 // Wait for signal or server error
344 select {
345 case err := <-serverErr:
346 slog.Error("Server failed", "error", err)
347 logging.Shutdown()
348 return err
349 case sig := <-sigChan:
350 slog.Info("Received signal, shutting down gracefully", "signal", sig)
351 s.shutdown()
352 }
353
354 return nil
355}
356
357func (s *HoldServer) shutdown() {
358 // Update status post to "offline" before shutdown
359 if s.PDS != nil {
360 ctx := context.Background()
361 if err := s.PDS.SetStatus(ctx, "offline"); err != nil {
362 slog.Warn("Failed to set status post to offline", "error", err)
363 } else {
364 slog.Info("Status post set to offline")
365 }
366 }
367
368 // Stop garbage collector
369 if s.garbageCollector != nil {
370 s.garbageCollector.Stop()
371 slog.Info("Garbage collector stopped")
372 }
373
374 // Close scan broadcaster database connection
375 if s.scanBroadcaster != nil {
376 if err := s.scanBroadcaster.Close(); err != nil {
377 slog.Warn("Failed to close scan broadcaster database", "error", err)
378 } else {
379 slog.Info("Scan broadcaster database closed")
380 }
381 }
382
383 // Close broadcaster database connection
384 if s.broadcaster != nil {
385 if err := s.broadcaster.Close(); err != nil {
386 slog.Warn("Failed to close broadcaster database", "error", err)
387 } else {
388 slog.Info("Broadcaster database closed")
389 }
390 }
391
392 // Close admin panel
393 if s.adminUI != nil {
394 if err := s.adminUI.Close(); err != nil {
395 slog.Warn("Failed to close admin panel", "error", err)
396 } else {
397 slog.Info("Admin panel closed")
398 }
399 }
400
401 // Close shared database connection and connector (after all subsystems)
402 if s.holdDB != nil {
403 if err := s.holdDB.Close(); err != nil {
404 slog.Warn("Failed to close hold database", "error", err)
405 } else {
406 slog.Info("Hold database closed")
407 }
408 }
409
410 // Graceful shutdown with 10 second timeout
411 shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
412 defer cancel()
413
414 if err := s.httpServer.Shutdown(shutdownCtx); err != nil {
415 slog.Error("Server shutdown error", "error", err)
416 } else {
417 slog.Info("Server shutdown complete")
418 }
419
420 logging.Shutdown()
421}