A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
at refactor 399 lines 14 kB view raw view rendered
1# Analysis: AppView SQL Database Usage 2 3## Overview 4 5The AppView uses SQLite with 19 tables. The key finding: **most data is a cache of ATProto records** that could theoretically be rebuilt from users' PDS instances. 6 7## Data Categories 8 9### 1. MUST PERSIST (Local State Only) 10 11These tables contain data that **cannot be reconstructed** from external sources: 12 13| Table | Purpose | Why It Must Persist | 14|-------|---------|---------------------| 15| `oauth_sessions` | OAuth tokens | Refresh tokens are stateful; losing them = users must re-auth | 16| `ui_sessions` | Web browser sessions | Session continuity for logged-in users | 17| `devices` | Approved devices + bcrypt secrets | User authorization decisions; secrets are one-way hashed | 18| `pending_device_auth` | In-flight auth flows | Short-lived (10min) but critical during auth | 19| `oauth_auth_requests` | OAuth flow state | Short-lived but required for auth completion | 20| `repository_stats` | Pull/push counts | **Locally tracked metrics** - not stored in ATProto | 21 22### 2. CACHED FROM PDS (Rebuildable) 23 24These tables are essentially a **read-through cache** of ATProto data: 25 26| Table | Source | ATProto Collection | 27|-------|--------|-------------------| 28| `users` | User's PDS profile | `app.bsky.actor.profile` + DID document | 29| `manifests` | User's PDS | `io.atcr.manifest` records | 30| `tags` | User's PDS | `io.atcr.tag` records | 31| `layers` | Derived from manifests | Parsed from manifest content | 32| `manifest_references` | Derived from manifest lists | Parsed from multi-arch manifests | 33| `repository_annotations` | Manifest config blob | OCI annotations from config | 34| `repo_pages` | User's PDS | `io.atcr.repo.page` records | 35| `stars` | User's PDS | `io.atcr.sailor.star` records (synced via Jetstream) | 36| `hold_captain_records` | Hold's embedded PDS | `io.atcr.hold.captain` records | 37| `hold_crew_approvals` | Hold's embedded PDS | `io.atcr.hold.crew` records | 38| `hold_crew_denials` | Local authorization cache | Could re-check on demand | 39 40### 3. OPERATIONAL 41 42| Table | Purpose | 43|-------|---------| 44| `schema_migrations` | Migration tracking | 45| `firehose_cursor` | Jetstream position (can restart from 0) | 46 47## Key Insights 48 49### What's Actually Unique to AppView? 50 511. **Authentication state** - OAuth sessions, devices, UI sessions 522. **Engagement metrics** - Pull/push counts (locally tracked, not in ATProto) 53 54### What Could Be Eliminated? 55 56If ATCR fully embraced the ATProto model: 57 581. **`users`** - Query PDS on demand (with caching) 592. **`manifests`, `tags`, `layers`** - Query PDS on demand (with caching) 603. **`repository_annotations`** - Fetch manifest config on demand 614. **`repo_pages`** - Query PDS on demand 625. **`hold_*` tables** - Query hold's PDS on demand 63 64### Trade-offs 65 66**Current approach (heavy caching):** 67- Fast queries for UI (search, browse, stats) 68- Offline resilience (PDS down doesn't break UI) 69- Complex sync logic (Jetstream consumer, backfill) 70- State can diverge from source of truth 71 72**Lighter approach (query on demand):** 73- Always fresh data 74- Simpler codebase (no sync) 75- Slower queries (network round-trips) 76- Depends on PDS availability 77 78## Current Limitation: No Cache-Miss Queries 79 80**Finding:** There's no "query PDS on cache miss" logic. Users/manifests only enter the DB via: 811. OAuth login (user authenticates) 822. Jetstream events (firehose activity) 83 84**Problem:** If someone visits `atcr.io/alice/myapp` before alice is indexed → 404 85 86**Where this happens:** 87- `pkg/appview/handlers/repository.go:50-53`: If `db.GetUserByDID()` returns nil → 404 88- No fallback to `atproto.Client.ListRecords()` or similar 89 90**This matters for Valkey migration:** If cache is ephemeral and restarts clear it, you need cache-miss logic to repopulate on demand. Otherwise: 91- Restart Valkey → all users/manifests gone 92- Wait for Jetstream to re-index OR implement cache-miss queries 93 94**Cache-miss implementation design:** 95 96Existing code to reuse: `pkg/appview/jetstream/processor.go:43-97` (`EnsureUser`) 97 98```go 99// New: pkg/appview/cache/loader.go 100 101type Loader struct { 102 cache Cache // Valkey interface 103 client *atproto.Client 104} 105 106// GetUser with cache-miss fallback 107func (l *Loader) GetUser(ctx context.Context, did string) (*User, error) { 108 // 1. Try cache 109 if user := l.cache.GetUser(did); user != nil { 110 return user, nil 111 } 112 113 // 2. Cache miss - resolve identity (already queries network) 114 _, handle, pdsEndpoint, err := atproto.ResolveIdentity(ctx, did) 115 if err != nil { 116 return nil, err // User doesn't exist in network 117 } 118 119 // 3. Fetch profile for avatar 120 client := atproto.NewClient(pdsEndpoint, "", "") 121 profile, _ := client.GetProfileRecord(ctx, did) 122 avatarURL := "" 123 if profile != nil && profile.Avatar != nil { 124 avatarURL = atproto.BlobCDNURL(did, profile.Avatar.Ref.Link) 125 } 126 127 // 4. Cache and return 128 user := &User{DID: did, Handle: handle, PDSEndpoint: pdsEndpoint, Avatar: avatarURL} 129 l.cache.SetUser(user, 1*time.Hour) 130 return user, nil 131} 132 133// GetManifestsForRepo with cache-miss fallback 134func (l *Loader) GetManifestsForRepo(ctx context.Context, did, repo string) ([]Manifest, error) { 135 cacheKey := fmt.Sprintf("manifests:%s:%s", did, repo) 136 137 // 1. Try cache 138 if cached := l.cache.Get(cacheKey); cached != nil { 139 return cached.([]Manifest), nil 140 } 141 142 // 2. Cache miss - get user's PDS endpoint 143 user, err := l.GetUser(ctx, did) 144 if err != nil { 145 return nil, err 146 } 147 148 // 3. Query PDS for manifests 149 client := atproto.NewClient(user.PDSEndpoint, "", "") 150 records, _, err := client.ListRecordsForRepo(ctx, did, atproto.ManifestCollection, 100, "") 151 if err != nil { 152 return nil, err 153 } 154 155 // 4. Filter by repository and parse 156 var manifests []Manifest 157 for _, rec := range records { 158 var m atproto.ManifestRecord 159 if err := json.Unmarshal(rec.Value, &m); err != nil { 160 continue 161 } 162 if m.Repository == repo { 163 manifests = append(manifests, convertManifest(m)) 164 } 165 } 166 167 // 5. Cache and return 168 l.cache.Set(cacheKey, manifests, 10*time.Minute) 169 return manifests, nil 170} 171``` 172 173**Handler changes:** 174```go 175// Before (repository.go:45-53): 176owner, err := db.GetUserByDID(h.DB, did) 177if owner == nil { 178 RenderNotFound(w, r, h.Templates, h.RegistryURL) 179 return 180} 181 182// After: 183owner, err := h.Loader.GetUser(r.Context(), did) 184if err != nil { 185 RenderNotFound(w, r, h.Templates, h.RegistryURL) 186 return 187} 188``` 189 190**Performance considerations:** 191- Cache hit: ~1ms (Valkey lookup) 192- Cache miss: ~200-500ms (PDS round-trip) 193- First request after restart: slower but correct 194- Jetstream still useful for proactive warming 195 196--- 197 198## Proposed Architecture: Valkey + ATProto 199 200### Goal 201Replace SQLite with Valkey (Redis-compatible) for ephemeral state, push remaining persistent data to ATProto. 202 203### What goes to Valkey (ephemeral, TTL-based) 204 205| Current Table | Valkey Key Pattern | TTL | Notes | 206|---------------|-------------------|-----|-------| 207| `oauth_sessions` | `oauth:{did}:{session_id}` | 90 days | Lost on restart = re-auth | 208| `ui_sessions` | `ui:{session_id}` | Session duration | Lost on restart = re-login | 209| `oauth_auth_requests` | `authreq:{state}` | 10 min | In-flight flows | 210| `pending_device_auth` | `pending:{device_code}` | 10 min | In-flight flows | 211| `firehose_cursor` | `cursor:jetstream` | None | Can restart from 0 | 212| All PDS cache tables | `cache:{collection}:{did}:{rkey}` | 10-60 min | Query PDS on miss | 213 214**Benefits:** 215- Multi-instance ready (shared Valkey) 216- No schema migrations 217- Natural TTL expiry 218- Simpler code (no SQL) 219 220### What could become ATProto records 221 222| Current Table | Proposed Collection | Where Stored | Open Questions | 223|---------------|---------------------|--------------|----------------| 224| `devices` | `io.atcr.sailor.device` | User's PDS | Privacy: IP, user-agent sensitive? | 225| `repository_stats` | `io.atcr.repo.stats` | Hold's PDS or User's PDS | Who owns the stats? | 226 227**Devices → Valkey:** 228- Move current device table to Valkey 229- Key: `device:{did}:{device_id}``{name, secret_hash, ip, user_agent, created_at, last_used}` 230- TTL: Long (1 year?) or no expiry 231- Device list: `devices:{did}` → Set of device IDs 232- Secret validation works the same, just different backend 233 234**Service auth exploration (future):** 235The challenge with pure ATProto service auth is the AppView still needs the user's OAuth session to write manifests to their PDS. The current flow: 2361. User authenticates via OAuth → AppView gets OAuth tokens 2372. AppView issues registry JWT to credential helper 2383. Credential helper presents JWT on each push/pull 2394. AppView uses OAuth session to write to user's PDS 240 241Service auth could work for the hold side (AppView → Hold), but not for the user's OAuth session. 242 243**Repository stats → Hold's PDS:** 244 245**Challenge discovered:** The hold's `getBlob` endpoint only receives `did` + `cid`, not the repository name. 246 247Current flow (`proxy_blob_store.go:358-362`): 248```go 249xrpcURL := fmt.Sprintf("%s%s?did=%s&cid=%s&method=%s", 250 p.holdURL, atproto.SyncGetBlob, p.ctx.DID, dgst.String(), operation) 251``` 252 253**Implementation options:** 254 255**Option A: Add repository parameter to getBlob (recommended)** 256```go 257// Modified AppView call: 258xrpcURL := fmt.Sprintf("%s%s?did=%s&cid=%s&method=%s&repo=%s", 259 p.holdURL, atproto.SyncGetBlob, p.ctx.DID, dgst.String(), operation, p.ctx.Repository) 260``` 261 262```go 263// Modified hold handler (xrpc.go:969): 264func (h *XRPCHandler) HandleGetBlob(w http.ResponseWriter, r *http.Request) { 265 did := r.URL.Query().Get("did") 266 cidOrDigest := r.URL.Query().Get("cid") 267 repo := r.URL.Query().Get("repo") // NEW 268 269 // ... existing blob handling ... 270 271 // Increment stats if repo provided 272 if repo != "" { 273 go h.pds.IncrementPullCount(did, repo) // Async, non-blocking 274 } 275} 276``` 277 278**Stats record structure:** 279``` 280Collection: io.atcr.hold.stats 281Rkey: base64(did:repository) // Deterministic, unique 282 283{ 284 "$type": "io.atcr.hold.stats", 285 "did": "did:plc:alice123", 286 "repository": "myapp", 287 "pullCount": 1542, 288 "pushCount": 47, 289 "lastPull": "2025-01-15T...", 290 "lastPush": "2025-01-10T...", 291 "createdAt": "2025-01-01T..." 292} 293``` 294 295**Hold-side implementation:** 296```go 297// New file: pkg/hold/pds/stats.go 298 299func (p *HoldPDS) IncrementPullCount(ctx context.Context, did, repo string) error { 300 rkey := statsRecordKey(did, repo) 301 302 // Get or create stats record 303 stats, err := p.GetStatsRecord(ctx, rkey) 304 if err != nil || stats == nil { 305 stats = &atproto.StatsRecord{ 306 Type: atproto.StatsCollection, 307 DID: did, 308 Repository: repo, 309 PullCount: 0, 310 PushCount: 0, 311 CreatedAt: time.Now(), 312 } 313 } 314 315 // Increment and update 316 stats.PullCount++ 317 stats.LastPull = time.Now() 318 319 _, err = p.repomgr.UpdateRecord(ctx, p.uid, atproto.StatsCollection, rkey, stats) 320 return err 321} 322``` 323 324**Query endpoint (new XRPC):** 325``` 326GET /xrpc/io.atcr.hold.getStats?did={userDID}&repo={repository} 327→ Returns JSON: { pullCount, pushCount, lastPull, lastPush } 328 329GET /xrpc/io.atcr.hold.listStats?did={userDID} 330→ Returns all stats for a user across all repos on this hold 331``` 332 333**AppView aggregation:** 334```go 335func (l *Loader) GetAggregatedStats(ctx context.Context, did, repo string) (*Stats, error) { 336 // 1. Get all holds that have served this repo 337 holdDIDs, _ := l.cache.GetHoldDIDsForRepo(did, repo) 338 339 // 2. Query each hold for stats 340 var total Stats 341 for _, holdDID := range holdDIDs { 342 holdURL := resolveHoldDID(holdDID) 343 stats, _ := queryHoldStats(ctx, holdURL, did, repo) 344 total.PullCount += stats.PullCount 345 total.PushCount += stats.PushCount 346 } 347 348 return &total, nil 349} 350``` 351 352**Files to modify:** 353- `pkg/atproto/lexicon.go` - Add `StatsCollection` + `StatsRecord` 354- `pkg/hold/pds/stats.go` - New file for stats operations 355- `pkg/hold/pds/xrpc.go` - Add `repo` param to getBlob, add stats endpoints 356- `pkg/appview/storage/proxy_blob_store.go` - Pass repository to getBlob 357- `pkg/appview/cache/loader.go` - Aggregation logic 358 359### Migration Path 360 361**Phase 1: Add Valkey infrastructure** 362- Add Valkey client to AppView 363- Create store interfaces that abstract SQLite vs Valkey 364- Dual-write OAuth sessions to both 365 366**Phase 2: Migrate sessions to Valkey** 367- OAuth sessions, UI sessions, auth requests, pending device auth 368- Remove SQLite session tables 369- Test: restart AppView, users get logged out (acceptable) 370 371**Phase 3: Migrate devices to Valkey** 372- Move device store to Valkey 373- Same data structure, different backend 374- Consider device expiry policy 375 376**Phase 4: Implement hold-side stats** 377- Add `io.atcr.hold.stats` collection to hold's embedded PDS 378- Hold increments stats on blob access 379- Add XRPC endpoint: `io.atcr.hold.getStats` 380 381**Phase 5: AppView stats aggregation** 382- Track holdDids per repo in Valkey cache 383- Query holds for stats, aggregate 384- Cache aggregated stats with TTL 385 386**Phase 6: Remove SQLite (optional)** 387- Keep SQLite as optional cache layer for UI queries 388- Or: Query PDS on demand with Valkey caching 389- Jetstream still useful for real-time updates 390 391## Summary Table 392 393| Category | Tables | % of Schema | Truly Persistent? | 394|----------|--------|-------------|-------------------| 395| Auth & Sessions + Metrics | 6 | 32% | Yes | 396| PDS Cache | 11 | 58% | No (rebuildable) | 397| Operational | 2 | 10% | No | 398 399**~58% of the database is cached ATProto data that could be rebuilt from PDSes.**