A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go

add a verify check on the relay-compare

evan.jarrett.net e6c2099a 5249c9ea

verified
+193 -23
+193 -23
cmd/relay-compare/main.go
··· 56 56 counts []int 57 57 status string // "sync", "diff", "error" 58 58 diffCount int 59 + realGaps int // verified: record exists on PDS but relay is missing it 60 + ghosts int // verified: record doesn't exist on PDS, relay has stale entry 61 + } 62 + 63 + // verifyResult holds the PDS verification result for a (DID, collection) pair. 64 + type verifyResult struct { 65 + exists bool 66 + err error 67 + } 68 + 69 + // key identifies a (collection, relay-or-DID) pair for result lookups. 70 + type key struct{ col, relay string } 71 + 72 + // diffEntry represents a DID missing from a specific relay for a collection. 73 + type diffEntry struct { 74 + did string 75 + collection string 76 + relayIdx int 59 77 } 60 78 61 79 func main() { 62 80 noColor := flag.Bool("no-color", false, "disable colored output") 81 + verify := flag.Bool("verify", false, "verify diffs against PDS to distinguish real gaps from ghost entries") 63 82 collection := flag.String("collection", "", "compare only this collection") 64 83 timeout := flag.Duration("timeout", 2*time.Minute, "timeout for all relay queries") 65 84 flag.Usage = func() { ··· 107 126 fmt.Printf("%sFetching %d collections from %d relays...%s\n", cDim, len(cols), len(relays), cReset) 108 127 109 128 // Fetch all data in parallel: every (collection, relay) pair concurrently 110 - type key struct{ col, relay string } 111 129 type fetchResult struct { 112 130 dids map[string]struct{} 113 131 err error ··· 130 148 } 131 149 wg.Wait() 132 150 133 - // Display per-collection diffs and collect summary 134 - var summary []summaryRow 135 - totalMissing := 0 151 + // Collect all diffs across collections (for optional verification) 152 + var allDiffs []diffEntry 136 153 137 - for _, col := range cols { 138 - fmt.Printf("\n%s%s━━━ %s ━━━%s\n", cBold, cCyan, col, cReset) 154 + // First pass: compute diffs per collection 155 + type colDiffs struct { 156 + hasError bool 157 + counts []int 158 + // per-relay missing DIDs (sorted) 159 + missing [][]string 160 + } 161 + colResults := make(map[string]*colDiffs) 139 162 140 - row := summaryRow{collection: col, counts: make([]int, len(relays))} 141 - hasError := false 163 + for _, col := range cols { 164 + cd := &colDiffs{counts: make([]int, len(relays)), missing: make([][]string, len(relays))} 165 + colResults[col] = cd 142 166 143 - // Show counts per relay 144 167 for ri, relay := range relays { 145 168 r := allResults[key{col, relay}] 146 169 if r.err != nil { 147 - hasError = true 148 - fmt.Printf(" %-*s %s%serror%s: %v\n", maxNameLen, names[ri], cBold, cRed, cReset, r.err) 170 + cd.hasError = true 149 171 } else { 150 - row.counts[ri] = len(r.dids) 151 - fmt.Printf(" %-*s %s%d%s DIDs\n", maxNameLen, names[ri], cBold, len(r.dids), cReset) 172 + cd.counts[ri] = len(r.dids) 152 173 } 153 174 } 154 175 155 - if hasError { 156 - row.status = "error" 157 - summary = append(summary, row) 176 + if cd.hasError { 158 177 continue 159 178 } 160 179 ··· 166 185 } 167 186 } 168 187 169 - // For each relay, show what it's missing 170 - inSync := true 171 188 for ri, relay := range relays { 172 189 var missing []string 173 190 for did := range union { ··· 175 192 missing = append(missing, did) 176 193 } 177 194 } 195 + sort.Strings(missing) 196 + cd.missing[ri] = missing 197 + for _, did := range missing { 198 + allDiffs = append(allDiffs, diffEntry{did: did, collection: col, relayIdx: ri}) 199 + } 200 + } 201 + } 202 + 203 + // Optionally verify diffs against PDS 204 + verified := make(map[key]verifyResult) 205 + if *verify && len(allDiffs) > 0 { 206 + verified = verifyDiffs(ctx, allDiffs) 207 + } 208 + 209 + // Display per-collection diffs and collect summary 210 + var summary []summaryRow 211 + totalMissing := 0 212 + totalRealGaps := 0 213 + totalGhosts := 0 214 + 215 + for _, col := range cols { 216 + fmt.Printf("\n%s%s━━━ %s ━━━%s\n", cBold, cCyan, col, cReset) 217 + 218 + cd := colResults[col] 219 + row := summaryRow{collection: col, counts: cd.counts} 220 + 221 + if cd.hasError { 222 + for ri, relay := range relays { 223 + r := allResults[key{col, relay}] 224 + if r.err != nil { 225 + fmt.Printf(" %-*s %s%serror%s: %v\n", maxNameLen, names[ri], cBold, cRed, cReset, r.err) 226 + } else { 227 + fmt.Printf(" %-*s %s%d%s DIDs\n", maxNameLen, names[ri], cBold, len(r.dids), cReset) 228 + } 229 + } 230 + row.status = "error" 231 + summary = append(summary, row) 232 + continue 233 + } 234 + 235 + // Show counts per relay 236 + for ri := range relays { 237 + fmt.Printf(" %-*s %s%d%s DIDs\n", maxNameLen, names[ri], cBold, cd.counts[ri], cReset) 238 + } 239 + 240 + // Show missing DIDs per relay 241 + inSync := true 242 + for ri := range relays { 243 + missing := cd.missing[ri] 178 244 if len(missing) == 0 { 179 245 continue 180 246 } ··· 182 248 inSync = false 183 249 totalMissing += len(missing) 184 250 row.diffCount += len(missing) 185 - sort.Strings(missing) 186 251 187 252 fmt.Printf("\n %sMissing from %s (%d):%s\n", cRed, names[ri], len(missing), cReset) 188 253 for _, did := range missing { 189 - fmt.Printf(" %s- %s%s\n", cRed, did, cReset) 254 + suffix := "" 255 + if *verify { 256 + vr, ok := verified[key{col, did}] 257 + if !ok { 258 + suffix = fmt.Sprintf(" %s(verify: unknown)%s", cDim, cReset) 259 + } else if vr.err != nil { 260 + suffix = fmt.Sprintf(" %s(verify: %s)%s", cDim, vr.err, cReset) 261 + } else if vr.exists { 262 + suffix = fmt.Sprintf(" %s← real gap%s", cRed, cReset) 263 + row.realGaps++ 264 + totalRealGaps++ 265 + } else { 266 + suffix = fmt.Sprintf(" %s← ghost (not on PDS)%s", cDim, cReset) 267 + row.ghosts++ 268 + totalGhosts++ 269 + } 270 + } 271 + fmt.Printf(" %s- %s%s%s\n", cRed, did, cReset, suffix) 190 272 } 191 273 } 192 274 ··· 200 282 } 201 283 202 284 // Summary table 203 - printSummary(summary, names, maxNameLen, totalMissing) 285 + printSummary(summary, names, maxNameLen, totalMissing, *verify, totalRealGaps, totalGhosts) 204 286 } 205 287 206 - func printSummary(rows []summaryRow, names []string, maxNameLen, totalMissing int) { 288 + func printSummary(rows []summaryRow, names []string, maxNameLen, totalMissing int, showVerify bool, totalRealGaps, totalGhosts int) { 207 289 fmt.Printf("\n%s%s━━━ Summary ━━━%s\n\n", cBold, cCyan, cReset) 208 290 209 291 colW := 28 ··· 241 323 case "sync": 242 324 fmt.Printf(" %s✓ in sync%s", cGreen, cReset) 243 325 case "diff": 244 - fmt.Printf(" %s≠ %d missing%s", cYellow, row.diffCount, cReset) 326 + if showVerify { 327 + fmt.Printf(" %s≠ %d missing%s %s(%d real, %d ghost)%s", 328 + cYellow, row.diffCount, cReset, cDim, row.realGaps, row.ghosts, cReset) 329 + } else { 330 + fmt.Printf(" %s≠ %d missing%s", cYellow, row.diffCount, cReset) 331 + } 245 332 case "error": 246 333 fmt.Printf(" %s✗ error%s", cRed, cReset) 247 334 } ··· 252 339 fmt.Println() 253 340 if totalMissing > 0 { 254 341 fmt.Printf("%s%d total missing DID-collection pairs across relays%s\n", cYellow, totalMissing, cReset) 342 + if showVerify { 343 + fmt.Printf(" %s%d real gaps%s (record exists on PDS), %s%d ghosts%s (record deleted from PDS)\n", 344 + cRed, totalRealGaps, cReset, cDim, totalGhosts, cReset) 345 + } 255 346 } else { 256 347 fmt.Printf("%s✓ All relays fully in sync%s\n", cGreen, cReset) 257 348 } 349 + } 350 + 351 + // verifyDiffs resolves each diff DID to its PDS and checks if records actually exist. 352 + func verifyDiffs(ctx context.Context, diffs []diffEntry) map[key]verifyResult { 353 + // Collect unique (DID, collection) pairs to verify 354 + type didCol struct{ did, col string } 355 + unique := make(map[didCol]struct{}) 356 + for _, d := range diffs { 357 + unique[didCol{d.did, d.collection}] = struct{}{} 358 + } 359 + 360 + // Resolve unique DIDs to PDS endpoints (deduplicate across collections) 361 + uniqueDIDs := make(map[string]struct{}) 362 + for dc := range unique { 363 + uniqueDIDs[dc.did] = struct{}{} 364 + } 365 + 366 + fmt.Printf("\n%sVerifying %d DID-collection pairs (%d unique DIDs)...%s\n", cDim, len(unique), len(uniqueDIDs), cReset) 367 + 368 + pdsEndpoints := make(map[string]string) // DID → PDS URL 369 + pdsErrors := make(map[string]error) // DID → resolution error 370 + var mu sync.Mutex 371 + var wg sync.WaitGroup 372 + sem := make(chan struct{}, 10) // concurrency limit 373 + 374 + for did := range uniqueDIDs { 375 + wg.Add(1) 376 + go func(did string) { 377 + defer wg.Done() 378 + sem <- struct{}{} 379 + defer func() { <-sem }() 380 + 381 + pds, err := atproto.ResolveDIDToPDS(ctx, did) 382 + mu.Lock() 383 + if err != nil { 384 + pdsErrors[did] = err 385 + } else { 386 + pdsEndpoints[did] = pds 387 + } 388 + mu.Unlock() 389 + }(did) 390 + } 391 + wg.Wait() 392 + 393 + // Check each (DID, collection) pair against the resolved PDS 394 + results := make(map[key]verifyResult) 395 + 396 + for dc := range unique { 397 + wg.Add(1) 398 + go func(dc didCol) { 399 + defer wg.Done() 400 + sem <- struct{}{} 401 + defer func() { <-sem }() 402 + 403 + k := key{dc.col, dc.did} 404 + 405 + // Check if DID resolution failed 406 + if err, ok := pdsErrors[dc.did]; ok { 407 + mu.Lock() 408 + results[k] = verifyResult{err: fmt.Errorf("DID resolution failed: %w", err)} 409 + mu.Unlock() 410 + return 411 + } 412 + 413 + pds := pdsEndpoints[dc.did] 414 + client := atproto.NewClient(pds, "", "") 415 + records, _, err := client.ListRecordsForRepo(ctx, dc.did, dc.col, 1, "") 416 + mu.Lock() 417 + if err != nil { 418 + results[k] = verifyResult{err: err} 419 + } else { 420 + results[k] = verifyResult{exists: len(records) > 0} 421 + } 422 + mu.Unlock() 423 + }(dc) 424 + } 425 + wg.Wait() 426 + 427 + return results 258 428 } 259 429 260 430 // fetchAllDIDs paginates through listReposByCollection to collect all DIDs.