forked from
cameron.stream/void
this repo has no description
1#!/usr/bin/env python3
2
3import json
4import os
5import shutil
6from pathlib import Path
7from typing import List, Dict, Any
8from rich.console import Console
9from rich.table import Table
10from rich.panel import Panel
11from rich.text import Text
12
13class QueueManager:
14 def __init__(self, queue_dir: str = "queue"):
15 self.queue_dir = Path(queue_dir)
16 self.deleted_dir = self.queue_dir / "deleted"
17 self.processed_file = self.queue_dir / "processed_notifications.json"
18 self.console = Console()
19
20 # Create deleted directory if it doesn't exist
21 self.deleted_dir.mkdir(exist_ok=True)
22
23 # Load existing processed notifications
24 self.processed_notifications = self._load_processed_notifications()
25
26 def _load_processed_notifications(self) -> List[str]:
27 """Load the list of processed notification URIs."""
28 if self.processed_file.exists():
29 try:
30 with open(self.processed_file, 'r') as f:
31 return json.load(f)
32 except (json.JSONDecodeError, FileNotFoundError):
33 return []
34 return []
35
36 def _save_processed_notifications(self):
37 """Save the list of processed notification URIs."""
38 with open(self.processed_file, 'w') as f:
39 json.dump(self.processed_notifications, f, indent=2)
40
41 def _get_queue_files(self) -> List[Path]:
42 """Get all JSON files in the queue directory, excluding deleted/."""
43 return [f for f in self.queue_dir.glob("*.json") if f.name != "processed_notifications.json"]
44
45 def _load_notification(self, file_path: Path) -> Dict[str, Any]:
46 """Load a notification from a JSON file."""
47 try:
48 with open(file_path, 'r') as f:
49 return json.load(f)
50 except (json.JSONDecodeError, FileNotFoundError) as e:
51 return {"error": f"Failed to load {file_path}: {e}"}
52
53 def _format_notification_summary(self, notification: Dict[str, Any], file_path: Path) -> str:
54 """Create a short summary of the notification for display."""
55 if "error" in notification:
56 return f"[red]ERROR: {notification['error']}[/red]"
57
58 reason = notification.get("reason", "unknown")
59 author = notification.get("author", {})
60 handle = author.get("handle", "unknown")
61 display_name = author.get("display_name", "")
62 record = notification.get("record", {})
63 text = record.get("text", "")
64
65 # Truncate text if too long
66 if len(text) > 100:
67 text = text[:97] + "..."
68
69 summary = f"[cyan]{reason}[/cyan] from [green]{handle}[/green]"
70 if display_name:
71 summary += f" ([yellow]{display_name}[/yellow])"
72
73 if text:
74 summary += f"\n [dim]{text}[/dim]"
75
76 summary += f"\n [magenta]{file_path.name}[/magenta]"
77
78 return summary
79
80 def browse_queue(self, page_size: int = 10):
81 """Interactive queue browser with paging and deletion."""
82 files = self._get_queue_files()
83 if not files:
84 self.console.print("[yellow]No files in queue.[/yellow]")
85 return
86
87 # Sort files by modification time (newest first)
88 files.sort(key=lambda f: f.stat().st_mtime, reverse=True)
89
90 current_page = 0
91 total_pages = (len(files) + page_size - 1) // page_size
92 marked_for_deletion = set()
93
94 while True:
95 # Clear screen
96 self.console.clear()
97
98 # Calculate current page bounds
99 start_idx = current_page * page_size
100 end_idx = min(start_idx + page_size, len(files))
101 current_files = files[start_idx:end_idx]
102
103 # Create table
104 table = Table(title=f"Queue Browser - Page {current_page + 1}/{total_pages}")
105 table.add_column("Index", justify="center", style="cyan")
106 table.add_column("Status", justify="center", style="magenta")
107 table.add_column("Notification", style="white")
108
109 # Add rows for current page
110 for i, file_path in enumerate(current_files):
111 global_index = start_idx + i
112 notification = self._load_notification(file_path)
113 summary = self._format_notification_summary(notification, file_path)
114
115 status = "[red]DELETE[/red]" if file_path in marked_for_deletion else "[green]KEEP[/green]"
116
117 table.add_row(str(global_index), status, summary)
118
119 self.console.print(table)
120
121 # Show statistics
122 stats_text = f"Total files: {len(files)} | Marked for deletion: {len(marked_for_deletion)}"
123 self.console.print(Panel(stats_text, title="Statistics"))
124
125 # Show help
126 help_text = """
127Commands:
128 [cyan]n[/cyan] - Next page [cyan]p[/cyan] - Previous page [cyan]q[/cyan] - Quit
129 [cyan]d <idx>[/cyan] - Toggle delete flag for item at index
130 [cyan]v <idx>[/cyan] - View full notification at index
131 [cyan]execute[/cyan] - Execute deletions and quit
132 [cyan]clear[/cyan] - Clear all delete flags
133 """
134 self.console.print(Panel(help_text.strip(), title="Help"))
135
136 # Get user input
137 try:
138 command = input("\nEnter command: ").strip().lower()
139
140 if command == 'q':
141 break
142 elif command == 'n':
143 if current_page < total_pages - 1:
144 current_page += 1
145 else:
146 self.console.print("[yellow]Already on last page.[/yellow]")
147 input("Press Enter to continue...")
148 elif command == 'p':
149 if current_page > 0:
150 current_page -= 1
151 else:
152 self.console.print("[yellow]Already on first page.[/yellow]")
153 input("Press Enter to continue...")
154 elif command.startswith('d '):
155 try:
156 idx = int(command.split()[1])
157 if 0 <= idx < len(files):
158 file_path = files[idx]
159 if file_path in marked_for_deletion:
160 marked_for_deletion.remove(file_path)
161 else:
162 marked_for_deletion.add(file_path)
163 else:
164 self.console.print(f"[red]Invalid index: {idx}[/red]")
165 input("Press Enter to continue...")
166 except (ValueError, IndexError):
167 self.console.print("[red]Invalid command format. Use: d <index>[/red]")
168 input("Press Enter to continue...")
169 elif command.startswith('v '):
170 try:
171 idx = int(command.split()[1])
172 if 0 <= idx < len(files):
173 self._view_notification(files[idx])
174 else:
175 self.console.print(f"[red]Invalid index: {idx}[/red]")
176 input("Press Enter to continue...")
177 except (ValueError, IndexError):
178 self.console.print("[red]Invalid command format. Use: v <index>[/red]")
179 input("Press Enter to continue...")
180 elif command == 'execute':
181 if marked_for_deletion:
182 self._execute_deletions(marked_for_deletion)
183 else:
184 self.console.print("[yellow]No files marked for deletion.[/yellow]")
185 input("Press Enter to continue...")
186 break
187 elif command == 'clear':
188 marked_for_deletion.clear()
189 self.console.print("[green]All delete flags cleared.[/green]")
190 input("Press Enter to continue...")
191 else:
192 self.console.print("[red]Unknown command.[/red]")
193 input("Press Enter to continue...")
194
195 except KeyboardInterrupt:
196 break
197
198 def _view_notification(self, file_path: Path):
199 """Display full notification content."""
200 self.console.clear()
201 notification = self._load_notification(file_path)
202
203 # Display as formatted JSON
204 self.console.print(Panel(
205 json.dumps(notification, indent=2),
206 title=f"Notification: {file_path.name}",
207 expand=False
208 ))
209
210 input("\nPress Enter to continue...")
211
212 def _execute_deletions(self, marked_files: set):
213 """Move marked files to deleted/ directory and update processed_notifications.json."""
214 self.console.print(f"\n[yellow]Moving {len(marked_files)} files to deleted/ directory...[/yellow]")
215
216 moved_count = 0
217 added_to_processed = []
218
219 for file_path in marked_files:
220 try:
221 # Load notification to get URI
222 notification = self._load_notification(file_path)
223 if "uri" in notification:
224 uri = notification["uri"]
225 if uri not in self.processed_notifications:
226 self.processed_notifications.append(uri)
227 added_to_processed.append(uri)
228
229 # Move file to deleted directory
230 deleted_path = self.deleted_dir / file_path.name
231 shutil.move(str(file_path), str(deleted_path))
232 moved_count += 1
233
234 self.console.print(f"[green]✓[/green] Moved {file_path.name}")
235
236 except Exception as e:
237 self.console.print(f"[red]✗[/red] Failed to move {file_path.name}: {e}")
238
239 # Save updated processed notifications
240 if added_to_processed:
241 self._save_processed_notifications()
242 self.console.print(f"\n[green]Added {len(added_to_processed)} URIs to processed_notifications.json[/green]")
243
244 self.console.print(f"\n[green]Successfully moved {moved_count} files to deleted/ directory.[/green]")
245 input("Press Enter to continue...")
246
247
248def main():
249 """Main entry point for the queue manager."""
250 import argparse
251
252 parser = argparse.ArgumentParser(description="Interactive queue management tool")
253 parser.add_argument("--queue-dir", default="queue", help="Queue directory path")
254 parser.add_argument("--page-size", type=int, default=10, help="Number of items per page")
255
256 args = parser.parse_args()
257
258 if not os.path.exists(args.queue_dir):
259 print(f"Error: Queue directory '{args.queue_dir}' does not exist.")
260 return 1
261
262 manager = QueueManager(args.queue_dir)
263 manager.browse_queue(args.page_size)
264
265 return 0
266
267
268if __name__ == "__main__":
269 exit(main())