[DEPRECATED] Go implementation of plcbundle

back to net/http

+549 -549
+7 -6
cmd/plcbundle/main.go
··· 4 "context" 5 "flag" 6 "fmt" 7 "os" 8 "os/signal" 9 "path/filepath" ··· 1446 go runSync(ctx, mgr, syncInterval, *verbose, *enableResolver) 1447 } 1448 1449 - // Create and run server 1450 - server := newServerHandler(mgr, *sync, *enableWebSocket, *enableResolver) 1451 1452 - // Run server (blocks until error or shutdown) 1453 - if err := server.Run(addr); err != nil { 1454 fmt.Fprintf(os.Stderr, "Server error: %v\n", err) 1455 - 1456 - // Ensure cleanup on error 1457 mgr.SaveMempool() 1458 mgr.Close() 1459 os.Exit(1)
··· 4 "context" 5 "flag" 6 "fmt" 7 + "net/http" 8 "os" 9 "os/signal" 10 "path/filepath" ··· 1447 go runSync(ctx, mgr, syncInterval, *verbose, *enableResolver) 1448 } 1449 1450 + handler := newServerHandler(mgr, *sync, *enableWebSocket, *enableResolver) 1451 + server := &http.Server{ 1452 + Addr: addr, 1453 + Handler: handler, 1454 + } 1455 1456 + if err := server.ListenAndServe(); err != nil { 1457 fmt.Fprintf(os.Stderr, "Server error: %v\n", err) 1458 mgr.SaveMempool() 1459 mgr.Close() 1460 os.Exit(1)
+542 -534
cmd/plcbundle/server.go
··· 12 "strings" 13 "time" 14 15 - "git.urbach.dev/go/web" 16 "github.com/goccy/go-json" 17 "github.com/gorilla/websocket" 18 ··· 33 var verboseMode bool 34 var resolverEnabled bool 35 36 - func newServerHandler(mgr *bundle.Manager, syncMode bool, wsEnabled bool, resolverEnabled bool) web.Server { 37 - s := web.NewServer() 38 39 - // CORS middleware 40 - s.Use(corsMiddleware) 41 42 - // Root endpoint 43 - s.Get("/", func(ctx web.Context) error { 44 - return handleRoot(ctx, mgr, syncMode, wsEnabled, resolverEnabled) 45 - }) 46 47 - // Bundle endpoints 48 - s.Get("/index.json", func(ctx web.Context) error { 49 - return handleIndexJSON(ctx, mgr) 50 - }) 51 52 - s.Get("/bundle/:number", func(ctx web.Context) error { 53 - return handleBundle(ctx, mgr) 54 - }) 55 56 - s.Get("/data/:number", func(ctx web.Context) error { 57 - return handleBundleData(ctx, mgr) 58 - }) 59 60 - s.Get("/jsonl/:number", func(ctx web.Context) error { 61 - return handleBundleJSONL(ctx, mgr) 62 - }) 63 64 - s.Get("/status", func(ctx web.Context) error { 65 - return handleStatus(ctx, mgr, syncMode, wsEnabled) 66 }) 67 68 - s.Get("/debug/memory", func(ctx web.Context) error { 69 - return handleDebugMemory(ctx, mgr) 70 - }) 71 72 - // WebSocket endpoint - needs special handling 73 - if wsEnabled { 74 - s.Get("/ws", func(ctx web.Context) error { 75 - // WebSocket needs raw ResponseWriter, get it from underlying request 76 - handleWebSocketRaw(ctx, mgr) 77 - return nil 78 - }) 79 } 80 81 - // Sync mode endpoints 82 - if syncMode { 83 - s.Get("/mempool", func(ctx web.Context) error { 84 - return handleMempool(ctx, mgr) 85 - }) 86 } 87 88 - // DID resolution endpoints (must be LAST to avoid conflicts) 89 - if resolverEnabled { 90 - // Single catch-all handler for DID routes 91 - s.Get("/*path", func(ctx web.Context) error { 92 - path := ctx.Request().Param("path") 93 94 - // Remove leading slash 95 - path = strings.TrimPrefix(path, "/") 96 97 - // Parse DID and sub-path 98 - parts := strings.SplitN(path, "/", 2) 99 - did := parts[0] 100 101 - // Validate it's a DID 102 - if !strings.HasPrefix(did, "did:plc:") { 103 - return sendJSON(ctx, 404, map[string]string{"error": "not found"}) 104 - } 105 106 - // Route based on sub-path 107 - if len(parts) == 1 { 108 - // /did:plc:xxx -> DID document 109 - return handleDIDDocumentLatest(ctx, mgr, did) 110 - } else if parts[1] == "data" { 111 - // /did:plc:xxx/data -> PLC state 112 - return handleDIDData(ctx, mgr, did) 113 - } else if parts[1] == "log/audit" { 114 - // /did:plc:xxx/log/audit -> Audit log 115 - return handleDIDAuditLog(ctx, mgr, did) 116 - } 117 118 - return sendJSON(ctx, 404, map[string]string{"error": "not found"}) 119 - }) 120 - } 121 - 122 - return s 123 } 124 125 - // Helper to send JSON responses using goccy/go-json 126 - func sendJSON(ctx web.Context, statusCode int, data interface{}) error { 127 - ctx.Response().SetHeader("Content-Type", "application/json") 128 - 129 - if statusCode != 200 { 130 - ctx.Status(statusCode) 131 - } 132 133 jsonData, err := json.Marshal(data) 134 if err != nil { 135 - return err 136 } 137 138 - _, err = ctx.Response().Write(jsonData) 139 - return err 140 } 141 142 - // CORS middleware 143 - func corsMiddleware(ctx web.Context) error { 144 - ctx.Response().SetHeader("Access-Control-Allow-Origin", "*") 145 - ctx.Response().SetHeader("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") 146 147 - if requestedHeaders := ctx.Request().Header("Access-Control-Request-Headers"); requestedHeaders != "" { 148 - ctx.Response().SetHeader("Access-Control-Allow-Headers", requestedHeaders) 149 - } else { 150 - ctx.Response().SetHeader("Access-Control-Allow-Headers", "*") 151 - } 152 153 - ctx.Response().SetHeader("Access-Control-Max-Age", "86400") 154 - 155 - if ctx.Request().Method() == "OPTIONS" { 156 - return ctx.Status(204).String("") 157 - } 158 - 159 - return ctx.Next(ctx) 160 - } 161 - 162 - func handleRoot(ctx web.Context, mgr *bundle.Manager, syncMode bool, wsEnabled bool, resolverEnabled bool) error { 163 - ctx.Response().SetHeader("Content-Type", "text/plain; charset=utf-8") 164 - 165 - index := mgr.GetIndex() 166 - stats := index.GetStats() 167 - bundleCount := stats["bundle_count"].(int) 168 169 - baseURL := getBaseURLFromContext(ctx) 170 - wsURL := getWSURLFromContext(ctx) 171 172 - var sb strings.Builder 173 174 - sb.WriteString(` 175 176 ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠄⠀⡀⠀⠀⠀⠀⠀⠀⢀⠀⠀⡀⠀⢀⠀⢀⡀⣤⡢⣤⡤⡀⡄⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 177 ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⡄⡄⠐⡀⠈⣀⠀⡠⡠⠀⣢⣆⢌⡾⢙⠺⢽⠾⡋⣻⡷⡫⢵⣭⢦⣴⠦⠀⢠⠀⠀⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ ··· 210 211 `) 212 213 - sb.WriteString("\nplcbundle server\n\n") 214 - sb.WriteString("What is PLC Bundle?\n") 215 - sb.WriteString("━━━━━━━━━━━━━━━━━━━━\n") 216 - sb.WriteString("plcbundle archives AT Protocol's DID PLC Directory operations into\n") 217 - sb.WriteString("immutable, cryptographically-chained bundles of 10,000 operations.\n\n") 218 - sb.WriteString("More info: https://tangled.org/@atscan.net/plcbundle\n\n") 219 220 - if bundleCount > 0 { 221 - sb.WriteString("Bundles\n") 222 - sb.WriteString("━━━━━━━\n") 223 - sb.WriteString(fmt.Sprintf(" Bundle count: %d\n", bundleCount)) 224 225 - firstBundle := stats["first_bundle"].(int) 226 - lastBundle := stats["last_bundle"].(int) 227 - totalSize := stats["total_size"].(int64) 228 - totalUncompressed := stats["total_uncompressed_size"].(int64) 229 230 - sb.WriteString(fmt.Sprintf(" Last bundle: %d (%s)\n", lastBundle, 231 - stats["updated_at"].(time.Time).Format("2006-01-02 15:04:05"))) 232 - sb.WriteString(fmt.Sprintf(" Range: %06d - %06d\n", firstBundle, lastBundle)) 233 - sb.WriteString(fmt.Sprintf(" Total size: %.2f MB\n", float64(totalSize)/(1000*1000))) 234 - sb.WriteString(fmt.Sprintf(" Uncompressed: %.2f MB (%.2fx)\n", 235 - float64(totalUncompressed)/(1000*1000), 236 - float64(totalUncompressed)/float64(totalSize))) 237 238 - if gaps, ok := stats["gaps"].(int); ok && gaps > 0 { 239 - sb.WriteString(fmt.Sprintf(" ⚠ Gaps: %d missing bundles\n", gaps)) 240 - } 241 242 - firstMeta, err := index.GetBundle(firstBundle) 243 - if err == nil { 244 - sb.WriteString(fmt.Sprintf("\n Root: %s\n", firstMeta.Hash)) 245 - } 246 247 - lastMeta, err := index.GetBundle(lastBundle) 248 - if err == nil { 249 - sb.WriteString(fmt.Sprintf(" Head: %s\n", lastMeta.Hash)) 250 } 251 - } 252 253 - if syncMode { 254 - mempoolStats := mgr.GetMempoolStats() 255 - count := mempoolStats["count"].(int) 256 - targetBundle := mempoolStats["target_bundle"].(int) 257 - canCreate := mempoolStats["can_create_bundle"].(bool) 258 259 - sb.WriteString("\nMempool Stats\n") 260 - sb.WriteString("━━━━━━━━━━━━━\n") 261 - sb.WriteString(fmt.Sprintf(" Target bundle: %d\n", targetBundle)) 262 - sb.WriteString(fmt.Sprintf(" Operations: %d / %d\n", count, bundle.BUNDLE_SIZE)) 263 - sb.WriteString(fmt.Sprintf(" Can create bundle: %v\n", canCreate)) 264 265 - if count > 0 { 266 - progress := float64(count) / float64(bundle.BUNDLE_SIZE) * 100 267 - sb.WriteString(fmt.Sprintf(" Progress: %.1f%%\n", progress)) 268 269 - barWidth := 50 270 - filled := int(float64(barWidth) * float64(count) / float64(bundle.BUNDLE_SIZE)) 271 - if filled > barWidth { 272 - filled = barWidth 273 - } 274 - bar := strings.Repeat("█", filled) + strings.Repeat("░", barWidth-filled) 275 - sb.WriteString(fmt.Sprintf(" [%s]\n", bar)) 276 277 - if firstTime, ok := mempoolStats["first_time"].(time.Time); ok { 278 - sb.WriteString(fmt.Sprintf(" First op: %s\n", firstTime.Format("2006-01-02 15:04:05"))) 279 - } 280 - if lastTime, ok := mempoolStats["last_time"].(time.Time); ok { 281 - sb.WriteString(fmt.Sprintf(" Last op: %s\n", lastTime.Format("2006-01-02 15:04:05"))) 282 } 283 - } else { 284 - sb.WriteString(" (empty)\n") 285 } 286 - } 287 288 - if didStats := mgr.GetDIDIndexStats(); didStats["exists"].(bool) { 289 - sb.WriteString("\nDID Index\n") 290 - sb.WriteString("━━━━━━━━━\n") 291 - sb.WriteString(" Status: enabled\n") 292 293 - indexedDIDs := didStats["indexed_dids"].(int64) 294 - mempoolDIDs := didStats["mempool_dids"].(int64) 295 - totalDIDs := didStats["total_dids"].(int64) 296 297 - if mempoolDIDs > 0 { 298 - sb.WriteString(fmt.Sprintf(" Total DIDs: %s (%s indexed + %s mempool)\n", 299 - formatNumber(int(totalDIDs)), 300 - formatNumber(int(indexedDIDs)), 301 - formatNumber(int(mempoolDIDs)))) 302 - } else { 303 - sb.WriteString(fmt.Sprintf(" Total DIDs: %s\n", formatNumber(int(totalDIDs)))) 304 } 305 306 - sb.WriteString(fmt.Sprintf(" Cached shards: %d / %d\n", 307 - didStats["cached_shards"], didStats["cache_limit"])) 308 - sb.WriteString("\n") 309 - } 310 311 - sb.WriteString("Server Stats\n") 312 - sb.WriteString("━━━━━━━━━━━━\n") 313 - sb.WriteString(fmt.Sprintf(" Version: %s\n", version)) 314 - if origin := mgr.GetPLCOrigin(); origin != "" { 315 - sb.WriteString(fmt.Sprintf(" Origin: %s\n", origin)) 316 - } 317 - sb.WriteString(fmt.Sprintf(" Sync mode: %v\n", syncMode)) 318 - sb.WriteString(fmt.Sprintf(" WebSocket: %v\n", wsEnabled)) 319 - sb.WriteString(fmt.Sprintf(" Resolver: %v\n", resolverEnabled)) 320 - sb.WriteString(fmt.Sprintf(" Uptime: %s\n", time.Since(serverStartTime).Round(time.Second))) 321 322 - sb.WriteString("\n\nAPI Endpoints\n") 323 - sb.WriteString("━━━━━━━━━━━━━\n") 324 - sb.WriteString(" GET / This info page\n") 325 - sb.WriteString(" GET /index.json Full bundle index\n") 326 - sb.WriteString(" GET /bundle/:number Bundle metadata (JSON)\n") 327 - sb.WriteString(" GET /data/:number Raw bundle (zstd compressed)\n") 328 - sb.WriteString(" GET /jsonl/:number Decompressed JSONL stream\n") 329 - sb.WriteString(" GET /status Server status\n") 330 - sb.WriteString(" GET /mempool Mempool operations (JSONL)\n") 331 332 - if resolverEnabled { 333 - sb.WriteString("\nDID Resolution\n") 334 - sb.WriteString("━━━━━━━━━━━━━━\n") 335 - sb.WriteString(" GET /:did DID Document (W3C format)\n") 336 - sb.WriteString(" GET /:did/data PLC State (raw format)\n") 337 - sb.WriteString(" GET /:did/log/audit Operation history\n") 338 - 339 - didStats := mgr.GetDIDIndexStats() 340 - if didStats["exists"].(bool) { 341 - sb.WriteString(fmt.Sprintf("\n Index: %s DIDs indexed\n", 342 - formatNumber(int(didStats["total_dids"].(int64))))) 343 - } else { 344 - sb.WriteString("\n ⚠️ Index: not built (will use slow scan)\n") 345 } 346 - sb.WriteString("\n") 347 - } 348 349 - if wsEnabled { 350 - sb.WriteString("\nWebSocket Endpoints\n") 351 - sb.WriteString("━━━━━━━━━━━━━━━━━━━\n") 352 - sb.WriteString(" WS /ws Live stream (new operations only)\n") 353 - sb.WriteString(" WS /ws?cursor=0 Stream all from beginning\n") 354 - sb.WriteString(" WS /ws?cursor=N Stream from cursor N\n\n") 355 - sb.WriteString("Cursor Format:\n") 356 - sb.WriteString(" Global record number: (bundleNumber × 10,000) + position\n") 357 - sb.WriteString(" Example: 88410345 = bundle 8841, position 345\n") 358 - sb.WriteString(" Default: starts from latest (skips all historical data)\n") 359 360 - latestCursor := mgr.GetCurrentCursor() 361 - bundledOps := len(index.GetBundles()) * bundle.BUNDLE_SIZE 362 - mempoolOps := latestCursor - bundledOps 363 364 - if syncMode && mempoolOps > 0 { 365 - sb.WriteString(fmt.Sprintf(" Current latest: %d (%d bundled + %d mempool)\n", 366 - latestCursor, bundledOps, mempoolOps)) 367 - } else { 368 - sb.WriteString(fmt.Sprintf(" Current latest: %d (%d bundles)\n", 369 - latestCursor, len(index.GetBundles()))) 370 } 371 - } 372 373 - sb.WriteString("\nExamples\n") 374 - sb.WriteString("━━━━━━━━\n") 375 - sb.WriteString(fmt.Sprintf(" curl %s/bundle/1\n", baseURL)) 376 - sb.WriteString(fmt.Sprintf(" curl %s/data/42 -o 000042.jsonl.zst\n", baseURL)) 377 - sb.WriteString(fmt.Sprintf(" curl %s/jsonl/1\n", baseURL)) 378 379 - if wsEnabled { 380 - sb.WriteString(fmt.Sprintf(" websocat %s/ws\n", wsURL)) 381 - sb.WriteString(fmt.Sprintf(" websocat '%s/ws?cursor=0'\n", wsURL)) 382 - } 383 384 - if syncMode { 385 - sb.WriteString(fmt.Sprintf(" curl %s/status\n", baseURL)) 386 - sb.WriteString(fmt.Sprintf(" curl %s/mempool\n", baseURL)) 387 - } 388 389 - sb.WriteString("\n────────────────────────────────────────────────────────────────\n") 390 - sb.WriteString("https://tangled.org/@atscan.net/plcbundle\n") 391 392 - return ctx.String(sb.String()) 393 } 394 395 - func handleIndexJSON(ctx web.Context, mgr *bundle.Manager) error { 396 - index := mgr.GetIndex() 397 - return sendJSON(ctx, 200, index) 398 } 399 400 - func handleBundle(ctx web.Context, mgr *bundle.Manager) error { 401 - bundleNum, err := strconv.Atoi(ctx.Request().Param("number")) 402 - if err != nil { 403 - return sendJSON(ctx, 400, map[string]string{"error": "Invalid bundle number"}) 404 - } 405 406 - meta, err := mgr.GetIndex().GetBundle(bundleNum) 407 - if err != nil { 408 - return sendJSON(ctx, 404, map[string]string{"error": "Bundle not found"}) 409 - } 410 411 - return sendJSON(ctx, 200, meta) 412 } 413 414 - func handleBundleData(ctx web.Context, mgr *bundle.Manager) error { 415 - bundleNum, err := strconv.Atoi(ctx.Request().Param("number")) 416 - if err != nil { 417 - return sendJSON(ctx, 400, map[string]string{"error": "Invalid bundle number"}) 418 - } 419 420 - reader, err := mgr.StreamBundleRaw(context.Background(), bundleNum) 421 - if err != nil { 422 - if strings.Contains(err.Error(), "not in index") || strings.Contains(err.Error(), "not found") { 423 - return sendJSON(ctx, 400, map[string]string{"error": "Bundle not found"}) 424 } 425 - return sendJSON(ctx, 500, map[string]string{"error": err.Error()}) 426 - } 427 - defer reader.Close() 428 429 - ctx.Response().SetHeader("Content-Type", "application/zstd") 430 - ctx.Response().SetHeader("Content-Disposition", fmt.Sprintf("attachment; filename=%06d.jsonl.zst", bundleNum)) 431 432 - _, err = io.Copy(ctx.Response(), reader) 433 - return err 434 } 435 436 - func handleBundleJSONL(ctx web.Context, mgr *bundle.Manager) error { 437 - bundleNum, err := strconv.Atoi(ctx.Request().Param("number")) 438 - if err != nil { 439 - return sendJSON(ctx, 400, map[string]string{"error": "Invalid bundle number"}) 440 - } 441 442 - reader, err := mgr.StreamBundleDecompressed(context.Background(), bundleNum) 443 - if err != nil { 444 - if strings.Contains(err.Error(), "not in index") || strings.Contains(err.Error(), "not found") { 445 - return sendJSON(ctx, 404, map[string]string{"error": "Bundle not found"}) 446 } 447 - return sendJSON(ctx, 500, map[string]string{"error": err.Error()}) 448 - } 449 - defer reader.Close() 450 451 - ctx.Response().SetHeader("Content-Type", "application/x-ndjson") 452 - ctx.Response().SetHeader("Content-Disposition", fmt.Sprintf("attachment; filename=%06d.jsonl", bundleNum)) 453 454 - _, err = io.Copy(ctx.Response(), reader) 455 - return err 456 } 457 458 - func handleStatus(ctx web.Context, mgr *bundle.Manager, syncMode bool, wsEnabled bool) error { 459 - index := mgr.GetIndex() 460 - indexStats := index.GetStats() 461 462 - response := StatusResponse{ 463 - Server: ServerStatus{ 464 - Version: version, 465 - UptimeSeconds: int(time.Since(serverStartTime).Seconds()), 466 - SyncMode: syncMode, 467 - WebSocketEnabled: wsEnabled, 468 - Origin: mgr.GetPLCOrigin(), 469 - }, 470 - Bundles: BundleStatus{ 471 - Count: indexStats["bundle_count"].(int), 472 - TotalSize: indexStats["total_size"].(int64), 473 - UncompressedSize: indexStats["total_uncompressed_size"].(int64), 474 - }, 475 - } 476 477 - if syncMode && syncInterval > 0 { 478 - response.Server.SyncIntervalSeconds = int(syncInterval.Seconds()) 479 - } 480 481 - if bundleCount := response.Bundles.Count; bundleCount > 0 { 482 - firstBundle := indexStats["first_bundle"].(int) 483 - lastBundle := indexStats["last_bundle"].(int) 484 485 - response.Bundles.FirstBundle = firstBundle 486 - response.Bundles.LastBundle = lastBundle 487 - response.Bundles.StartTime = indexStats["start_time"].(time.Time) 488 - response.Bundles.EndTime = indexStats["end_time"].(time.Time) 489 490 - if firstMeta, err := index.GetBundle(firstBundle); err == nil { 491 - response.Bundles.RootHash = firstMeta.Hash 492 - } 493 494 - if lastMeta, err := index.GetBundle(lastBundle); err == nil { 495 - response.Bundles.HeadHash = lastMeta.Hash 496 - response.Bundles.HeadAgeSeconds = int(time.Since(lastMeta.EndTime).Seconds()) 497 - } 498 499 - if gaps, ok := indexStats["gaps"].(int); ok { 500 - response.Bundles.Gaps = gaps 501 - response.Bundles.HasGaps = gaps > 0 502 - if gaps > 0 { 503 - response.Bundles.GapNumbers = index.FindGaps() 504 } 505 - } 506 507 - totalOps := bundleCount * bundle.BUNDLE_SIZE 508 - response.Bundles.TotalOperations = totalOps 509 510 - duration := response.Bundles.EndTime.Sub(response.Bundles.StartTime) 511 - if duration.Hours() > 0 { 512 - response.Bundles.AvgOpsPerHour = int(float64(totalOps) / duration.Hours()) 513 } 514 - } 515 516 - if syncMode { 517 - mempoolStats := mgr.GetMempoolStats() 518 519 - if count, ok := mempoolStats["count"].(int); ok { 520 - mempool := &MempoolStatus{ 521 - Count: count, 522 - TargetBundle: mempoolStats["target_bundle"].(int), 523 - CanCreateBundle: mempoolStats["can_create_bundle"].(bool), 524 - MinTimestamp: mempoolStats["min_timestamp"].(time.Time), 525 - Validated: mempoolStats["validated"].(bool), 526 - ProgressPercent: float64(count) / float64(bundle.BUNDLE_SIZE) * 100, 527 - BundleSize: bundle.BUNDLE_SIZE, 528 - OperationsNeeded: bundle.BUNDLE_SIZE - count, 529 - } 530 531 - if firstTime, ok := mempoolStats["first_time"].(time.Time); ok { 532 - mempool.FirstTime = firstTime 533 - mempool.TimespanSeconds = int(time.Since(firstTime).Seconds()) 534 - } 535 - if lastTime, ok := mempoolStats["last_time"].(time.Time); ok { 536 - mempool.LastTime = lastTime 537 - mempool.LastOpAgeSeconds = int(time.Since(lastTime).Seconds()) 538 - } 539 540 - if count > 100 && count < bundle.BUNDLE_SIZE { 541 - if !mempool.FirstTime.IsZero() && !mempool.LastTime.IsZero() { 542 - timespan := mempool.LastTime.Sub(mempool.FirstTime) 543 - if timespan.Seconds() > 0 { 544 - opsPerSec := float64(count) / timespan.Seconds() 545 - remaining := bundle.BUNDLE_SIZE - count 546 - mempool.EtaNextBundleSeconds = int(float64(remaining) / opsPerSec) 547 } 548 } 549 } 550 551 - response.Mempool = mempool 552 - } 553 } 554 - 555 - return sendJSON(ctx, 200, response) 556 } 557 558 - func handleMempool(ctx web.Context, mgr *bundle.Manager) error { 559 - ops, err := mgr.GetMempoolOperations() 560 - if err != nil { 561 - return sendJSON(ctx, 500, map[string]string{"error": err.Error()}) 562 - } 563 564 - ctx.Response().SetHeader("Content-Type", "application/x-ndjson") 565 566 - if len(ops) == 0 { 567 - return nil 568 - } 569 570 - for _, op := range ops { 571 - if len(op.RawJSON) > 0 { 572 - ctx.Response().Write(op.RawJSON) 573 - } else { 574 - data, _ := json.Marshal(op) 575 - ctx.Response().Write(data) 576 } 577 - ctx.Response().Write([]byte("\n")) 578 } 579 - 580 - return nil 581 } 582 583 - func handleDebugMemory(ctx web.Context, mgr *bundle.Manager) error { 584 - var m runtime.MemStats 585 - runtime.ReadMemStats(&m) 586 587 - didStats := mgr.GetDIDIndexStats() 588 589 - beforeAlloc := m.Alloc / 1024 / 1024 590 591 - runtime.GC() 592 - runtime.ReadMemStats(&m) 593 - afterAlloc := m.Alloc / 1024 / 1024 594 595 - return ctx.String(fmt.Sprintf(`Memory Stats: 596 Alloc: %d MB 597 TotalAlloc: %d MB 598 Sys: %d MB ··· 604 After GC: 605 Alloc: %d MB 606 `, 607 - beforeAlloc, 608 - m.TotalAlloc/1024/1024, 609 - m.Sys/1024/1024, 610 - m.NumGC, 611 - didStats["cached_shards"], 612 - didStats["cache_limit"], 613 - afterAlloc)) 614 - } 615 - 616 - func handleDIDDocumentLatest(ctx web.Context, mgr *bundle.Manager, did string) error { 617 - op, err := mgr.GetLatestDIDOperation(context.Background(), did) 618 - if err != nil { 619 - return sendJSON(ctx, 500, map[string]string{"error": err.Error()}) 620 - } 621 622 - doc, err := plc.ResolveDIDDocument(did, []plc.PLCOperation{*op}) 623 - if err != nil { 624 - if strings.Contains(err.Error(), "deactivated") { 625 - return sendJSON(ctx, 410, map[string]string{"error": "DID has been deactivated"}) 626 - } 627 - return sendJSON(ctx, 500, map[string]string{"error": fmt.Sprintf("Resolution failed: %v", err)}) 628 } 629 - ctx.Response().SetHeader("Content-Type", "application/did+ld+json") 630 - return sendJSON(ctx, 200, doc) 631 } 632 633 - func handleDIDData(ctx web.Context, mgr *bundle.Manager, did string) error { 634 - if err := plc.ValidateDIDFormat(did); err != nil { 635 - return sendJSON(ctx, 400, map[string]string{"error": "Invalid DID format"}) 636 - } 637 638 - operations, err := mgr.GetDIDOperations(context.Background(), did, false) 639 - if err != nil { 640 - return sendJSON(ctx, 500, map[string]string{"error": err.Error()}) 641 - } 642 643 - if len(operations) == 0 { 644 - return sendJSON(ctx, 404, map[string]string{"error": "DID not found"}) 645 - } 646 - 647 - state, err := plc.BuildDIDState(did, operations) 648 - if err != nil { 649 - if strings.Contains(err.Error(), "deactivated") { 650 - return sendJSON(ctx, 410, map[string]string{"error": "DID has been deactivated"}) 651 } 652 - return sendJSON(ctx, 500, map[string]string{"error": err.Error()}) 653 - } 654 655 - return sendJSON(ctx, 200, state) 656 - } 657 658 - func handleDIDAuditLog(ctx web.Context, mgr *bundle.Manager, did string) error { 659 - if err := plc.ValidateDIDFormat(did); err != nil { 660 - return sendJSON(ctx, 400, map[string]string{"error": "Invalid DID format"}) 661 - } 662 663 - operations, err := mgr.GetDIDOperations(context.Background(), did, false) 664 - if err != nil { 665 - return sendJSON(ctx, 500, map[string]string{"error": err.Error()}) 666 - } 667 668 - if len(operations) == 0 { 669 - return sendJSON(ctx, 404, map[string]string{"error": "DID not found"}) 670 } 671 - 672 - auditLog := plc.FormatAuditLog(operations) 673 - return sendJSON(ctx, 200, auditLog) 674 } 675 676 - // WebSocket handler wrapper for web framework 677 - func handleWebSocketRaw(ctx web.Context, mgr *bundle.Manager) { 678 - // The web framework doesn't expose ResponseWriter directly 679 - // We need to use reflection or type assertion to get it 680 - // For now, we'll implement a workaround by getting the underlying HTTP objects 681 682 - cursorStr := ctx.Request().Query().Param("cursor") 683 - var cursor int 684 - 685 - if cursorStr == "" { 686 - cursor = mgr.GetCurrentCursor() 687 - } else { 688 - var err error 689 - cursor, err = strconv.Atoi(cursorStr) 690 - if err != nil || cursor < 0 { 691 - ctx.Status(400).String("Invalid cursor: must be non-negative integer") 692 return 693 } 694 - } 695 696 - // Access underlying ResponseWriter through interface assertion 697 - type ResponseWriterGetter interface { 698 - ResponseWriter() http.ResponseWriter 699 } 700 701 - type RequestGetter interface { 702 - HTTPRequest() *http.Request 703 - } 704 - 705 - var w http.ResponseWriter 706 - var r *http.Request 707 708 - // Try to get ResponseWriter (framework-specific) 709 - if rwg, ok := ctx.(ResponseWriterGetter); ok { 710 - w = rwg.ResponseWriter() 711 - } 712 713 - if rg, ok := ctx.(RequestGetter); ok { 714 - r = rg.HTTPRequest() 715 - } 716 717 - // If we can't get them, we need to upgrade manually 718 - // This is a limitation - WebSocket needs direct access 719 - if w == nil || r == nil { 720 - ctx.Status(500).String("WebSocket not supported") 721 - return 722 - } 723 724 - conn, err := upgrader.Upgrade(w, r, nil) 725 - if err != nil { 726 - fmt.Fprintf(os.Stderr, "WebSocket upgrade failed: %v\n", err) 727 - return 728 } 729 - defer conn.Close() 730 731 - conn.SetPongHandler(func(string) error { 732 - conn.SetReadDeadline(time.Now().Add(60 * time.Second)) 733 - return nil 734 - }) 735 736 - done := make(chan struct{}) 737 - 738 - go func() { 739 - defer close(done) 740 - for { 741 - _, _, err := conn.ReadMessage() 742 - if err != nil { 743 - if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { 744 - fmt.Fprintf(os.Stderr, "WebSocket: client closed connection\n") 745 - } 746 - return 747 - } 748 } 749 - }() 750 751 - bgCtx := context.Background() 752 753 - if err := streamLive(bgCtx, conn, mgr, cursor, done); err != nil { 754 - fmt.Fprintf(os.Stderr, "WebSocket stream error: %v\n", err) 755 } 756 } 757 758 - // streamLive and other WebSocket functions remain unchanged 759 func streamLive(ctx context.Context, conn *websocket.Conn, mgr *bundle.Manager, startCursor int, done chan struct{}) error { 760 index := mgr.GetIndex() 761 bundles := index.GetBundles() ··· 966 return "ws" 967 } 968 969 - func getBaseURLFromContext(ctx web.Context) string { 970 - // Get host from request 971 - host := ctx.Request().Header("Host") 972 - // Assume http since we're behind reverse proxy 973 - return fmt.Sprintf("http://%s", host) 974 } 975 976 - func getWSURLFromContext(ctx web.Context) string { 977 - host := ctx.Request().Header("Host") 978 - return fmt.Sprintf("ws://%s", host) 979 } 980 981 - // Response types 982 983 type StatusResponse struct { 984 Bundles BundleStatus `json:"bundles"` ··· 1031 EtaNextBundleSeconds int `json:"eta_next_bundle_seconds,omitempty"` 1032 } 1033 1034 - // Background sync 1035 1036 func runSync(ctx context.Context, mgr *bundle.Manager, interval time.Duration, verbose bool, resolverEnabled bool) { 1037 syncBundles(ctx, mgr, verbose, resolverEnabled)
··· 12 "strings" 13 "time" 14 15 "github.com/goccy/go-json" 16 "github.com/gorilla/websocket" 17 ··· 32 var verboseMode bool 33 var resolverEnabled bool 34 35 + // newServerHandler creates HTTP handler with all routes 36 + func newServerHandler(mgr *bundle.Manager, syncMode bool, wsEnabled bool, resolverEnabled bool) http.Handler { 37 + mux := http.NewServeMux() 38 39 + // Specific routes first (highest priority) 40 + mux.HandleFunc("GET /index.json", handleIndexJSONNative(mgr)) 41 + mux.HandleFunc("GET /bundle/{number}", handleBundleNative(mgr)) 42 + mux.HandleFunc("GET /data/{number}", handleBundleDataNative(mgr)) 43 + mux.HandleFunc("GET /jsonl/{number}", handleBundleJSONLNative(mgr)) 44 + mux.HandleFunc("GET /status", handleStatusNative(mgr, syncMode, wsEnabled)) 45 + mux.HandleFunc("GET /debug/memory", handleDebugMemoryNative(mgr)) 46 47 + // WebSocket endpoint 48 + if wsEnabled { 49 + mux.HandleFunc("GET /ws", handleWebSocketNative(mgr)) 50 + } 51 52 + // Sync mode endpoints 53 + if syncMode { 54 + mux.HandleFunc("GET /mempool", handleMempoolNative(mgr)) 55 + } 56 57 + // Combined root and DID resolver handler 58 + mux.HandleFunc("GET /", func(w http.ResponseWriter, r *http.Request) { 59 + path := r.URL.Path 60 61 + // Handle exact root 62 + if path == "/" { 63 + handleRootNative(mgr, syncMode, wsEnabled, resolverEnabled)(w, r) 64 + return 65 + } 66 67 + // Handle DID routes if enabled 68 + if resolverEnabled { 69 + handleDIDRouting(w, r, mgr) 70 + return 71 + } 72 73 + // 404 for everything else 74 + sendJSON(w, 404, map[string]string{"error": "not found"}) 75 }) 76 77 + // Wrap with CORS middleware 78 + return corsMiddleware(mux) 79 + } 80 + 81 + // handleDIDRouting routes DID-related requests 82 + func handleDIDRouting(w http.ResponseWriter, r *http.Request, mgr *bundle.Manager) { 83 + path := strings.TrimPrefix(r.URL.Path, "/") 84 + 85 + // Parse DID and sub-path 86 + parts := strings.SplitN(path, "/", 2) 87 + did := parts[0] 88 89 + // Validate it's a DID 90 + if !strings.HasPrefix(did, "did:plc:") { 91 + sendJSON(w, 404, map[string]string{"error": "not found"}) 92 + return 93 } 94 95 + // Route based on sub-path 96 + if len(parts) == 1 { 97 + // /did:plc:xxx -> DID document 98 + handleDIDDocumentLatestNative(mgr, did)(w, r) 99 + } else if parts[1] == "data" { 100 + // /did:plc:xxx/data -> PLC state 101 + handleDIDDataNative(mgr, did)(w, r) 102 + } else if parts[1] == "log/audit" { 103 + // /did:plc:xxx/log/audit -> Audit log 104 + handleDIDAuditLogNative(mgr, did)(w, r) 105 + } else { 106 + sendJSON(w, 404, map[string]string{"error": "not found"}) 107 } 108 + } 109 110 + // corsMiddleware adds CORS headers (skips WebSocket upgrade requests) 111 + func corsMiddleware(next http.Handler) http.Handler { 112 + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 113 + // Check if this is a WebSocket upgrade request 114 + if r.Header.Get("Upgrade") == "websocket" { 115 + // Skip CORS for WebSocket - pass through directly 116 + next.ServeHTTP(w, r) 117 + return 118 + } 119 120 + // Normal CORS handling for non-WebSocket requests 121 + w.Header().Set("Access-Control-Allow-Origin", "*") 122 + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") 123 124 + if requestedHeaders := r.Header.Get("Access-Control-Request-Headers"); requestedHeaders != "" { 125 + w.Header().Set("Access-Control-Allow-Headers", requestedHeaders) 126 + } else { 127 + w.Header().Set("Access-Control-Allow-Headers", "*") 128 + } 129 130 + w.Header().Set("Access-Control-Max-Age", "86400") 131 132 + if r.Method == "OPTIONS" { 133 + w.WriteHeader(204) 134 + return 135 + } 136 137 + next.ServeHTTP(w, r) 138 + }) 139 } 140 141 + // sendJSON sends JSON response 142 + func sendJSON(w http.ResponseWriter, statusCode int, data interface{}) { 143 + w.Header().Set("Content-Type", "application/json") 144 145 jsonData, err := json.Marshal(data) 146 if err != nil { 147 + w.WriteHeader(500) 148 + w.Write([]byte(`{"error":"failed to marshal JSON"}`)) 149 + return 150 } 151 152 + w.WriteHeader(statusCode) 153 + w.Write(jsonData) 154 } 155 156 + // Handler implementations 157 158 + func handleRootNative(mgr *bundle.Manager, syncMode bool, wsEnabled bool, resolverEnabled bool) http.HandlerFunc { 159 + return func(w http.ResponseWriter, r *http.Request) { 160 + w.Header().Set("Content-Type", "text/plain; charset=utf-8") 161 162 + index := mgr.GetIndex() 163 + stats := index.GetStats() 164 + bundleCount := stats["bundle_count"].(int) 165 166 + baseURL := getBaseURL(r) 167 + wsURL := getWSURL(r) 168 169 + var sb strings.Builder 170 171 + sb.WriteString(` 172 173 ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠄⠀⡀⠀⠀⠀⠀⠀⠀⢀⠀⠀⡀⠀⢀⠀⢀⡀⣤⡢⣤⡤⡀⡄⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 174 ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⡄⡄⠐⡀⠈⣀⠀⡠⡠⠀⣢⣆⢌⡾⢙⠺⢽⠾⡋⣻⡷⡫⢵⣭⢦⣴⠦⠀⢠⠀⠀⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ ··· 207 208 `) 209 210 + sb.WriteString("\nplcbundle server\n\n") 211 + sb.WriteString("What is PLC Bundle?\n") 212 + sb.WriteString("━━━━━━━━━━━━━━━━━━━━\n") 213 + sb.WriteString("plcbundle archives AT Protocol's DID PLC Directory operations into\n") 214 + sb.WriteString("immutable, cryptographically-chained bundles of 10,000 operations.\n\n") 215 + sb.WriteString("More info: https://tangled.org/@atscan.net/plcbundle\n\n") 216 217 + if bundleCount > 0 { 218 + sb.WriteString("Bundles\n") 219 + sb.WriteString("━━━━━━━\n") 220 + sb.WriteString(fmt.Sprintf(" Bundle count: %d\n", bundleCount)) 221 222 + firstBundle := stats["first_bundle"].(int) 223 + lastBundle := stats["last_bundle"].(int) 224 + totalSize := stats["total_size"].(int64) 225 + totalUncompressed := stats["total_uncompressed_size"].(int64) 226 227 + sb.WriteString(fmt.Sprintf(" Last bundle: %d (%s)\n", lastBundle, 228 + stats["updated_at"].(time.Time).Format("2006-01-02 15:04:05"))) 229 + sb.WriteString(fmt.Sprintf(" Range: %06d - %06d\n", firstBundle, lastBundle)) 230 + sb.WriteString(fmt.Sprintf(" Total size: %.2f MB\n", float64(totalSize)/(1000*1000))) 231 + sb.WriteString(fmt.Sprintf(" Uncompressed: %.2f MB (%.2fx)\n", 232 + float64(totalUncompressed)/(1000*1000), 233 + float64(totalUncompressed)/float64(totalSize))) 234 235 + if gaps, ok := stats["gaps"].(int); ok && gaps > 0 { 236 + sb.WriteString(fmt.Sprintf(" ⚠ Gaps: %d missing bundles\n", gaps)) 237 + } 238 239 + firstMeta, err := index.GetBundle(firstBundle) 240 + if err == nil { 241 + sb.WriteString(fmt.Sprintf("\n Root: %s\n", firstMeta.Hash)) 242 + } 243 244 + lastMeta, err := index.GetBundle(lastBundle) 245 + if err == nil { 246 + sb.WriteString(fmt.Sprintf(" Head: %s\n", lastMeta.Hash)) 247 + } 248 } 249 250 + if syncMode { 251 + mempoolStats := mgr.GetMempoolStats() 252 + count := mempoolStats["count"].(int) 253 + targetBundle := mempoolStats["target_bundle"].(int) 254 + canCreate := mempoolStats["can_create_bundle"].(bool) 255 256 + sb.WriteString("\nMempool Stats\n") 257 + sb.WriteString("━━━━━━━━━━━━━\n") 258 + sb.WriteString(fmt.Sprintf(" Target bundle: %d\n", targetBundle)) 259 + sb.WriteString(fmt.Sprintf(" Operations: %d / %d\n", count, bundle.BUNDLE_SIZE)) 260 + sb.WriteString(fmt.Sprintf(" Can create bundle: %v\n", canCreate)) 261 262 + if count > 0 { 263 + progress := float64(count) / float64(bundle.BUNDLE_SIZE) * 100 264 + sb.WriteString(fmt.Sprintf(" Progress: %.1f%%\n", progress)) 265 266 + barWidth := 50 267 + filled := int(float64(barWidth) * float64(count) / float64(bundle.BUNDLE_SIZE)) 268 + if filled > barWidth { 269 + filled = barWidth 270 + } 271 + bar := strings.Repeat("█", filled) + strings.Repeat("░", barWidth-filled) 272 + sb.WriteString(fmt.Sprintf(" [%s]\n", bar)) 273 274 + if firstTime, ok := mempoolStats["first_time"].(time.Time); ok { 275 + sb.WriteString(fmt.Sprintf(" First op: %s\n", firstTime.Format("2006-01-02 15:04:05"))) 276 + } 277 + if lastTime, ok := mempoolStats["last_time"].(time.Time); ok { 278 + sb.WriteString(fmt.Sprintf(" Last op: %s\n", lastTime.Format("2006-01-02 15:04:05"))) 279 + } 280 + } else { 281 + sb.WriteString(" (empty)\n") 282 } 283 } 284 285 + if didStats := mgr.GetDIDIndexStats(); didStats["exists"].(bool) { 286 + sb.WriteString("\nDID Index\n") 287 + sb.WriteString("━━━━━━━━━\n") 288 + sb.WriteString(" Status: enabled\n") 289 + 290 + indexedDIDs := didStats["indexed_dids"].(int64) 291 + mempoolDIDs := didStats["mempool_dids"].(int64) 292 + totalDIDs := didStats["total_dids"].(int64) 293 294 + if mempoolDIDs > 0 { 295 + sb.WriteString(fmt.Sprintf(" Total DIDs: %s (%s indexed + %s mempool)\n", 296 + formatNumber(int(totalDIDs)), 297 + formatNumber(int(indexedDIDs)), 298 + formatNumber(int(mempoolDIDs)))) 299 + } else { 300 + sb.WriteString(fmt.Sprintf(" Total DIDs: %s\n", formatNumber(int(totalDIDs)))) 301 + } 302 303 + sb.WriteString(fmt.Sprintf(" Cached shards: %d / %d\n", 304 + didStats["cached_shards"], didStats["cache_limit"])) 305 + sb.WriteString("\n") 306 } 307 308 + sb.WriteString("Server Stats\n") 309 + sb.WriteString("━━━━━━━━━━━━\n") 310 + sb.WriteString(fmt.Sprintf(" Version: %s\n", version)) 311 + if origin := mgr.GetPLCOrigin(); origin != "" { 312 + sb.WriteString(fmt.Sprintf(" Origin: %s\n", origin)) 313 + } 314 + sb.WriteString(fmt.Sprintf(" Sync mode: %v\n", syncMode)) 315 + sb.WriteString(fmt.Sprintf(" WebSocket: %v\n", wsEnabled)) 316 + sb.WriteString(fmt.Sprintf(" Resolver: %v\n", resolverEnabled)) 317 + sb.WriteString(fmt.Sprintf(" Uptime: %s\n", time.Since(serverStartTime).Round(time.Second))) 318 319 + sb.WriteString("\n\nAPI Endpoints\n") 320 + sb.WriteString("━━━━━━━━━━━━━\n") 321 + sb.WriteString(" GET / This info page\n") 322 + sb.WriteString(" GET /index.json Full bundle index\n") 323 + sb.WriteString(" GET /bundle/:number Bundle metadata (JSON)\n") 324 + sb.WriteString(" GET /data/:number Raw bundle (zstd compressed)\n") 325 + sb.WriteString(" GET /jsonl/:number Decompressed JSONL stream\n") 326 + sb.WriteString(" GET /status Server status\n") 327 + sb.WriteString(" GET /mempool Mempool operations (JSONL)\n") 328 329 + if resolverEnabled { 330 + sb.WriteString("\nDID Resolution\n") 331 + sb.WriteString("━━━━━━━━━━━━━━\n") 332 + sb.WriteString(" GET /:did DID Document (W3C format)\n") 333 + sb.WriteString(" GET /:did/data PLC State (raw format)\n") 334 + sb.WriteString(" GET /:did/log/audit Operation history\n") 335 336 + didStats := mgr.GetDIDIndexStats() 337 + if didStats["exists"].(bool) { 338 + sb.WriteString(fmt.Sprintf("\n Index: %s DIDs indexed\n", 339 + formatNumber(int(didStats["total_dids"].(int64))))) 340 + } else { 341 + sb.WriteString("\n ⚠️ Index: not built (will use slow scan)\n") 342 + } 343 + sb.WriteString("\n") 344 } 345 346 + if wsEnabled { 347 + sb.WriteString("\nWebSocket Endpoints\n") 348 + sb.WriteString("━━━━━━━━━━━━━━━━━━━\n") 349 + sb.WriteString(" WS /ws Live stream (new operations only)\n") 350 + sb.WriteString(" WS /ws?cursor=0 Stream all from beginning\n") 351 + sb.WriteString(" WS /ws?cursor=N Stream from cursor N\n\n") 352 + sb.WriteString("Cursor Format:\n") 353 + sb.WriteString(" Global record number: (bundleNumber × 10,000) + position\n") 354 + sb.WriteString(" Example: 88410345 = bundle 8841, position 345\n") 355 + sb.WriteString(" Default: starts from latest (skips all historical data)\n") 356 357 + latestCursor := mgr.GetCurrentCursor() 358 + bundledOps := len(index.GetBundles()) * bundle.BUNDLE_SIZE 359 + mempoolOps := latestCursor - bundledOps 360 361 + if syncMode && mempoolOps > 0 { 362 + sb.WriteString(fmt.Sprintf(" Current latest: %d (%d bundled + %d mempool)\n", 363 + latestCursor, bundledOps, mempoolOps)) 364 + } else { 365 + sb.WriteString(fmt.Sprintf(" Current latest: %d (%d bundles)\n", 366 + latestCursor, len(index.GetBundles()))) 367 + } 368 } 369 370 + sb.WriteString("\nExamples\n") 371 + sb.WriteString("━━━━━━━━\n") 372 + sb.WriteString(fmt.Sprintf(" curl %s/bundle/1\n", baseURL)) 373 + sb.WriteString(fmt.Sprintf(" curl %s/data/42 -o 000042.jsonl.zst\n", baseURL)) 374 + sb.WriteString(fmt.Sprintf(" curl %s/jsonl/1\n", baseURL)) 375 376 + if wsEnabled { 377 + sb.WriteString(fmt.Sprintf(" websocat %s/ws\n", wsURL)) 378 + sb.WriteString(fmt.Sprintf(" websocat '%s/ws?cursor=0'\n", wsURL)) 379 + } 380 381 + if syncMode { 382 + sb.WriteString(fmt.Sprintf(" curl %s/status\n", baseURL)) 383 + sb.WriteString(fmt.Sprintf(" curl %s/mempool\n", baseURL)) 384 + } 385 386 + sb.WriteString("\n────────────────────────────────────────────────────────────────\n") 387 + sb.WriteString("https://tangled.org/@atscan.net/plcbundle\n") 388 389 + w.Write([]byte(sb.String())) 390 + } 391 } 392 393 + func handleIndexJSONNative(mgr *bundle.Manager) http.HandlerFunc { 394 + return func(w http.ResponseWriter, r *http.Request) { 395 + index := mgr.GetIndex() 396 + sendJSON(w, 200, index) 397 + } 398 } 399 400 + func handleBundleNative(mgr *bundle.Manager) http.HandlerFunc { 401 + return func(w http.ResponseWriter, r *http.Request) { 402 + bundleNum, err := strconv.Atoi(r.PathValue("number")) 403 + if err != nil { 404 + sendJSON(w, 400, map[string]string{"error": "Invalid bundle number"}) 405 + return 406 + } 407 408 + meta, err := mgr.GetIndex().GetBundle(bundleNum) 409 + if err != nil { 410 + sendJSON(w, 404, map[string]string{"error": "Bundle not found"}) 411 + return 412 + } 413 414 + sendJSON(w, 200, meta) 415 + } 416 } 417 418 + func handleBundleDataNative(mgr *bundle.Manager) http.HandlerFunc { 419 + return func(w http.ResponseWriter, r *http.Request) { 420 + bundleNum, err := strconv.Atoi(r.PathValue("number")) 421 + if err != nil { 422 + sendJSON(w, 400, map[string]string{"error": "Invalid bundle number"}) 423 + return 424 + } 425 426 + reader, err := mgr.StreamBundleRaw(context.Background(), bundleNum) 427 + if err != nil { 428 + if strings.Contains(err.Error(), "not in index") || strings.Contains(err.Error(), "not found") { 429 + sendJSON(w, 404, map[string]string{"error": "Bundle not found"}) 430 + } else { 431 + sendJSON(w, 500, map[string]string{"error": err.Error()}) 432 + } 433 + return 434 } 435 + defer reader.Close() 436 437 + w.Header().Set("Content-Type", "application/zstd") 438 + w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%06d.jsonl.zst", bundleNum)) 439 440 + io.Copy(w, reader) 441 + } 442 } 443 444 + func handleBundleJSONLNative(mgr *bundle.Manager) http.HandlerFunc { 445 + return func(w http.ResponseWriter, r *http.Request) { 446 + bundleNum, err := strconv.Atoi(r.PathValue("number")) 447 + if err != nil { 448 + sendJSON(w, 400, map[string]string{"error": "Invalid bundle number"}) 449 + return 450 + } 451 452 + reader, err := mgr.StreamBundleDecompressed(context.Background(), bundleNum) 453 + if err != nil { 454 + if strings.Contains(err.Error(), "not in index") || strings.Contains(err.Error(), "not found") { 455 + sendJSON(w, 404, map[string]string{"error": "Bundle not found"}) 456 + } else { 457 + sendJSON(w, 500, map[string]string{"error": err.Error()}) 458 + } 459 + return 460 } 461 + defer reader.Close() 462 463 + w.Header().Set("Content-Type", "application/x-ndjson") 464 + w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%06d.jsonl", bundleNum)) 465 466 + io.Copy(w, reader) 467 + } 468 } 469 470 + func handleStatusNative(mgr *bundle.Manager, syncMode bool, wsEnabled bool) http.HandlerFunc { 471 + return func(w http.ResponseWriter, r *http.Request) { 472 + index := mgr.GetIndex() 473 + indexStats := index.GetStats() 474 475 + response := StatusResponse{ 476 + Server: ServerStatus{ 477 + Version: version, 478 + UptimeSeconds: int(time.Since(serverStartTime).Seconds()), 479 + SyncMode: syncMode, 480 + WebSocketEnabled: wsEnabled, 481 + Origin: mgr.GetPLCOrigin(), 482 + }, 483 + Bundles: BundleStatus{ 484 + Count: indexStats["bundle_count"].(int), 485 + TotalSize: indexStats["total_size"].(int64), 486 + UncompressedSize: indexStats["total_uncompressed_size"].(int64), 487 + }, 488 + } 489 490 + if syncMode && syncInterval > 0 { 491 + response.Server.SyncIntervalSeconds = int(syncInterval.Seconds()) 492 + } 493 494 + if bundleCount := response.Bundles.Count; bundleCount > 0 { 495 + firstBundle := indexStats["first_bundle"].(int) 496 + lastBundle := indexStats["last_bundle"].(int) 497 498 + response.Bundles.FirstBundle = firstBundle 499 + response.Bundles.LastBundle = lastBundle 500 + response.Bundles.StartTime = indexStats["start_time"].(time.Time) 501 + response.Bundles.EndTime = indexStats["end_time"].(time.Time) 502 503 + if firstMeta, err := index.GetBundle(firstBundle); err == nil { 504 + response.Bundles.RootHash = firstMeta.Hash 505 + } 506 507 + if lastMeta, err := index.GetBundle(lastBundle); err == nil { 508 + response.Bundles.HeadHash = lastMeta.Hash 509 + response.Bundles.HeadAgeSeconds = int(time.Since(lastMeta.EndTime).Seconds()) 510 + } 511 512 + if gaps, ok := indexStats["gaps"].(int); ok { 513 + response.Bundles.Gaps = gaps 514 + response.Bundles.HasGaps = gaps > 0 515 + if gaps > 0 { 516 + response.Bundles.GapNumbers = index.FindGaps() 517 + } 518 } 519 520 + totalOps := bundleCount * bundle.BUNDLE_SIZE 521 + response.Bundles.TotalOperations = totalOps 522 523 + duration := response.Bundles.EndTime.Sub(response.Bundles.StartTime) 524 + if duration.Hours() > 0 { 525 + response.Bundles.AvgOpsPerHour = int(float64(totalOps) / duration.Hours()) 526 + } 527 } 528 529 + if syncMode { 530 + mempoolStats := mgr.GetMempoolStats() 531 532 + if count, ok := mempoolStats["count"].(int); ok { 533 + mempool := &MempoolStatus{ 534 + Count: count, 535 + TargetBundle: mempoolStats["target_bundle"].(int), 536 + CanCreateBundle: mempoolStats["can_create_bundle"].(bool), 537 + MinTimestamp: mempoolStats["min_timestamp"].(time.Time), 538 + Validated: mempoolStats["validated"].(bool), 539 + ProgressPercent: float64(count) / float64(bundle.BUNDLE_SIZE) * 100, 540 + BundleSize: bundle.BUNDLE_SIZE, 541 + OperationsNeeded: bundle.BUNDLE_SIZE - count, 542 + } 543 544 + if firstTime, ok := mempoolStats["first_time"].(time.Time); ok { 545 + mempool.FirstTime = firstTime 546 + mempool.TimespanSeconds = int(time.Since(firstTime).Seconds()) 547 + } 548 + if lastTime, ok := mempoolStats["last_time"].(time.Time); ok { 549 + mempool.LastTime = lastTime 550 + mempool.LastOpAgeSeconds = int(time.Since(lastTime).Seconds()) 551 + } 552 553 + if count > 100 && count < bundle.BUNDLE_SIZE { 554 + if !mempool.FirstTime.IsZero() && !mempool.LastTime.IsZero() { 555 + timespan := mempool.LastTime.Sub(mempool.FirstTime) 556 + if timespan.Seconds() > 0 { 557 + opsPerSec := float64(count) / timespan.Seconds() 558 + remaining := bundle.BUNDLE_SIZE - count 559 + mempool.EtaNextBundleSeconds = int(float64(remaining) / opsPerSec) 560 + } 561 } 562 } 563 + 564 + response.Mempool = mempool 565 } 566 + } 567 568 + sendJSON(w, 200, response) 569 } 570 } 571 572 + func handleMempoolNative(mgr *bundle.Manager) http.HandlerFunc { 573 + return func(w http.ResponseWriter, r *http.Request) { 574 + ops, err := mgr.GetMempoolOperations() 575 + if err != nil { 576 + sendJSON(w, 500, map[string]string{"error": err.Error()}) 577 + return 578 + } 579 580 + w.Header().Set("Content-Type", "application/x-ndjson") 581 582 + if len(ops) == 0 { 583 + return 584 + } 585 586 + for _, op := range ops { 587 + if len(op.RawJSON) > 0 { 588 + w.Write(op.RawJSON) 589 + } else { 590 + data, _ := json.Marshal(op) 591 + w.Write(data) 592 + } 593 + w.Write([]byte("\n")) 594 } 595 } 596 } 597 598 + func handleDebugMemoryNative(mgr *bundle.Manager) http.HandlerFunc { 599 + return func(w http.ResponseWriter, r *http.Request) { 600 + var m runtime.MemStats 601 + runtime.ReadMemStats(&m) 602 603 + didStats := mgr.GetDIDIndexStats() 604 605 + beforeAlloc := m.Alloc / 1024 / 1024 606 607 + runtime.GC() 608 + runtime.ReadMemStats(&m) 609 + afterAlloc := m.Alloc / 1024 / 1024 610 611 + response := fmt.Sprintf(`Memory Stats: 612 Alloc: %d MB 613 TotalAlloc: %d MB 614 Sys: %d MB ··· 620 After GC: 621 Alloc: %d MB 622 `, 623 + beforeAlloc, 624 + m.TotalAlloc/1024/1024, 625 + m.Sys/1024/1024, 626 + m.NumGC, 627 + didStats["cached_shards"], 628 + didStats["cache_limit"], 629 + afterAlloc) 630 631 + w.Header().Set("Content-Type", "text/plain") 632 + w.Write([]byte(response)) 633 } 634 } 635 636 + func handleWebSocketNative(mgr *bundle.Manager) http.HandlerFunc { 637 + return func(w http.ResponseWriter, r *http.Request) { 638 + cursorStr := r.URL.Query().Get("cursor") 639 + var cursor int 640 641 + if cursorStr == "" { 642 + cursor = mgr.GetCurrentCursor() 643 + } else { 644 + var err error 645 + cursor, err = strconv.Atoi(cursorStr) 646 + if err != nil || cursor < 0 { 647 + http.Error(w, "Invalid cursor: must be non-negative integer", 400) 648 + return 649 + } 650 + } 651 652 + conn, err := upgrader.Upgrade(w, r, nil) 653 + if err != nil { 654 + fmt.Fprintf(os.Stderr, "WebSocket upgrade failed: %v\n", err) 655 + return 656 } 657 + defer conn.Close() 658 + 659 + conn.SetPongHandler(func(string) error { 660 + conn.SetReadDeadline(time.Now().Add(60 * time.Second)) 661 + return nil 662 + }) 663 664 + done := make(chan struct{}) 665 666 + go func() { 667 + defer close(done) 668 + for { 669 + _, _, err := conn.ReadMessage() 670 + if err != nil { 671 + if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { 672 + fmt.Fprintf(os.Stderr, "WebSocket: client closed connection\n") 673 + } 674 + return 675 + } 676 + } 677 + }() 678 679 + bgCtx := context.Background() 680 681 + if err := streamLive(bgCtx, conn, mgr, cursor, done); err != nil { 682 + fmt.Fprintf(os.Stderr, "WebSocket stream error: %v\n", err) 683 + } 684 } 685 } 686 687 + func handleDIDDocumentLatestNative(mgr *bundle.Manager, did string) http.HandlerFunc { 688 + return func(w http.ResponseWriter, r *http.Request) { 689 + op, err := mgr.GetLatestDIDOperation(context.Background(), did) 690 + if err != nil { 691 + sendJSON(w, 500, map[string]string{"error": err.Error()}) 692 + return 693 + } 694 695 + doc, err := plc.ResolveDIDDocument(did, []plc.PLCOperation{*op}) 696 + if err != nil { 697 + if strings.Contains(err.Error(), "deactivated") { 698 + sendJSON(w, 410, map[string]string{"error": "DID has been deactivated"}) 699 + } else { 700 + sendJSON(w, 500, map[string]string{"error": fmt.Sprintf("Resolution failed: %v", err)}) 701 + } 702 return 703 } 704 705 + w.Header().Set("Content-Type", "application/did+ld+json") 706 + sendJSON(w, 200, doc) 707 } 708 + } 709 710 + func handleDIDDataNative(mgr *bundle.Manager, did string) http.HandlerFunc { 711 + return func(w http.ResponseWriter, r *http.Request) { 712 + if err := plc.ValidateDIDFormat(did); err != nil { 713 + sendJSON(w, 400, map[string]string{"error": "Invalid DID format"}) 714 + return 715 + } 716 717 + operations, err := mgr.GetDIDOperations(context.Background(), did, false) 718 + if err != nil { 719 + sendJSON(w, 500, map[string]string{"error": err.Error()}) 720 + return 721 + } 722 723 + if len(operations) == 0 { 724 + sendJSON(w, 404, map[string]string{"error": "DID not found"}) 725 + return 726 + } 727 728 + state, err := plc.BuildDIDState(did, operations) 729 + if err != nil { 730 + if strings.Contains(err.Error(), "deactivated") { 731 + sendJSON(w, 410, map[string]string{"error": "DID has been deactivated"}) 732 + } else { 733 + sendJSON(w, 500, map[string]string{"error": err.Error()}) 734 + } 735 + return 736 + } 737 738 + sendJSON(w, 200, state) 739 } 740 + } 741 742 + func handleDIDAuditLogNative(mgr *bundle.Manager, did string) http.HandlerFunc { 743 + return func(w http.ResponseWriter, r *http.Request) { 744 + if err := plc.ValidateDIDFormat(did); err != nil { 745 + sendJSON(w, 400, map[string]string{"error": "Invalid DID format"}) 746 + return 747 + } 748 749 + operations, err := mgr.GetDIDOperations(context.Background(), did, false) 750 + if err != nil { 751 + sendJSON(w, 500, map[string]string{"error": err.Error()}) 752 + return 753 } 754 755 + if len(operations) == 0 { 756 + sendJSON(w, 404, map[string]string{"error": "DID not found"}) 757 + return 758 + } 759 760 + auditLog := plc.FormatAuditLog(operations) 761 + sendJSON(w, 200, auditLog) 762 } 763 } 764 765 + // WebSocket streaming functions (unchanged from your original) 766 + 767 func streamLive(ctx context.Context, conn *websocket.Conn, mgr *bundle.Manager, startCursor int, done chan struct{}) error { 768 index := mgr.GetIndex() 769 bundles := index.GetBundles() ··· 974 return "ws" 975 } 976 977 + func getBaseURL(r *http.Request) string { 978 + scheme := getScheme(r) 979 + host := r.Host 980 + return fmt.Sprintf("%s://%s", scheme, host) 981 } 982 983 + func getWSURL(r *http.Request) string { 984 + scheme := getWSScheme(r) 985 + host := r.Host 986 + return fmt.Sprintf("%s://%s", scheme, host) 987 } 988 989 + // Response types (unchanged) 990 991 type StatusResponse struct { 992 Bundles BundleStatus `json:"bundles"` ··· 1039 EtaNextBundleSeconds int `json:"eta_next_bundle_seconds,omitempty"` 1040 } 1041 1042 + // Background sync (unchanged) 1043 1044 func runSync(ctx context.Context, mgr *bundle.Manager, interval time.Duration, verbose bool, resolverEnabled bool) { 1045 syncBundles(ctx, mgr, verbose, resolverEnabled)
-3
go.mod
··· 3 go 1.25 4 5 require ( 6 - git.urbach.dev/go/web v0.0.0-20250827103423-e50f220853ff 7 github.com/DataDog/zstd v1.5.7 8 github.com/goccy/go-json v0.10.5 9 github.com/gorilla/websocket v1.5.3 10 ) 11 - 12 - require git.urbach.dev/go/router v0.0.0-20250721083733-8d04266bc544 // indirect
··· 3 go 1.25 4 5 require ( 6 github.com/DataDog/zstd v1.5.7 7 github.com/goccy/go-json v0.10.5 8 github.com/gorilla/websocket v1.5.3 9 )
-6
go.sum
··· 1 - git.urbach.dev/go/assert v0.0.0-20250606150337-559d3d3afcda h1:VN6ZQwtwLOm2xTms+v8IIeeNjvs55qyEBNArv3dPq9g= 2 - git.urbach.dev/go/assert v0.0.0-20250606150337-559d3d3afcda/go.mod h1:PNI/NSBOqvoeU58/7eBsIR09Yoq2S/qtSRiTrctkiq0= 3 - git.urbach.dev/go/router v0.0.0-20250721083733-8d04266bc544 h1:ChhCFmPVTDzj5rdzYsbQFJrDwzUldnZQMt2Bgc0gcwM= 4 - git.urbach.dev/go/router v0.0.0-20250721083733-8d04266bc544/go.mod h1:seUQ5raGaj6fDeZP6d7JdgnWQys8oTrtFdvBhAp1IZA= 5 - git.urbach.dev/go/web v0.0.0-20250827103423-e50f220853ff h1:bB+YedSwmEjgAFe9W6WMwKFi1T/b78z7072HyVPAcCw= 6 - git.urbach.dev/go/web v0.0.0-20250827103423-e50f220853ff/go.mod h1:ON84DswRfsjIgAloDGjbt9PWWhDcMVEek3LsCHuYnOg= 7 github.com/DataDog/zstd v1.5.7 h1:ybO8RBeh29qrxIhCA9E8gKY6xfONU9T6G6aP9DTKfLE= 8 github.com/DataDog/zstd v1.5.7/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= 9 github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4=
··· 1 github.com/DataDog/zstd v1.5.7 h1:ybO8RBeh29qrxIhCA9E8gKY6xfONU9T6G6aP9DTKfLE= 2 github.com/DataDog/zstd v1.5.7/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= 3 github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4=