···11 "maps"
12 "net/http"
13 "strings"
014 "time"
1516 "atcr.io/pkg/atproto"
···22// It stores manifests in ATProto as records
23type ManifestStore struct {
24 ctx *RegistryContext // Context with user/hold info
025 lastFetchedHoldDID string // Hold DID from most recently fetched manifest (for pull)
26 blobStore distribution.BlobStore // Blob store for fetching config during push
27}
···67 // Store the hold DID for subsequent blob requests during pull
68 // Prefer HoldDID (new format) with fallback to HoldEndpoint (legacy URL format)
69 // The routing repository will cache this for concurrent blob fetches
070 if manifestRecord.HoldDID != "" {
71 // New format: DID reference (preferred)
72 s.lastFetchedHoldDID = manifestRecord.HoldDID
···74 // Legacy format: URL reference - convert to DID
75 s.lastFetchedHoldDID = atproto.ResolveHoldDIDFromURL(manifestRecord.HoldEndpoint)
76 }
07778 var ociManifest []byte
79···232// GetLastFetchedHoldDID returns the hold DID from the most recently fetched manifest
233// This is used by the routing repository to cache the hold for blob requests
234func (s *ManifestStore) GetLastFetchedHoldDID() string {
00235 return s.lastFetchedHoldDID
236}
237
···11 "maps"
12 "net/http"
13 "strings"
14+ "sync"
15 "time"
1617 "atcr.io/pkg/atproto"
···23// It stores manifests in ATProto as records
24type ManifestStore struct {
25 ctx *RegistryContext // Context with user/hold info
26+ mu sync.RWMutex // Protects lastFetchedHoldDID
27 lastFetchedHoldDID string // Hold DID from most recently fetched manifest (for pull)
28 blobStore distribution.BlobStore // Blob store for fetching config during push
29}
···69 // Store the hold DID for subsequent blob requests during pull
70 // Prefer HoldDID (new format) with fallback to HoldEndpoint (legacy URL format)
71 // The routing repository will cache this for concurrent blob fetches
72+ s.mu.Lock()
73 if manifestRecord.HoldDID != "" {
74 // New format: DID reference (preferred)
75 s.lastFetchedHoldDID = manifestRecord.HoldDID
···77 // Legacy format: URL reference - convert to DID
78 s.lastFetchedHoldDID = atproto.ResolveHoldDIDFromURL(manifestRecord.HoldEndpoint)
79 }
80+ s.mu.Unlock()
8182 var ociManifest []byte
83···236// GetLastFetchedHoldDID returns the hold DID from the most recently fetched manifest
237// This is used by the routing repository to cache the hold for blob requests
238func (s *ManifestStore) GetLastFetchedHoldDID() string {
239+ s.mu.RLock()
240+ defer s.mu.RUnlock()
241 return s.lastFetchedHoldDID
242}
243
+3-3
pkg/appview/storage/manifest_store_test.go
···669670 if tt.expectPullIncrement {
671 // Check that IncrementPullCount was called
672- if mockDB.pullCount == 0 {
673 t.Error("Expected pull count to be incremented for GET request, but it wasn't")
674 }
675 } else {
676 // Check that IncrementPullCount was NOT called
677- if mockDB.pullCount > 0 {
678- t.Errorf("Expected pull count NOT to be incremented for %s request, but it was (count=%d)", tt.httpMethod, mockDB.pullCount)
679 }
680 }
681 })
···669670 if tt.expectPullIncrement {
671 // Check that IncrementPullCount was called
672+ if mockDB.getPullCount() == 0 {
673 t.Error("Expected pull count to be incremented for GET request, but it wasn't")
674 }
675 } else {
676 // Check that IncrementPullCount was NOT called
677+ if mockDB.getPullCount() > 0 {
678+ t.Errorf("Expected pull count NOT to be incremented for %s request, but it was (count=%d)", tt.httpMethod, mockDB.getPullCount())
679 }
680 }
681 })
+11-3
pkg/appview/storage/profile_test.go
···219 // Clear migration locks before each test
220 migrationLocks = sync.Map{}
2210222 putRecordCalled := false
223 var migrationRequest map[string]any
224···232233 // PutRecord (migration)
234 if r.Method == "POST" && strings.Contains(r.URL.Path, "putRecord") {
0235 putRecordCalled = true
236 json.NewDecoder(r.Body).Decode(&migrationRequest)
0237 w.WriteHeader(http.StatusOK)
238 w.Write([]byte(`{"uri":"at://did:plc:test123/io.atcr.sailor.profile/self","cid":"bafytest"}`))
239 return
···270 // Give goroutine time to execute
271 time.Sleep(50 * time.Millisecond)
272273- if !putRecordCalled {
00000274 t.Error("Expected migration PutRecord to be called")
275 }
276277- if migrationRequest != nil {
278- recordData := migrationRequest["record"].(map[string]any)
279 migratedHold := recordData["defaultHold"]
280 if migratedHold != tt.expectedHoldDID {
281 t.Errorf("Migrated defaultHold = %v, want %v", migratedHold, tt.expectedHoldDID)
···219 // Clear migration locks before each test
220 migrationLocks = sync.Map{}
221222+ var mu sync.Mutex
223 putRecordCalled := false
224 var migrationRequest map[string]any
225···233234 // PutRecord (migration)
235 if r.Method == "POST" && strings.Contains(r.URL.Path, "putRecord") {
236+ mu.Lock()
237 putRecordCalled = true
238 json.NewDecoder(r.Body).Decode(&migrationRequest)
239+ mu.Unlock()
240 w.WriteHeader(http.StatusOK)
241 w.Write([]byte(`{"uri":"at://did:plc:test123/io.atcr.sailor.profile/self","cid":"bafytest"}`))
242 return
···273 // Give goroutine time to execute
274 time.Sleep(50 * time.Millisecond)
275276+ mu.Lock()
277+ called := putRecordCalled
278+ request := migrationRequest
279+ mu.Unlock()
280+281+ if !called {
282 t.Error("Expected migration PutRecord to be called")
283 }
284285+ if request != nil {
286+ recordData := request["record"].(map[string]any)
287 migratedHold := recordData["defaultHold"]
288 if migratedHold != tt.expectedHoldDID {
289 t.Errorf("Migrated defaultHold = %v, want %v", migratedHold, tt.expectedHoldDID)
+21-5
pkg/appview/storage/routing_repository.go
···7import (
8 "context"
9 "log/slog"
010 "time"
1112 "github.com/distribution/distribution/v3"
···17type RoutingRepository struct {
18 distribution.Repository
19 Ctx *RegistryContext // All context and services (exported for token updates)
020 manifestStore *ManifestStore // Cached manifest store instance
21 blobStore *ProxyBlobStore // Cached blob store instance
22}
···3132// Manifests returns the ATProto-backed manifest service
33func (r *RoutingRepository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) {
034 // Create or return cached manifest store
35 if r.manifestStore == nil {
36 // Ensure blob store is created first (needed for label extraction during push)
0037 blobStore := r.Blobs(ctx)
03839- r.manifestStore = NewManifestStore(r.Ctx, blobStore)
00040 }
004142 // After any manifest operation, cache the hold DID for blob fetches
43 // We use a goroutine to avoid blocking, and check after a short delay to allow the operation to complete
44 go func() {
45 time.Sleep(100 * time.Millisecond) // Brief delay to let manifest fetch complete
46- if holdDID := r.manifestStore.GetLastFetchedHoldDID(); holdDID != "" {
47 // Cache for 10 minutes - should cover typical pull operations
48 GetGlobalHoldCache().Set(r.Ctx.DID, r.Ctx.Repository, holdDID, 10*time.Minute)
49 slog.Debug("Cached hold DID", "component", "storage/routing", "did", r.Ctx.DID, "repo", r.Ctx.Repository, "hold", holdDID)
50 }
51 }()
5253- return r.manifestStore, nil
54}
5556// Blobs returns a proxy blob store that routes to external hold service
57// The registry (AppView) NEVER stores blobs locally - all blobs go through hold service
58func (r *RoutingRepository) Blobs(ctx context.Context) distribution.BlobStore {
059 // Return cached blob store if available
60 if r.blobStore != nil {
0061 slog.Debug("Returning cached blob store", "component", "storage/blobs", "did", r.Ctx.DID, "repo", r.Ctx.Repository)
62- return r.blobStore
63 }
6465 // For pull operations, check if we have a cached hold DID from a recent manifest fetch
···8586 // Create and cache proxy blob store
87 r.blobStore = NewProxyBlobStore(r.Ctx)
88- return r.blobStore
0089}
9091// Tags returns the tag service
···7import (
8 "context"
9 "log/slog"
10+ "sync"
11 "time"
1213 "github.com/distribution/distribution/v3"
···18type RoutingRepository struct {
19 distribution.Repository
20 Ctx *RegistryContext // All context and services (exported for token updates)
21+ mu sync.Mutex // Protects manifestStore and blobStore
22 manifestStore *ManifestStore // Cached manifest store instance
23 blobStore *ProxyBlobStore // Cached blob store instance
24}
···3334// Manifests returns the ATProto-backed manifest service
35func (r *RoutingRepository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) {
36+ r.mu.Lock()
37 // Create or return cached manifest store
38 if r.manifestStore == nil {
39 // Ensure blob store is created first (needed for label extraction during push)
40+ // Release lock while calling Blobs to avoid deadlock
41+ r.mu.Unlock()
42 blobStore := r.Blobs(ctx)
43+ r.mu.Lock()
4445+ // Double-check after reacquiring lock (another goroutine might have set it)
46+ if r.manifestStore == nil {
47+ r.manifestStore = NewManifestStore(r.Ctx, blobStore)
48+ }
49 }
50+ manifestStore := r.manifestStore
51+ r.mu.Unlock()
5253 // After any manifest operation, cache the hold DID for blob fetches
54 // We use a goroutine to avoid blocking, and check after a short delay to allow the operation to complete
55 go func() {
56 time.Sleep(100 * time.Millisecond) // Brief delay to let manifest fetch complete
57+ if holdDID := manifestStore.GetLastFetchedHoldDID(); holdDID != "" {
58 // Cache for 10 minutes - should cover typical pull operations
59 GetGlobalHoldCache().Set(r.Ctx.DID, r.Ctx.Repository, holdDID, 10*time.Minute)
60 slog.Debug("Cached hold DID", "component", "storage/routing", "did", r.Ctx.DID, "repo", r.Ctx.Repository, "hold", holdDID)
61 }
62 }()
6364+ return manifestStore, nil
65}
6667// Blobs returns a proxy blob store that routes to external hold service
68// The registry (AppView) NEVER stores blobs locally - all blobs go through hold service
69func (r *RoutingRepository) Blobs(ctx context.Context) distribution.BlobStore {
70+ r.mu.Lock()
71 // Return cached blob store if available
72 if r.blobStore != nil {
73+ blobStore := r.blobStore
74+ r.mu.Unlock()
75 slog.Debug("Returning cached blob store", "component", "storage/blobs", "did", r.Ctx.DID, "repo", r.Ctx.Repository)
76+ return blobStore
77 }
7879 // For pull operations, check if we have a cached hold DID from a recent manifest fetch
···99100 // Create and cache proxy blob store
101 r.blobStore = NewProxyBlobStore(r.Ctx)
102+ blobStore := r.blobStore
103+ r.mu.Unlock()
104+ return blobStore
105}
106107// Tags returns the tag service