tangled
alpha
login
or
join now
angrydutchman.peedee.es
/
plcbundle
forked from
atscan.net/plcbundle
0
fork
atom
A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
0
fork
atom
overview
issues
pulls
pipelines
resolve metrics
tree.fail
4 months ago
c571e982
21f8aea9
+199
-64
5 changed files
expand all
collapse all
unified
split
bundle
manager.go
types.go
cmd
plcbundle
commands
index.go
internal
mempool
mempool.go
server
handlers.go
+101
-22
bundle/manager.go
···
288
288
mempool: mempool,
289
289
didIndex: didIndex, // Updated type
290
290
bundleCache: make(map[int]*Bundle),
291
291
-
maxCacheSize: 2,
291
291
+
maxCacheSize: 10,
292
292
syncer: fetcher,
293
293
cloner: cloner,
294
294
}, nil
···
773
773
774
774
// LoadOperation loads a single operation from a bundle efficiently
775
775
func (m *Manager) LoadOperation(ctx context.Context, bundleNumber int, position int) (*plcclient.PLCOperation, error) {
776
776
+
// Validate position
777
777
+
if position < 0 || position >= types.BUNDLE_SIZE {
778
778
+
return nil, fmt.Errorf("invalid position: %d (must be 0-%d)", position, types.BUNDLE_SIZE-1)
779
779
+
}
780
780
+
776
781
// Validate bundle exists in index
777
782
_, err := m.index.GetBundle(bundleNumber)
778
783
if err != nil {
779
784
return nil, fmt.Errorf("bundle not in index: %w", err)
780
785
}
781
786
782
782
-
// Validate position
783
783
-
if position < 0 || position >= types.BUNDLE_SIZE {
784
784
-
return nil, fmt.Errorf("invalid position: %d (must be 0-%d)", position, types.BUNDLE_SIZE-1)
785
785
-
}
786
786
-
787
787
// Build file path
788
788
path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
789
789
if !m.operations.FileExists(path) {
790
790
return nil, fmt.Errorf("bundle file not found: %s", path)
791
791
}
792
792
793
793
-
// Load just the one operation
793
793
+
// Load just the one operation (optimized - decompresses only until position)
794
794
return m.operations.LoadOperationAtPosition(path, position)
795
795
}
796
796
···
966
966
return allOps, nil
967
967
}
968
968
969
969
-
// getDIDOperationsFromMempool retrieves operations for a DID from mempool only
969
969
+
// GetDIDOperationsFromMempool retrieves operations for a DID from mempool only
970
970
func (m *Manager) GetDIDOperationsFromMempool(did string) ([]plcclient.PLCOperation, error) {
971
971
if m.mempool == nil {
972
972
return []plcclient.PLCOperation{}, nil
973
973
}
974
974
975
975
-
allMempoolOps, err := m.GetMempoolOperations()
976
976
-
if err != nil {
977
977
-
return nil, err
978
978
-
}
979
979
-
980
980
-
matchingOps := make([]plcclient.PLCOperation, 0, 16)
981
981
-
982
982
-
for _, op := range allMempoolOps {
983
983
-
if op.DID == did {
984
984
-
matchingOps = append(matchingOps, op)
985
985
-
}
986
986
-
}
987
987
-
988
988
-
return matchingOps, nil
975
975
+
// Use direct search - only copies matching operations
976
976
+
return m.mempool.FindDIDOperations(did), nil
989
977
}
990
978
991
979
// GetLatestDIDOperation returns only the most recent non-nullified operation
···
1221
1209
// Delegate to cloner with inline callback
1222
1210
return m.cloner.Clone(ctx, opts, m.index, updateIndexCallback)
1223
1211
}
1212
1212
+
1213
1213
+
// ResolveDID resolves a DID to its current document with detailed timing metrics
1214
1214
+
func (m *Manager) ResolveDID(ctx context.Context, did string) (*ResolveDIDResult, error) {
1215
1215
+
if err := plcclient.ValidateDIDFormat(did); err != nil {
1216
1216
+
return nil, err
1217
1217
+
}
1218
1218
+
1219
1219
+
result := &ResolveDIDResult{}
1220
1220
+
totalStart := time.Now()
1221
1221
+
1222
1222
+
// STEP 1: Check mempool first (most recent data) - OPTIMIZED
1223
1223
+
mempoolStart := time.Now()
1224
1224
+
1225
1225
+
var latestMempoolOp *plcclient.PLCOperation
1226
1226
+
if m.mempool != nil {
1227
1227
+
// Fast backwards search with early exit
1228
1228
+
latestMempoolOp = m.mempool.FindLatestDIDOperation(did)
1229
1229
+
}
1230
1230
+
result.MempoolTime = time.Since(mempoolStart)
1231
1231
+
1232
1232
+
// Early return if found in mempool
1233
1233
+
if latestMempoolOp != nil {
1234
1234
+
doc, err := plcclient.ResolveDIDDocument(did, []plcclient.PLCOperation{*latestMempoolOp})
1235
1235
+
if err != nil {
1236
1236
+
return nil, fmt.Errorf("resolution failed: %w", err)
1237
1237
+
}
1238
1238
+
1239
1239
+
result.Document = doc
1240
1240
+
result.Source = "mempool"
1241
1241
+
result.TotalTime = time.Since(totalStart)
1242
1242
+
return result, nil
1243
1243
+
}
1244
1244
+
1245
1245
+
// STEP 2: Index lookup
1246
1246
+
if m.didIndex == nil || !m.didIndex.Exists() {
1247
1247
+
return nil, fmt.Errorf("DID index not available - run 'plcbundle index build' to enable DID resolution")
1248
1248
+
}
1249
1249
+
1250
1250
+
indexStart := time.Now()
1251
1251
+
locations, err := m.didIndex.GetDIDLocations(did)
1252
1252
+
result.IndexTime = time.Since(indexStart)
1253
1253
+
1254
1254
+
if err != nil {
1255
1255
+
return nil, err
1256
1256
+
}
1257
1257
+
1258
1258
+
if len(locations) == 0 {
1259
1259
+
return nil, fmt.Errorf("DID not found")
1260
1260
+
}
1261
1261
+
1262
1262
+
// Find latest non-nullified location
1263
1263
+
var latestLoc *didindex.OpLocation
1264
1264
+
for i := range locations {
1265
1265
+
if locations[i].Nullified {
1266
1266
+
continue
1267
1267
+
}
1268
1268
+
if latestLoc == nil ||
1269
1269
+
locations[i].Bundle > latestLoc.Bundle ||
1270
1270
+
(locations[i].Bundle == latestLoc.Bundle && locations[i].Position > latestLoc.Position) {
1271
1271
+
latestLoc = &locations[i]
1272
1272
+
}
1273
1273
+
}
1274
1274
+
1275
1275
+
if latestLoc == nil {
1276
1276
+
return nil, fmt.Errorf("no valid operations (all nullified)")
1277
1277
+
}
1278
1278
+
1279
1279
+
// STEP 3: Load operation
1280
1280
+
opStart := time.Now()
1281
1281
+
op, err := m.LoadOperation(ctx, int(latestLoc.Bundle), int(latestLoc.Position))
1282
1282
+
result.LoadOpTime = time.Since(opStart)
1283
1283
+
1284
1284
+
if err != nil {
1285
1285
+
return nil, fmt.Errorf("failed to load operation: %w", err)
1286
1286
+
}
1287
1287
+
1288
1288
+
result.BundleNumber = int(latestLoc.Bundle)
1289
1289
+
result.Position = int(latestLoc.Position)
1290
1290
+
1291
1291
+
// STEP 4: Resolve document
1292
1292
+
doc, err := plcclient.ResolveDIDDocument(did, []plcclient.PLCOperation{*op})
1293
1293
+
if err != nil {
1294
1294
+
return nil, fmt.Errorf("resolution failed: %w", err)
1295
1295
+
}
1296
1296
+
1297
1297
+
result.Document = doc
1298
1298
+
result.Source = "bundle"
1299
1299
+
result.TotalTime = time.Since(totalStart)
1300
1300
+
1301
1301
+
return result, nil
1302
1302
+
}
+12
bundle/types.go
···
186
186
CreatedAt: b.CreatedAt,
187
187
}
188
188
}
189
189
+
190
190
+
// ResolveDIDResult contains DID resolution with timing metrics
191
191
+
type ResolveDIDResult struct {
192
192
+
Document *plcclient.DIDDocument
193
193
+
MempoolTime time.Duration
194
194
+
IndexTime time.Duration
195
195
+
LoadOpTime time.Duration
196
196
+
TotalTime time.Duration
197
197
+
Source string // "mempool" or "bundle"
198
198
+
BundleNumber int // if from bundle
199
199
+
Position int // if from bundle
200
200
+
}
+29
-33
cmd/plcbundle/commands/index.go
···
242
242
243
243
func indexResolve(args []string) error {
244
244
fs := flag.NewFlagSet("index resolve", flag.ExitOnError)
245
245
+
verbose := fs.Bool("v", false, "verbose timing breakdown")
245
246
246
247
if err := fs.Parse(args); err != nil {
247
248
return err
248
249
}
249
250
250
251
if fs.NArg() < 1 {
251
251
-
return fmt.Errorf("usage: plcbundle index resolve <did>")
252
252
+
return fmt.Errorf("usage: plcbundle index resolve <did> [-v]")
252
253
}
253
254
254
255
did := fs.Arg(0)
···
262
263
ctx := context.Background()
263
264
fmt.Fprintf(os.Stderr, "Resolving: %s\n", did)
264
265
265
265
-
start := time.Now()
266
266
-
267
267
-
// Check mempool first
268
268
-
mempoolOps, _ := mgr.GetDIDOperationsFromMempool(did)
269
269
-
if len(mempoolOps) > 0 {
270
270
-
for i := len(mempoolOps) - 1; i >= 0; i-- {
271
271
-
if !mempoolOps[i].IsNullified() {
272
272
-
doc, err := plcclient.ResolveDIDDocument(did, []plcclient.PLCOperation{mempoolOps[i]})
273
273
-
if err != nil {
274
274
-
return fmt.Errorf("resolution failed: %w", err)
275
275
-
}
276
276
-
277
277
-
totalTime := time.Since(start)
278
278
-
fmt.Fprintf(os.Stderr, "Total: %s (resolved from mempool)\n\n", totalTime)
279
279
-
280
280
-
data, _ := json.MarshalIndent(doc, "", " ")
281
281
-
fmt.Println(string(data))
282
282
-
return nil
283
283
-
}
284
284
-
}
285
285
-
}
286
286
-
287
287
-
// Use index
288
288
-
op, err := mgr.GetLatestDIDOperation(ctx, did)
266
266
+
// Use unified resolution method with metrics
267
267
+
result, err := mgr.ResolveDID(ctx, did)
289
268
if err != nil {
290
290
-
return fmt.Errorf("failed to get latest operation: %w", err)
269
269
+
return err
291
270
}
292
271
293
293
-
doc, err := plcclient.ResolveDIDDocument(did, []plcclient.PLCOperation{*op})
294
294
-
if err != nil {
295
295
-
return fmt.Errorf("resolution failed: %w", err)
272
272
+
// Display timing metrics
273
273
+
if result.Source == "mempool" {
274
274
+
fmt.Fprintf(os.Stderr, "Mempool check: %s (✓ found in mempool)\n", result.MempoolTime)
275
275
+
fmt.Fprintf(os.Stderr, "Total: %s (resolved from mempool)\n\n", result.TotalTime)
276
276
+
} else {
277
277
+
fmt.Fprintf(os.Stderr, "Mempool check: %s (not found)\n", result.MempoolTime)
278
278
+
fmt.Fprintf(os.Stderr, "Index lookup: %s (shard access)\n", result.IndexTime)
279
279
+
fmt.Fprintf(os.Stderr, "Operation load: %s (bundle %d, pos %d)\n",
280
280
+
result.LoadOpTime, result.BundleNumber, result.Position)
281
281
+
fmt.Fprintf(os.Stderr, "Total: %s\n", result.TotalTime)
282
282
+
283
283
+
// Verbose timing breakdown
284
284
+
if *verbose {
285
285
+
fmt.Fprintf(os.Stderr, "\nTiming breakdown:\n")
286
286
+
fmt.Fprintf(os.Stderr, " Mempool: %6s (%5.1f%%)\n",
287
287
+
result.MempoolTime, float64(result.MempoolTime)/float64(result.TotalTime)*100)
288
288
+
fmt.Fprintf(os.Stderr, " Index: %6s (%5.1f%%)\n",
289
289
+
result.IndexTime, float64(result.IndexTime)/float64(result.TotalTime)*100)
290
290
+
fmt.Fprintf(os.Stderr, " Load op: %6s (%5.1f%%)\n",
291
291
+
result.LoadOpTime, float64(result.LoadOpTime)/float64(result.TotalTime)*100)
292
292
+
}
293
293
+
fmt.Fprintf(os.Stderr, "\n")
296
294
}
297
295
298
298
-
totalTime := time.Since(start)
299
299
-
fmt.Fprintf(os.Stderr, "Total: %s\n\n", totalTime)
300
300
-
301
301
-
data, _ := json.MarshalIndent(doc, "", " ")
296
296
+
// Output document to stdout
297
297
+
data, _ := json.MarshalIndent(result.Document, "", " ")
302
298
fmt.Println(string(data))
303
299
304
300
return nil
+40
internal/mempool/mempool.go
···
417
417
func (m *Mempool) GetFilename() string {
418
418
return filepath.Base(m.file)
419
419
}
420
420
+
421
421
+
// FindDIDOperations searches for operations matching a DID (no full copy)
422
422
+
func (m *Mempool) FindDIDOperations(did string) []plcclient.PLCOperation {
423
423
+
m.mu.RLock()
424
424
+
defer m.mu.RUnlock()
425
425
+
426
426
+
if len(m.operations) == 0 {
427
427
+
return []plcclient.PLCOperation{}
428
428
+
}
429
429
+
430
430
+
// Pre-allocate with small capacity (most DIDs have 1-5 ops)
431
431
+
matching := make([]plcclient.PLCOperation, 0, 4)
432
432
+
433
433
+
for _, op := range m.operations {
434
434
+
if op.DID == did {
435
435
+
matching = append(matching, op)
436
436
+
}
437
437
+
}
438
438
+
439
439
+
return matching
440
440
+
}
441
441
+
442
442
+
// FindLatestDIDOperation finds the most recent non-nullified operation for a DID
443
443
+
// Returns nil if not found. Searches backwards for early exit.
444
444
+
func (m *Mempool) FindLatestDIDOperation(did string) *plcclient.PLCOperation {
445
445
+
m.mu.RLock()
446
446
+
defer m.mu.RUnlock()
447
447
+
448
448
+
// Search backwards from most recent
449
449
+
for i := len(m.operations) - 1; i >= 0; i-- {
450
450
+
if m.operations[i].DID == did {
451
451
+
if !m.operations[i].IsNullified() {
452
452
+
// Return pointer to avoid copy
453
453
+
return &m.operations[i]
454
454
+
}
455
455
+
}
456
456
+
}
457
457
+
458
458
+
return nil
459
459
+
}
+17
-9
server/handlers.go
···
518
518
519
519
func (s *Server) handleDIDDocument(did string) http.HandlerFunc {
520
520
return func(w http.ResponseWriter, r *http.Request) {
521
521
-
op, err := s.manager.GetLatestDIDOperation(context.Background(), did)
522
522
-
if err != nil {
523
523
-
sendJSON(w, 500, map[string]string{"error": err.Error()})
524
524
-
return
525
525
-
}
526
526
-
527
527
-
doc, err := plcclient.ResolveDIDDocument(did, []plcclient.PLCOperation{*op})
521
521
+
result, err := s.manager.ResolveDID(r.Context(), did)
528
522
if err != nil {
529
523
if strings.Contains(err.Error(), "deactivated") {
530
524
sendJSON(w, 410, map[string]string{"error": "DID has been deactivated"})
525
525
+
} else if strings.Contains(err.Error(), "not found") {
526
526
+
sendJSON(w, 404, map[string]string{"error": "DID not found"})
531
527
} else {
532
532
-
sendJSON(w, 500, map[string]string{"error": fmt.Sprintf("Resolution failed: %v", err)})
528
528
+
sendJSON(w, 500, map[string]string{"error": err.Error()})
533
529
}
534
530
return
535
531
}
536
532
533
533
+
// Add timing headers in MILLISECONDS (float for precision)
534
534
+
w.Header().Set("X-Resolution-Time-Ms", fmt.Sprintf("%.3f", float64(result.TotalTime.Microseconds())/1000.0))
535
535
+
w.Header().Set("X-Resolution-Source", result.Source)
536
536
+
w.Header().Set("X-Mempool-Time-Ms", fmt.Sprintf("%.3f", float64(result.MempoolTime.Microseconds())/1000.0))
537
537
+
538
538
+
if result.Source == "bundle" {
539
539
+
w.Header().Set("X-Bundle-Number", fmt.Sprintf("%d", result.BundleNumber))
540
540
+
w.Header().Set("X-Bundle-Position", fmt.Sprintf("%d", result.Position))
541
541
+
w.Header().Set("X-Index-Time-Ms", fmt.Sprintf("%.3f", float64(result.IndexTime.Microseconds())/1000.0))
542
542
+
w.Header().Set("X-Load-Time-Ms", fmt.Sprintf("%.3f", float64(result.LoadOpTime.Microseconds())/1000.0))
543
543
+
}
544
544
+
537
545
w.Header().Set("Content-Type", "application/did+ld+json")
538
538
-
sendJSON(w, 200, doc)
546
546
+
sendJSON(w, 200, result.Document)
539
547
}
540
548
}
541
549