tangled
alpha
login
or
join now
atscan.net
/
plcbundle-ref
5
fork
atom
PLC Bundle V1 Example Implementations
5
fork
atom
overview
issues
pulls
pipelines
simplify
tree.fail
4 months ago
9b98b1b3
63208368
+225
-334
2 changed files
expand all
collapse all
unified
split
.gitignore
plcbundle.ts
+2
-1
.gitignore
···
1
1
node_modules
2
2
.DS_Store
3
3
-
plc_bundles
3
3
+
plc_bundles
4
4
+
plc_bundles.json
+223
-333
plcbundle.ts
···
1
1
#!/usr/bin/env node
2
2
3
3
/**
4
4
-
* plcbundle.ts - Fetch from PLC Directory and create verifiable bundles
4
4
+
* plcbundle.ts - A reference implementation for fetching PLC directory
5
5
+
* operations and creating verifiable, chained bundles according to the plcbundle V1 spec.
6
6
+
*
7
7
+
* This script fetches operations, validates their order, de-duplicates them,
8
8
+
* and groups them into 10,000-operation bundles. Each bundle is compressed,
9
9
+
* hashed, and cryptographically linked to the previous one, creating a verifiable
10
10
+
* chain of data.
5
11
*/
6
12
7
13
import fs from 'fs/promises';
8
14
import path from 'path';
9
15
import crypto from 'crypto';
10
10
-
import { fileURLToPath } from 'url';
11
16
import { init, compress, decompress } from '@bokuweb/zstd-wasm';
12
17
import axios from 'axios';
13
18
14
14
-
const __dirname = path.dirname(fileURLToPath(import.meta.url));
15
15
-
19
19
+
// --- Configuration ---
16
20
const BUNDLE_SIZE = 10000;
17
21
const INDEX_FILE = 'plc_bundles.json';
22
22
+
const DEFAULT_DIR = './plc_bundles';
18
23
const PLC_URL = 'https://plc.directory';
19
24
20
20
-
// ============================================================================
21
21
-
// Types
22
22
-
// ============================================================================
23
23
-
25
25
+
// --- Types (as per spec) ---
24
26
interface PLCOperation {
25
25
-
did: string;
26
27
cid: string;
27
28
createdAt: string;
28
28
-
operation: Record<string, any>;
29
29
-
nullified?: boolean | string;
30
30
-
_raw?: string;
29
29
+
_raw: string; // Holds the original raw JSON string for reproducibility.
31
30
}
32
31
33
32
interface BundleMetadata {
···
36
35
end_time: string;
37
36
operation_count: number;
38
37
did_count: number;
39
39
-
hash: string;
38
38
+
hash: string; // The chain hash.
40
39
content_hash: string;
41
40
parent: string;
42
41
compressed_hash: string;
43
42
compressed_size: number;
44
43
uncompressed_size: number;
44
44
+
cursor: string;
45
45
created_at: string;
46
46
}
47
47
···
53
53
bundles: BundleMetadata[];
54
54
}
55
55
56
56
-
// Initialize zstd
56
56
+
// --- ZSTD Initialization ---
57
57
await init();
58
58
59
59
-
// ============================================================================
60
60
-
// Index Management
61
61
-
// ============================================================================
59
59
+
/**
60
60
+
* Manages the state and process of fetching, validating, and creating PLC bundles.
61
61
+
*/
62
62
+
class PlcBundleManager {
63
63
+
private bundleDir: string;
64
64
+
private index!: Index;
65
65
+
private mempool: PLCOperation[] = [];
66
66
+
// This set is used to de-duplicate operations. It tracks CIDs from the
67
67
+
// previous bundle's boundary and all CIDs from the current mempool.
68
68
+
private seenCIDs = new Set<string>();
62
69
63
63
-
const loadIndex = async (dir: string): Promise<Index> => {
64
64
-
try {
65
65
-
const data = await fs.readFile(path.join(dir, INDEX_FILE), 'utf8');
66
66
-
return JSON.parse(data);
67
67
-
} catch (err) {
68
68
-
return {
69
69
-
version: '1.0',
70
70
-
last_bundle: 0,
71
71
-
updated_at: new Date().toISOString(),
72
72
-
total_size_bytes: 0,
73
73
-
bundles: []
74
74
-
};
70
70
+
constructor(bundleDir: string) {
71
71
+
this.bundleDir = bundleDir;
75
72
}
76
76
-
};
77
73
78
78
-
const saveIndex = async (dir: string, index: Index): Promise<void> => {
79
79
-
index.updated_at = new Date().toISOString();
80
80
-
const indexPath = path.join(dir, INDEX_FILE);
81
81
-
const tempPath = indexPath + '.tmp';
82
82
-
await fs.writeFile(tempPath, JSON.stringify(index, null, 2));
83
83
-
await fs.rename(tempPath, indexPath);
84
84
-
};
74
74
+
/**
75
75
+
* Initializes the manager by loading the index and seeding the `seenCIDs`
76
76
+
* set with the CIDs from the last saved bundle's boundary.
77
77
+
*/
78
78
+
async init() {
79
79
+
await fs.mkdir(this.bundleDir, { recursive: true });
80
80
+
this.index = await this._loadIndex();
81
81
+
console.log(`plcbundle Reference Implementation\nDirectory: ${this.bundleDir}\n`);
85
82
86
86
-
// ============================================================================
87
87
-
// Bundle Loading
88
88
-
// ============================================================================
89
89
-
90
90
-
const loadBundle = async (dir: string, bundleNumber: number): Promise<PLCOperation[]> => {
91
91
-
const filename = `${String(bundleNumber).padStart(6, '0')}.jsonl.zst`;
92
92
-
const filepath = path.join(dir, filename);
93
93
-
94
94
-
const compressed = await fs.readFile(filepath);
95
95
-
const decompressed = decompress(compressed);
96
96
-
const jsonl = Buffer.from(decompressed).toString('utf8');
97
97
-
98
98
-
const lines = jsonl.trim().split('\n').filter(l => l);
99
99
-
return lines.map(line => {
100
100
-
const op = JSON.parse(line) as PLCOperation;
101
101
-
op._raw = line;
102
102
-
return op;
103
103
-
});
104
104
-
};
105
105
-
106
106
-
// ============================================================================
107
107
-
// Boundary Handling
108
108
-
// ============================================================================
109
109
-
110
110
-
const getBoundaryCIDs = (operations: PLCOperation[]): Set<string> => {
111
111
-
if (operations.length === 0) return new Set();
112
112
-
113
113
-
const lastOp = operations[operations.length - 1];
114
114
-
const boundaryTime = lastOp.createdAt;
115
115
-
const cidSet = new Set<string>();
116
116
-
117
117
-
// Walk backwards from the end to find all operations with the same timestamp
118
118
-
for (let i = operations.length - 1; i >= 0; i--) {
119
119
-
if (operations[i].createdAt === boundaryTime) {
120
120
-
cidSet.add(operations[i].cid);
83
83
+
const lastBundle = this.index.bundles.at(-1);
84
84
+
if (lastBundle) {
85
85
+
console.log(`Resuming from bundle ${lastBundle.bundle_number + 1}. Last op time: ${lastBundle.end_time}`);
86
86
+
try {
87
87
+
// Pre-seed the de-duplication set with CIDs from the previous bundle's boundary.
88
88
+
// This is crucial for preventing duplicates between two adjacent bundles.
89
89
+
const prevOps = await this._loadBundleOps(lastBundle.bundle_number);
90
90
+
this.seenCIDs = this._getBoundaryCIDs(prevOps);
91
91
+
console.log(` Seeded de-duplication set with ${this.seenCIDs.size} boundary CIDs.`);
92
92
+
} catch (e) {
93
93
+
console.warn(` Warning: Could not load previous bundle file. Boundary deduplication may be incomplete.`);
94
94
+
}
121
95
} else {
122
122
-
break;
96
96
+
console.log('Starting from the beginning (genesis bundle).');
123
97
}
124
98
}
125
125
-
126
126
-
return cidSet;
127
127
-
};
128
99
129
129
-
const stripBoundaryDuplicates = (
130
130
-
operations: PLCOperation[],
131
131
-
prevBoundaryCIDs: Set<string>
132
132
-
): PLCOperation[] => {
133
133
-
if (prevBoundaryCIDs.size === 0) return operations;
134
134
-
if (operations.length === 0) return operations;
135
135
-
136
136
-
const boundaryTime = operations[0].createdAt;
137
137
-
let startIdx = 0;
138
138
-
139
139
-
// Skip operations that are in the previous bundle's boundary
140
140
-
for (let i = 0; i < operations.length; i++) {
141
141
-
const op = operations[i];
142
142
-
143
143
-
// Stop if we've moved past the boundary timestamp
144
144
-
if (op.createdAt > boundaryTime) {
145
145
-
break;
146
146
-
}
147
147
-
148
148
-
// Skip if this CID was in the previous boundary
149
149
-
if (op.createdAt === boundaryTime && prevBoundaryCIDs.has(op.cid)) {
150
150
-
startIdx = i + 1;
151
151
-
continue;
100
100
+
/**
101
101
+
* The main execution loop. It continuously fetches operations, validates and
102
102
+
* de-duplicates them, fills the mempool, and creates bundles when ready.
103
103
+
*/
104
104
+
async run() {
105
105
+
let cursor = this.index.bundles.at(-1)?.end_time || null;
106
106
+
107
107
+
while (true) {
108
108
+
try {
109
109
+
console.log(`\nFetching operations from cursor: ${cursor || 'start'}...`);
110
110
+
const fetchedOps = await this._fetchOperations(cursor);
111
111
+
112
112
+
if (fetchedOps.length === 0) {
113
113
+
console.log('No more operations available from PLC directory.');
114
114
+
break;
115
115
+
}
116
116
+
117
117
+
// The core ingestion logic: de-duplicate and validate operations before adding to the mempool.
118
118
+
this._processAndValidateOps(fetchedOps);
119
119
+
120
120
+
// The cursor for the next fetch is always the timestamp of the last operation received in the current batch.
121
121
+
cursor = fetchedOps[fetchedOps.length - 1].createdAt;
122
122
+
123
123
+
// If the mempool is full enough, create bundles. This can run multiple times per fetch.
124
124
+
while (this.mempool.length >= BUNDLE_SIZE) {
125
125
+
await this._createAndSaveBundle();
126
126
+
}
127
127
+
128
128
+
await new Promise(resolve => setTimeout(resolve, 200)); // Be nice to the server.
129
129
+
} catch (err: any) {
130
130
+
console.error(`\nError: ${err.message}`);
131
131
+
if (err.response) console.error(`HTTP Status: ${err.response.status}`);
132
132
+
if (['ECONNRESET', 'ECONNABORTED'].includes(err.code)) {
133
133
+
console.log('Connection error, retrying in 5 seconds...');
134
134
+
await new Promise(resolve => setTimeout(resolve, 5000));
135
135
+
continue;
136
136
+
}
137
137
+
break;
138
138
+
}
152
139
}
153
153
-
154
154
-
break;
140
140
+
141
141
+
await this._saveIndex();
142
142
+
console.log(`\n---`);
143
143
+
console.log('Process complete.');
144
144
+
console.log(`Total bundles in index: ${this.index.bundles.length}`);
145
145
+
console.log(`Operations in mempool: ${this.mempool.length}`);
146
146
+
console.log(`Total size: ${(this.index.total_size_bytes / 1024 / 1024).toFixed(2)} MB`);
155
147
}
156
148
157
157
-
const stripped = operations.slice(startIdx);
158
158
-
if (startIdx > 0) {
159
159
-
console.log(` Stripped ${startIdx} boundary duplicates`);
160
160
-
}
161
161
-
return stripped;
162
162
-
};
149
149
+
// ==========================================================================
150
150
+
// Private Helper Methods
151
151
+
// ==========================================================================
163
152
164
164
-
// ============================================================================
165
165
-
// PLC Directory Client
166
166
-
// ============================================================================
167
167
-
168
168
-
const fetchOperations = async (after: string | null, count: number = 1000): Promise<PLCOperation[]> => {
169
169
-
const params: Record<string, any> = { count };
170
170
-
if (after) {
171
171
-
params.after = after;
153
153
+
private async _fetchOperations(after: string | null): Promise<PLCOperation[]> {
154
154
+
const params = { count: 1000, ...(after && { after }) };
155
155
+
const response = await axios.get<string>(`${PLC_URL}/export`, { params, responseType: 'text' });
156
156
+
const lines = response.data.trimEnd().split('\n');
157
157
+
if (lines.length === 1 && lines[0] === '') return [];
158
158
+
// Important: The `_raw` property is added here to preserve the original JSON string,
159
159
+
// ensuring byte-for-byte reproducibility as required by Spec 4.2.
160
160
+
return lines.map(line => ({ ...JSON.parse(line), _raw: line }));
172
161
}
173
173
-
174
174
-
const response = await axios.get<string>(`${PLC_URL}/export`, {
175
175
-
params,
176
176
-
responseType: 'text'
177
177
-
});
178
178
-
179
179
-
const lines = response.data.trim().split('\n').filter(l => l);
180
180
-
181
181
-
return lines.map(line => {
182
182
-
const op = JSON.parse(line) as PLCOperation;
183
183
-
op._raw = line; // Preserve exact JSON
184
184
-
return op;
185
185
-
});
186
186
-
};
187
162
188
188
-
// ============================================================================
189
189
-
// Bundle Operations
190
190
-
// ============================================================================
163
163
+
/**
164
164
+
* Processes a batch of fetched operations. It ensures each operation is unique
165
165
+
* (both within the batch and across bundle boundaries) and that it maintains
166
166
+
* chronological order before adding it to the mempool.
167
167
+
*/
168
168
+
private _processAndValidateOps(ops: PLCOperation[]) {
169
169
+
// The timestamp to validate against is the last operation in the mempool, or if empty,
170
170
+
// the end time of the last bundle. This prevents chronological gaps.
171
171
+
let lastTimestamp = this.mempool.at(-1)?.createdAt || this.index.bundles.at(-1)?.end_time || '';
172
172
+
let newOpsCount = 0;
191
173
192
192
-
const serializeJSONL = (operations: PLCOperation[]): string => {
193
193
-
const lines = operations.map(op => {
194
194
-
const json = op._raw || JSON.stringify(op);
195
195
-
return json + '\n';
196
196
-
});
197
197
-
return lines.join('');
198
198
-
};
199
199
-
200
200
-
const sha256 = (data: Buffer | string): string => {
201
201
-
return crypto.createHash('sha256').update(data).digest('hex');
202
202
-
};
174
174
+
for (const op of ops) {
175
175
+
// The `seenCIDs` set efficiently handles duplicates from the previous bundle's
176
176
+
// boundary as well as any duplicates within the current fetched batch.
177
177
+
if (this.seenCIDs.has(op.cid)) {
178
178
+
continue;
179
179
+
}
203
180
204
204
-
const calculateChainHash = (parent: string, contentHash: string): string => {
205
205
-
let data: string;
206
206
-
if (!parent || parent === '') {
207
207
-
data = `plcbundle:genesis:${contentHash}`;
208
208
-
} else {
209
209
-
data = `${parent}:${contentHash}`;
181
181
+
// Spec 3: Validate that the stream is chronological. This is a critical sanity check.
182
182
+
if (op.createdAt < lastTimestamp) {
183
183
+
throw new Error(`Chronological validation failed: op ${op.cid} at ${op.createdAt} is older than last op at ${lastTimestamp}`);
184
184
+
}
185
185
+
186
186
+
this.mempool.push(op);
187
187
+
this.seenCIDs.add(op.cid); // Add the CID to the set only after it's confirmed valid.
188
188
+
lastTimestamp = op.createdAt;
189
189
+
newOpsCount++;
190
190
+
}
191
191
+
console.log(` Added ${newOpsCount} new operations to mempool.`);
210
192
}
211
211
-
return sha256(data);
212
212
-
};
213
193
214
214
-
const extractUniqueDIDs = (operations: PLCOperation[]): number => {
215
215
-
const dids = new Set<string>();
216
216
-
operations.forEach(op => dids.add(op.did));
217
217
-
return dids.size;
218
218
-
};
194
194
+
/**
195
195
+
* Takes 10,000 operations from the mempool, creates a bundle file, generates
196
196
+
* its metadata according to the spec, and updates the index.
197
197
+
*/
198
198
+
private async _createAndSaveBundle() {
199
199
+
const currentBundleNumber = this.index.last_bundle + 1;
200
200
+
const bundleOps = this.mempool.splice(0, BUNDLE_SIZE);
219
201
220
220
-
const saveBundle = async (
221
221
-
dir: string,
222
222
-
bundleNumber: number,
223
223
-
operations: PLCOperation[],
224
224
-
parentHash: string
225
225
-
): Promise<BundleMetadata> => {
226
226
-
const filename = `${String(bundleNumber).padStart(6, '0')}.jsonl.zst`;
227
227
-
const filepath = path.join(dir, filename);
228
228
-
229
229
-
const jsonl = serializeJSONL(operations);
230
230
-
const uncompressedBuffer = Buffer.from(jsonl, 'utf8');
231
231
-
232
232
-
const contentHash = sha256(uncompressedBuffer);
233
233
-
const uncompressedSize = uncompressedBuffer.length;
234
234
-
235
235
-
const chainHash = calculateChainHash(parentHash, contentHash);
236
236
-
237
237
-
const compressed = compress(uncompressedBuffer, 3);
238
238
-
const compressedBuffer = Buffer.from(compressed);
239
239
-
const compressedHash = sha256(compressedBuffer);
240
240
-
const compressedSize = compressedBuffer.length;
241
241
-
242
242
-
await fs.writeFile(filepath, compressedBuffer);
243
243
-
244
244
-
const startTime = operations[0].createdAt;
245
245
-
const endTime = operations[operations.length - 1].createdAt;
246
246
-
const didCount = extractUniqueDIDs(operations);
247
247
-
248
248
-
return {
249
249
-
bundle_number: bundleNumber,
250
250
-
start_time: startTime,
251
251
-
end_time: endTime,
252
252
-
operation_count: operations.length,
253
253
-
did_count: didCount,
254
254
-
hash: chainHash,
255
255
-
content_hash: contentHash,
256
256
-
parent: parentHash || '',
257
257
-
compressed_hash: compressedHash,
258
258
-
compressed_size: compressedSize,
259
259
-
uncompressed_size: uncompressedSize,
260
260
-
created_at: new Date().toISOString()
261
261
-
};
262
262
-
};
202
202
+
const parentHash = this.index.bundles.at(-1)?.hash || '';
203
203
+
const previousCursor = this.index.bundles.at(-1)?.end_time || '';
204
204
+
205
205
+
// The hashing and serialization process follows the spec exactly to ensure compatibility.
206
206
+
const jsonl = PlcBundleManager._serializeJSONL(bundleOps);
207
207
+
const uncompressedBuffer = Buffer.from(jsonl, 'utf8');
208
208
+
const contentHash = PlcBundleManager._sha256(uncompressedBuffer);
209
209
+
const chainHash = PlcBundleManager._calculateChainHash(parentHash, contentHash);
210
210
+
const compressedBuffer = Buffer.from(compress(uncompressedBuffer, 3));
211
211
+
212
212
+
const filename = `${String(currentBundleNumber).padStart(6, '0')}.jsonl.zst`;
213
213
+
await fs.writeFile(path.join(this.bundleDir, filename), compressedBuffer);
263
214
264
264
-
// ============================================================================
265
265
-
// Main Logic
266
266
-
// ============================================================================
215
215
+
const dids = new Set(bundleOps.map(op => JSON.parse(op._raw).did));
216
216
+
const metadata: BundleMetadata = {
217
217
+
bundle_number: currentBundleNumber,
218
218
+
start_time: bundleOps[0].createdAt,
219
219
+
end_time: bundleOps[bundleOps.length - 1].createdAt,
220
220
+
operation_count: bundleOps.length,
221
221
+
did_count: dids.size,
222
222
+
hash: chainHash,
223
223
+
content_hash: contentHash,
224
224
+
parent: parentHash,
225
225
+
compressed_hash: PlcBundleManager._sha256(compressedBuffer),
226
226
+
compressed_size: compressedBuffer.length,
227
227
+
uncompressed_size: uncompressedBuffer.length,
228
228
+
cursor: previousCursor,
229
229
+
created_at: new Date().toISOString()
230
230
+
};
267
231
268
268
-
const run = async (): Promise<void> => {
269
269
-
const dir = process.argv[2] || './plc_bundles';
270
270
-
271
271
-
console.log('PLC Bundle Fetcher');
272
272
-
console.log('==================');
273
273
-
console.log();
274
274
-
console.log(`Directory: ${dir}`);
275
275
-
console.log(`Source: ${PLC_URL}`);
276
276
-
console.log();
277
277
-
278
278
-
await fs.mkdir(dir, { recursive: true });
279
279
-
280
280
-
const index = await loadIndex(dir);
281
281
-
282
282
-
let currentBundle = index.last_bundle + 1;
283
283
-
let cursor: string | null = null;
284
284
-
let parentHash = '';
285
285
-
let prevBoundaryCIDs = new Set<string>();
286
286
-
287
287
-
if (index.bundles.length > 0) {
288
288
-
const lastBundle = index.bundles[index.bundles.length - 1];
289
289
-
cursor = lastBundle.end_time;
290
290
-
parentHash = lastBundle.hash;
232
232
+
this.index.bundles.push(metadata);
233
233
+
this.index.last_bundle = currentBundleNumber;
234
234
+
this.index.total_size_bytes += metadata.compressed_size;
291
235
292
292
-
try {
293
293
-
const prevOps = await loadBundle(dir, lastBundle.bundle_number);
294
294
-
prevBoundaryCIDs = getBoundaryCIDs(prevOps);
295
295
-
console.log(`Loaded previous bundle boundary: ${prevBoundaryCIDs.size} CIDs`);
296
296
-
} catch (err) {
297
297
-
console.log(`Could not load previous bundle for boundary detection`);
298
298
-
}
299
299
-
300
300
-
console.log(`Resuming from bundle ${currentBundle}`);
301
301
-
console.log(`Last operation: ${cursor}`);
302
302
-
} else {
303
303
-
console.log('Starting from the beginning (genesis)');
236
236
+
// Prune the `seenCIDs` set to keep it memory-efficient. It only needs to hold CIDs
237
237
+
// from the new boundary and the remaining mempool, not all CIDs ever seen.
238
238
+
const newBoundaryCIDs = this._getBoundaryCIDs(bundleOps);
239
239
+
const mempoolCIDs = new Set(this.mempool.map(op => op.cid));
240
240
+
this.seenCIDs = new Set([...newBoundaryCIDs, ...mempoolCIDs]);
241
241
+
242
242
+
await this._saveIndex();
243
243
+
console.log(`\nCreating bundle ${filename}...`);
244
244
+
console.log(` ✓ Saved. Hash: ${metadata.hash.substring(0, 16)}...`);
245
245
+
console.log(` Set new boundary with ${newBoundaryCIDs.size} CIDs. Pruned de-duplication set to ${this.seenCIDs.size} CIDs.`);
304
246
}
305
247
306
306
-
console.log();
307
307
-
308
308
-
let mempool: PLCOperation[] = [];
309
309
-
const seenCIDs = new Set<string>(prevBoundaryCIDs);
310
310
-
let totalFetched = 0;
311
311
-
let totalBundles = 0;
312
312
-
313
313
-
while (true) {
248
248
+
private async _loadIndex(): Promise<Index> {
314
249
try {
315
315
-
console.log(`Fetching operations (cursor: ${cursor || 'start'})...`);
316
316
-
const operations = await fetchOperations(cursor, 1000);
317
317
-
318
318
-
if (operations.length === 0) {
319
319
-
console.log('No more operations available');
320
320
-
break;
321
321
-
}
322
322
-
323
323
-
// Deduplicate
324
324
-
const uniqueOps = operations.filter(op => {
325
325
-
if (seenCIDs.has(op.cid)) {
326
326
-
return false;
327
327
-
}
328
328
-
seenCIDs.add(op.cid);
329
329
-
return true;
330
330
-
});
331
331
-
332
332
-
console.log(` Fetched ${operations.length} operations (${uniqueOps.length} unique)`);
333
333
-
totalFetched += uniqueOps.length;
334
334
-
335
335
-
mempool.push(...uniqueOps);
336
336
-
cursor = operations[operations.length - 1].createdAt;
337
337
-
338
338
-
while (mempool.length >= BUNDLE_SIZE) {
339
339
-
const bundleOps = mempool.splice(0, BUNDLE_SIZE);
340
340
-
341
341
-
console.log(`\nCreating bundle ${String(currentBundle).padStart(6, '0')}...`);
342
342
-
343
343
-
const metadata = await saveBundle(dir, currentBundle, bundleOps, parentHash);
344
344
-
345
345
-
index.bundles.push(metadata);
346
346
-
index.last_bundle = currentBundle;
347
347
-
index.total_size_bytes += metadata.compressed_size;
348
348
-
349
349
-
await saveIndex(dir, index);
350
350
-
351
351
-
console.log(` ✓ Bundle ${String(currentBundle).padStart(6, '0')}: ${metadata.operation_count} ops, ${metadata.did_count} DIDs`);
352
352
-
console.log(` Chain Hash: ${metadata.hash}`);
353
353
-
console.log(` Content Hash: ${metadata.content_hash}`);
354
354
-
console.log(` Size: ${(metadata.compressed_size / 1024).toFixed(1)} KB`);
355
355
-
356
356
-
// Get boundary CIDs for next bundle
357
357
-
prevBoundaryCIDs = getBoundaryCIDs(bundleOps);
358
358
-
console.log(` Boundary CIDs: ${prevBoundaryCIDs.size}`);
359
359
-
console.log();
360
360
-
361
361
-
parentHash = metadata.hash;
362
362
-
currentBundle++;
363
363
-
totalBundles++;
364
364
-
}
365
365
-
366
366
-
await new Promise(resolve => setTimeout(resolve, 100));
367
367
-
368
368
-
} catch (err: any) {
369
369
-
console.error(`Error: ${err.message}`);
370
370
-
371
371
-
if (err.response) {
372
372
-
console.error(`HTTP Status: ${err.response.status}`);
373
373
-
}
374
374
-
375
375
-
if (err.code === 'ECONNRESET' || err.code === 'ECONNABORTED') {
376
376
-
console.log('Connection error, retrying in 5 seconds...');
377
377
-
await new Promise(resolve => setTimeout(resolve, 5000));
378
378
-
continue;
379
379
-
}
380
380
-
381
381
-
break;
250
250
+
const data = await fs.readFile(path.join(this.bundleDir, INDEX_FILE), 'utf8');
251
251
+
return JSON.parse(data);
252
252
+
} catch (err) {
253
253
+
return { version: '1.0', last_bundle: 0, updated_at: '', total_size_bytes: 0, bundles: [] };
382
254
}
383
255
}
256
256
+
257
257
+
private async _saveIndex(): Promise<void> {
258
258
+
this.index.updated_at = new Date().toISOString();
259
259
+
const tempPath = path.join(this.bundleDir, INDEX_FILE + '.tmp');
260
260
+
await fs.writeFile(tempPath, JSON.stringify(this.index, null, 2));
261
261
+
await fs.rename(tempPath, path.join(this.bundleDir, INDEX_FILE));
262
262
+
}
384
263
385
385
-
await saveIndex(dir, index);
386
386
-
387
387
-
console.log();
388
388
-
console.log('================');
389
389
-
console.log('Complete!');
390
390
-
console.log('================');
391
391
-
console.log(`Total operations fetched: ${totalFetched}`);
392
392
-
console.log(`Bundles created: ${totalBundles}`);
393
393
-
console.log(`Total bundles: ${index.bundles.length}`);
394
394
-
console.log(`Mempool: ${mempool.length} operations`);
395
395
-
console.log(`Total size: ${(index.total_size_bytes / 1024 / 1024).toFixed(1)} MB`);
396
396
-
397
397
-
if (mempool.length > 0) {
398
398
-
console.log();
399
399
-
console.log(`Note: ${mempool.length} operations in mempool`);
264
264
+
private async _loadBundleOps(bundleNumber: number): Promise<PLCOperation[]> {
265
265
+
const filename = `${String(bundleNumber).padStart(6, '0')}.jsonl.zst`;
266
266
+
const filepath = path.join(this.bundleDir, filename);
267
267
+
const compressed = await fs.readFile(filepath);
268
268
+
const decompressed = Buffer.from(decompress(compressed)).toString('utf8');
269
269
+
return decompressed.trimEnd().split('\n').map(line => ({...JSON.parse(line), _raw: line}));
400
270
}
401
401
-
};
402
271
403
403
-
// ============================================================================
404
404
-
// Entry Point
405
405
-
// ============================================================================
272
272
+
/** Returns CIDs from the last timestamp of a bundle, used for boundary de-duplication. */
273
273
+
private _getBoundaryCIDs(ops: PLCOperation[]): Set<string> {
274
274
+
if (!ops.length) return new Set();
275
275
+
const lastTime = ops.at(-1)!.createdAt;
276
276
+
const cids = new Set<string>();
277
277
+
for (let i = ops.length - 1; i >= 0 && ops[i].createdAt === lastTime; i--) {
278
278
+
cids.add(ops[i].cid);
279
279
+
}
280
280
+
return cids;
281
281
+
}
282
282
+
283
283
+
// --- Static Utilities ---
284
284
+
private static _sha256 = (data: string | Buffer): string => crypto.createHash('sha256').update(data).digest('hex');
285
285
+
private static _serializeJSONL = (ops: PLCOperation[]): string => ops.map(op => op._raw + '\n').join('');
286
286
+
private static _calculateChainHash = (parent: string, contentHash: string): string => {
287
287
+
return PlcBundleManager._sha256(parent ? `${parent}:${contentHash}` : `plcbundle:genesis:${contentHash}`);
288
288
+
};
289
289
+
}
406
290
407
407
-
run().catch(err => {
408
408
-
console.error('Fatal error:', err.message);
291
291
+
// --- Entry Point ---
292
292
+
(async () => {
293
293
+
const dir = process.argv[2] || DEFAULT_DIR;
294
294
+
const manager = new PlcBundleManager(dir);
295
295
+
await manager.init();
296
296
+
await manager.run();
297
297
+
})().catch(err => {
298
298
+
console.error('\nFATAL ERROR:', err.message);
409
299
console.error(err.stack);
410
300
process.exit(1);
411
301
});