this repo has no description
at e8ed148d7ade2d6da8c22b0d84160fce90d7bcbc 269 lines 11 kB view raw
1#!/usr/bin/env python3 2 3import json 4import os 5import shutil 6from pathlib import Path 7from typing import List, Dict, Any 8from rich.console import Console 9from rich.table import Table 10from rich.panel import Panel 11from rich.text import Text 12 13class QueueManager: 14 def __init__(self, queue_dir: str = "queue"): 15 self.queue_dir = Path(queue_dir) 16 self.deleted_dir = self.queue_dir / "deleted" 17 self.processed_file = self.queue_dir / "processed_notifications.json" 18 self.console = Console() 19 20 # Create deleted directory if it doesn't exist 21 self.deleted_dir.mkdir(exist_ok=True) 22 23 # Load existing processed notifications 24 self.processed_notifications = self._load_processed_notifications() 25 26 def _load_processed_notifications(self) -> List[str]: 27 """Load the list of processed notification URIs.""" 28 if self.processed_file.exists(): 29 try: 30 with open(self.processed_file, 'r') as f: 31 return json.load(f) 32 except (json.JSONDecodeError, FileNotFoundError): 33 return [] 34 return [] 35 36 def _save_processed_notifications(self): 37 """Save the list of processed notification URIs.""" 38 with open(self.processed_file, 'w') as f: 39 json.dump(self.processed_notifications, f, indent=2) 40 41 def _get_queue_files(self) -> List[Path]: 42 """Get all JSON files in the queue directory, excluding deleted/.""" 43 return [f for f in self.queue_dir.glob("*.json") if f.name != "processed_notifications.json"] 44 45 def _load_notification(self, file_path: Path) -> Dict[str, Any]: 46 """Load a notification from a JSON file.""" 47 try: 48 with open(file_path, 'r') as f: 49 return json.load(f) 50 except (json.JSONDecodeError, FileNotFoundError) as e: 51 return {"error": f"Failed to load {file_path}: {e}"} 52 53 def _format_notification_summary(self, notification: Dict[str, Any], file_path: Path) -> str: 54 """Create a short summary of the notification for display.""" 55 if "error" in notification: 56 return f"[red]ERROR: {notification['error']}[/red]" 57 58 reason = notification.get("reason", "unknown") 59 author = notification.get("author", {}) 60 handle = author.get("handle", "unknown") 61 display_name = author.get("display_name", "") 62 record = notification.get("record", {}) 63 text = record.get("text", "") 64 65 # Truncate text if too long 66 if len(text) > 100: 67 text = text[:97] + "..." 68 69 summary = f"[cyan]{reason}[/cyan] from [green]{handle}[/green]" 70 if display_name: 71 summary += f" ([yellow]{display_name}[/yellow])" 72 73 if text: 74 summary += f"\n [dim]{text}[/dim]" 75 76 summary += f"\n [magenta]{file_path.name}[/magenta]" 77 78 return summary 79 80 def browse_queue(self, page_size: int = 10): 81 """Interactive queue browser with paging and deletion.""" 82 files = self._get_queue_files() 83 if not files: 84 self.console.print("[yellow]No files in queue.[/yellow]") 85 return 86 87 # Sort files by modification time (newest first) 88 files.sort(key=lambda f: f.stat().st_mtime, reverse=True) 89 90 current_page = 0 91 total_pages = (len(files) + page_size - 1) // page_size 92 marked_for_deletion = set() 93 94 while True: 95 # Clear screen 96 self.console.clear() 97 98 # Calculate current page bounds 99 start_idx = current_page * page_size 100 end_idx = min(start_idx + page_size, len(files)) 101 current_files = files[start_idx:end_idx] 102 103 # Create table 104 table = Table(title=f"Queue Browser - Page {current_page + 1}/{total_pages}") 105 table.add_column("Index", justify="center", style="cyan") 106 table.add_column("Status", justify="center", style="magenta") 107 table.add_column("Notification", style="white") 108 109 # Add rows for current page 110 for i, file_path in enumerate(current_files): 111 global_index = start_idx + i 112 notification = self._load_notification(file_path) 113 summary = self._format_notification_summary(notification, file_path) 114 115 status = "[red]DELETE[/red]" if file_path in marked_for_deletion else "[green]KEEP[/green]" 116 117 table.add_row(str(global_index), status, summary) 118 119 self.console.print(table) 120 121 # Show statistics 122 stats_text = f"Total files: {len(files)} | Marked for deletion: {len(marked_for_deletion)}" 123 self.console.print(Panel(stats_text, title="Statistics")) 124 125 # Show help 126 help_text = """ 127Commands: 128 [cyan]n[/cyan] - Next page [cyan]p[/cyan] - Previous page [cyan]q[/cyan] - Quit 129 [cyan]d <idx>[/cyan] - Toggle delete flag for item at index 130 [cyan]v <idx>[/cyan] - View full notification at index 131 [cyan]execute[/cyan] - Execute deletions and quit 132 [cyan]clear[/cyan] - Clear all delete flags 133 """ 134 self.console.print(Panel(help_text.strip(), title="Help")) 135 136 # Get user input 137 try: 138 command = input("\nEnter command: ").strip().lower() 139 140 if command == 'q': 141 break 142 elif command == 'n': 143 if current_page < total_pages - 1: 144 current_page += 1 145 else: 146 self.console.print("[yellow]Already on last page.[/yellow]") 147 input("Press Enter to continue...") 148 elif command == 'p': 149 if current_page > 0: 150 current_page -= 1 151 else: 152 self.console.print("[yellow]Already on first page.[/yellow]") 153 input("Press Enter to continue...") 154 elif command.startswith('d '): 155 try: 156 idx = int(command.split()[1]) 157 if 0 <= idx < len(files): 158 file_path = files[idx] 159 if file_path in marked_for_deletion: 160 marked_for_deletion.remove(file_path) 161 else: 162 marked_for_deletion.add(file_path) 163 else: 164 self.console.print(f"[red]Invalid index: {idx}[/red]") 165 input("Press Enter to continue...") 166 except (ValueError, IndexError): 167 self.console.print("[red]Invalid command format. Use: d <index>[/red]") 168 input("Press Enter to continue...") 169 elif command.startswith('v '): 170 try: 171 idx = int(command.split()[1]) 172 if 0 <= idx < len(files): 173 self._view_notification(files[idx]) 174 else: 175 self.console.print(f"[red]Invalid index: {idx}[/red]") 176 input("Press Enter to continue...") 177 except (ValueError, IndexError): 178 self.console.print("[red]Invalid command format. Use: v <index>[/red]") 179 input("Press Enter to continue...") 180 elif command == 'execute': 181 if marked_for_deletion: 182 self._execute_deletions(marked_for_deletion) 183 else: 184 self.console.print("[yellow]No files marked for deletion.[/yellow]") 185 input("Press Enter to continue...") 186 break 187 elif command == 'clear': 188 marked_for_deletion.clear() 189 self.console.print("[green]All delete flags cleared.[/green]") 190 input("Press Enter to continue...") 191 else: 192 self.console.print("[red]Unknown command.[/red]") 193 input("Press Enter to continue...") 194 195 except KeyboardInterrupt: 196 break 197 198 def _view_notification(self, file_path: Path): 199 """Display full notification content.""" 200 self.console.clear() 201 notification = self._load_notification(file_path) 202 203 # Display as formatted JSON 204 self.console.print(Panel( 205 json.dumps(notification, indent=2), 206 title=f"Notification: {file_path.name}", 207 expand=False 208 )) 209 210 input("\nPress Enter to continue...") 211 212 def _execute_deletions(self, marked_files: set): 213 """Move marked files to deleted/ directory and update processed_notifications.json.""" 214 self.console.print(f"\n[yellow]Moving {len(marked_files)} files to deleted/ directory...[/yellow]") 215 216 moved_count = 0 217 added_to_processed = [] 218 219 for file_path in marked_files: 220 try: 221 # Load notification to get URI 222 notification = self._load_notification(file_path) 223 if "uri" in notification: 224 uri = notification["uri"] 225 if uri not in self.processed_notifications: 226 self.processed_notifications.append(uri) 227 added_to_processed.append(uri) 228 229 # Move file to deleted directory 230 deleted_path = self.deleted_dir / file_path.name 231 shutil.move(str(file_path), str(deleted_path)) 232 moved_count += 1 233 234 self.console.print(f"[green]✓[/green] Moved {file_path.name}") 235 236 except Exception as e: 237 self.console.print(f"[red]✗[/red] Failed to move {file_path.name}: {e}") 238 239 # Save updated processed notifications 240 if added_to_processed: 241 self._save_processed_notifications() 242 self.console.print(f"\n[green]Added {len(added_to_processed)} URIs to processed_notifications.json[/green]") 243 244 self.console.print(f"\n[green]Successfully moved {moved_count} files to deleted/ directory.[/green]") 245 input("Press Enter to continue...") 246 247 248def main(): 249 """Main entry point for the queue manager.""" 250 import argparse 251 252 parser = argparse.ArgumentParser(description="Interactive queue management tool") 253 parser.add_argument("--queue-dir", default="queue", help="Queue directory path") 254 parser.add_argument("--page-size", type=int, default=10, help="Number of items per page") 255 256 args = parser.parse_args() 257 258 if not os.path.exists(args.queue_dir): 259 print(f"Error: Queue directory '{args.queue_dir}' does not exist.") 260 return 1 261 262 manager = QueueManager(args.queue_dir) 263 manager.browse_queue(args.page_size) 264 265 return 0 266 267 268if __name__ == "__main__": 269 exit(main())