A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory

update ws

+179 -95
+179 -95
cmd/plcbundle/server.go
··· 119 119 } 120 120 121 121 // handleWebSocket streams all records via WebSocket starting from cursor 122 + // Keeps connection alive and streams new records as they arrive 122 123 func handleWebSocket(w http.ResponseWriter, r *http.Request, mgr *bundle.Manager) { 123 124 // Parse cursor from query parameter (defaults to 0) 124 125 cursorStr := r.URL.Query().Get("cursor") ··· 132 133 } 133 134 } 134 135 135 - // Check if client wants to keep connection alive 136 - keepAlive := r.URL.Query().Get("keepalive") == "true" 137 - 138 136 // Upgrade to WebSocket 139 137 conn, err := upgrader.Upgrade(w, r, nil) 140 138 if err != nil { ··· 143 141 } 144 142 defer conn.Close() 145 143 146 - // Set up ping/pong handlers for keepalive 144 + // Set up handlers for connection management 147 145 conn.SetPongHandler(func(string) error { 148 146 conn.SetReadDeadline(time.Now().Add(60 * time.Second)) 149 147 return nil 150 148 }) 151 149 152 - ctx := context.Background() 153 - index := mgr.GetIndex() 154 - bundles := index.GetBundles() 150 + // Channel to signal client disconnect 151 + done := make(chan struct{}) 155 152 156 - if len(bundles) == 0 { 157 - if !keepAlive { 158 - closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "no bundles available") 159 - conn.WriteMessage(websocket.CloseMessage, closeMsg) 153 + // Start goroutine to detect client disconnect 154 + go func() { 155 + defer close(done) 156 + for { 157 + _, _, err := conn.ReadMessage() 158 + if err != nil { 159 + if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { 160 + fmt.Fprintf(os.Stderr, "WebSocket: client closed connection\n") 161 + } 162 + return 163 + } 160 164 } 161 - return 162 - } 165 + }() 163 166 164 - // Calculate starting bundle and position from cursor 165 - startBundleIdx := cursor / bundle.BUNDLE_SIZE 166 - startPosition := cursor % bundle.BUNDLE_SIZE 167 + ctx := context.Background() 167 168 168 - // Validate starting bundle exists 169 - if startBundleIdx >= len(bundles) { 170 - currentRecord := len(bundles) * bundle.BUNDLE_SIZE 171 - if err := streamMempool(conn, mgr, cursor, currentRecord); err != nil { 172 - return 173 - } 174 - if !keepAlive { 175 - closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "stream complete") 176 - conn.WriteMessage(websocket.CloseMessage, closeMsg) 177 - } 178 - return 169 + // Stream all data and keep connection alive 170 + if err := streamLive(ctx, conn, mgr, cursor, done); err != nil { 171 + fmt.Fprintf(os.Stderr, "WebSocket stream error: %v\n", err) 179 172 } 173 + } 180 174 181 - currentRecord := cursor 175 + // streamLive streams all historical data then continues with live updates 176 + func streamLive(ctx context.Context, conn *websocket.Conn, mgr *bundle.Manager, startCursor int, done chan struct{}) error { 177 + index := mgr.GetIndex() 178 + currentRecord := startCursor 182 179 183 - // Stream bundles starting from the calculated bundle 184 - for i := startBundleIdx; i < len(bundles); i++ { 185 - meta := bundles[i] 180 + // Step 1: Stream all historical bundles 181 + bundles := index.GetBundles() 182 + if len(bundles) > 0 { 183 + startBundleIdx := startCursor / bundle.BUNDLE_SIZE 184 + startPosition := startCursor % bundle.BUNDLE_SIZE 186 185 187 - b, err := mgr.LoadBundle(ctx, meta.BundleNumber) 188 - if err != nil { 189 - fmt.Fprintf(os.Stderr, "Failed to load bundle %d: %v\n", meta.BundleNumber, err) 190 - continue 191 - } 186 + if startBundleIdx < len(bundles) { 187 + for i := startBundleIdx; i < len(bundles); i++ { 188 + select { 189 + case <-done: 190 + return nil // Client disconnected 191 + default: 192 + } 192 193 193 - startPos := 0 194 - if i == startBundleIdx { 195 - startPos = startPosition 196 - } 194 + meta := bundles[i] 195 + b, err := mgr.LoadBundle(ctx, meta.BundleNumber) 196 + if err != nil { 197 + fmt.Fprintf(os.Stderr, "Failed to load bundle %d: %v\n", meta.BundleNumber, err) 198 + continue 199 + } 197 200 198 - for j := startPos; j < len(b.Operations); j++ { 199 - op := b.Operations[j] 201 + startPos := 0 202 + if i == startBundleIdx { 203 + startPos = startPosition 204 + } 200 205 201 - if err := sendOperation(conn, op); err != nil { 202 - return 203 - } 206 + for j := startPos; j < len(b.Operations); j++ { 207 + select { 208 + case <-done: 209 + return nil 210 + default: 211 + } 204 212 205 - currentRecord++ 213 + if err := sendOperation(conn, b.Operations[j]); err != nil { 214 + return err 215 + } 216 + currentRecord++ 206 217 207 - if currentRecord%1000 == 0 { 208 - if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { 209 - return 218 + if currentRecord%1000 == 0 { 219 + if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { 220 + return err 221 + } 222 + } 210 223 } 211 224 } 212 225 } 213 226 } 214 227 215 - // Stream mempool 216 - if err := streamMempool(conn, mgr, cursor, currentRecord); err != nil { 217 - return 218 - } 228 + // Step 2: Stream current mempool 229 + lastSeenMempoolCount := 0 230 + mempoolOps, err := mgr.GetMempoolOperations() 231 + if err == nil { 232 + bundleRecordBase := len(bundles) * bundle.BUNDLE_SIZE 219 233 220 - if keepAlive { 221 - // Keep connection open and wait for client to close 222 - fmt.Fprintf(os.Stderr, "WebSocket: stream complete, keeping connection alive\n") 234 + for i, op := range mempoolOps { 235 + select { 236 + case <-done: 237 + return nil 238 + default: 239 + } 223 240 224 - // Read messages from client (to detect close) 225 - conn.SetReadDeadline(time.Now().Add(60 * time.Second)) 226 - for { 227 - _, _, err := conn.ReadMessage() 228 - if err != nil { 229 - if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { 230 - fmt.Fprintf(os.Stderr, "WebSocket: client closed connection\n") 231 - } 232 - break 241 + recordNum := bundleRecordBase + i 242 + if recordNum < startCursor { 243 + continue 244 + } 245 + 246 + if err := sendOperation(conn, op); err != nil { 247 + return err 233 248 } 249 + currentRecord++ 250 + lastSeenMempoolCount = i + 1 234 251 } 235 - } else { 236 - // Close gracefully 237 - closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "stream complete") 238 - conn.WriteMessage(websocket.CloseMessage, closeMsg) 239 - time.Sleep(100 * time.Millisecond) 240 252 } 241 - } 253 + 254 + // Step 3: Enter live streaming loop 255 + // Poll for new operations in mempool and new bundles 256 + ticker := time.NewTicker(2 * time.Second) 257 + defer ticker.Stop() 258 + 259 + lastBundleCount := len(bundles) 260 + 261 + fmt.Fprintf(os.Stderr, "WebSocket: entering live mode at cursor %d\n", currentRecord) 262 + 263 + for { 264 + select { 265 + case <-done: 266 + fmt.Fprintf(os.Stderr, "WebSocket: client disconnected, stopping stream\n") 267 + return nil 268 + 269 + case <-ticker.C: 270 + // Refresh index to check for new bundles 271 + index = mgr.GetIndex() 272 + bundles = index.GetBundles() 273 + 274 + // Check if new bundles were created 275 + if len(bundles) > lastBundleCount { 276 + fmt.Fprintf(os.Stderr, "WebSocket: detected %d new bundle(s)\n", len(bundles)-lastBundleCount) 277 + 278 + // Stream new bundles 279 + for i := lastBundleCount; i < len(bundles); i++ { 280 + select { 281 + case <-done: 282 + return nil 283 + default: 284 + } 285 + 286 + meta := bundles[i] 287 + b, err := mgr.LoadBundle(ctx, meta.BundleNumber) 288 + if err != nil { 289 + fmt.Fprintf(os.Stderr, "Failed to load bundle %d: %v\n", meta.BundleNumber, err) 290 + continue 291 + } 292 + 293 + for _, op := range b.Operations { 294 + select { 295 + case <-done: 296 + return nil 297 + default: 298 + } 299 + 300 + if err := sendOperation(conn, op); err != nil { 301 + return err 302 + } 303 + currentRecord++ 304 + 305 + if currentRecord%1000 == 0 { 306 + if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { 307 + return err 308 + } 309 + } 310 + } 311 + } 242 312 243 - // streamMempool streams mempool operations if cursor is in range 244 - func streamMempool(conn *websocket.Conn, mgr *bundle.Manager, cursor int, currentRecord int) { 245 - mempoolOps, err := mgr.GetMempoolOperations() 246 - if err != nil { 247 - fmt.Fprintf(os.Stderr, "Failed to get mempool operations: %v\n", err) 248 - return 249 - } 313 + lastBundleCount = len(bundles) 314 + lastSeenMempoolCount = 0 // Reset mempool count after bundle creation 315 + } 250 316 251 - for _, op := range mempoolOps { 252 - // Skip records before cursor 253 - if currentRecord < cursor { 254 - currentRecord++ 255 - continue 256 - } 317 + // Check for new operations in mempool 318 + mempoolOps, err := mgr.GetMempoolOperations() 319 + if err != nil { 320 + continue 321 + } 257 322 258 - // Send raw JSON 259 - if err := sendOperation(conn, op); err != nil { 260 - return 261 - } 323 + if len(mempoolOps) > lastSeenMempoolCount { 324 + fmt.Fprintf(os.Stderr, "WebSocket: streaming %d new mempool operation(s)\n", 325 + len(mempoolOps)-lastSeenMempoolCount) 262 326 263 - currentRecord++ 327 + // Stream new mempool operations 328 + for i := lastSeenMempoolCount; i < len(mempoolOps); i++ { 329 + select { 330 + case <-done: 331 + return nil 332 + default: 333 + } 264 334 265 - // Send ping periodically 266 - if currentRecord%1000 == 0 { 335 + if err := sendOperation(conn, mempoolOps[i]); err != nil { 336 + return err 337 + } 338 + currentRecord++ 339 + } 340 + 341 + lastSeenMempoolCount = len(mempoolOps) 342 + } 343 + 344 + // Send periodic ping to keep connection alive 267 345 if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { 268 - return 346 + return err 269 347 } 270 348 } 271 349 } ··· 289 367 290 368 // Send as text message 291 369 if err := conn.WriteMessage(websocket.TextMessage, data); err != nil { 292 - fmt.Fprintf(os.Stderr, "WebSocket write error: %v\n", err) 370 + if !websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { 371 + fmt.Fprintf(os.Stderr, "WebSocket write error: %v\n", err) 372 + } 293 373 return err 294 374 } 295 375 ··· 423 503 if wsEnabled { 424 504 fmt.Fprintf(w, "\nWebSocket Endpoints\n") 425 505 fmt.Fprintf(w, "━━━━━━━━━━━━━━━━━━━\n") 426 - fmt.Fprintf(w, " WS /ws?cursor=N Stream all records from cursor N\n") 427 - fmt.Fprintf(w, " (cursor defaults to 0)\n") 506 + fmt.Fprintf(w, " WS /ws?cursor=N Live stream all records from cursor N\n") 507 + fmt.Fprintf(w, " Streams all bundles, then mempool\n") 508 + fmt.Fprintf(w, " Continues streaming new operations live\n") 509 + fmt.Fprintf(w, " Connection stays open until client closes\n") 510 + fmt.Fprintf(w, " Cursor: global record number (0-based)\n") 511 + fmt.Fprintf(w, " Example: 88410345 = bundle 8841, pos 345\n") 428 512 } 429 513 430 514 if syncMode {