Cameron's void repo torn apart for local testing
at main 312 lines 12 kB view raw
1#!/usr/bin/env python3 2"""Aggregate structured event logs (events-*.jsonl) & legacy replay heuristics. 3 4Features: 5 - Event counts & tool usage (from tool_invocation response-phase events) 6 - Context budget stats (avg initial/final, pressure levels, summaries, evictions) 7 - Warnings (context_budget_warning) 8 - Legacy replay heuristics: tool_calls directory scan, recent structured failures, Ollama request count 9 10Usage: 11 ./scripts/log_report.py [--dir replay_logs] [--days 1] [--json] [--include pattern] [--event name] 12 13Exit codes: 14 0 success (even if no events found unless --strict specified in future) 15""" 16from __future__ import annotations 17 18import argparse 19import json 20import re 21from collections import Counter 22from datetime import datetime, timedelta 23from pathlib import Path 24from typing import Any, Dict, Iterable, Optional 25 26ROOT = Path(__file__).resolve().parent.parent 27DEFAULT_DIR = ROOT / "replay_logs" 28 29 30# ------------------------- 31# Legacy helpers (retain) 32# ------------------------- 33def load_json(path: Path): 34 try: 35 return json.loads(path.read_text()) 36 except Exception: 37 return None 38 39 40def summarize_tool_calls(replay_dir: Path): 41 tool_dir = replay_dir / "tool_calls" 42 if not tool_dir.exists(): 43 return {"total_files": 0, "tools": {}, "phases": {}} 44 tool_counter = Counter() 45 phase_counter = Counter() 46 for p in tool_dir.glob("request_*_*.json"): 47 data = load_json(p) 48 if not data: 49 continue 50 m = re.match(r"request_(?P<kind>[a-zA-Z0-9]+)_", p.name) 51 if m: 52 kind = m.group("kind") 53 if kind in {"webpage", "memory_search"} or kind.startswith("stage"): 54 tool_counter[kind] += 1 55 if "stage" in p.name: 56 phase_counter[p.name.split("_", 1)[0]] += 1 57 return { 58 "total_files": sum(tool_counter.values()), 59 "tools": dict(tool_counter.most_common()), 60 "phases": dict(phase_counter.most_common()), 61 } 62 63 64def summarize_ollama(replay_dir: Path): 65 o_dir = replay_dir / "ollama_requests" 66 if not o_dir.exists(): 67 return {"requests": 0} 68 return {"requests": len(list(o_dir.glob("request_*.json")))} 69 70 71def recent_failures(replay_dir: Path, limit: int = 5): 72 fails = sorted(replay_dir.glob("structured_failure__*.json"), key=lambda p: p.stat().st_mtime, reverse=True) 73 return [p.name for p in fails[:limit]] 74 75 76# ------------------------- 77# Event aggregation 78# ------------------------- 79def iter_event_files(base: Path, days: Optional[int]) -> Iterable[Path]: 80 if not base.exists(): 81 return [] 82 files = sorted(p for p in base.glob("events-*.jsonl") if p.is_file()) 83 if days is None: 84 return files 85 cutoff = datetime.utcnow().date() - timedelta(days=days - 1) 86 selected = [] 87 for f in files: 88 try: 89 stamp = f.name[len("events-") : -len(".jsonl")] 90 dt = datetime.strptime(stamp, "%Y-%m-%d").date() 91 if dt >= cutoff: 92 selected.append(f) 93 except Exception: 94 continue 95 return selected 96 97 98def read_events(paths: Iterable[Path]) -> Iterable[Dict[str, Any]]: 99 for p in paths: 100 try: 101 with p.open("r", encoding="utf-8") as fh: 102 for line in fh: 103 line = line.strip() 104 if not line: 105 continue 106 try: 107 yield json.loads(line) 108 except Exception: 109 yield {"event": "_malformed", "raw": line, "file": str(p)} 110 except FileNotFoundError: 111 continue 112 except Exception as e: 113 yield {"event": "_read_error", "file": str(p), "error": str(e)} 114 115 116def aggregate_events(events: Iterable[Dict[str, Any]], include_pattern: Optional[str], limit_event: Optional[str]) -> Dict[str, Any]: 117 counts = Counter() 118 tool_usage = Counter() 119 warnings = Counter() 120 ctx_budget = { 121 "events": 0, 122 "initial_total_sum": 0, 123 "final_total_sum": 0, 124 "pressure_levels": Counter(), 125 "summaries": 0, 126 "evictions": 0, 127 } 128 memory_metrics = { 129 "writes": 0, # total memory_write events 130 "suppressed_total": 0, # sum of suppressed lines 131 "added_total": 0, # sum of added lines 132 "truncated_flagged": 0, # memory_write events where truncated true 133 "truncations": 0, # memory_truncation events 134 "trunc_removed_lines": 0, # sum removed_lines across truncation events 135 "trunc_prev_chars": 0, # sum previous_chars 136 "trunc_final_chars": 0, # sum final_chars 137 } 138 malformed = 0 139 for ev in events: 140 etype = ev.get("event") 141 if not etype: 142 malformed += 1 143 continue 144 if include_pattern and include_pattern not in etype: 145 continue 146 if limit_event and etype != limit_event: 147 continue 148 counts[etype] += 1 149 if etype == "tool_invocation": 150 phase = ev.get("phase") 151 if phase == "response": 152 tool = ev.get("tool") or ev.get("tool_name") 153 if tool: 154 tool_usage[tool] += 1 155 elif etype == "context_budget": 156 ctx_budget["events"] += 1 157 ctx_budget["initial_total_sum"] += int(ev.get("initial_total") or 0) 158 ctx_budget["final_total_sum"] += int(ev.get("final_total") or 0) 159 pl = ev.get("pressure_level") or "none" 160 ctx_budget["pressure_levels"][pl] += 1 161 for p in (ev.get("passes") or []): 162 mode = p.get("mode") 163 if mode == "summarize": 164 ctx_budget["summaries"] += 1 165 elif mode == "evict": 166 ctx_budget["evictions"] += 1 167 elif etype == "context_budget_warning": 168 warnings[ev.get("pressure") or "unknown"] += 1 169 elif etype in {"_malformed", "_read_error"}: 170 malformed += 1 171 elif etype == "memory_write": 172 memory_metrics["writes"] += 1 173 memory_metrics["suppressed_total"] += int(ev.get("suppressed") or 0) 174 memory_metrics["added_total"] += int(ev.get("added_lines") or 0) 175 if ev.get("truncated"): 176 memory_metrics["truncated_flagged"] += 1 177 elif etype == "memory_truncation": 178 memory_metrics["truncations"] += 1 179 memory_metrics["trunc_removed_lines"] += int(ev.get("removed_lines") or 0) 180 memory_metrics["trunc_prev_chars"] += int(ev.get("previous_chars") or 0) 181 memory_metrics["trunc_final_chars"] += int(ev.get("final_chars") or 0) 182 if ctx_budget["events"]: 183 ctx_budget["avg_initial_total"] = ctx_budget["initial_total_sum"] / ctx_budget["events"] 184 ctx_budget["avg_final_total"] = ctx_budget["final_total_sum"] / ctx_budget["events"] 185 else: 186 ctx_budget["avg_initial_total"] = 0 187 ctx_budget["avg_final_total"] = 0 188 189 # Derived memory ratios 190 if memory_metrics["writes"]: 191 memory_metrics["suppression_rate"] = ( 192 memory_metrics["suppressed_total"] / max(1, memory_metrics["suppressed_total"] + memory_metrics["added_total"]) 193 ) 194 memory_metrics["avg_added_per_write"] = memory_metrics["added_total"] / memory_metrics["writes"] 195 else: 196 memory_metrics["suppression_rate"] = 0.0 197 memory_metrics["avg_added_per_write"] = 0.0 198 if memory_metrics["truncations"]: 199 memory_metrics["avg_removed_lines_per_trunc"] = memory_metrics["trunc_removed_lines"] / memory_metrics["truncations"] 200 else: 201 memory_metrics["avg_removed_lines_per_trunc"] = 0.0 202 return { 203 "event_counts": counts, 204 "tool_usage": tool_usage, 205 "context_budget": ctx_budget, 206 "warnings": warnings, 207 "malformed_or_errors": malformed, 208 "memory": memory_metrics, 209 "filtered_total": sum(counts.values()), 210 } 211 212 213def format_events_human(report: Dict[str, Any]) -> str: 214 lines = [] 215 ec = report["event_counts"] 216 if ec: 217 lines.append("=== Event Counts ===") 218 for k, v in ec.most_common(): 219 lines.append(f"{k:25} {v}") 220 tc = report["tool_usage"] 221 if tc: 222 lines.append("\n=== Tool Usage (response-phase) ===") 223 for k, v in tc.most_common(): 224 lines.append(f"{k:25} {v}") 225 cb = report["context_budget"] 226 if cb.get("events"): 227 lines.append("\n=== Context Budget ===") 228 lines.append(f"events: {cb['events']} avg_initial_total: {cb['avg_initial_total']:.2f} avg_final_total: {cb['avg_final_total']:.2f}") 229 lines.append(f"summaries: {cb['summaries']} evictions: {cb['evictions']}") 230 if cb["pressure_levels"]: 231 levels = ", ".join(f"{k}={v}" for k, v in cb["pressure_levels"].most_common()) 232 lines.append(f"pressure_levels: {levels}") 233 mem = report.get("memory", {}) 234 if mem.get("writes") or mem.get("truncations"): 235 lines.append("\n=== Memory Journal Metrics ===") 236 lines.append(f"writes: {mem.get('writes',0)} added_total: {mem.get('added_total',0)} suppressed_total: {mem.get('suppressed_total',0)} suppression_rate: {mem.get('suppression_rate',0):.2f}") 237 lines.append(f"truncated_flagged_in_write: {mem.get('truncated_flagged',0)} truncations: {mem.get('truncations',0)} avg_removed_lines_per_trunc: {mem.get('avg_removed_lines_per_trunc',0):.2f}") 238 warn = report["warnings"] 239 if warn: 240 lines.append("\n=== Warnings ===") 241 for k, v in warn.most_common(): 242 lines.append(f"{k:25} {v}") 243 if report["malformed_or_errors"]: 244 lines.append(f"\nMalformed/Error lines: {report['malformed_or_errors']}") 245 return "\n".join(lines) 246 247 248# ------------------------- 249# Orchestration 250# ------------------------- 251def build_report(base_dir: Path, days: Optional[int], include_pattern: Optional[str], event_name: Optional[str]): 252 event_files = list(iter_event_files(base_dir, days)) 253 events_report = None 254 if event_files: 255 events_report = aggregate_events(read_events(event_files), include_pattern, event_name) 256 legacy = { 257 "tool_calls": summarize_tool_calls(base_dir), 258 "ollama": summarize_ollama(base_dir), 259 "recent_failures": recent_failures(base_dir), 260 } 261 return events_report, legacy, event_files 262 263 264def main(): 265 parser = argparse.ArgumentParser(description="Aggregate structured events & legacy replay stats") 266 parser.add_argument("--dir", default=str(DEFAULT_DIR), help="Base directory containing events-*.jsonl (default: replay_logs)") 267 parser.add_argument("--days", type=int, default=None, help="Limit to last N days of event files") 268 parser.add_argument("--include", help="Substring filter on event type") 269 parser.add_argument("--event", help="Exact event type filter") 270 parser.add_argument("--json", action="store_true", help="Emit machine-readable JSON output") 271 args = parser.parse_args() 272 273 base = Path(args.dir) 274 events_report, legacy_report, event_files = build_report(base, args.days, args.include, args.event) 275 276 if args.json: 277 out = { 278 "events": None, 279 "legacy": legacy_report, 280 "event_files": [f.name for f in event_files], 281 } 282 if events_report: 283 out["events"] = { 284 "event_counts": dict(events_report["event_counts"]), 285 "tool_usage": dict(events_report["tool_usage"]), 286 "context_budget": { 287 **{k: v for k, v in events_report["context_budget"].items() if k != "pressure_levels"}, 288 "pressure_levels": dict(events_report["context_budget"]["pressure_levels"]), 289 }, 290 "warnings": dict(events_report["warnings"]), 291 "malformed_or_errors": events_report["malformed_or_errors"], 292 "filtered_total": events_report["filtered_total"], 293 } 294 print(json.dumps(out, indent=2)) 295 return 296 297 # Human readable output 298 print("Log directory:", base) 299 if not base.exists(): 300 print("(directory missing)") 301 return 302 if events_report: 303 print(f"Found {len(event_files)} event file(s)\n") 304 print(format_events_human(events_report)) 305 else: 306 print("No events-*.jsonl files found (skip structured aggregation)\n") 307 print("=== Legacy Replay Heuristics ===") 308 print(json.dumps(legacy_report, indent=2)) 309 310 311if __name__ == "__main__": 312 main()