tangled
alpha
login
or
join now
evan.jarrett.net
/
at-container-registry
66
fork
atom
A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
atcr.io
docker
container
atproto
go
66
fork
atom
overview
issues
1
pulls
pipelines
add relay-compare tool
evan.jarrett.net
2 weeks ago
5249c9ea
2b9ea997
verified
This commit was signed with the committer's
known signature
.
evan.jarrett.net
SSH Key Fingerprint:
SHA256:bznk0uVPp7XFOl67P0uTM1pCjf2A4ojeP/lsUE7uauQ=
0/2
lint.yaml
failed
5min 2s
tests.yml
failed
5min 2s
+292
1 changed file
expand all
collapse all
unified
split
cmd
relay-compare
main.go
+292
cmd/relay-compare/main.go
···
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
···
1
+
// relay-compare compares ATProto relays by querying listReposByCollection
2
+
// for all io.atcr.* record types and showing what's missing from each relay.
3
+
//
4
+
// Usage:
5
+
//
6
+
// go run ./cmd/relay-compare https://relay1.us-east.bsky.network https://relay1.us-west.bsky.network
7
+
package main
8
+
9
+
import (
10
+
"context"
11
+
"flag"
12
+
"fmt"
13
+
"net/url"
14
+
"os"
15
+
"sort"
16
+
"strings"
17
+
"sync"
18
+
"time"
19
+
20
+
"atcr.io/pkg/atproto"
21
+
)
22
+
23
+
// ANSI color codes (disabled via --no-color or NO_COLOR env)
24
+
var (
25
+
cRed = "\033[31m"
26
+
cGreen = "\033[32m"
27
+
cYellow = "\033[33m"
28
+
cCyan = "\033[36m"
29
+
cBold = "\033[1m"
30
+
cDim = "\033[2m"
31
+
cReset = "\033[0m"
32
+
)
33
+
34
+
func disableColors() {
35
+
cRed, cGreen, cYellow, cCyan, cBold, cDim, cReset = "", "", "", "", "", "", ""
36
+
}
37
+
38
+
// All io.atcr.* collections to compare
39
+
var allCollections = []string{
40
+
atproto.ManifestCollection, // io.atcr.manifest
41
+
atproto.TagCollection, // io.atcr.tag
42
+
atproto.SailorProfileCollection, // io.atcr.sailor.profile
43
+
atproto.StarCollection, // io.atcr.sailor.star
44
+
atproto.SailorWebhookCollection, // io.atcr.sailor.webhook
45
+
atproto.RepoPageCollection, // io.atcr.repo.page
46
+
atproto.CaptainCollection, // io.atcr.hold.captain
47
+
atproto.CrewCollection, // io.atcr.hold.crew
48
+
atproto.LayerCollection, // io.atcr.hold.layer
49
+
atproto.StatsCollection, // io.atcr.hold.stats
50
+
atproto.ScanCollection, // io.atcr.hold.scan
51
+
atproto.WebhookCollection, // io.atcr.hold.webhook
52
+
}
53
+
54
+
type summaryRow struct {
55
+
collection string
56
+
counts []int
57
+
status string // "sync", "diff", "error"
58
+
diffCount int
59
+
}
60
+
61
+
func main() {
62
+
noColor := flag.Bool("no-color", false, "disable colored output")
63
+
collection := flag.String("collection", "", "compare only this collection")
64
+
timeout := flag.Duration("timeout", 2*time.Minute, "timeout for all relay queries")
65
+
flag.Usage = func() {
66
+
fmt.Fprintf(os.Stderr, "Compare ATProto relays by querying listReposByCollection for io.atcr.* records.\n\n")
67
+
fmt.Fprintf(os.Stderr, "Usage:\n relay-compare [flags] <relay-url> <relay-url> [relay-url...]\n\n")
68
+
fmt.Fprintf(os.Stderr, "Example:\n")
69
+
fmt.Fprintf(os.Stderr, " go run ./cmd/relay-compare https://relay1.us-east.bsky.network https://relay1.us-west.bsky.network\n\n")
70
+
fmt.Fprintf(os.Stderr, "Flags:\n")
71
+
flag.PrintDefaults()
72
+
}
73
+
flag.Parse()
74
+
75
+
if *noColor || os.Getenv("NO_COLOR") != "" {
76
+
disableColors()
77
+
}
78
+
79
+
relays := flag.Args()
80
+
if len(relays) < 2 {
81
+
flag.Usage()
82
+
os.Exit(1)
83
+
}
84
+
85
+
for i, r := range relays {
86
+
relays[i] = strings.TrimRight(r, "/")
87
+
}
88
+
89
+
cols := allCollections
90
+
if *collection != "" {
91
+
cols = []string{*collection}
92
+
}
93
+
94
+
ctx, cancel := context.WithTimeout(context.Background(), *timeout)
95
+
defer cancel()
96
+
97
+
// Short display names for each relay
98
+
names := make([]string, len(relays))
99
+
maxNameLen := 0
100
+
for i, r := range relays {
101
+
names[i] = shortName(r)
102
+
if len(names[i]) > maxNameLen {
103
+
maxNameLen = len(names[i])
104
+
}
105
+
}
106
+
107
+
fmt.Printf("%sFetching %d collections from %d relays...%s\n", cDim, len(cols), len(relays), cReset)
108
+
109
+
// Fetch all data in parallel: every (collection, relay) pair concurrently
110
+
type key struct{ col, relay string }
111
+
type fetchResult struct {
112
+
dids map[string]struct{}
113
+
err error
114
+
}
115
+
allResults := make(map[key]fetchResult)
116
+
var mu sync.Mutex
117
+
var wg sync.WaitGroup
118
+
119
+
for _, col := range cols {
120
+
for _, relay := range relays {
121
+
wg.Add(1)
122
+
go func(col, relay string) {
123
+
defer wg.Done()
124
+
dids, err := fetchAllDIDs(ctx, relay, col)
125
+
mu.Lock()
126
+
allResults[key{col, relay}] = fetchResult{dids, err}
127
+
mu.Unlock()
128
+
}(col, relay)
129
+
}
130
+
}
131
+
wg.Wait()
132
+
133
+
// Display per-collection diffs and collect summary
134
+
var summary []summaryRow
135
+
totalMissing := 0
136
+
137
+
for _, col := range cols {
138
+
fmt.Printf("\n%s%s━━━ %s ━━━%s\n", cBold, cCyan, col, cReset)
139
+
140
+
row := summaryRow{collection: col, counts: make([]int, len(relays))}
141
+
hasError := false
142
+
143
+
// Show counts per relay
144
+
for ri, relay := range relays {
145
+
r := allResults[key{col, relay}]
146
+
if r.err != nil {
147
+
hasError = true
148
+
fmt.Printf(" %-*s %s%serror%s: %v\n", maxNameLen, names[ri], cBold, cRed, cReset, r.err)
149
+
} else {
150
+
row.counts[ri] = len(r.dids)
151
+
fmt.Printf(" %-*s %s%d%s DIDs\n", maxNameLen, names[ri], cBold, len(r.dids), cReset)
152
+
}
153
+
}
154
+
155
+
if hasError {
156
+
row.status = "error"
157
+
summary = append(summary, row)
158
+
continue
159
+
}
160
+
161
+
// Build union of all DIDs across relays
162
+
union := make(map[string]struct{})
163
+
for _, relay := range relays {
164
+
for did := range allResults[key{col, relay}].dids {
165
+
union[did] = struct{}{}
166
+
}
167
+
}
168
+
169
+
// For each relay, show what it's missing
170
+
inSync := true
171
+
for ri, relay := range relays {
172
+
var missing []string
173
+
for did := range union {
174
+
if _, ok := allResults[key{col, relay}].dids[did]; !ok {
175
+
missing = append(missing, did)
176
+
}
177
+
}
178
+
if len(missing) == 0 {
179
+
continue
180
+
}
181
+
182
+
inSync = false
183
+
totalMissing += len(missing)
184
+
row.diffCount += len(missing)
185
+
sort.Strings(missing)
186
+
187
+
fmt.Printf("\n %sMissing from %s (%d):%s\n", cRed, names[ri], len(missing), cReset)
188
+
for _, did := range missing {
189
+
fmt.Printf(" %s- %s%s\n", cRed, did, cReset)
190
+
}
191
+
}
192
+
193
+
if inSync {
194
+
fmt.Printf(" %s✓ in sync%s\n", cGreen, cReset)
195
+
row.status = "sync"
196
+
} else {
197
+
row.status = "diff"
198
+
}
199
+
summary = append(summary, row)
200
+
}
201
+
202
+
// Summary table
203
+
printSummary(summary, names, maxNameLen, totalMissing)
204
+
}
205
+
206
+
func printSummary(rows []summaryRow, names []string, maxNameLen, totalMissing int) {
207
+
fmt.Printf("\n%s%s━━━ Summary ━━━%s\n\n", cBold, cCyan, cReset)
208
+
209
+
colW := 28
210
+
relayW := maxNameLen + 2
211
+
if relayW < 8 {
212
+
relayW = 8
213
+
}
214
+
215
+
// Header
216
+
fmt.Printf(" %-*s", colW, "Collection")
217
+
for _, name := range names {
218
+
fmt.Printf(" %*s", relayW, name)
219
+
}
220
+
fmt.Printf(" Status\n")
221
+
222
+
// Separator
223
+
fmt.Printf(" %s", strings.Repeat("─", colW))
224
+
for range names {
225
+
fmt.Printf(" %s", strings.Repeat("─", relayW))
226
+
}
227
+
fmt.Printf(" %s\n", strings.Repeat("─", 14))
228
+
229
+
// Data rows
230
+
for _, row := range rows {
231
+
fmt.Printf(" %-*s", colW, row.collection)
232
+
for _, c := range row.counts {
233
+
switch row.status {
234
+
case "error":
235
+
fmt.Printf(" %*s", relayW, fmt.Sprintf("%s—%s", cDim, cReset))
236
+
default:
237
+
fmt.Printf(" %*d", relayW, c)
238
+
}
239
+
}
240
+
switch row.status {
241
+
case "sync":
242
+
fmt.Printf(" %s✓ in sync%s", cGreen, cReset)
243
+
case "diff":
244
+
fmt.Printf(" %s≠ %d missing%s", cYellow, row.diffCount, cReset)
245
+
case "error":
246
+
fmt.Printf(" %s✗ error%s", cRed, cReset)
247
+
}
248
+
fmt.Println()
249
+
}
250
+
251
+
// Footer
252
+
fmt.Println()
253
+
if totalMissing > 0 {
254
+
fmt.Printf("%s%d total missing DID-collection pairs across relays%s\n", cYellow, totalMissing, cReset)
255
+
} else {
256
+
fmt.Printf("%s✓ All relays fully in sync%s\n", cGreen, cReset)
257
+
}
258
+
}
259
+
260
+
// fetchAllDIDs paginates through listReposByCollection to collect all DIDs.
261
+
func fetchAllDIDs(ctx context.Context, relay, collection string) (map[string]struct{}, error) {
262
+
client := atproto.NewClient(relay, "", "")
263
+
dids := make(map[string]struct{})
264
+
var cursor string
265
+
266
+
for {
267
+
result, err := client.ListReposByCollection(ctx, collection, 1000, cursor)
268
+
if err != nil {
269
+
return dids, err
270
+
}
271
+
272
+
for _, repo := range result.Repos {
273
+
dids[repo.DID] = struct{}{}
274
+
}
275
+
276
+
if result.Cursor == "" {
277
+
break
278
+
}
279
+
cursor = result.Cursor
280
+
}
281
+
282
+
return dids, nil
283
+
}
284
+
285
+
// shortName extracts the hostname from a relay URL for display.
286
+
func shortName(relayURL string) string {
287
+
u, err := url.Parse(relayURL)
288
+
if err != nil {
289
+
return relayURL
290
+
}
291
+
return u.Hostname()
292
+
}