PLC Bundle V1 Example Implementations

deduplication, boundary

+112 -43
+112 -43
plcbundle.ts
··· 8 8 import path from 'path'; 9 9 import crypto from 'crypto'; 10 10 import { fileURLToPath } from 'url'; 11 - import { init, compress } from '@bokuweb/zstd-wasm'; 11 + import { init, compress, decompress } from '@bokuweb/zstd-wasm'; 12 12 import axios from 'axios'; 13 13 14 14 const __dirname = path.dirname(fileURLToPath(import.meta.url)); ··· 84 84 }; 85 85 86 86 // ============================================================================ 87 + // Bundle Loading 88 + // ============================================================================ 89 + 90 + const loadBundle = async (dir: string, bundleNumber: number): Promise<PLCOperation[]> => { 91 + const filename = `${String(bundleNumber).padStart(6, '0')}.jsonl.zst`; 92 + const filepath = path.join(dir, filename); 93 + 94 + const compressed = await fs.readFile(filepath); 95 + const decompressed = decompress(compressed); 96 + const jsonl = Buffer.from(decompressed).toString('utf8'); 97 + 98 + const lines = jsonl.trim().split('\n').filter(l => l); 99 + return lines.map(line => { 100 + const op = JSON.parse(line) as PLCOperation; 101 + op._raw = line; 102 + return op; 103 + }); 104 + }; 105 + 106 + // ============================================================================ 107 + // Boundary Handling 108 + // ============================================================================ 109 + 110 + const getBoundaryCIDs = (operations: PLCOperation[]): Set<string> => { 111 + if (operations.length === 0) return new Set(); 112 + 113 + const lastOp = operations[operations.length - 1]; 114 + const boundaryTime = lastOp.createdAt; 115 + const cidSet = new Set<string>(); 116 + 117 + // Walk backwards from the end to find all operations with the same timestamp 118 + for (let i = operations.length - 1; i >= 0; i--) { 119 + if (operations[i].createdAt === boundaryTime) { 120 + cidSet.add(operations[i].cid); 121 + } else { 122 + break; 123 + } 124 + } 125 + 126 + return cidSet; 127 + }; 128 + 129 + const stripBoundaryDuplicates = ( 130 + operations: PLCOperation[], 131 + prevBoundaryCIDs: Set<string> 132 + ): PLCOperation[] => { 133 + if (prevBoundaryCIDs.size === 0) return operations; 134 + if (operations.length === 0) return operations; 135 + 136 + const boundaryTime = operations[0].createdAt; 137 + let startIdx = 0; 138 + 139 + // Skip operations that are in the previous bundle's boundary 140 + for (let i = 0; i < operations.length; i++) { 141 + const op = operations[i]; 142 + 143 + // Stop if we've moved past the boundary timestamp 144 + if (op.createdAt > boundaryTime) { 145 + break; 146 + } 147 + 148 + // Skip if this CID was in the previous boundary 149 + if (op.createdAt === boundaryTime && prevBoundaryCIDs.has(op.cid)) { 150 + startIdx = i + 1; 151 + continue; 152 + } 153 + 154 + break; 155 + } 156 + 157 + const stripped = operations.slice(startIdx); 158 + if (startIdx > 0) { 159 + console.log(` Stripped ${startIdx} boundary duplicates`); 160 + } 161 + return stripped; 162 + }; 163 + 164 + // ============================================================================ 87 165 // PLC Directory Client 88 166 // ============================================================================ 89 167 ··· 112 190 // ============================================================================ 113 191 114 192 const serializeJSONL = (operations: PLCOperation[]): string => { 115 - // Each operation followed by \n, but NO trailing newline at the end 116 - // This matches the Go implementation exactly 117 193 const lines = operations.map(op => { 118 194 const json = op._raw || JSON.stringify(op); 119 195 return json + '\n'; ··· 128 204 const calculateChainHash = (parent: string, contentHash: string): string => { 129 205 let data: string; 130 206 if (!parent || parent === '') { 131 - // Genesis bundle (first bundle) 132 207 data = `plcbundle:genesis:${contentHash}`; 133 208 } else { 134 - // Subsequent bundles - chain parent hash with current content 135 209 data = `${parent}:${contentHash}`; 136 210 } 137 211 return sha256(data); ··· 152 226 const filename = `${String(bundleNumber).padStart(6, '0')}.jsonl.zst`; 153 227 const filepath = path.join(dir, filename); 154 228 155 - // Serialize to JSONL (exact format: each line ends with \n, no trailing newline) 156 229 const jsonl = serializeJSONL(operations); 157 230 const uncompressedBuffer = Buffer.from(jsonl, 'utf8'); 158 231 159 - console.log(` JSONL size: ${uncompressedBuffer.length} bytes`); 160 - console.log(` First 100 chars: ${jsonl.substring(0, 100)}`); 161 - console.log(` Last 100 chars: ${jsonl.substring(jsonl.length - 100)}`); 162 - 163 - // Calculate content hash 164 232 const contentHash = sha256(uncompressedBuffer); 165 233 const uncompressedSize = uncompressedBuffer.length; 166 234 167 - // Calculate chain hash 168 235 const chainHash = calculateChainHash(parentHash, contentHash); 169 236 170 - // Compress with zstd level 3 (same as Go SpeedDefault) 171 237 const compressed = compress(uncompressedBuffer, 3); 172 238 const compressedBuffer = Buffer.from(compressed); 173 239 const compressedHash = sha256(compressedBuffer); 174 240 const compressedSize = compressedBuffer.length; 175 241 176 - // Write file 177 242 await fs.writeFile(filepath, compressedBuffer); 178 243 179 - // Extract metadata 180 244 const startTime = operations[0].createdAt; 181 245 const endTime = operations[operations.length - 1].createdAt; 182 246 const didCount = extractUniqueDIDs(operations); ··· 187 251 end_time: endTime, 188 252 operation_count: operations.length, 189 253 did_count: didCount, 190 - hash: chainHash, // Chain hash (primary) 191 - content_hash: contentHash, // Content hash 192 - parent: parentHash || '', // Parent chain hash 254 + hash: chainHash, 255 + content_hash: contentHash, 256 + parent: parentHash || '', 193 257 compressed_hash: compressedHash, 194 258 compressed_size: compressedSize, 195 259 uncompressed_size: uncompressedSize, ··· 213 277 214 278 await fs.mkdir(dir, { recursive: true }); 215 279 216 - // Load existing index 217 280 const index = await loadIndex(dir); 218 281 219 282 let currentBundle = index.last_bundle + 1; 220 283 let cursor: string | null = null; 221 284 let parentHash = ''; 285 + let prevBoundaryCIDs = new Set<string>(); 222 286 223 - // If resuming, get cursor and parent from last bundle 224 287 if (index.bundles.length > 0) { 225 288 const lastBundle = index.bundles[index.bundles.length - 1]; 226 289 cursor = lastBundle.end_time; 227 - parentHash = lastBundle.hash; // Chain hash from previous bundle 290 + parentHash = lastBundle.hash; 291 + 292 + try { 293 + const prevOps = await loadBundle(dir, lastBundle.bundle_number); 294 + prevBoundaryCIDs = getBoundaryCIDs(prevOps); 295 + console.log(`Loaded previous bundle boundary: ${prevBoundaryCIDs.size} CIDs`); 296 + } catch (err) { 297 + console.log(`Could not load previous bundle for boundary detection`); 298 + } 299 + 228 300 console.log(`Resuming from bundle ${currentBundle}`); 229 301 console.log(`Last operation: ${cursor}`); 230 - console.log(`Parent hash: ${parentHash}`); 231 302 } else { 232 303 console.log('Starting from the beginning (genesis)'); 233 304 } ··· 235 306 console.log(); 236 307 237 308 let mempool: PLCOperation[] = []; 309 + const seenCIDs = new Set<string>(prevBoundaryCIDs); 238 310 let totalFetched = 0; 239 311 let totalBundles = 0; 240 312 241 313 while (true) { 242 314 try { 243 - // Fetch operations 244 315 console.log(`Fetching operations (cursor: ${cursor || 'start'})...`); 245 316 const operations = await fetchOperations(cursor, 1000); 246 317 ··· 249 320 break; 250 321 } 251 322 252 - console.log(` Fetched ${operations.length} operations`); 253 - totalFetched += operations.length; 323 + // Deduplicate 324 + const uniqueOps = operations.filter(op => { 325 + if (seenCIDs.has(op.cid)) { 326 + return false; 327 + } 328 + seenCIDs.add(op.cid); 329 + return true; 330 + }); 254 331 255 - // Add to mempool 256 - mempool.push(...operations); 332 + console.log(` Fetched ${operations.length} operations (${uniqueOps.length} unique)`); 333 + totalFetched += uniqueOps.length; 257 334 258 - // Update cursor 335 + mempool.push(...uniqueOps); 259 336 cursor = operations[operations.length - 1].createdAt; 260 337 261 - // Create bundles while we have enough operations 262 338 while (mempool.length >= BUNDLE_SIZE) { 263 339 const bundleOps = mempool.splice(0, BUNDLE_SIZE); 264 340 265 - console.log(`Creating bundle ${String(currentBundle).padStart(6, '0')}...`); 341 + console.log(`\nCreating bundle ${String(currentBundle).padStart(6, '0')}...`); 266 342 267 343 const metadata = await saveBundle(dir, currentBundle, bundleOps, parentHash); 268 344 269 - // Add to index 270 345 index.bundles.push(metadata); 271 346 index.last_bundle = currentBundle; 272 347 index.total_size_bytes += metadata.compressed_size; ··· 276 351 console.log(` ✓ Bundle ${String(currentBundle).padStart(6, '0')}: ${metadata.operation_count} ops, ${metadata.did_count} DIDs`); 277 352 console.log(` Chain Hash: ${metadata.hash}`); 278 353 console.log(` Content Hash: ${metadata.content_hash}`); 279 - if (metadata.parent) { 280 - console.log(` Parent Hash: ${metadata.parent}`); 281 - } else { 282 - console.log(` Parent Hash: (genesis)`); 283 - } 284 - console.log(` Compressed: ${metadata.compressed_hash}`); 285 - console.log(` Size: ${(metadata.compressed_size / 1024).toFixed(1)} KB (${(metadata.compressed_size / metadata.uncompressed_size * 100).toFixed(1)}% of original)`); 354 + console.log(` Size: ${(metadata.compressed_size / 1024).toFixed(1)} KB`); 355 + 356 + // Get boundary CIDs for next bundle 357 + prevBoundaryCIDs = getBoundaryCIDs(bundleOps); 358 + console.log(` Boundary CIDs: ${prevBoundaryCIDs.size}`); 286 359 console.log(); 287 360 288 - // Update parent hash for next bundle 289 361 parentHash = metadata.hash; 290 - 291 362 currentBundle++; 292 363 totalBundles++; 293 364 } 294 365 295 - // Small delay to be nice to the server 296 366 await new Promise(resolve => setTimeout(resolve, 100)); 297 367 298 368 } catch (err: any) { ··· 312 382 } 313 383 } 314 384 315 - // Save final index 316 385 await saveIndex(dir, index); 317 386 318 387 console.log(); ··· 327 396 328 397 if (mempool.length > 0) { 329 398 console.log(); 330 - console.log(`Note: ${mempool.length} operations in mempool (need ${BUNDLE_SIZE - mempool.length} more for next bundle)`); 399 + console.log(`Note: ${mempool.length} operations in mempool`); 331 400 } 332 401 }; 333 402