A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory

ws bundle streaming

+115 -117
+115 -117
cmd/plcbundle/server.go
··· 1 1 package main 2 2 3 3 import ( 4 + "bufio" 4 5 "context" 5 6 "fmt" 6 7 "io" ··· 360 361 // streamLive streams all historical data then continues with live updates 361 362 func streamLive(ctx context.Context, conn *websocket.Conn, mgr *bundle.Manager, startCursor int, done chan struct{}) error { 362 363 index := mgr.GetIndex() 364 + bundles := index.GetBundles() 363 365 currentRecord := startCursor 364 366 365 - // Step 1: Stream all historical bundles 366 - bundles := index.GetBundles() 367 + // Step 1: Stream historical bundles 367 368 if len(bundles) > 0 { 368 369 startBundleIdx := startCursor / bundle.BUNDLE_SIZE 369 370 startPosition := startCursor % bundle.BUNDLE_SIZE 370 371 371 372 if startBundleIdx < len(bundles) { 373 + // Stream from startBundleIdx to end 372 374 for i := startBundleIdx; i < len(bundles); i++ { 373 - select { 374 - case <-done: 375 - return nil // Client disconnected 376 - default: 377 - } 378 - 379 - meta := bundles[i] 380 - b, err := mgr.LoadBundle(ctx, meta.BundleNumber) 381 - if err != nil { 382 - fmt.Fprintf(os.Stderr, "Failed to load bundle %d: %v\n", meta.BundleNumber, err) 383 - continue 384 - } 385 - 386 - startPos := 0 375 + skipUntil := 0 387 376 if i == startBundleIdx { 388 - startPos = startPosition 377 + skipUntil = startPosition 389 378 } 390 379 391 - for j := startPos; j < len(b.Operations); j++ { 392 - select { 393 - case <-done: 394 - return nil 395 - default: 396 - } 397 - 398 - if err := sendOperation(conn, b.Operations[j]); err != nil { 399 - return err 400 - } 401 - currentRecord++ 402 - 403 - if currentRecord%1000 == 0 { 404 - if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { 405 - return err 406 - } 407 - } 380 + newRecordCount, err := streamBundle(ctx, conn, mgr, bundles[i].BundleNumber, skipUntil, done) 381 + if err != nil { 382 + return err 408 383 } 384 + currentRecord += newRecordCount 409 385 } 410 386 } 411 387 } 412 388 413 389 // Step 2: Stream current mempool 414 390 lastSeenMempoolCount := 0 415 - mempoolOps, err := mgr.GetMempoolOperations() 416 - if err == nil { 417 - bundleRecordBase := len(bundles) * bundle.BUNDLE_SIZE 418 - 419 - for i, op := range mempoolOps { 420 - select { 421 - case <-done: 422 - return nil 423 - default: 424 - } 425 - 426 - recordNum := bundleRecordBase + i 427 - if recordNum < startCursor { 428 - continue 429 - } 430 - 431 - if err := sendOperation(conn, op); err != nil { 432 - return err 433 - } 434 - currentRecord++ 435 - lastSeenMempoolCount = i + 1 436 - } 391 + if err := streamMempool(conn, mgr, startCursor, len(bundles)*bundle.BUNDLE_SIZE, &currentRecord, &lastSeenMempoolCount, done); err != nil { 392 + return err 437 393 } 438 394 439 - // Step 3: Enter live streaming loop 440 - // Poll for new operations in mempool and new bundles 441 - ticker := time.NewTicker(2 * time.Second) 395 + // Step 3: Live streaming loop 396 + ticker := time.NewTicker(500 * time.Millisecond) 442 397 defer ticker.Stop() 443 398 444 399 lastBundleCount := len(bundles) 445 - 446 400 fmt.Fprintf(os.Stderr, "WebSocket: entering live mode at cursor %d\n", currentRecord) 447 401 448 402 for { ··· 452 406 return nil 453 407 454 408 case <-ticker.C: 455 - // Refresh index to check for new bundles 409 + // Check for new bundles 456 410 index = mgr.GetIndex() 457 411 bundles = index.GetBundles() 458 412 459 - // Check if new bundles were created 460 413 if len(bundles) > lastBundleCount { 461 414 fmt.Fprintf(os.Stderr, "WebSocket: detected %d new bundle(s)\n", len(bundles)-lastBundleCount) 462 415 463 416 // Stream new bundles 464 417 for i := lastBundleCount; i < len(bundles); i++ { 465 - select { 466 - case <-done: 467 - return nil 468 - default: 469 - } 470 - 471 - meta := bundles[i] 472 - b, err := mgr.LoadBundle(ctx, meta.BundleNumber) 418 + newRecordCount, err := streamBundle(ctx, conn, mgr, bundles[i].BundleNumber, 0, done) 473 419 if err != nil { 474 - fmt.Fprintf(os.Stderr, "Failed to load bundle %d: %v\n", meta.BundleNumber, err) 475 - continue 420 + return err 476 421 } 477 - 478 - for _, op := range b.Operations { 479 - select { 480 - case <-done: 481 - return nil 482 - default: 483 - } 484 - 485 - if err := sendOperation(conn, op); err != nil { 486 - return err 487 - } 488 - currentRecord++ 489 - 490 - if currentRecord%1000 == 0 { 491 - if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { 492 - return err 493 - } 494 - } 495 - } 422 + currentRecord += newRecordCount 496 423 } 497 424 498 425 lastBundleCount = len(bundles) 499 - lastSeenMempoolCount = 0 // Reset mempool count after bundle creation 426 + lastSeenMempoolCount = 0 // Reset after bundle creation 500 427 } 501 428 502 - // Check for new operations in mempool 503 - mempoolOps, err := mgr.GetMempoolOperations() 504 - if err != nil { 505 - continue 429 + // Check for new mempool operations 430 + if err := streamMempool(conn, mgr, startCursor, len(bundles)*bundle.BUNDLE_SIZE, &currentRecord, &lastSeenMempoolCount, done); err != nil { 431 + return err 432 + } 433 + 434 + // Keep-alive ping 435 + if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { 436 + return err 506 437 } 438 + } 439 + } 440 + } 507 441 508 - if len(mempoolOps) > lastSeenMempoolCount { 509 - fmt.Fprintf(os.Stderr, "WebSocket: streaming %d new mempool operation(s)\n", 510 - len(mempoolOps)-lastSeenMempoolCount) 442 + // streamBundle streams a single bundle's operations without parsing 443 + // Returns number of records streamed 444 + func streamBundle(ctx context.Context, conn *websocket.Conn, mgr *bundle.Manager, bundleNumber int, skipUntil int, done chan struct{}) (int, error) { 445 + reader, err := mgr.StreamBundleDecompressed(ctx, bundleNumber) 446 + if err != nil { 447 + fmt.Fprintf(os.Stderr, "Failed to stream bundle %d: %v\n", bundleNumber, err) 448 + return 0, nil // Continue with next bundle 449 + } 450 + defer reader.Close() 451 + 452 + scanner := bufio.NewScanner(reader) 453 + buf := make([]byte, 0, 64*1024) 454 + scanner.Buffer(buf, 1024*1024) 455 + 456 + position := 0 457 + streamed := 0 458 + 459 + for scanner.Scan() { 460 + line := scanner.Bytes() 461 + if len(line) == 0 { 462 + continue 463 + } 464 + 465 + // Skip until start position 466 + if position < skipUntil { 467 + position++ 468 + continue 469 + } 470 + 471 + select { 472 + case <-done: 473 + return streamed, nil 474 + default: 475 + } 476 + 477 + // Send raw JSON line 478 + if err := conn.WriteMessage(websocket.TextMessage, line); err != nil { 479 + return streamed, err 480 + } 481 + 482 + position++ 483 + streamed++ 484 + 485 + if streamed%1000 == 0 { 486 + conn.WriteMessage(websocket.PingMessage, nil) 487 + } 488 + } 489 + 490 + if err := scanner.Err(); err != nil { 491 + return streamed, fmt.Errorf("scanner error on bundle %d: %w", bundleNumber, err) 492 + } 493 + 494 + return streamed, nil 495 + } 496 + 497 + // streamMempool streams new operations from mempool 498 + func streamMempool(conn *websocket.Conn, mgr *bundle.Manager, startCursor int, bundleRecordBase int, currentRecord *int, lastSeenCount *int, done chan struct{}) error { 499 + mempoolOps, err := mgr.GetMempoolOperations() 500 + if err != nil { 501 + return nil // Not fatal 502 + } 503 + 504 + if len(mempoolOps) <= *lastSeenCount { 505 + return nil // No new operations 506 + } 511 507 512 - // Stream new mempool operations 513 - for i := lastSeenMempoolCount; i < len(mempoolOps); i++ { 514 - select { 515 - case <-done: 516 - return nil 517 - default: 518 - } 508 + newOps := len(mempoolOps) - *lastSeenCount 509 + if newOps > 0 { 510 + fmt.Fprintf(os.Stderr, "WebSocket: streaming %d new mempool operation(s)\n", newOps) 511 + } 519 512 520 - if err := sendOperation(conn, mempoolOps[i]); err != nil { 521 - return err 522 - } 523 - currentRecord++ 524 - } 513 + for i := *lastSeenCount; i < len(mempoolOps); i++ { 514 + recordNum := bundleRecordBase + i 515 + if recordNum < startCursor { 516 + continue 517 + } 525 518 526 - lastSeenMempoolCount = len(mempoolOps) 527 - } 519 + select { 520 + case <-done: 521 + return nil 522 + default: 523 + } 528 524 529 - // Send periodic ping to keep connection alive 530 - if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { 531 - return err 532 - } 525 + if err := sendOperation(conn, mempoolOps[i]); err != nil { 526 + return err 533 527 } 528 + *currentRecord++ 534 529 } 530 + 531 + *lastSeenCount = len(mempoolOps) 532 + return nil 535 533 } 536 534 537 535 // sendOperation sends a single operation over WebSocket as raw JSON