A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
at test-validate 356 lines 12 kB view raw
1#!/usr/bin/env python3 2""" 3detect-invalid-ops.py - Detect invalid PLC operations (optimized for scale) 4 5Validates PLC operations according to did:plc specification v0.2.1 6Optimized for processing millions of operations efficiently. 7""" 8 9import sys 10import json 11import csv 12import base64 13import time 14from typing import List, Optional, Dict, Any 15from collections import defaultdict 16 17try: 18 from cryptography.hazmat.primitives.asymmetric import ec 19 from cryptography.hazmat.backends import default_backend 20 HAS_CRYPTO = True 21except ImportError: 22 HAS_CRYPTO = False 23 24# Performance: Pre-compile valid field sets 25VALID_TOP_FIELDS = frozenset({'did', 'operation', 'cid', 'nullified', 'createdAt'}) 26VALID_PLC_OP_FIELDS = frozenset({'type', 'rotationKeys', 'verificationMethods', 27 'alsoKnownAs', 'services', 'prev', 'sig'}) 28VALID_LEGACY_FIELDS = frozenset({'type', 'signingKey', 'recoveryKey', 29 'handle', 'service', 'prev', 'sig'}) 30VALID_TOMBSTONE_FIELDS = frozenset({'type', 'prev', 'sig'}) 31VALID_BASE64URL_CHARS = frozenset('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_') 32 33# EC curve orders for High-S check 34P256_ORDER = 0xFFFFFFFF00000000FFFFFFFFFFFFFFFFBCE6FAADA7179E84F3B9CAC2FC632551 35K256_ORDER = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141 36P256_HALF_ORDER = P256_ORDER // 2 37K256_HALF_ORDER = K256_ORDER // 2 38 39 40class OperationValidator: 41 def __init__(self, skip_high_s=False): 42 self.op_count = 0 43 self.invalid_count = 0 44 self.skip_high_s = skip_high_s or not HAS_CRYPTO 45 self.start_time = time.time() 46 self.last_progress_time = time.time() 47 self.reason_counts = defaultdict(int) 48 49 def log_progress(self, message: str): 50 print(f"[PROGRESS] {message}", file=sys.stderr, flush=True) 51 52 def check_extra_fields(self, operation: Dict[str, Any]) -> Optional[str]: 53 """Check for extra fields - optimized""" 54 # Fast path: check top-level first (most common) 55 extra_top = set(operation.keys()) - VALID_TOP_FIELDS 56 if extra_top: 57 return f"extra-top-level:{','.join(sorted(extra_top))}" 58 59 op_data = operation.get('operation', {}) 60 op_type = op_data.get('type') 61 62 # Fast path: direct field set comparison 63 op_keys = set(op_data.keys()) 64 65 if op_type == 'plc_operation': 66 extra = op_keys - VALID_PLC_OP_FIELDS 67 if extra: 68 return f"extra-plc-operation:{','.join(sorted(extra))}" 69 70 # Only check VM structure if we have verificationMethods 71 vm = op_data.get('verificationMethods') 72 if vm: 73 for key, value in vm.items(): 74 if isinstance(value, dict): 75 return f"invalid-verification-method:{key}" 76 77 elif op_type == 'create': 78 extra = op_keys - VALID_LEGACY_FIELDS 79 if extra: 80 return f"extra-legacy-create:{','.join(sorted(extra))}" 81 82 elif op_type == 'plc_tombstone': 83 extra = op_keys - VALID_TOMBSTONE_FIELDS 84 if extra: 85 return f"extra-tombstone:{','.join(sorted(extra))}" 86 87 return None 88 89 def check_signature_encoding(self, operation: Dict[str, Any]) -> Optional[str]: 90 """Check signature encoding - optimized""" 91 sig = operation.get('operation', {}).get('sig') 92 93 if not sig: 94 return None 95 96 # Fast checks first 97 sig_len = len(sig) 98 if sig_len != 86: 99 return f"sig-wrong-length:{sig_len}" 100 101 if '=' in sig: 102 return "sig-has-padding" 103 104 # Check valid chars (using set for O(1) lookup) 105 if not set(sig).issubset(VALID_BASE64URL_CHARS): 106 return "sig-invalid-chars" 107 108 return None 109 110 def check_high_s_signature(self, operation: Dict[str, Any]) -> Optional[str]: 111 """Check for High-S signatures - optimized""" 112 if self.skip_high_s: 113 return None 114 115 sig = operation.get('operation', {}).get('sig') 116 if not sig: 117 return None 118 119 try: 120 # Fast decode with minimal validation 121 sig_bytes = base64.urlsafe_b64decode(sig + '==') 122 123 if len(sig_bytes) != 64: 124 return None # Already caught by encoding check 125 126 # Extract s value (second 32 bytes) 127 s = int.from_bytes(sig_bytes[32:], 'big') 128 129 # Check against both curve half-orders 130 if s > P256_HALF_ORDER or s > K256_HALF_ORDER: 131 return "high-s-signature" 132 133 except Exception: 134 return None # Encoding errors caught elsewhere 135 136 return None 137 138 def check_duplicate_rotation_keys(self, operation: Dict[str, Any]) -> Optional[str]: 139 """Check for duplicate rotation keys - optimized""" 140 op_data = operation.get('operation', {}) 141 142 if op_data.get('type') != 'plc_operation': 143 return None 144 145 keys = op_data.get('rotationKeys') 146 if not keys: 147 return None 148 149 keys_len = len(keys) 150 unique_len = len(set(keys)) 151 152 if unique_len < keys_len: 153 return f"duplicate-rotation-keys:{keys_len}-total-{unique_len}-unique" 154 155 return None 156 157 def check_rotation_keys_count(self, operation: Dict[str, Any]) -> Optional[str]: 158 """Check rotation keys count - optimized""" 159 op_data = operation.get('operation', {}) 160 161 if op_data.get('type') != 'plc_operation': 162 return None 163 164 keys = op_data.get('rotationKeys') 165 if not keys: 166 return None 167 168 count = len(keys) 169 if count < 1 or count > 5: 170 return f"invalid-rotation-keys-count:{count}" 171 172 return None 173 174 def check_verification_methods_count(self, operation: Dict[str, Any]) -> Optional[str]: 175 """Check verification methods count - optimized""" 176 op_data = operation.get('operation', {}) 177 178 if op_data.get('type') != 'plc_operation': 179 return None 180 181 vm = op_data.get('verificationMethods') 182 if not vm: 183 return None 184 185 count = len(vm) 186 if count > 10: 187 return f"too-many-verification-methods:{count}" 188 189 return None 190 191 def validate_operation(self, operation: Dict[str, Any]) -> List[str]: 192 """Run all validations - optimized hot path""" 193 reasons = [] 194 195 # Inline checks for performance (avoid function call overhead) 196 result = self.check_extra_fields(operation) 197 if result: 198 reasons.append(result) 199 200 result = self.check_signature_encoding(operation) 201 if result: 202 reasons.append(result) 203 204 if not self.skip_high_s: 205 result = self.check_high_s_signature(operation) 206 if result: 207 reasons.append(result) 208 209 result = self.check_duplicate_rotation_keys(operation) 210 if result: 211 reasons.append(result) 212 213 result = self.check_rotation_keys_count(operation) 214 if result: 215 reasons.append(result) 216 217 result = self.check_verification_methods_count(operation) 218 if result: 219 reasons.append(result) 220 221 return reasons 222 223 def process_stream(self): 224 """Process operations from stdin - optimized""" 225 writer = csv.writer(sys.stdout, lineterminator='\n') 226 writer.writerow(['bundle', 'position', 'reason', 'opRaw']) 227 228 # Buffer for batch writing (improves I/O performance) 229 buffer = [] 230 buffer_size = 100 231 232 for line in sys.stdin: 233 line = line.strip() 234 if not line: 235 continue 236 237 self.op_count += 1 238 239 # Calculate bundle and position 240 bundle = (self.op_count - 1) // 10000 + 1 241 position = (self.op_count - 1) % 10000 242 243 try: 244 operation = json.loads(line) 245 except json.JSONDecodeError: 246 continue 247 248 # Validate 249 reasons = self.validate_operation(operation) 250 251 if reasons: 252 self.invalid_count += 1 253 reason_str = '|'.join(reasons) 254 255 # Track reason statistics 256 for reason in reasons: 257 self.reason_counts[reason] += 1 258 259 # Compact JSON (no spaces) 260 op_raw = json.dumps(operation, separators=(',', ':')) 261 262 buffer.append([bundle, position, reason_str, op_raw]) 263 264 # Batch write for performance 265 if len(buffer) >= buffer_size: 266 writer.writerows(buffer) 267 buffer.clear() 268 269 # Progress (throttled to once per second) 270 if self.op_count % 10000 == 0: 271 current_time = time.time() 272 if current_time - self.last_progress_time >= 1.0: 273 elapsed = current_time - self.start_time 274 rate = self.op_count / elapsed 275 self.log_progress( 276 f"Processed {self.op_count:,} ops " 277 f"({rate:,.0f} ops/sec) | " 278 f"Invalid: {self.invalid_count:,}" 279 ) 280 self.last_progress_time = current_time 281 282 # Flush remaining buffer 283 if buffer: 284 writer.writerows(buffer) 285 286 # Final summary 287 elapsed = time.time() - self.start_time 288 if self.op_count > 0: 289 percentage = (self.invalid_count / self.op_count) * 100 290 rate = self.op_count / elapsed 291 self.log_progress("=" * 60) 292 self.log_progress(f"Complete: {self.op_count:,} operations in {elapsed:.1f}s") 293 self.log_progress(f"Rate: {rate:,.0f} ops/sec") 294 self.log_progress(f"Invalid: {self.invalid_count:,} ({percentage:.4f}%)") 295 296 # Show top invalid reasons 297 if self.reason_counts: 298 self.log_progress("\nTop invalid reasons:") 299 sorted_reasons = sorted( 300 self.reason_counts.items(), 301 key=lambda x: x[1], 302 reverse=True 303 )[:5] 304 for reason, count in sorted_reasons: 305 self.log_progress(f" {reason}: {count:,}") 306 else: 307 self.log_progress("No operations processed") 308 309 310def main(): 311 import argparse 312 313 parser = argparse.ArgumentParser( 314 description='Detect invalid PLC operations', 315 formatter_class=argparse.RawDescriptionHelpFormatter, 316 epilog=''' 317Examples: 318 plcbundle export -count 10000 | %(prog)s > invalid.csv 319 plcbundle backfill | %(prog)s > invalid.csv 320 321 # Skip High-S check for speed 322 plcbundle backfill | %(prog)s --skip-high-s > invalid.csv 323 324Performance: 325 ~50,000-100,000 ops/sec on modern hardware 326 ~1-2 hours for 10 million operations 327 Memory usage: <100MB 328 ''' 329 ) 330 331 parser.add_argument( 332 '--skip-high-s', 333 action='store_true', 334 help='Skip High-S validation (faster)' 335 ) 336 337 args = parser.parse_args() 338 339 if not HAS_CRYPTO and not args.skip_high_s: 340 print("[WARNING] cryptography not installed. High-S validation disabled.", file=sys.stderr) 341 print("[WARNING] Install with: pip install cryptography", file=sys.stderr) 342 args.skip_high_s = True 343 344 validator = OperationValidator(skip_high_s=args.skip_high_s) 345 346 try: 347 validator.process_stream() 348 except KeyboardInterrupt: 349 print("\n[INTERRUPTED]", file=sys.stderr) 350 sys.exit(1) 351 except BrokenPipeError: 352 sys.exit(0) 353 354 355if __name__ == '__main__': 356 main()