Cameron's void repo torn apart for local testing
at main 345 lines 13 kB view raw
1#!/usr/bin/env python3 2"""Queue management utilities for knbnnot bot.""" 3import argparse 4import json 5from pathlib import Path 6 7from rich.console import Console 8from rich.prompt import Confirm 9from rich.table import Table 10 11console = Console() 12 13# Queue directories 14QUEUE_DIR = Path("queue") 15QUEUE_ERROR_DIR = QUEUE_DIR / "errors" 16QUEUE_NO_REPLY_DIR = QUEUE_DIR / "no_reply" 17 18 19def load_notification(filepath: Path) -> dict: 20 """Load a notification from a JSON file.""" 21 try: 22 with open(filepath, 'r') as f: 23 return json.load(f) 24 except Exception as e: 25 console.print(f"[red]Error loading {filepath}: {e}[/red]") 26 return None 27 28 29def list_notifications(handle_filter: str = None, show_all: bool = False): 30 """List all notifications in the queue, optionally filtered by handle.""" 31 # Collect notifications from all directories if show_all is True 32 if show_all: 33 dirs_to_check = [QUEUE_DIR, QUEUE_ERROR_DIR, QUEUE_NO_REPLY_DIR] 34 else: 35 dirs_to_check = [QUEUE_DIR] 36 37 all_notifications = [] 38 39 for directory in dirs_to_check: 40 if not directory.exists(): 41 continue 42 43 # Get source directory name for display 44 if directory == QUEUE_DIR: 45 source = "queue" 46 elif directory == QUEUE_ERROR_DIR: 47 source = "errors" 48 elif directory == QUEUE_NO_REPLY_DIR: 49 source = "no_reply" 50 else: 51 source = "unknown" 52 53 for filepath in directory.glob("*.json"): 54 # Skip subdirectories 55 if filepath.is_dir(): 56 continue 57 58 notif = load_notification(filepath) 59 if notif and isinstance(notif, dict): 60 notif['_filepath'] = filepath 61 notif['_source'] = source 62 63 # Apply handle filter if specified 64 if handle_filter: 65 author_handle = notif.get('author', {}).get('handle', '') 66 if handle_filter.lower() not in author_handle.lower(): 67 continue 68 69 all_notifications.append(notif) 70 71 # Sort by indexed_at 72 all_notifications.sort(key=lambda x: x.get('indexed_at', ''), reverse=True) 73 74 # Display results 75 if not all_notifications: 76 if handle_filter: 77 console.print(f"[yellow]No notifications found for handle containing '{handle_filter}'[/yellow]") 78 else: 79 console.print("[yellow]No notifications found in queue[/yellow]") 80 return 81 82 table = Table(title=f"Queue Notifications ({len(all_notifications)} total)") 83 table.add_column("File", style="cyan", width=20) 84 table.add_column("Source", style="magenta", width=10) 85 table.add_column("Handle", style="green", width=25) 86 table.add_column("Display Name", width=25) 87 table.add_column("Text", width=40) 88 table.add_column("Time", style="dim", width=20) 89 90 for notif in all_notifications: 91 author = notif.get('author', {}) 92 handle = author.get('handle', 'unknown') 93 display_name = author.get('display_name', '') 94 text = notif.get('record', {}).get('text', '')[:40] 95 if len(notif.get('record', {}).get('text', '')) > 40: 96 text += "..." 97 indexed_at = notif.get('indexed_at', '')[:19] # Trim milliseconds 98 filename = notif['_filepath'].name[:20] 99 source = notif['_source'] 100 101 table.add_row(filename, source, f"@{handle}", display_name, text, indexed_at) 102 103 console.print(table) 104 return all_notifications 105 106 107def delete_by_handle(handle: str, dry_run: bool = False, force: bool = False): 108 """Delete all notifications from a specific handle.""" 109 # Remove @ if present 110 handle = handle.lstrip('@') 111 112 # Find all notifications from this handle 113 console.print(f"\\n[bold]Searching for notifications from @{handle}...[/bold]\\n") 114 115 to_delete = [] 116 dirs_to_check = [QUEUE_DIR, QUEUE_ERROR_DIR, QUEUE_NO_REPLY_DIR] 117 118 for directory in dirs_to_check: 119 if not directory.exists(): 120 continue 121 122 for filepath in directory.glob("*.json"): 123 if filepath.is_dir(): 124 continue 125 126 notif = load_notification(filepath) 127 if notif and isinstance(notif, dict): 128 author_handle = notif.get('author', {}).get('handle', '') 129 if author_handle.lower() == handle.lower(): 130 to_delete.append({ 131 'filepath': filepath, 132 'notif': notif, 133 'source': directory.name 134 }) 135 136 if not to_delete: 137 console.print(f"[yellow]No notifications found from @{handle}[/yellow]") 138 return 139 140 # Display what will be deleted 141 table = Table(title=f"Notifications to Delete from @{handle}") 142 table.add_column("File", style="cyan") 143 table.add_column("Location", style="magenta") 144 table.add_column("Text", width=50) 145 table.add_column("Time", style="dim") 146 147 for item in to_delete: 148 notif = item['notif'] 149 text = notif.get('record', {}).get('text', '')[:50] 150 if len(notif.get('record', {}).get('text', '')) > 50: 151 text += "..." 152 indexed_at = notif.get('indexed_at', '')[:19] 153 154 table.add_row( 155 item['filepath'].name, 156 item['source'], 157 text, 158 indexed_at 159 ) 160 161 console.print(table) 162 console.print(f"\\n[bold red]Found {len(to_delete)} notifications to delete[/bold red]") 163 164 if dry_run: 165 console.print("\\n[yellow]DRY RUN - No files were deleted[/yellow]") 166 return 167 168 # Confirm deletion 169 if not force and not Confirm.ask("\\nDo you want to delete these notifications?"): 170 console.print("[yellow]Deletion cancelled[/yellow]") 171 return 172 173 # Delete the files 174 deleted_count = 0 175 for item in to_delete: 176 try: 177 item['filepath'].unlink() 178 deleted_count += 1 179 console.print(f"[green]✓[/green] Deleted {item['filepath'].name}") 180 except Exception as e: 181 console.print(f"[red]✗[/red] Failed to delete {item['filepath'].name}: {e}") 182 183 console.print(f"\\n[bold green]Successfully deleted {deleted_count} notifications[/bold green]") 184 185 186def count_by_handle(): 187 """Show detailed count of notifications by handle.""" 188 handle_counts = {} 189 190 # Collect counts from all directories 191 for directory, location in [(QUEUE_DIR, 'queue'), (QUEUE_ERROR_DIR, 'errors'), (QUEUE_NO_REPLY_DIR, 'no_reply')]: 192 if not directory.exists(): 193 continue 194 195 for filepath in directory.glob("*.json"): 196 if filepath.is_dir(): 197 continue 198 199 notif = load_notification(filepath) 200 if notif and isinstance(notif, dict): 201 handle = notif.get('author', {}).get('handle', 'unknown') 202 203 if handle not in handle_counts: 204 handle_counts[handle] = {'queue': 0, 'errors': 0, 'no_reply': 0, 'total': 0} 205 206 handle_counts[handle][location] += 1 207 handle_counts[handle]['total'] += 1 208 209 if not handle_counts: 210 console.print("[yellow]No notifications found in any queue[/yellow]") 211 return 212 213 # Sort by total count 214 sorted_handles = sorted(handle_counts.items(), key=lambda x: x[1]['total'], reverse=True) 215 216 # Display results 217 table = Table(title=f"Notification Count by Handle ({len(handle_counts)} unique handles)") 218 table.add_column("Handle", style="green", width=30) 219 table.add_column("Queue", style="cyan", justify="right") 220 table.add_column("Errors", style="red", justify="right") 221 table.add_column("No Reply", style="yellow", justify="right") 222 table.add_column("Total", style="bold magenta", justify="right") 223 224 for handle, counts in sorted_handles: 225 table.add_row( 226 f"@{handle}", 227 str(counts['queue']) if counts['queue'] > 0 else "-", 228 str(counts['errors']) if counts['errors'] > 0 else "-", 229 str(counts['no_reply']) if counts['no_reply'] > 0 else "-", 230 str(counts['total']) 231 ) 232 233 console.print(table) 234 235 # Summary statistics 236 total_notifications = sum(h['total'] for h in handle_counts.values()) 237 avg_per_handle = total_notifications / len(handle_counts) 238 239 console.print("\n[bold]Summary:[/bold]") 240 console.print(f" Total notifications: {total_notifications}") 241 console.print(f" Unique handles: {len(handle_counts)}") 242 console.print(f" Average per handle: {avg_per_handle:.1f}") 243 244 # Top user info 245 if sorted_handles: 246 top_handle, top_counts = sorted_handles[0] 247 percentage = (top_counts['total'] / total_notifications) * 100 248 console.print(f" Most active: @{top_handle} ({top_counts['total']} notifications, {percentage:.1f}% of total)") 249 250 251def stats(): 252 """Show queue statistics.""" 253 stats_data = { 254 'queue': {'count': 0, 'handles': set()}, 255 'errors': {'count': 0, 'handles': set()}, 256 'no_reply': {'count': 0, 'handles': set()} 257 } 258 259 # Collect stats 260 for directory, key in [(QUEUE_DIR, 'queue'), (QUEUE_ERROR_DIR, 'errors'), (QUEUE_NO_REPLY_DIR, 'no_reply')]: 261 if not directory.exists(): 262 continue 263 264 for filepath in directory.glob("*.json"): 265 if filepath.is_dir(): 266 continue 267 268 notif = load_notification(filepath) 269 if notif and isinstance(notif, dict): 270 stats_data[key]['count'] += 1 271 handle = notif.get('author', {}).get('handle', 'unknown') 272 stats_data[key]['handles'].add(handle) 273 274 # Display stats 275 table = Table(title="Queue Statistics") 276 table.add_column("Location", style="cyan") 277 table.add_column("Count", style="yellow") 278 table.add_column("Unique Handles", style="green") 279 280 for key, label in [('queue', 'Active Queue'), ('errors', 'Errors'), ('no_reply', 'No Reply')]: 281 table.add_row( 282 label, 283 str(stats_data[key]['count']), 284 str(len(stats_data[key]['handles'])) 285 ) 286 287 console.print(table) 288 289 # Show top handles 290 all_handles = {} 291 for location_data in stats_data.values(): 292 for handle in location_data['handles']: 293 all_handles[handle] = all_handles.get(handle, 0) + 1 294 295 if all_handles: 296 sorted_handles = sorted(all_handles.items(), key=lambda x: x[1], reverse=True)[:10] 297 298 top_table = Table(title="Top 10 Handles by Notification Count") 299 top_table.add_column("Handle", style="green") 300 top_table.add_column("Count", style="yellow") 301 302 for handle, count in sorted_handles: 303 top_table.add_row(f"@{handle}", str(count)) 304 305 console.print("\\n") 306 console.print(top_table) 307 308 309def main(): 310 parser = argparse.ArgumentParser(description="Manage bot notification queue") 311 subparsers = parser.add_subparsers(dest='command', help='Commands') 312 313 # List command 314 list_parser = subparsers.add_parser('list', help='List notifications in queue') 315 list_parser.add_argument('--handle', help='Filter by handle (partial match)') 316 list_parser.add_argument('--all', action='store_true', help='Include errors and no_reply folders') 317 318 # Delete command 319 delete_parser = subparsers.add_parser('delete', help='Delete notifications from a specific handle') 320 delete_parser.add_argument('handle', help='Handle to delete notifications from') 321 delete_parser.add_argument('--dry-run', action='store_true', help='Show what would be deleted without deleting') 322 delete_parser.add_argument('--force', action='store_true', help='Skip confirmation prompt') 323 324 # Stats command 325 subparsers.add_parser('stats', help='Show queue statistics') 326 327 # Count command 328 subparsers.add_parser('count', help='Show detailed count by handle') 329 330 args = parser.parse_args() 331 332 if args.command == 'list': 333 list_notifications(args.handle, args.all) 334 elif args.command == 'delete': 335 delete_by_handle(args.handle, args.dry_run, args.force) 336 elif args.command == 'stats': 337 stats() 338 elif args.command == 'count': 339 count_by_handle() 340 else: 341 parser.print_help() 342 343 344if __name__ == "__main__": 345 main()