PLC Bundle V1 Example Implementations

now its multilang

+285 -8
.gitignore typescript/.gitignore
+21 -8
README.md
··· 1 - # PLC Bundle V1 Reference Implementation in TypeScript 1 + # PLC Bundle V1 Reference Implementations 2 2 3 - This script ([plcbundle.ts](plcbundle.ts)) is a compact, readable reference implementation for creating [PLC Bundle](https://github.com/atscan/plcbundle) v1 archives. It fetches operations from the PLC directory and generates a complete, verifiable repository of data bundles. 3 + This set of scripts represents a compact, readable reference implementations for creating [PLC Bundle](https://github.com/atscan/plcbundle) v1 archives. It fetches operations from the PLC directory and generates a complete, verifiable repository of data bundles. 4 4 5 5 It is fully compliant with the [PLC Bundle v1 Specification](https://github.com/atscan/plcbundle/blob/main/SPECIFICATION.md). 6 6 7 7 ## Features 8 8 9 9 - **Spec Compliant:** Correctly implements hashing, chaining, serialization, and boundary de-duplication. 10 - - **Reproducible:** Generates byte-for-byte identical bundles to the official Go implementation. 11 - - **Efficient:** Uses a memory-efficient method to handle duplicates between bundle boundaries. 10 + - **Reproducible:** Generates byte-for-byte identical bundles to the other implementations. 12 11 - **Standalone:** Single-file script with clear dependencies. 13 12 14 - ## Usage 13 + ## Implementations 14 + 15 + | Language | File | 16 + | --- | --- | 17 + | [TypeScript](#typescript) | [`typescript/plcbundle.ts`](typescript/plcbundle.ts) | 18 + | [Python](#python) | [`python/plcbundle.py`](python/plcbundle.py) | 19 + 20 + ## TypeScript 21 + 22 + File: [plcbundle.ts](plcbundle.ts) 23 + 24 + ### Usage 15 25 16 26 This script should run well with **[Bun](https://bun.com/) (recommended)**, [Deno](https://deno.com/), or [Node.js](https://nodejs.org/en). 17 27 18 28 The script accepts one optional argument: the path to the output directory where bundles will be stored. If omitted, it defaults to `./plc_bundles`. 19 29 20 - ### Bun (Recommended) 30 + #### Bun (Recommended) 21 31 22 32 Bun is the fastest and easiest way to run this script, as it handles TypeScript and dependencies automatically. 23 33 ··· 29 39 bun run plcbundle.ts ./my_plc_bundles 30 40 ``` 31 41 32 - ### Deno 42 + #### Deno 33 43 34 44 Deno can also run the script directly. You will need to provide permissions for network access and file system I/O. 35 45 ··· 38 48 deno run --allow-net --allow-read --allow-write plcbundle.ts ./my_plc_bundles 39 49 ``` 40 50 41 - ### Node.js (with TypeScript) 51 + #### Node.js (with TypeScript) 42 52 43 53 If using Node.js, you must first install dependencies and compile the TypeScript file to JavaScript. 44 54 ··· 53 63 node dist/plcbundle.js ./my_plc_bundles 54 64 ``` 55 65 66 + ## Python 67 + 68 + TODO
package.json typescript/package.json
plcbundle.ts typescript/plcbundle.ts
+264
python/plcbundle.py
··· 1 + #!/usr/bin/env python3 2 + 3 + """ 4 + plcbundle.py - A compact, readable reference implementation for creating 5 + plcbundle V1 compliant archives. This script demonstrates all critical spec 6 + requirements, including hashing, serialization, ordering, and boundary handling. 7 + 8 + PLC Bundle v1 Specification: 9 + https://github.com/atscan/plcbundle/blob/main/SPECIFICATION.md 10 + """ 11 + 12 + import asyncio 13 + import hashlib 14 + import json 15 + import sys 16 + from datetime import datetime, timezone 17 + from pathlib import Path 18 + from typing import TypedDict, Self 19 + 20 + import httpx 21 + import zstd 22 + 23 + # --- Configuration --- 24 + BUNDLE_SIZE = 10000 25 + INDEX_FILE = 'plc_bundles.json' 26 + DEFAULT_DIR = './plc_bundles_py' 27 + PLC_URL = 'https://plc.directory' 28 + 29 + # --- Types (as per spec) --- 30 + class PLCOperation(TypedDict): 31 + did: str 32 + cid: str 33 + createdAt: str 34 + operation: dict 35 + nullified: bool | str | None 36 + _raw: str # Holds the original raw JSON string for reproducibility 37 + 38 + class BundleMetadata(TypedDict): 39 + bundle_number: int 40 + start_time: str 41 + end_time: str 42 + operation_count: int 43 + did_count: int 44 + hash: str # The chain hash 45 + content_hash: str 46 + parent: str 47 + compressed_hash: str 48 + compressed_size: int 49 + uncompressed_size: int 50 + cursor: str 51 + created_at: str 52 + 53 + class Index(TypedDict): 54 + version: str 55 + last_bundle: int 56 + updated_at: str 57 + total_size_bytes: int 58 + bundles: list[BundleMetadata] 59 + 60 + class PlcBundleManager: 61 + """ 62 + Manages the state and process of fetching, validating, and creating PLC bundles. 63 + """ 64 + _index: Index 65 + _mempool: list[PLCOperation] = [] 66 + # This set correctly de-duplicates operations, both from the previous bundle's 67 + # boundary and within new batches, and is pruned to stay memory-efficient. 68 + _seen_cids = set[str]() 69 + 70 + def __init__(self, bundle_dir: str): 71 + self._bundle_dir = Path(bundle_dir) 72 + self._http_client = httpx.AsyncClient(timeout=30) 73 + 74 + @classmethod 75 + async def create(cls, bundle_dir: str) -> Self: 76 + """Factory to create and asynchronously initialize a PlcBundleManager instance.""" 77 + manager = cls(bundle_dir) 78 + await manager._init() 79 + return manager 80 + 81 + async def _init(self): 82 + """ 83 + Initializes the manager by loading the index and seeding the `seen_cids` 84 + set with the CIDs from the last saved bundle's boundary. 85 + """ 86 + self._bundle_dir.mkdir(exist_ok=True) 87 + self._index = await self._load_index() 88 + print(f"plcbundle Reference Implementation\nDirectory: {self._bundle_dir}\n") 89 + 90 + last_bundle = self._index['bundles'][-1] if self._index['bundles'] else None 91 + if last_bundle: 92 + print(f"Resuming from bundle {last_bundle['bundle_number'] + 1}. Last op time: {last_bundle['end_time']}") 93 + try: 94 + prev_ops = await self._load_bundle_ops(last_bundle['bundle_number']) 95 + self._seen_cids = self._get_boundary_cids(prev_ops) 96 + print(f" Seeded de-duplication set with {len(self._seen_cids)} boundary CIDs.") 97 + except FileNotFoundError: 98 + print(f" Warning: Could not load previous bundle file. Boundary deduplication may be incomplete.") 99 + else: 100 + print('Starting from the beginning (genesis bundle).') 101 + 102 + async def run(self): 103 + """ 104 + The main execution loop. It continuously fetches operations, validates and 105 + de-duplicates them, fills the mempool, and creates bundles when ready. 106 + """ 107 + last_bundle = self._index['bundles'][-1] if self._index['bundles'] else None 108 + cursor = last_bundle['end_time'] if last_bundle else None 109 + 110 + while True: 111 + try: 112 + print(f"\nFetching operations from cursor: {cursor or 'start'}...") 113 + fetched_ops = await self._fetch_operations(cursor) 114 + if not fetched_ops: 115 + print('No more operations available from PLC directory.') 116 + break 117 + 118 + self._process_and_validate_ops(fetched_ops) 119 + cursor = fetched_ops[-1]['createdAt'] 120 + 121 + while len(self._mempool) >= BUNDLE_SIZE: 122 + await self._create_and_save_bundle() 123 + 124 + await asyncio.sleep(0.2) # Be nice to the server 125 + except httpx.HTTPStatusError as e: 126 + print(f"\nError: HTTP {e.response.status_code} - {e.response.text}") 127 + break 128 + except Exception as e: 129 + print(f"\nAn unexpected error occurred: {e}") 130 + break 131 + 132 + await self._save_index() 133 + print(f"\n---\nProcess complete.") 134 + print(f"Total bundles in index: {len(self._index['bundles'])}") 135 + print(f"Operations in mempool: {len(self._mempool)}") 136 + total_mb = self._index['total_size_bytes'] / 1024 / 1024 137 + print(f"Total size: {total_mb:.2f} MB") 138 + 139 + # --- Private Helper Methods --- 140 + 141 + async def _fetch_operations(self, after: str | None) -> list[PLCOperation]: 142 + params = {'count': 1000} 143 + if after: 144 + params['after'] = after 145 + 146 + response = await self._http_client.get(f"{PLC_URL}/export", params=params) 147 + response.raise_for_status() 148 + 149 + lines = response.text.strip().split('\n') 150 + if not lines or not lines[0]: 151 + return [] 152 + 153 + # Important: The `_raw` key is added here to preserve the original JSON string, 154 + # ensuring byte-for-byte reproducibility as required by Spec 4.2. 155 + return [{**json.loads(line), '_raw': line} for line in lines] 156 + 157 + def _process_and_validate_ops(self, ops: list[PLCOperation]): 158 + last_op = self._mempool[-1] if self._mempool else None 159 + last_bundle = self._index['bundles'][-1] if self._index['bundles'] else None 160 + last_timestamp = last_op['createdAt'] if last_op else (last_bundle['end_time'] if last_bundle else '') 161 + 162 + new_ops_count = 0 163 + for op in ops: 164 + if op['cid'] in self._seen_cids: 165 + continue 166 + 167 + if op['createdAt'] < last_timestamp: 168 + raise ValueError(f"Chronological validation failed: op {op['cid']} at {op['createdAt']} is older than last op at {last_timestamp}") 169 + 170 + self._mempool.append(op) 171 + self._seen_cids.add(op['cid']) 172 + last_timestamp = op['createdAt'] 173 + new_ops_count += 1 174 + print(f" Added {new_ops_count} new operations to mempool.") 175 + 176 + async def _create_and_save_bundle(self): 177 + bundle_ops = self._mempool[:BUNDLE_SIZE] 178 + self._mempool = self._mempool[BUNDLE_SIZE:] 179 + 180 + last_bundle = self._index['bundles'][-1] if self._index['bundles'] else None 181 + parent_hash = last_bundle['hash'] if last_bundle else '' 182 + 183 + # Spec 4.2 & 6.3: Hashing and serialization must be exact. 184 + jsonl_data = "".join([op['_raw'] + '\n' for op in bundle_ops]).encode('utf-8') 185 + content_hash = hashlib.sha256(jsonl_data).hexdigest() 186 + chain_hash = self._calculate_chain_hash(parent_hash, content_hash) 187 + compressed_data = zstd.compress(jsonl_data, 3) 188 + 189 + bundle_number = self._index['last_bundle'] + 1 190 + filename = f"{bundle_number:06d}.jsonl.zst" 191 + (self._bundle_dir / filename).write_bytes(compressed_data) 192 + 193 + self._index['bundles'].append({ 194 + 'bundle_number': bundle_number, 195 + 'start_time': bundle_ops[0]['createdAt'], 196 + 'end_time': bundle_ops[-1]['createdAt'], 197 + 'operation_count': len(bundle_ops), 198 + 'did_count': len({op['did'] for op in bundle_ops}), 199 + 'hash': chain_hash, 'content_hash': content_hash, 'parent': parent_hash, 200 + 'compressed_hash': hashlib.sha256(compressed_data).hexdigest(), 201 + 'compressed_size': len(compressed_data), 202 + 'uncompressed_size': len(jsonl_data), 203 + 'cursor': last_bundle['end_time'] if last_bundle else '', 204 + 'created_at': datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z') 205 + }) 206 + self._index['last_bundle'] = bundle_number 207 + self._index['total_size_bytes'] += len(compressed_data) 208 + 209 + # Prune `seen_cids` to keep it memory-efficient. 210 + new_boundary_cids = self._get_boundary_cids(bundle_ops) 211 + mempool_cids = {op['cid'] for op in self._mempool} 212 + self._seen_cids = new_boundary_cids.union(mempool_cids) 213 + 214 + await self._save_index() 215 + print(f"\nCreating bundle {filename}...") 216 + print(f" ✓ Saved. Hash: {chain_hash[:16]}...") 217 + print(f" Pruned de-duplication set to {len(self._seen_cids)} CIDs.") 218 + 219 + async def _load_index(self) -> Index: 220 + try: 221 + return json.loads((self._bundle_dir / INDEX_FILE).read_text()) 222 + except FileNotFoundError: 223 + return {'version': '1.0', 'last_bundle': 0, 'updated_at': '', 'total_size_bytes': 0, 'bundles': []} 224 + 225 + async def _save_index(self): 226 + self._index['updated_at'] = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z') 227 + temp_path = self._bundle_dir / f"{INDEX_FILE}.tmp" 228 + temp_path.write_text(json.dumps(self._index, indent=2)) 229 + temp_path.rename(self._bundle_dir / INDEX_FILE) 230 + 231 + async def _load_bundle_ops(self, bundle_number: int) -> list[PLCOperation]: 232 + filename = f"{bundle_number:06d}.jsonl.zst" 233 + compressed = (self._bundle_dir / filename).read_bytes() 234 + decompressed = zstd.decompress(compressed).decode('utf-8') 235 + return [{**json.loads(line), '_raw': line} for line in decompressed.strip().split('\n')] 236 + 237 + # --- Static Utilities --- 238 + 239 + @staticmethod 240 + def _calculate_chain_hash(parent: str, content_hash: str) -> str: 241 + data = f"{parent}:{content_hash}" if parent else f"plcbundle:genesis:{content_hash}" 242 + return hashlib.sha256(data.encode('utf-8')).hexdigest() 243 + 244 + @staticmethod 245 + def _get_boundary_cids(ops: list[PLCOperation]) -> set[str]: 246 + if not ops: return set() 247 + last_time = ops[-1]['createdAt'] 248 + return {op['cid'] for op in reversed(ops) if op['createdAt'] == last_time} 249 + 250 + async def main(): 251 + """Entry point for the script.""" 252 + dir_path = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_DIR 253 + manager = await PlcBundleManager.create(dir_path) 254 + await manager.run() 255 + 256 + if __name__ == "__main__": 257 + try: 258 + asyncio.run(main()) 259 + except KeyboardInterrupt: 260 + print("\nProcess interrupted by user.") 261 + except Exception as e: 262 + print(f"\nFATAL ERROR: {e}", file=sys.stderr) 263 + sys.exit(1) 264 +