optimizing a gate level bcm to the end of the earth and back
at main 620 lines 23 kB view raw
1#!/usr/bin/env python3 2""" 3Search for optimal BCD to 7-segment decoder circuit with 2, 3, and 4-input gates. 4""" 5 6import multiprocessing as mp 7from multiprocessing import Manager 8from concurrent.futures import ProcessPoolExecutor, as_completed 9import sys 10import os 11import time 12import threading 13import queue 14import termios 15import tty 16import signal 17import atexit 18 19from bcd_optimization.solver import BCDTo7SegmentSolver 20from bcd_optimization.truth_tables import SEGMENT_MINTERMS, SEGMENT_NAMES 21 22# ANSI escape codes for terminal control 23CLEAR_LINE = "\033[K" 24MOVE_UP = "\033[A" 25GREEN = "\033[92m" 26YELLOW = "\033[93m" 27CYAN = "\033[96m" 28DIM = "\033[2m" 29RESET = "\033[0m" 30BOLD = "\033[1m" 31HIDE_CURSOR = "\033[?25l" 32SHOW_CURSOR = "\033[?25h" 33 34 35class ProgressDisplay: 36 """Async progress display that updates in a separate thread.""" 37 38 def __init__(self, stats_queue=None): 39 self.lock = threading.Lock() 40 self.running = False 41 self.thread = None 42 self.stats_queue = stats_queue 43 self.original_termios = None 44 45 # State 46 self.completed = 0 47 self.total = 0 48 self.start_time = 0 49 self.last_config = "" 50 self.current_cost = 0 51 self.message = "" 52 53 # SAT solver stats (aggregated across all running configs) 54 self.active_configs = {} # config_str -> stats dict 55 self.total_conflicts = 0 56 self.total_decisions = 0 57 self.total_vars = 0 58 self.total_clauses = 0 59 60 def _disable_input(self): 61 """Disable keyboard echo and hide cursor.""" 62 try: 63 self.original_termios = termios.tcgetattr(sys.stdin) 64 new_termios = termios.tcgetattr(sys.stdin) 65 new_termios[3] = new_termios[3] & ~termios.ECHO & ~termios.ICANON 66 termios.tcsetattr(sys.stdin, termios.TCSANOW, new_termios) 67 except (termios.error, AttributeError): 68 pass 69 sys.stdout.write(HIDE_CURSOR) 70 sys.stdout.flush() 71 72 def _restore_input(self): 73 """Restore keyboard echo and show cursor.""" 74 sys.stdout.write(SHOW_CURSOR) 75 sys.stdout.flush() 76 if self.original_termios: 77 try: 78 termios.tcsetattr(sys.stdin, termios.TCSANOW, self.original_termios) 79 except (termios.error, AttributeError): 80 pass 81 self.original_termios = None 82 83 def start(self, total, cost): 84 """Start the progress display thread.""" 85 with self.lock: 86 self.completed = 0 87 self.total = total 88 self.start_time = time.time() 89 self.last_config = "" 90 self.current_cost = cost 91 self.message = "" 92 self.active_configs = {} 93 self.total_conflicts = 0 94 self.total_decisions = 0 95 self.running = True 96 97 self._disable_input() 98 self.thread = threading.Thread(target=self._update_loop, daemon=True) 99 self.thread.start() 100 101 def stop(self): 102 """Stop the progress display thread.""" 103 self.running = False 104 if self.thread: 105 self.thread.join(timeout=0.5) 106 self._restore_input() 107 # Clear the line 108 print(f"\r{CLEAR_LINE}", end="", flush=True) 109 110 def update(self, completed, last_config=""): 111 """Update progress state (called from main thread).""" 112 with self.lock: 113 self.completed = completed 114 if last_config: 115 self.last_config = last_config 116 # Remove completed config from active 117 if last_config in self.active_configs: 118 del self.active_configs[last_config] 119 120 def set_message(self, msg): 121 """Set a temporary message to display.""" 122 with self.lock: 123 self.message = msg 124 125 def _poll_stats(self): 126 """Poll stats queue for updates from worker processes.""" 127 if not self.stats_queue: 128 return 129 130 try: 131 while True: 132 stats = self.stats_queue.get_nowait() 133 config = stats.get('config', '') 134 with self.lock: 135 self.active_configs[config] = stats 136 # Aggregate stats 137 self.total_conflicts = sum(s.get('conflicts', 0) for s in self.active_configs.values()) 138 self.total_decisions = sum(s.get('decisions', 0) for s in self.active_configs.values()) 139 self.total_vars = sum(s.get('vars', 0) for s in self.active_configs.values()) 140 self.total_clauses = sum(s.get('clauses', 0) for s in self.active_configs.values()) 141 except: 142 pass # Queue empty 143 144 def _update_loop(self): 145 """Background thread that updates the display.""" 146 import shutil 147 148 spinner = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" 149 spin_idx = 0 150 last_conflicts = 0 151 conflict_rate = 0 152 153 def fmt_num(n): 154 if n >= 1_000_000: 155 return f"{n/1_000_000:.1f}M" 156 elif n >= 1_000: 157 return f"{n/1_000:.1f}K" 158 return str(int(n)) 159 160 while self.running: 161 self._poll_stats() 162 163 # Get terminal width each iteration (in case of resize) 164 try: 165 term_width = shutil.get_terminal_size().columns 166 except: 167 term_width = 80 168 169 with self.lock: 170 elapsed = time.time() - self.start_time 171 speed = self.completed / elapsed if elapsed > 0 else 0 172 remaining = self.total - self.completed 173 eta = remaining / speed if speed > 0 else 0 174 175 pct = 100 * self.completed / self.total if self.total > 0 else 0 176 spin = spinner[spin_idx % len(spinner)] 177 178 # Calculate conflict rate 179 conflict_delta = self.total_conflicts - last_conflicts 180 conflict_rate = conflict_rate * 0.7 + conflict_delta * 10 * 0.3 181 last_conflicts = self.total_conflicts 182 183 # Fixed parts (calculate their length) 184 # Format: " ⠹ [BAR] 2/6 (33%) 0.8/s [ACT] 29K" 185 prefix = f" {spin} [" 186 count_str = f"] {self.completed}/{self.total} ({pct:.0f}%) {speed:.1f}/s" 187 188 if self.total_conflicts > 0: 189 activity_level = min(max(conflict_rate / 50000, 0), 1.0) 190 conflict_str = f" {fmt_num(self.total_conflicts)}" 191 else: 192 activity_level = 0 193 conflict_str = "" 194 195 # Calculate available space for bars 196 fixed_len = len(prefix) + len(count_str) + len(conflict_str) + 4 # +4 for " []" around activity 197 available = term_width - fixed_len - 2 # -2 for safety margin 198 199 if available < 10: 200 # Too narrow, minimal display 201 content = f" {spin} {self.completed}/{self.total} {fmt_num(self.total_conflicts)}" 202 else: 203 # Split available space: 70% progress bar, 30% activity bar 204 if self.total_conflicts > 0: 205 progress_width = int(available * 0.65) 206 activity_width = available - progress_width 207 else: 208 progress_width = available 209 activity_width = 0 210 211 # Progress bar 212 filled = int(progress_width * self.completed / self.total) if self.total > 0 else 0 213 bar = "" * filled + "" * (progress_width - filled) 214 215 # Activity bar 216 if activity_width > 0: 217 activity_filled = int(activity_width * activity_level) 218 activity_bar = f" [{CYAN}{'' * activity_filled}{'' * (activity_width - activity_filled)}{RESET}]{conflict_str}" 219 else: 220 activity_bar = "" 221 222 content = f"{prefix}{bar}{count_str}{activity_bar}" 223 224 line = f"\r{content}{CLEAR_LINE}" 225 sys.stdout.write(line) 226 sys.stdout.flush() 227 228 spin_idx += 1 229 time.sleep(0.1) 230 231 232def progress_bar(current, total, width=30, label=""): 233 """Generate a progress bar string.""" 234 filled = int(width * current / total) if total > 0 else 0 235 bar = "" * filled + "" * (width - filled) 236 pct = 100 * current / total if total > 0 else 0 237 return f"{label}[{bar}] {current}/{total} ({pct:.0f}%)" 238 239 240def try_config_with_stats(n2, n3, n4, use_complements, restrict_functions, stats_queue, stop_event=None): 241 """Try a single (n2, n3, n4) configuration with stats reporting.""" 242 cost = n2 * 2 + n3 * 3 + n4 * 4 243 n_gates = n2 + n3 + n4 244 config_str = f"{n2}x2+{n3}x3+{n4}x4" 245 246 if n_gates < 7: 247 return None, cost, f"Skipped (only {n_gates} gates)", (n2, n3, n4) 248 249 from pysat.formula import CNF 250 from pysat.solvers import Solver 251 252 solver = BCDTo7SegmentSolver() 253 solver.generate_prime_implicants() 254 255 start = time.time() 256 try: 257 # Check early termination before building CNF 258 if stop_event and stop_event.is_set(): 259 return None, cost, "Cancelled", (n2, n3, n4) 260 261 # Build the CNF without solving 262 cnf = solver._build_general_cnf(n2, n3, n4, use_complements, restrict_functions) 263 if cnf is None: 264 return None, cost, f"UNSAT (no valid config)", (n2, n3, n4) 265 266 n_vars = cnf['n_vars'] 267 n_clauses = len(cnf['clauses']) 268 269 # Report initial stats 270 if stats_queue: 271 try: 272 stats_queue.put_nowait({ 273 'config': config_str, 274 'phase': 'solving', 275 'vars': n_vars, 276 'clauses': n_clauses, 277 'conflicts': 0, 278 'decisions': 0, 279 }) 280 except: 281 pass 282 283 # Solve with periodic stats updates 284 with Solver(name='g3', bootstrap_with=CNF(from_clauses=cnf['clauses'])) as sat_solver: 285 # Use solve_limited with conflict budget for progress updates 286 conflict_budget = 10000 287 total_conflicts = 0 288 total_decisions = 0 289 290 while True: 291 # Check early termination 292 if stop_event and stop_event.is_set(): 293 elapsed = time.time() - start 294 return None, cost, f"Cancelled ({elapsed:.1f}s, {total_conflicts} conflicts)", (n2, n3, n4) 295 296 sat_solver.conf_budget(conflict_budget) 297 status = sat_solver.solve_limited() 298 299 stats = sat_solver.accum_stats() 300 total_conflicts = stats.get('conflicts', 0) 301 total_decisions = stats.get('decisions', 0) 302 303 if stats_queue: 304 try: 305 stats_queue.put_nowait({ 306 'config': config_str, 307 'phase': 'solving', 308 'vars': n_vars, 309 'clauses': n_clauses, 310 'conflicts': total_conflicts, 311 'decisions': total_decisions, 312 }) 313 except: 314 pass 315 316 if status is not None: 317 # Solved (True = SAT, False = UNSAT) 318 break 319 # status is None means budget exhausted, continue 320 321 elapsed = time.time() - start 322 323 if status: 324 model = set(sat_solver.get_model()) 325 result = solver._decode_general_solution_from_cnf(model, cnf) 326 return result, cost, f"SAT ({elapsed:.1f}s, {total_conflicts} conflicts)", (n2, n3, n4) 327 else: 328 return None, cost, f"UNSAT ({elapsed:.1f}s, {total_conflicts} conflicts)", (n2, n3, n4) 329 330 except Exception as e: 331 import traceback 332 elapsed = time.time() - start 333 return None, cost, f"Error ({elapsed:.1f}s): {e}", (n2, n3, n4) 334 335 336def try_config(args): 337 """Try a single (n2, n3, n4) configuration. Run in separate process.""" 338 n2, n3, n4, use_complements, restrict_functions, stats_queue, stop_event = args 339 return try_config_with_stats(n2, n3, n4, use_complements, restrict_functions, stats_queue, stop_event) 340 341 342def worker_init(): 343 """Initialize worker process to ignore SIGINT (parent handles it).""" 344 signal.signal(signal.SIGINT, signal.SIG_IGN) 345 346 347def verify_result(result): 348 """Verify a synthesis result is correct.""" 349 def eval_func2(func, a, b): 350 return (func >> (a * 2 + b)) & 1 351 352 def eval_func3(func, a, b, c): 353 return (func >> (a * 4 + b * 2 + c)) & 1 354 355 def eval_func4(func, a, b, c, d): 356 return (func >> (a * 8 + b * 4 + c * 2 + d)) & 1 357 358 for digit in range(10): 359 A = (digit >> 3) & 1 360 B = (digit >> 2) & 1 361 C = (digit >> 1) & 1 362 D = digit & 1 363 364 nodes = [A, B, C, D, 1-A, 1-B, 1-C, 1-D] 365 366 for g in result.gates: 367 if isinstance(g.input2, tuple): 368 if len(g.input2) == 2: 369 k, l = g.input2 370 val = eval_func3(g.func, nodes[g.input1], nodes[k], nodes[l]) 371 else: 372 k, l, m = g.input2 373 val = eval_func4(g.func, nodes[g.input1], nodes[k], nodes[l], nodes[m]) 374 else: 375 val = eval_func2(g.func, nodes[g.input1], nodes[g.input2]) 376 nodes.append(val) 377 378 for seg in SEGMENT_NAMES: 379 expected = 1 if digit in SEGMENT_MINTERMS[seg] else 0 380 actual = nodes[result.output_map[seg]] 381 if actual != expected: 382 return False, f"Digit {digit}, {seg}: expected {expected}, got {actual}" 383 384 return True, "All correct" 385 386 387def cleanup_terminal(): 388 """Restore terminal to normal state.""" 389 sys.stdout.write(SHOW_CURSOR) 390 sys.stdout.flush() 391 392 393def main(): 394 # Register cleanup to ensure cursor is always restored 395 atexit.register(cleanup_terminal) 396 397 # Store original terminal settings for signal handler 398 original_termios = None 399 try: 400 original_termios = termios.tcgetattr(sys.stdin) 401 except (termios.error, AttributeError): 402 pass 403 404 # Create progress display early so signal handler can access it 405 progress = ProgressDisplay(None) # Queue set later 406 executor_ref = [None] # Mutable container for executor reference 407 # State for signal handler status message 408 search_state = { 409 'start_time': 0, 410 'group_start': 0, 411 'configs_tested': 0, 412 'current_cost': 0, 413 'group_completed': 0, 414 'group_size': 0, 415 } 416 417 def signal_handler(signum, frame): 418 """Handle interrupt signals gracefully.""" 419 # Stop progress display first to prevent overwrites 420 progress.running = False 421 if progress.thread: 422 progress.thread.join(timeout=0.2) 423 # Shutdown executor without waiting 424 if executor_ref[0]: 425 executor_ref[0].shutdown(wait=False, cancel_futures=True) 426 # Restore terminal 427 sys.stdout.write(f"\r{CLEAR_LINE}") 428 sys.stdout.write(SHOW_CURSOR) 429 sys.stdout.flush() 430 if original_termios: 431 try: 432 termios.tcsetattr(sys.stdin, termios.TCSANOW, original_termios) 433 except (termios.error, AttributeError): 434 pass 435 # Print status message 436 elapsed = time.time() - search_state['group_start'] if search_state['group_start'] else 0 437 conflicts = progress.total_conflicts 438 print(f" {YELLOW}● Interrupted at {search_state['group_completed']}/{search_state['group_size']} configurations{RESET} ({elapsed:.1f}s, {conflicts} conflicts)") 439 print(f"\n{YELLOW}Search interrupted by user{RESET}") 440 os._exit(0) # Hard exit to avoid cleanup errors from child processes 441 442 signal.signal(signal.SIGINT, signal_handler) 443 signal.signal(signal.SIGTERM, signal_handler) 444 445 print(f"{BOLD}{'=' * 60}{RESET}") 446 print(f"{BOLD}BCD to 7-Segment Optimal Circuit Search (with 4-input gates){RESET}") 447 print(f"{BOLD}{'=' * 60}{RESET}") 448 print() 449 print(f"Gates: AND, OR, XOR, XNOR, NAND, NOR (2, 3, and 4 input variants)") 450 print(f"Primary input complements (A', B', C', D') are free") 451 print() 452 453 # Generate configurations sorted by cost 454 configs = [] 455 for n2 in range(0, 12): 456 for n3 in range(0, 8): 457 for n4 in range(0, 6): 458 cost = n2 * 2 + n3 * 3 + n4 * 4 459 n_gates = n2 + n3 + n4 460 # Need at least 7 gates for 7 outputs 461 if 7 <= n_gates <= 11 and 14 <= cost <= 22: 462 configs.append((n2, n3, n4, True, True)) 463 464 # Sort by cost, then by number of gates 465 configs.sort(key=lambda x: (x[0]*2 + x[1]*3 + x[2]*4, x[0]+x[1]+x[2])) 466 467 # Group configs by cost 468 cost_groups = {} 469 for cfg in configs: 470 cost = cfg[0] * 2 + cfg[1] * 3 + cfg[2] * 4 471 if cost not in cost_groups: 472 cost_groups[cost] = [] 473 cost_groups[cost].append(cfg) 474 475 min_cost = min(cost_groups.keys()) 476 max_cost = max(cost_groups.keys()) 477 478 print(f"Searching {len(configs)} configurations from {min_cost} to {max_cost} inputs") 479 print(f"Using {mp.cpu_count()} CPU cores") 480 print() 481 482 best_result = None 483 best_cost = float('inf') 484 485 total_start = time.time() 486 search_state['start_time'] = total_start 487 configs_tested = 0 488 total_configs = len(configs) 489 490 # Create shared queue for stats and stop event for early termination 491 manager = Manager() 492 stats_queue = manager.Queue() 493 stop_event = manager.Event() 494 progress.stats_queue = stats_queue 495 496 with ProcessPoolExecutor(max_workers=mp.cpu_count(), initializer=worker_init) as executor: 497 executor_ref[0] = executor 498 for cost in sorted(cost_groups.keys()): 499 if cost >= best_cost: 500 continue 501 502 group = cost_groups[cost] 503 group_size = len(group) 504 group_start = time.time() 505 completed_in_group = 0 506 507 # Update state for signal handler 508 search_state['current_cost'] = cost 509 search_state['group_size'] = group_size 510 search_state['group_completed'] = 0 511 search_state['group_start'] = group_start 512 513 print(f"\n{CYAN}{BOLD}Testing {cost} inputs{RESET} ({group_size} configurations)") 514 print("-" * 50) 515 516 # Start async progress display 517 progress.start(group_size, cost) 518 519 # Add stats_queue and stop_event to each config 520 stop_event.clear() # Reset for new cost group 521 configs_with_queue = [(cfg[0], cfg[1], cfg[2], cfg[3], cfg[4], stats_queue, stop_event) for cfg in group] 522 futures = {executor.submit(try_config, cfg): cfg for cfg in configs_with_queue} 523 found_solution = False 524 525 for future in as_completed(futures): 526 cfg = futures[future] 527 n2, n3, n4 = cfg[0], cfg[1], cfg[2] # First 3 elements are gate counts 528 completed_in_group += 1 529 configs_tested += 1 530 search_state['group_completed'] = completed_in_group 531 search_state['configs_tested'] = configs_tested 532 config_str = f"{n2}x2+{n3}x3+{n4}x4" 533 534 try: 535 result, result_cost, status, _ = future.result(timeout=300) 536 537 # Update progress (always, even for UNSAT) 538 progress.update(completed_in_group, config_str) 539 540 if result is not None: 541 # Found a potential solution - stop progress to print 542 valid, msg = verify_result(result) 543 progress.stop() 544 if valid: 545 print(f"\n {GREEN}{n2}x2 + {n3}x3 + {n4}x4{RESET}: {status} {GREEN}(VERIFIED){RESET}") 546 if result_cost < best_cost: 547 best_result = result 548 best_cost = result_cost 549 found_solution = True 550 # Signal other workers to stop 551 stop_event.set() 552 for f in futures: 553 f.cancel() 554 break 555 else: 556 print(f"\n {YELLOW}{n2}x2 + {n3}x3 + {n4}x4{RESET}: INVALID - {msg}") 557 # Restart progress 558 progress.start(group_size, cost) 559 progress.update(completed_in_group, config_str) 560 # For UNSAT: just continue, progress bar updates automatically 561 562 except Exception as e: 563 progress.stop() 564 print(f"\n {n2}x2 + {n3}x3 + {n4}x4: Error - {e}") 565 progress.start(group_size, cost) 566 progress.update(completed_in_group) 567 568 # Stop progress and print summary 569 progress.stop() 570 group_elapsed = time.time() - group_start 571 572 if not found_solution: 573 print(f" {YELLOW}✗ All {group_size} configurations UNSAT{RESET} ({group_elapsed:.1f}s, {progress.total_conflicts} conflicts)") 574 575 if best_result is not None and best_cost <= cost: 576 print(f"\n{GREEN}{BOLD}Found solution at {best_cost} inputs!{RESET}") 577 break 578 579 total_elapsed = time.time() - total_start 580 581 print() 582 print(f"{BOLD}{'=' * 60}{RESET}") 583 print(f"{BOLD}RESULTS{RESET}") 584 print(f"{BOLD}{'=' * 60}{RESET}") 585 print(f"Search time: {total_elapsed:.1f} seconds ({configs_tested} configurations tested)") 586 print(f"Average speed: {configs_tested/total_elapsed:.2f} configurations/second") 587 588 if best_result: 589 print(f"\n{GREEN}{BOLD}Best solution: {best_cost} gate inputs{RESET}") 590 print() 591 print("Gates:") 592 593 node_names = ['A', 'B', 'C', 'D', "A'", "B'", "C'", "D'"] 594 for g in best_result.gates: 595 i1 = node_names[g.input1] 596 if isinstance(g.input2, tuple): 597 if len(g.input2) == 2: 598 k, l = g.input2 599 i2, i3 = node_names[k], node_names[l] 600 print(f" g{g.index}: {g.func_name}({i1}, {i2}, {i3})") 601 else: 602 k, l, m = g.input2 603 i2, i3, i4 = node_names[k], node_names[l], node_names[m] 604 print(f" g{g.index}: {g.func_name}({i1}, {i2}, {i3}, {i4})") 605 else: 606 i2 = node_names[g.input2] 607 print(f" g{g.index}: {g.func_name}({i1}, {i2})") 608 node_names.append(f"g{g.index}") 609 610 print() 611 print("Outputs:") 612 for seg in SEGMENT_NAMES: 613 print(f" {seg} = {node_names[best_result.output_map[seg]]}") 614 else: 615 print(f"\n{YELLOW}No solution found in the search range (14-22 inputs).{RESET}") 616 print("The minimum is likely 23 inputs (7x2 + 3x3 configuration).") 617 618 619if __name__ == "__main__": 620 main()