A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
atcr.io
docker
container
atproto
go
1# Analysis: AppView SQL Database Usage
2
3## Overview
4
5The AppView uses SQLite with 19 tables. The key finding: **most data is a cache of ATProto records** that could theoretically be rebuilt from users' PDS instances.
6
7## Data Categories
8
9### 1. MUST PERSIST (Local State Only)
10
11These tables contain data that **cannot be reconstructed** from external sources:
12
13| Table | Purpose | Why It Must Persist |
14|-------|---------|---------------------|
15| `oauth_sessions` | OAuth tokens | Refresh tokens are stateful; losing them = users must re-auth |
16| `ui_sessions` | Web browser sessions | Session continuity for logged-in users |
17| `devices` | Approved devices + bcrypt secrets | User authorization decisions; secrets are one-way hashed |
18| `pending_device_auth` | In-flight auth flows | Short-lived (10min) but critical during auth |
19| `oauth_auth_requests` | OAuth flow state | Short-lived but required for auth completion |
20| `repository_stats` | Pull/push counts | **Locally tracked metrics** - not stored in ATProto |
21
22### 2. CACHED FROM PDS (Rebuildable)
23
24These tables are essentially a **read-through cache** of ATProto data:
25
26| Table | Source | ATProto Collection |
27|-------|--------|-------------------|
28| `users` | User's PDS profile | `app.bsky.actor.profile` + DID document |
29| `manifests` | User's PDS | `io.atcr.manifest` records |
30| `tags` | User's PDS | `io.atcr.tag` records |
31| `layers` | Derived from manifests | Parsed from manifest content |
32| `manifest_references` | Derived from manifest lists | Parsed from multi-arch manifests |
33| `repository_annotations` | Manifest config blob | OCI annotations from config |
34| `repo_pages` | User's PDS | `io.atcr.repo.page` records |
35| `stars` | User's PDS | `io.atcr.sailor.star` records (synced via Jetstream) |
36| `hold_captain_records` | Hold's embedded PDS | `io.atcr.hold.captain` records |
37| `hold_crew_approvals` | Hold's embedded PDS | `io.atcr.hold.crew` records |
38| `hold_crew_denials` | Local authorization cache | Could re-check on demand |
39
40### 3. OPERATIONAL
41
42| Table | Purpose |
43|-------|---------|
44| `schema_migrations` | Migration tracking |
45| `firehose_cursor` | Jetstream position (can restart from 0) |
46
47## Key Insights
48
49### What's Actually Unique to AppView?
50
511. **Authentication state** - OAuth sessions, devices, UI sessions
522. **Engagement metrics** - Pull/push counts (locally tracked, not in ATProto)
53
54### What Could Be Eliminated?
55
56If ATCR fully embraced the ATProto model:
57
581. **`users`** - Query PDS on demand (with caching)
592. **`manifests`, `tags`, `layers`** - Query PDS on demand (with caching)
603. **`repository_annotations`** - Fetch manifest config on demand
614. **`repo_pages`** - Query PDS on demand
625. **`hold_*` tables** - Query hold's PDS on demand
63
64### Trade-offs
65
66**Current approach (heavy caching):**
67- Fast queries for UI (search, browse, stats)
68- Offline resilience (PDS down doesn't break UI)
69- Complex sync logic (Jetstream consumer, backfill)
70- State can diverge from source of truth
71
72**Lighter approach (query on demand):**
73- Always fresh data
74- Simpler codebase (no sync)
75- Slower queries (network round-trips)
76- Depends on PDS availability
77
78## Current Limitation: No Cache-Miss Queries
79
80**Finding:** There's no "query PDS on cache miss" logic. Users/manifests only enter the DB via:
811. OAuth login (user authenticates)
822. Jetstream events (firehose activity)
83
84**Problem:** If someone visits `atcr.io/alice/myapp` before alice is indexed → 404
85
86**Where this happens:**
87- `pkg/appview/handlers/repository.go:50-53`: If `db.GetUserByDID()` returns nil → 404
88- No fallback to `atproto.Client.ListRecords()` or similar
89
90**This matters for Valkey migration:** If cache is ephemeral and restarts clear it, you need cache-miss logic to repopulate on demand. Otherwise:
91- Restart Valkey → all users/manifests gone
92- Wait for Jetstream to re-index OR implement cache-miss queries
93
94**Cache-miss implementation design:**
95
96Existing code to reuse: `pkg/appview/jetstream/processor.go:43-97` (`EnsureUser`)
97
98```go
99// New: pkg/appview/cache/loader.go
100
101type Loader struct {
102 cache Cache // Valkey interface
103 client *atproto.Client
104}
105
106// GetUser with cache-miss fallback
107func (l *Loader) GetUser(ctx context.Context, did string) (*User, error) {
108 // 1. Try cache
109 if user := l.cache.GetUser(did); user != nil {
110 return user, nil
111 }
112
113 // 2. Cache miss - resolve identity (already queries network)
114 _, handle, pdsEndpoint, err := atproto.ResolveIdentity(ctx, did)
115 if err != nil {
116 return nil, err // User doesn't exist in network
117 }
118
119 // 3. Fetch profile for avatar
120 client := atproto.NewClient(pdsEndpoint, "", "")
121 profile, _ := client.GetProfileRecord(ctx, did)
122 avatarURL := ""
123 if profile != nil && profile.Avatar != nil {
124 avatarURL = atproto.BlobCDNURL(did, profile.Avatar.Ref.Link)
125 }
126
127 // 4. Cache and return
128 user := &User{DID: did, Handle: handle, PDSEndpoint: pdsEndpoint, Avatar: avatarURL}
129 l.cache.SetUser(user, 1*time.Hour)
130 return user, nil
131}
132
133// GetManifestsForRepo with cache-miss fallback
134func (l *Loader) GetManifestsForRepo(ctx context.Context, did, repo string) ([]Manifest, error) {
135 cacheKey := fmt.Sprintf("manifests:%s:%s", did, repo)
136
137 // 1. Try cache
138 if cached := l.cache.Get(cacheKey); cached != nil {
139 return cached.([]Manifest), nil
140 }
141
142 // 2. Cache miss - get user's PDS endpoint
143 user, err := l.GetUser(ctx, did)
144 if err != nil {
145 return nil, err
146 }
147
148 // 3. Query PDS for manifests
149 client := atproto.NewClient(user.PDSEndpoint, "", "")
150 records, _, err := client.ListRecordsForRepo(ctx, did, atproto.ManifestCollection, 100, "")
151 if err != nil {
152 return nil, err
153 }
154
155 // 4. Filter by repository and parse
156 var manifests []Manifest
157 for _, rec := range records {
158 var m atproto.ManifestRecord
159 if err := json.Unmarshal(rec.Value, &m); err != nil {
160 continue
161 }
162 if m.Repository == repo {
163 manifests = append(manifests, convertManifest(m))
164 }
165 }
166
167 // 5. Cache and return
168 l.cache.Set(cacheKey, manifests, 10*time.Minute)
169 return manifests, nil
170}
171```
172
173**Handler changes:**
174```go
175// Before (repository.go:45-53):
176owner, err := db.GetUserByDID(h.DB, did)
177if owner == nil {
178 RenderNotFound(w, r, h.Templates, h.RegistryURL)
179 return
180}
181
182// After:
183owner, err := h.Loader.GetUser(r.Context(), did)
184if err != nil {
185 RenderNotFound(w, r, h.Templates, h.RegistryURL)
186 return
187}
188```
189
190**Performance considerations:**
191- Cache hit: ~1ms (Valkey lookup)
192- Cache miss: ~200-500ms (PDS round-trip)
193- First request after restart: slower but correct
194- Jetstream still useful for proactive warming
195
196---
197
198## Proposed Architecture: Valkey + ATProto
199
200### Goal
201Replace SQLite with Valkey (Redis-compatible) for ephemeral state, push remaining persistent data to ATProto.
202
203### What goes to Valkey (ephemeral, TTL-based)
204
205| Current Table | Valkey Key Pattern | TTL | Notes |
206|---------------|-------------------|-----|-------|
207| `oauth_sessions` | `oauth:{did}:{session_id}` | 90 days | Lost on restart = re-auth |
208| `ui_sessions` | `ui:{session_id}` | Session duration | Lost on restart = re-login |
209| `oauth_auth_requests` | `authreq:{state}` | 10 min | In-flight flows |
210| `pending_device_auth` | `pending:{device_code}` | 10 min | In-flight flows |
211| `firehose_cursor` | `cursor:jetstream` | None | Can restart from 0 |
212| All PDS cache tables | `cache:{collection}:{did}:{rkey}` | 10-60 min | Query PDS on miss |
213
214**Benefits:**
215- Multi-instance ready (shared Valkey)
216- No schema migrations
217- Natural TTL expiry
218- Simpler code (no SQL)
219
220### What could become ATProto records
221
222| Current Table | Proposed Collection | Where Stored | Open Questions |
223|---------------|---------------------|--------------|----------------|
224| `devices` | `io.atcr.sailor.device` | User's PDS | Privacy: IP, user-agent sensitive? |
225| `repository_stats` | `io.atcr.repo.stats` | Hold's PDS or User's PDS | Who owns the stats? |
226
227**Devices → Valkey:**
228- Move current device table to Valkey
229- Key: `device:{did}:{device_id}` → `{name, secret_hash, ip, user_agent, created_at, last_used}`
230- TTL: Long (1 year?) or no expiry
231- Device list: `devices:{did}` → Set of device IDs
232- Secret validation works the same, just different backend
233
234**Service auth exploration (future):**
235The challenge with pure ATProto service auth is the AppView still needs the user's OAuth session to write manifests to their PDS. The current flow:
2361. User authenticates via OAuth → AppView gets OAuth tokens
2372. AppView issues registry JWT to credential helper
2383. Credential helper presents JWT on each push/pull
2394. AppView uses OAuth session to write to user's PDS
240
241Service auth could work for the hold side (AppView → Hold), but not for the user's OAuth session.
242
243**Repository stats → Hold's PDS:**
244
245**Challenge discovered:** The hold's `getBlob` endpoint only receives `did` + `cid`, not the repository name.
246
247Current flow (`proxy_blob_store.go:358-362`):
248```go
249xrpcURL := fmt.Sprintf("%s%s?did=%s&cid=%s&method=%s",
250 p.holdURL, atproto.SyncGetBlob, p.ctx.DID, dgst.String(), operation)
251```
252
253**Implementation options:**
254
255**Option A: Add repository parameter to getBlob (recommended)**
256```go
257// Modified AppView call:
258xrpcURL := fmt.Sprintf("%s%s?did=%s&cid=%s&method=%s&repo=%s",
259 p.holdURL, atproto.SyncGetBlob, p.ctx.DID, dgst.String(), operation, p.ctx.Repository)
260```
261
262```go
263// Modified hold handler (xrpc.go:969):
264func (h *XRPCHandler) HandleGetBlob(w http.ResponseWriter, r *http.Request) {
265 did := r.URL.Query().Get("did")
266 cidOrDigest := r.URL.Query().Get("cid")
267 repo := r.URL.Query().Get("repo") // NEW
268
269 // ... existing blob handling ...
270
271 // Increment stats if repo provided
272 if repo != "" {
273 go h.pds.IncrementPullCount(did, repo) // Async, non-blocking
274 }
275}
276```
277
278**Stats record structure:**
279```
280Collection: io.atcr.hold.stats
281Rkey: base64(did:repository) // Deterministic, unique
282
283{
284 "$type": "io.atcr.hold.stats",
285 "did": "did:plc:alice123",
286 "repository": "myapp",
287 "pullCount": 1542,
288 "pushCount": 47,
289 "lastPull": "2025-01-15T...",
290 "lastPush": "2025-01-10T...",
291 "createdAt": "2025-01-01T..."
292}
293```
294
295**Hold-side implementation:**
296```go
297// New file: pkg/hold/pds/stats.go
298
299func (p *HoldPDS) IncrementPullCount(ctx context.Context, did, repo string) error {
300 rkey := statsRecordKey(did, repo)
301
302 // Get or create stats record
303 stats, err := p.GetStatsRecord(ctx, rkey)
304 if err != nil || stats == nil {
305 stats = &atproto.StatsRecord{
306 Type: atproto.StatsCollection,
307 DID: did,
308 Repository: repo,
309 PullCount: 0,
310 PushCount: 0,
311 CreatedAt: time.Now(),
312 }
313 }
314
315 // Increment and update
316 stats.PullCount++
317 stats.LastPull = time.Now()
318
319 _, err = p.repomgr.UpdateRecord(ctx, p.uid, atproto.StatsCollection, rkey, stats)
320 return err
321}
322```
323
324**Query endpoint (new XRPC):**
325```
326GET /xrpc/io.atcr.hold.getStats?did={userDID}&repo={repository}
327→ Returns JSON: { pullCount, pushCount, lastPull, lastPush }
328
329GET /xrpc/io.atcr.hold.listStats?did={userDID}
330→ Returns all stats for a user across all repos on this hold
331```
332
333**AppView aggregation:**
334```go
335func (l *Loader) GetAggregatedStats(ctx context.Context, did, repo string) (*Stats, error) {
336 // 1. Get all holds that have served this repo
337 holdDIDs, _ := l.cache.GetHoldDIDsForRepo(did, repo)
338
339 // 2. Query each hold for stats
340 var total Stats
341 for _, holdDID := range holdDIDs {
342 holdURL := resolveHoldDID(holdDID)
343 stats, _ := queryHoldStats(ctx, holdURL, did, repo)
344 total.PullCount += stats.PullCount
345 total.PushCount += stats.PushCount
346 }
347
348 return &total, nil
349}
350```
351
352**Files to modify:**
353- `pkg/atproto/lexicon.go` - Add `StatsCollection` + `StatsRecord`
354- `pkg/hold/pds/stats.go` - New file for stats operations
355- `pkg/hold/pds/xrpc.go` - Add `repo` param to getBlob, add stats endpoints
356- `pkg/appview/storage/proxy_blob_store.go` - Pass repository to getBlob
357- `pkg/appview/cache/loader.go` - Aggregation logic
358
359### Migration Path
360
361**Phase 1: Add Valkey infrastructure**
362- Add Valkey client to AppView
363- Create store interfaces that abstract SQLite vs Valkey
364- Dual-write OAuth sessions to both
365
366**Phase 2: Migrate sessions to Valkey**
367- OAuth sessions, UI sessions, auth requests, pending device auth
368- Remove SQLite session tables
369- Test: restart AppView, users get logged out (acceptable)
370
371**Phase 3: Migrate devices to Valkey**
372- Move device store to Valkey
373- Same data structure, different backend
374- Consider device expiry policy
375
376**Phase 4: Implement hold-side stats**
377- Add `io.atcr.hold.stats` collection to hold's embedded PDS
378- Hold increments stats on blob access
379- Add XRPC endpoint: `io.atcr.hold.getStats`
380
381**Phase 5: AppView stats aggregation**
382- Track holdDids per repo in Valkey cache
383- Query holds for stats, aggregate
384- Cache aggregated stats with TTL
385
386**Phase 6: Remove SQLite (optional)**
387- Keep SQLite as optional cache layer for UI queries
388- Or: Query PDS on demand with Valkey caching
389- Jetstream still useful for real-time updates
390
391## Summary Table
392
393| Category | Tables | % of Schema | Truly Persistent? |
394|----------|--------|-------------|-------------------|
395| Auth & Sessions + Metrics | 6 | 32% | Yes |
396| PDS Cache | 11 | 58% | No (rebuildable) |
397| Operational | 2 | 10% | No |
398
399**~58% of the database is cached ATProto data that could be rebuilt from PDSes.**