Cameron's void repo torn apart for local testing
1#!/usr/bin/env python3
2"""Aggregate structured event logs (events-*.jsonl) & legacy replay heuristics.
3
4Features:
5 - Event counts & tool usage (from tool_invocation response-phase events)
6 - Context budget stats (avg initial/final, pressure levels, summaries, evictions)
7 - Warnings (context_budget_warning)
8 - Legacy replay heuristics: tool_calls directory scan, recent structured failures, Ollama request count
9
10Usage:
11 ./scripts/log_report.py [--dir replay_logs] [--days 1] [--json] [--include pattern] [--event name]
12
13Exit codes:
14 0 success (even if no events found unless --strict specified in future)
15"""
16from __future__ import annotations
17
18import argparse
19import json
20import re
21from collections import Counter
22from datetime import datetime, timedelta
23from pathlib import Path
24from typing import Any, Dict, Iterable, Optional
25
26ROOT = Path(__file__).resolve().parent.parent
27DEFAULT_DIR = ROOT / "replay_logs"
28
29
30# -------------------------
31# Legacy helpers (retain)
32# -------------------------
33def load_json(path: Path):
34 try:
35 return json.loads(path.read_text())
36 except Exception:
37 return None
38
39
40def summarize_tool_calls(replay_dir: Path):
41 tool_dir = replay_dir / "tool_calls"
42 if not tool_dir.exists():
43 return {"total_files": 0, "tools": {}, "phases": {}}
44 tool_counter = Counter()
45 phase_counter = Counter()
46 for p in tool_dir.glob("request_*_*.json"):
47 data = load_json(p)
48 if not data:
49 continue
50 m = re.match(r"request_(?P<kind>[a-zA-Z0-9]+)_", p.name)
51 if m:
52 kind = m.group("kind")
53 if kind in {"webpage", "memory_search"} or kind.startswith("stage"):
54 tool_counter[kind] += 1
55 if "stage" in p.name:
56 phase_counter[p.name.split("_", 1)[0]] += 1
57 return {
58 "total_files": sum(tool_counter.values()),
59 "tools": dict(tool_counter.most_common()),
60 "phases": dict(phase_counter.most_common()),
61 }
62
63
64def summarize_ollama(replay_dir: Path):
65 o_dir = replay_dir / "ollama_requests"
66 if not o_dir.exists():
67 return {"requests": 0}
68 return {"requests": len(list(o_dir.glob("request_*.json")))}
69
70
71def recent_failures(replay_dir: Path, limit: int = 5):
72 fails = sorted(replay_dir.glob("structured_failure__*.json"), key=lambda p: p.stat().st_mtime, reverse=True)
73 return [p.name for p in fails[:limit]]
74
75
76# -------------------------
77# Event aggregation
78# -------------------------
79def iter_event_files(base: Path, days: Optional[int]) -> Iterable[Path]:
80 if not base.exists():
81 return []
82 files = sorted(p for p in base.glob("events-*.jsonl") if p.is_file())
83 if days is None:
84 return files
85 cutoff = datetime.utcnow().date() - timedelta(days=days - 1)
86 selected = []
87 for f in files:
88 try:
89 stamp = f.name[len("events-") : -len(".jsonl")]
90 dt = datetime.strptime(stamp, "%Y-%m-%d").date()
91 if dt >= cutoff:
92 selected.append(f)
93 except Exception:
94 continue
95 return selected
96
97
98def read_events(paths: Iterable[Path]) -> Iterable[Dict[str, Any]]:
99 for p in paths:
100 try:
101 with p.open("r", encoding="utf-8") as fh:
102 for line in fh:
103 line = line.strip()
104 if not line:
105 continue
106 try:
107 yield json.loads(line)
108 except Exception:
109 yield {"event": "_malformed", "raw": line, "file": str(p)}
110 except FileNotFoundError:
111 continue
112 except Exception as e:
113 yield {"event": "_read_error", "file": str(p), "error": str(e)}
114
115
116def aggregate_events(events: Iterable[Dict[str, Any]], include_pattern: Optional[str], limit_event: Optional[str]) -> Dict[str, Any]:
117 counts = Counter()
118 tool_usage = Counter()
119 warnings = Counter()
120 ctx_budget = {
121 "events": 0,
122 "initial_total_sum": 0,
123 "final_total_sum": 0,
124 "pressure_levels": Counter(),
125 "summaries": 0,
126 "evictions": 0,
127 }
128 memory_metrics = {
129 "writes": 0, # total memory_write events
130 "suppressed_total": 0, # sum of suppressed lines
131 "added_total": 0, # sum of added lines
132 "truncated_flagged": 0, # memory_write events where truncated true
133 "truncations": 0, # memory_truncation events
134 "trunc_removed_lines": 0, # sum removed_lines across truncation events
135 "trunc_prev_chars": 0, # sum previous_chars
136 "trunc_final_chars": 0, # sum final_chars
137 }
138 malformed = 0
139 for ev in events:
140 etype = ev.get("event")
141 if not etype:
142 malformed += 1
143 continue
144 if include_pattern and include_pattern not in etype:
145 continue
146 if limit_event and etype != limit_event:
147 continue
148 counts[etype] += 1
149 if etype == "tool_invocation":
150 phase = ev.get("phase")
151 if phase == "response":
152 tool = ev.get("tool") or ev.get("tool_name")
153 if tool:
154 tool_usage[tool] += 1
155 elif etype == "context_budget":
156 ctx_budget["events"] += 1
157 ctx_budget["initial_total_sum"] += int(ev.get("initial_total") or 0)
158 ctx_budget["final_total_sum"] += int(ev.get("final_total") or 0)
159 pl = ev.get("pressure_level") or "none"
160 ctx_budget["pressure_levels"][pl] += 1
161 for p in (ev.get("passes") or []):
162 mode = p.get("mode")
163 if mode == "summarize":
164 ctx_budget["summaries"] += 1
165 elif mode == "evict":
166 ctx_budget["evictions"] += 1
167 elif etype == "context_budget_warning":
168 warnings[ev.get("pressure") or "unknown"] += 1
169 elif etype in {"_malformed", "_read_error"}:
170 malformed += 1
171 elif etype == "memory_write":
172 memory_metrics["writes"] += 1
173 memory_metrics["suppressed_total"] += int(ev.get("suppressed") or 0)
174 memory_metrics["added_total"] += int(ev.get("added_lines") or 0)
175 if ev.get("truncated"):
176 memory_metrics["truncated_flagged"] += 1
177 elif etype == "memory_truncation":
178 memory_metrics["truncations"] += 1
179 memory_metrics["trunc_removed_lines"] += int(ev.get("removed_lines") or 0)
180 memory_metrics["trunc_prev_chars"] += int(ev.get("previous_chars") or 0)
181 memory_metrics["trunc_final_chars"] += int(ev.get("final_chars") or 0)
182 if ctx_budget["events"]:
183 ctx_budget["avg_initial_total"] = ctx_budget["initial_total_sum"] / ctx_budget["events"]
184 ctx_budget["avg_final_total"] = ctx_budget["final_total_sum"] / ctx_budget["events"]
185 else:
186 ctx_budget["avg_initial_total"] = 0
187 ctx_budget["avg_final_total"] = 0
188
189 # Derived memory ratios
190 if memory_metrics["writes"]:
191 memory_metrics["suppression_rate"] = (
192 memory_metrics["suppressed_total"] / max(1, memory_metrics["suppressed_total"] + memory_metrics["added_total"])
193 )
194 memory_metrics["avg_added_per_write"] = memory_metrics["added_total"] / memory_metrics["writes"]
195 else:
196 memory_metrics["suppression_rate"] = 0.0
197 memory_metrics["avg_added_per_write"] = 0.0
198 if memory_metrics["truncations"]:
199 memory_metrics["avg_removed_lines_per_trunc"] = memory_metrics["trunc_removed_lines"] / memory_metrics["truncations"]
200 else:
201 memory_metrics["avg_removed_lines_per_trunc"] = 0.0
202 return {
203 "event_counts": counts,
204 "tool_usage": tool_usage,
205 "context_budget": ctx_budget,
206 "warnings": warnings,
207 "malformed_or_errors": malformed,
208 "memory": memory_metrics,
209 "filtered_total": sum(counts.values()),
210 }
211
212
213def format_events_human(report: Dict[str, Any]) -> str:
214 lines = []
215 ec = report["event_counts"]
216 if ec:
217 lines.append("=== Event Counts ===")
218 for k, v in ec.most_common():
219 lines.append(f"{k:25} {v}")
220 tc = report["tool_usage"]
221 if tc:
222 lines.append("\n=== Tool Usage (response-phase) ===")
223 for k, v in tc.most_common():
224 lines.append(f"{k:25} {v}")
225 cb = report["context_budget"]
226 if cb.get("events"):
227 lines.append("\n=== Context Budget ===")
228 lines.append(f"events: {cb['events']} avg_initial_total: {cb['avg_initial_total']:.2f} avg_final_total: {cb['avg_final_total']:.2f}")
229 lines.append(f"summaries: {cb['summaries']} evictions: {cb['evictions']}")
230 if cb["pressure_levels"]:
231 levels = ", ".join(f"{k}={v}" for k, v in cb["pressure_levels"].most_common())
232 lines.append(f"pressure_levels: {levels}")
233 mem = report.get("memory", {})
234 if mem.get("writes") or mem.get("truncations"):
235 lines.append("\n=== Memory Journal Metrics ===")
236 lines.append(f"writes: {mem.get('writes',0)} added_total: {mem.get('added_total',0)} suppressed_total: {mem.get('suppressed_total',0)} suppression_rate: {mem.get('suppression_rate',0):.2f}")
237 lines.append(f"truncated_flagged_in_write: {mem.get('truncated_flagged',0)} truncations: {mem.get('truncations',0)} avg_removed_lines_per_trunc: {mem.get('avg_removed_lines_per_trunc',0):.2f}")
238 warn = report["warnings"]
239 if warn:
240 lines.append("\n=== Warnings ===")
241 for k, v in warn.most_common():
242 lines.append(f"{k:25} {v}")
243 if report["malformed_or_errors"]:
244 lines.append(f"\nMalformed/Error lines: {report['malformed_or_errors']}")
245 return "\n".join(lines)
246
247
248# -------------------------
249# Orchestration
250# -------------------------
251def build_report(base_dir: Path, days: Optional[int], include_pattern: Optional[str], event_name: Optional[str]):
252 event_files = list(iter_event_files(base_dir, days))
253 events_report = None
254 if event_files:
255 events_report = aggregate_events(read_events(event_files), include_pattern, event_name)
256 legacy = {
257 "tool_calls": summarize_tool_calls(base_dir),
258 "ollama": summarize_ollama(base_dir),
259 "recent_failures": recent_failures(base_dir),
260 }
261 return events_report, legacy, event_files
262
263
264def main():
265 parser = argparse.ArgumentParser(description="Aggregate structured events & legacy replay stats")
266 parser.add_argument("--dir", default=str(DEFAULT_DIR), help="Base directory containing events-*.jsonl (default: replay_logs)")
267 parser.add_argument("--days", type=int, default=None, help="Limit to last N days of event files")
268 parser.add_argument("--include", help="Substring filter on event type")
269 parser.add_argument("--event", help="Exact event type filter")
270 parser.add_argument("--json", action="store_true", help="Emit machine-readable JSON output")
271 args = parser.parse_args()
272
273 base = Path(args.dir)
274 events_report, legacy_report, event_files = build_report(base, args.days, args.include, args.event)
275
276 if args.json:
277 out = {
278 "events": None,
279 "legacy": legacy_report,
280 "event_files": [f.name for f in event_files],
281 }
282 if events_report:
283 out["events"] = {
284 "event_counts": dict(events_report["event_counts"]),
285 "tool_usage": dict(events_report["tool_usage"]),
286 "context_budget": {
287 **{k: v for k, v in events_report["context_budget"].items() if k != "pressure_levels"},
288 "pressure_levels": dict(events_report["context_budget"]["pressure_levels"]),
289 },
290 "warnings": dict(events_report["warnings"]),
291 "malformed_or_errors": events_report["malformed_or_errors"],
292 "filtered_total": events_report["filtered_total"],
293 }
294 print(json.dumps(out, indent=2))
295 return
296
297 # Human readable output
298 print("Log directory:", base)
299 if not base.exists():
300 print("(directory missing)")
301 return
302 if events_report:
303 print(f"Found {len(event_files)} event file(s)\n")
304 print(format_events_human(events_report))
305 else:
306 print("No events-*.jsonl files found (skip structured aggregation)\n")
307 print("=== Legacy Replay Heuristics ===")
308 print(json.dumps(legacy_report, indent=2))
309
310
311if __name__ == "__main__":
312 main()