PLC Bundle V1 Example Implementations

update

+44 -113
+44 -113
plcbundle.ts
··· 8 8 import path from 'path'; 9 9 import crypto from 'crypto'; 10 10 import { fileURLToPath } from 'url'; 11 - import { init, compress, decompress } from '@bokuweb/zstd-wasm'; 11 + import { init, compress } from '@bokuweb/zstd-wasm'; 12 12 import axios from 'axios'; 13 13 14 14 const __dirname = path.dirname(fileURLToPath(import.meta.url)); ··· 84 84 }; 85 85 86 86 // ============================================================================ 87 - // Bundle Loading 88 - // ============================================================================ 89 - 90 - const loadBundle = async (dir: string, bundleNumber: number): Promise<PLCOperation[]> => { 91 - const filename = `${String(bundleNumber).padStart(6, '0')}.jsonl.zst`; 92 - const filepath = path.join(dir, filename); 93 - 94 - const compressed = await fs.readFile(filepath); 95 - const decompressed = decompress(compressed); 96 - const jsonl = Buffer.from(decompressed).toString('utf8'); 97 - 98 - const lines = jsonl.trim().split('\n').filter(l => l); 99 - return lines.map(line => { 100 - const op = JSON.parse(line) as PLCOperation; 101 - op._raw = line; 102 - return op; 103 - }); 104 - }; 105 - 106 - // ============================================================================ 107 - // Boundary Handling 108 - // ============================================================================ 109 - 110 - const getBoundaryCIDs = (operations: PLCOperation[]): Set<string> => { 111 - if (operations.length === 0) return new Set(); 112 - 113 - const lastOp = operations[operations.length - 1]; 114 - const boundaryTime = lastOp.createdAt; 115 - const cidSet = new Set<string>(); 116 - 117 - // Walk backwards from the end to find all operations with the same timestamp 118 - for (let i = operations.length - 1; i >= 0; i--) { 119 - if (operations[i].createdAt === boundaryTime) { 120 - cidSet.add(operations[i].cid); 121 - } else { 122 - break; 123 - } 124 - } 125 - 126 - return cidSet; 127 - }; 128 - 129 - const stripBoundaryDuplicates = ( 130 - operations: PLCOperation[], 131 - prevBoundaryCIDs: Set<string> 132 - ): PLCOperation[] => { 133 - if (prevBoundaryCIDs.size === 0) return operations; 134 - if (operations.length === 0) return operations; 135 - 136 - const boundaryTime = operations[0].createdAt; 137 - let startIdx = 0; 138 - 139 - // Skip operations that are in the previous bundle's boundary 140 - for (let i = 0; i < operations.length; i++) { 141 - const op = operations[i]; 142 - 143 - // Stop if we've moved past the boundary timestamp 144 - if (op.createdAt > boundaryTime) { 145 - break; 146 - } 147 - 148 - // Skip if this CID was in the previous boundary 149 - if (op.createdAt === boundaryTime && prevBoundaryCIDs.has(op.cid)) { 150 - startIdx = i + 1; 151 - continue; 152 - } 153 - 154 - break; 155 - } 156 - 157 - const stripped = operations.slice(startIdx); 158 - if (startIdx > 0) { 159 - console.log(` Stripped ${startIdx} boundary duplicates`); 160 - } 161 - return stripped; 162 - }; 163 - 164 - // ============================================================================ 165 87 // PLC Directory Client 166 88 // ============================================================================ 167 89 ··· 190 112 // ============================================================================ 191 113 192 114 const serializeJSONL = (operations: PLCOperation[]): string => { 115 + // Each operation followed by \n, but NO trailing newline at the end 116 + // This matches the Go implementation exactly 193 117 const lines = operations.map(op => { 194 118 const json = op._raw || JSON.stringify(op); 195 119 return json + '\n'; ··· 204 128 const calculateChainHash = (parent: string, contentHash: string): string => { 205 129 let data: string; 206 130 if (!parent || parent === '') { 131 + // Genesis bundle (first bundle) 207 132 data = `plcbundle:genesis:${contentHash}`; 208 133 } else { 134 + // Subsequent bundles - chain parent hash with current content 209 135 data = `${parent}:${contentHash}`; 210 136 } 211 137 return sha256(data); ··· 226 152 const filename = `${String(bundleNumber).padStart(6, '0')}.jsonl.zst`; 227 153 const filepath = path.join(dir, filename); 228 154 155 + // Serialize to JSONL (exact format: each line ends with \n, no trailing newline) 229 156 const jsonl = serializeJSONL(operations); 230 157 const uncompressedBuffer = Buffer.from(jsonl, 'utf8'); 231 158 159 + console.log(` JSONL size: ${uncompressedBuffer.length} bytes`); 160 + console.log(` First 100 chars: ${jsonl.substring(0, 100)}`); 161 + console.log(` Last 100 chars: ${jsonl.substring(jsonl.length - 100)}`); 162 + 163 + // Calculate content hash 232 164 const contentHash = sha256(uncompressedBuffer); 233 165 const uncompressedSize = uncompressedBuffer.length; 234 166 167 + // Calculate chain hash 235 168 const chainHash = calculateChainHash(parentHash, contentHash); 236 169 170 + // Compress with zstd level 3 (same as Go SpeedDefault) 237 171 const compressed = compress(uncompressedBuffer, 3); 238 172 const compressedBuffer = Buffer.from(compressed); 239 173 const compressedHash = sha256(compressedBuffer); 240 174 const compressedSize = compressedBuffer.length; 241 175 176 + // Write file 242 177 await fs.writeFile(filepath, compressedBuffer); 243 178 179 + // Extract metadata 244 180 const startTime = operations[0].createdAt; 245 181 const endTime = operations[operations.length - 1].createdAt; 246 182 const didCount = extractUniqueDIDs(operations); ··· 251 187 end_time: endTime, 252 188 operation_count: operations.length, 253 189 did_count: didCount, 254 - hash: chainHash, 255 - content_hash: contentHash, 256 - parent: parentHash || '', 190 + hash: chainHash, // Chain hash (primary) 191 + content_hash: contentHash, // Content hash 192 + parent: parentHash || '', // Parent chain hash 257 193 compressed_hash: compressedHash, 258 194 compressed_size: compressedSize, 259 195 uncompressed_size: uncompressedSize, ··· 266 202 // ============================================================================ 267 203 268 204 const run = async (): Promise<void> => { 269 - const dir = process.argv[2] || './plc_bundles'; 205 + const dir = process.argv[2] || './bundles'; 270 206 271 207 console.log('PLC Bundle Fetcher'); 272 208 console.log('=================='); ··· 277 213 278 214 await fs.mkdir(dir, { recursive: true }); 279 215 216 + // Load existing index 280 217 const index = await loadIndex(dir); 281 218 282 219 let currentBundle = index.last_bundle + 1; 283 220 let cursor: string | null = null; 284 221 let parentHash = ''; 285 - let prevBoundaryCIDs = new Set<string>(); 286 222 223 + // If resuming, get cursor and parent from last bundle 287 224 if (index.bundles.length > 0) { 288 225 const lastBundle = index.bundles[index.bundles.length - 1]; 289 226 cursor = lastBundle.end_time; 290 - parentHash = lastBundle.hash; 291 - 292 - try { 293 - const prevOps = await loadBundle(dir, lastBundle.bundle_number); 294 - prevBoundaryCIDs = getBoundaryCIDs(prevOps); 295 - console.log(`Loaded previous bundle boundary: ${prevBoundaryCIDs.size} CIDs`); 296 - } catch (err) { 297 - console.log(`Could not load previous bundle for boundary detection`); 298 - } 299 - 227 + parentHash = lastBundle.hash; // Chain hash from previous bundle 300 228 console.log(`Resuming from bundle ${currentBundle}`); 301 229 console.log(`Last operation: ${cursor}`); 230 + console.log(`Parent hash: ${parentHash}`); 302 231 } else { 303 232 console.log('Starting from the beginning (genesis)'); 304 233 } ··· 306 235 console.log(); 307 236 308 237 let mempool: PLCOperation[] = []; 309 - const seenCIDs = new Set<string>(prevBoundaryCIDs); 310 238 let totalFetched = 0; 311 239 let totalBundles = 0; 312 240 313 241 while (true) { 314 242 try { 243 + // Fetch operations 315 244 console.log(`Fetching operations (cursor: ${cursor || 'start'})...`); 316 245 const operations = await fetchOperations(cursor, 1000); 317 246 ··· 320 249 break; 321 250 } 322 251 323 - // Deduplicate 324 - const uniqueOps = operations.filter(op => { 325 - if (seenCIDs.has(op.cid)) { 326 - return false; 327 - } 328 - seenCIDs.add(op.cid); 329 - return true; 330 - }); 252 + console.log(` Fetched ${operations.length} operations`); 253 + totalFetched += operations.length; 331 254 332 - console.log(` Fetched ${operations.length} operations (${uniqueOps.length} unique)`); 333 - totalFetched += uniqueOps.length; 255 + // Add to mempool 256 + mempool.push(...operations); 334 257 335 - mempool.push(...uniqueOps); 258 + // Update cursor 336 259 cursor = operations[operations.length - 1].createdAt; 337 260 261 + // Create bundles while we have enough operations 338 262 while (mempool.length >= BUNDLE_SIZE) { 339 263 const bundleOps = mempool.splice(0, BUNDLE_SIZE); 340 264 341 - console.log(`\nCreating bundle ${String(currentBundle).padStart(6, '0')}...`); 265 + console.log(`Creating bundle ${String(currentBundle).padStart(6, '0')}...`); 342 266 343 267 const metadata = await saveBundle(dir, currentBundle, bundleOps, parentHash); 344 268 269 + // Add to index 345 270 index.bundles.push(metadata); 346 271 index.last_bundle = currentBundle; 347 272 index.total_size_bytes += metadata.compressed_size; ··· 351 276 console.log(` ✓ Bundle ${String(currentBundle).padStart(6, '0')}: ${metadata.operation_count} ops, ${metadata.did_count} DIDs`); 352 277 console.log(` Chain Hash: ${metadata.hash}`); 353 278 console.log(` Content Hash: ${metadata.content_hash}`); 354 - console.log(` Size: ${(metadata.compressed_size / 1024).toFixed(1)} KB`); 355 - 356 - // Get boundary CIDs for next bundle 357 - prevBoundaryCIDs = getBoundaryCIDs(bundleOps); 358 - console.log(` Boundary CIDs: ${prevBoundaryCIDs.size}`); 279 + if (metadata.parent) { 280 + console.log(` Parent Hash: ${metadata.parent}`); 281 + } else { 282 + console.log(` Parent Hash: (genesis)`); 283 + } 284 + console.log(` Compressed: ${metadata.compressed_hash}`); 285 + console.log(` Size: ${(metadata.compressed_size / 1024).toFixed(1)} KB (${(metadata.compressed_size / metadata.uncompressed_size * 100).toFixed(1)}% of original)`); 359 286 console.log(); 360 287 288 + // Update parent hash for next bundle 361 289 parentHash = metadata.hash; 290 + 362 291 currentBundle++; 363 292 totalBundles++; 364 293 } 365 294 295 + // Small delay to be nice to the server 366 296 await new Promise(resolve => setTimeout(resolve, 100)); 367 297 368 298 } catch (err: any) { ··· 382 312 } 383 313 } 384 314 315 + // Save final index 385 316 await saveIndex(dir, index); 386 317 387 318 console.log(); ··· 396 327 397 328 if (mempool.length > 0) { 398 329 console.log(); 399 - console.log(`Note: ${mempool.length} operations in mempool`); 330 + console.log(`Note: ${mempool.length} operations in mempool (need ${BUNDLE_SIZE - mempool.length} more for next bundle)`); 400 331 } 401 332 }; 402 333