forked from
atscan.net/plcbundle
A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
1#!/usr/bin/env python3
2"""
3detect-invalid-ops.py - Detect invalid PLC operations (optimized for scale)
4
5Validates PLC operations according to did:plc specification v0.2.1
6Optimized for processing millions of operations efficiently.
7"""
8
9import sys
10import json
11import csv
12import base64
13import time
14from typing import List, Optional, Dict, Any
15from collections import defaultdict
16
17try:
18 from cryptography.hazmat.primitives.asymmetric import ec
19 from cryptography.hazmat.backends import default_backend
20 HAS_CRYPTO = True
21except ImportError:
22 HAS_CRYPTO = False
23
24# Performance: Pre-compile valid field sets
25VALID_TOP_FIELDS = frozenset({'did', 'operation', 'cid', 'nullified', 'createdAt'})
26VALID_PLC_OP_FIELDS = frozenset({'type', 'rotationKeys', 'verificationMethods',
27 'alsoKnownAs', 'services', 'prev', 'sig'})
28VALID_LEGACY_FIELDS = frozenset({'type', 'signingKey', 'recoveryKey',
29 'handle', 'service', 'prev', 'sig'})
30VALID_TOMBSTONE_FIELDS = frozenset({'type', 'prev', 'sig'})
31VALID_BASE64URL_CHARS = frozenset('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_')
32
33# EC curve orders for High-S check
34P256_ORDER = 0xFFFFFFFF00000000FFFFFFFFFFFFFFFFBCE6FAADA7179E84F3B9CAC2FC632551
35K256_ORDER = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141
36P256_HALF_ORDER = P256_ORDER // 2
37K256_HALF_ORDER = K256_ORDER // 2
38
39
40class OperationValidator:
41 def __init__(self, skip_high_s=False):
42 self.op_count = 0
43 self.invalid_count = 0
44 self.skip_high_s = skip_high_s or not HAS_CRYPTO
45 self.start_time = time.time()
46 self.last_progress_time = time.time()
47 self.reason_counts = defaultdict(int)
48
49 def log_progress(self, message: str):
50 print(f"[PROGRESS] {message}", file=sys.stderr, flush=True)
51
52 def check_extra_fields(self, operation: Dict[str, Any]) -> Optional[str]:
53 """Check for extra fields - optimized"""
54 # Fast path: check top-level first (most common)
55 extra_top = set(operation.keys()) - VALID_TOP_FIELDS
56 if extra_top:
57 return f"extra-top-level:{','.join(sorted(extra_top))}"
58
59 op_data = operation.get('operation', {})
60 op_type = op_data.get('type')
61
62 # Fast path: direct field set comparison
63 op_keys = set(op_data.keys())
64
65 if op_type == 'plc_operation':
66 extra = op_keys - VALID_PLC_OP_FIELDS
67 if extra:
68 return f"extra-plc-operation:{','.join(sorted(extra))}"
69
70 # Only check VM structure if we have verificationMethods
71 vm = op_data.get('verificationMethods')
72 if vm:
73 for key, value in vm.items():
74 if isinstance(value, dict):
75 return f"invalid-verification-method:{key}"
76
77 elif op_type == 'create':
78 extra = op_keys - VALID_LEGACY_FIELDS
79 if extra:
80 return f"extra-legacy-create:{','.join(sorted(extra))}"
81
82 elif op_type == 'plc_tombstone':
83 extra = op_keys - VALID_TOMBSTONE_FIELDS
84 if extra:
85 return f"extra-tombstone:{','.join(sorted(extra))}"
86
87 return None
88
89 def check_signature_encoding(self, operation: Dict[str, Any]) -> Optional[str]:
90 """Check signature encoding - optimized"""
91 sig = operation.get('operation', {}).get('sig')
92
93 if not sig:
94 return None
95
96 # Fast checks first
97 sig_len = len(sig)
98 if sig_len != 86:
99 return f"sig-wrong-length:{sig_len}"
100
101 if '=' in sig:
102 return "sig-has-padding"
103
104 # Check valid chars (using set for O(1) lookup)
105 if not set(sig).issubset(VALID_BASE64URL_CHARS):
106 return "sig-invalid-chars"
107
108 return None
109
110 def check_high_s_signature(self, operation: Dict[str, Any]) -> Optional[str]:
111 """Check for High-S signatures - optimized"""
112 if self.skip_high_s:
113 return None
114
115 sig = operation.get('operation', {}).get('sig')
116 if not sig:
117 return None
118
119 try:
120 # Fast decode with minimal validation
121 sig_bytes = base64.urlsafe_b64decode(sig + '==')
122
123 if len(sig_bytes) != 64:
124 return None # Already caught by encoding check
125
126 # Extract s value (second 32 bytes)
127 s = int.from_bytes(sig_bytes[32:], 'big')
128
129 # Check against both curve half-orders
130 if s > P256_HALF_ORDER or s > K256_HALF_ORDER:
131 return "high-s-signature"
132
133 except Exception:
134 return None # Encoding errors caught elsewhere
135
136 return None
137
138 def check_duplicate_rotation_keys(self, operation: Dict[str, Any]) -> Optional[str]:
139 """Check for duplicate rotation keys - optimized"""
140 op_data = operation.get('operation', {})
141
142 if op_data.get('type') != 'plc_operation':
143 return None
144
145 keys = op_data.get('rotationKeys')
146 if not keys:
147 return None
148
149 keys_len = len(keys)
150 unique_len = len(set(keys))
151
152 if unique_len < keys_len:
153 return f"duplicate-rotation-keys:{keys_len}-total-{unique_len}-unique"
154
155 return None
156
157 def check_rotation_keys_count(self, operation: Dict[str, Any]) -> Optional[str]:
158 """Check rotation keys count - optimized"""
159 op_data = operation.get('operation', {})
160
161 if op_data.get('type') != 'plc_operation':
162 return None
163
164 keys = op_data.get('rotationKeys')
165 if not keys:
166 return None
167
168 count = len(keys)
169 if count < 1 or count > 5:
170 return f"invalid-rotation-keys-count:{count}"
171
172 return None
173
174 def check_verification_methods_count(self, operation: Dict[str, Any]) -> Optional[str]:
175 """Check verification methods count - optimized"""
176 op_data = operation.get('operation', {})
177
178 if op_data.get('type') != 'plc_operation':
179 return None
180
181 vm = op_data.get('verificationMethods')
182 if not vm:
183 return None
184
185 count = len(vm)
186 if count > 10:
187 return f"too-many-verification-methods:{count}"
188
189 return None
190
191 def validate_operation(self, operation: Dict[str, Any]) -> List[str]:
192 """Run all validations - optimized hot path"""
193 reasons = []
194
195 # Inline checks for performance (avoid function call overhead)
196 result = self.check_extra_fields(operation)
197 if result:
198 reasons.append(result)
199
200 result = self.check_signature_encoding(operation)
201 if result:
202 reasons.append(result)
203
204 if not self.skip_high_s:
205 result = self.check_high_s_signature(operation)
206 if result:
207 reasons.append(result)
208
209 result = self.check_duplicate_rotation_keys(operation)
210 if result:
211 reasons.append(result)
212
213 result = self.check_rotation_keys_count(operation)
214 if result:
215 reasons.append(result)
216
217 result = self.check_verification_methods_count(operation)
218 if result:
219 reasons.append(result)
220
221 return reasons
222
223 def process_stream(self):
224 """Process operations from stdin - optimized"""
225 writer = csv.writer(sys.stdout, lineterminator='\n')
226 writer.writerow(['bundle', 'position', 'reason', 'opRaw'])
227
228 # Buffer for batch writing (improves I/O performance)
229 buffer = []
230 buffer_size = 100
231
232 for line in sys.stdin:
233 line = line.strip()
234 if not line:
235 continue
236
237 self.op_count += 1
238
239 # Calculate bundle and position
240 bundle = (self.op_count - 1) // 10000 + 1
241 position = (self.op_count - 1) % 10000
242
243 try:
244 operation = json.loads(line)
245 except json.JSONDecodeError:
246 continue
247
248 # Validate
249 reasons = self.validate_operation(operation)
250
251 if reasons:
252 self.invalid_count += 1
253 reason_str = '|'.join(reasons)
254
255 # Track reason statistics
256 for reason in reasons:
257 self.reason_counts[reason] += 1
258
259 # Compact JSON (no spaces)
260 op_raw = json.dumps(operation, separators=(',', ':'))
261
262 buffer.append([bundle, position, reason_str, op_raw])
263
264 # Batch write for performance
265 if len(buffer) >= buffer_size:
266 writer.writerows(buffer)
267 buffer.clear()
268
269 # Progress (throttled to once per second)
270 if self.op_count % 10000 == 0:
271 current_time = time.time()
272 if current_time - self.last_progress_time >= 1.0:
273 elapsed = current_time - self.start_time
274 rate = self.op_count / elapsed
275 self.log_progress(
276 f"Processed {self.op_count:,} ops "
277 f"({rate:,.0f} ops/sec) | "
278 f"Invalid: {self.invalid_count:,}"
279 )
280 self.last_progress_time = current_time
281
282 # Flush remaining buffer
283 if buffer:
284 writer.writerows(buffer)
285
286 # Final summary
287 elapsed = time.time() - self.start_time
288 if self.op_count > 0:
289 percentage = (self.invalid_count / self.op_count) * 100
290 rate = self.op_count / elapsed
291 self.log_progress("=" * 60)
292 self.log_progress(f"Complete: {self.op_count:,} operations in {elapsed:.1f}s")
293 self.log_progress(f"Rate: {rate:,.0f} ops/sec")
294 self.log_progress(f"Invalid: {self.invalid_count:,} ({percentage:.4f}%)")
295
296 # Show top invalid reasons
297 if self.reason_counts:
298 self.log_progress("\nTop invalid reasons:")
299 sorted_reasons = sorted(
300 self.reason_counts.items(),
301 key=lambda x: x[1],
302 reverse=True
303 )[:5]
304 for reason, count in sorted_reasons:
305 self.log_progress(f" {reason}: {count:,}")
306 else:
307 self.log_progress("No operations processed")
308
309
310def main():
311 import argparse
312
313 parser = argparse.ArgumentParser(
314 description='Detect invalid PLC operations',
315 formatter_class=argparse.RawDescriptionHelpFormatter,
316 epilog='''
317Examples:
318 plcbundle export -count 10000 | %(prog)s > invalid.csv
319 plcbundle backfill | %(prog)s > invalid.csv
320
321 # Skip High-S check for speed
322 plcbundle backfill | %(prog)s --skip-high-s > invalid.csv
323
324Performance:
325 ~50,000-100,000 ops/sec on modern hardware
326 ~1-2 hours for 10 million operations
327 Memory usage: <100MB
328 '''
329 )
330
331 parser.add_argument(
332 '--skip-high-s',
333 action='store_true',
334 help='Skip High-S validation (faster)'
335 )
336
337 args = parser.parse_args()
338
339 if not HAS_CRYPTO and not args.skip_high_s:
340 print("[WARNING] cryptography not installed. High-S validation disabled.", file=sys.stderr)
341 print("[WARNING] Install with: pip install cryptography", file=sys.stderr)
342 args.skip_high_s = True
343
344 validator = OperationValidator(skip_high_s=args.skip_high_s)
345
346 try:
347 validator.process_stream()
348 except KeyboardInterrupt:
349 print("\n[INTERRUPTED]", file=sys.stderr)
350 sys.exit(1)
351 except BrokenPipeError:
352 sys.exit(0)
353
354
355if __name__ == '__main__':
356 main()