""" LLM-powered cluster curation via Claude Haiku. Periodically fetches the entity graph from the backend, extracts trending clusters, and asks Haiku to curate them into named groups. The LLM identifies coherent news stories from co-occurrence clusters, merges duplicates, drops garbage, and returns at most 5 named groups. The curator maintains a rolling history of recent topics so the LLM can see what it labeled before, how long topics have persisted, and what's new vs continuing. This produces more stable, contextual labels. The groups are POSTed back to the backend, which serves them in the entity-graph websocket data for frontend rendering. Runs standalone — fetches entity-graph, curates, posts groups, sleeps, repeat. """ import os import sys from dataclasses import dataclass from pathlib import Path from time import time, sleep import httpx from pydantic import BaseModel NARRATOR_INTERVAL = int(os.getenv("NARRATOR_INTERVAL", "300")) MIN_CLUSTER_SIZE = 2 # need at least 2 entities to narrate MIN_ENTITY_MENTIONS = 3 # filter noise entities with near-zero counts MAX_CLUSTERS = 10 # cap clusters sent to LLM MAX_HISTORY = 8 # max past topics shown to LLM HISTORY_TTL = 60 * 60 # prune topics older than 1 hour OVERLAP_THRESHOLD = 0.3 # Jaccard similarity to match a cluster to history MAX_GROUPS = 5 # max groups returned to backend CONTEXT_NOTES_PATH = Path(__file__).parent / "context_notes.txt" BACKEND_URL = os.getenv("ZIG_BACKEND_URL", "http://127.0.0.1:3000") class NamedGroup(BaseModel): name: str entities: list[str] class CuratorOutput(BaseModel): groups: list[NamedGroup] haiku: str @dataclass class TopicRecord: label: str entities: set[str] first_seen: float last_seen: float times_labeled: int = 1 SYSTEM_PROMPT = """\ You are curating trending topics from a real-time entity co-occurrence graph. Each cluster below contains entities that frequently appear together in social media posts. Your job is to identify the real stories. For each coherent cluster, provide a short (2-4 word) topic name. You may merge clusters that are about the same story. Drop clusters that are incoherent or uninteresting. Return at most 5 groups — quality over quantity. Each entity has a "surprise" score (Poisson z-score) measuring how unusual its current mention volume is compared to baseline. Higher surprise = more newsworthy. Prioritize clusters with high-surprise entities over persistent low-surprise ones. Be specific: "Trump Budget Fight" not "Politics". Use natural language: "SpaceX Launch" not "SPACEX/LAUNCH". Preserve proper nouns and capitalization. When a group matches a recent topic (shown below), prefer reusing or refining that name rather than inventing a new one — unless the entities have changed significantly. For each group, list the entity names (exact text from the clusters) that belong to it. Also write a haiku about the real-world events in these clusters. STRICT RULES for the haiku: - Line 1: exactly 5 syllables - Line 2: exactly 7 syllables - Line 3: exactly 5 syllables - Count syllables carefully before writing each line - Write in the style of Bashō — concrete, grounded in the physical world - Reference specific people, places, or events from the data - The haiku is about the world, not about the internet""" def _jaccard(a: set, b: set) -> float: if not a or not b: return 0.0 return len(a & b) / len(a | b) def _fmt_duration(seconds: float) -> str: if seconds < 60: return f"{int(seconds)}s" minutes = seconds / 60 if minutes < 60: return f"{int(minutes)}min" return f"{minutes / 60:.1f}hr" class ClusterNarrator: def __init__(self, backend_url: str, call_interval: int = NARRATOR_INTERVAL): self.backend_url = backend_url self.call_interval = call_interval self.current_groups: list[dict] = [] # [{name, entities}, ...] self.topic_history: list[TopicRecord] = [] self.stats = { "calls": 0, "errors": 0, "groups": 0, "clusters_seen": 0, } def _fetch_graph(self, client: httpx.Client) -> dict | None: try: resp = client.get(f"{self.backend_url}/entity-graph", timeout=5) if resp.status_code == 200: return resp.json() except Exception as e: print(f"narrator: fetch error: {e}", flush=True) return None def _post_groups(self, client: httpx.Client, groups: list[dict], haiku: str = "") -> None: try: payload: dict = {"groups": groups} if haiku: payload["haiku"] = haiku resp = client.post( f"{self.backend_url}/groups", json=payload, timeout=5, ) if resp.status_code != 200: print(f"narrator: POST failed: {resp.status_code} {resp.text[:100]}", flush=True) except Exception as e: print(f"narrator: POST error: {e}", flush=True) def _extract_clusters(self, graph: dict) -> dict[int, list[dict]]: clusters: dict[int, list[dict]] = {} for entity in graph.get("entities", []): cid = entity.get("cluster", 0) if cid == 0: continue if entity.get("trend", 0) <= 0: continue if entity.get("count", 0) < MIN_ENTITY_MENTIONS: continue clusters.setdefault(cid, []).append(entity) return { cid: ents for cid, ents in clusters.items() if len(ents) >= MIN_CLUSTER_SIZE } def _match_history(self, entity_names: set[str]) -> TopicRecord | None: """find the best-matching historical topic by entity overlap.""" best, best_score = None, 0.0 for record in self.topic_history: score = _jaccard(entity_names, record.entities) if score > best_score: best, best_score = record, score if best_score >= OVERLAP_THRESHOLD: return best return None def _update_history(self, groups: list[dict]) -> None: """update topic history with current cycle's results.""" now = time() for group in groups: name = group["name"] entity_names = set(group["entities"]) if not entity_names: continue if existing := self._match_history(entity_names): existing.label = name existing.last_seen = now existing.entities = entity_names existing.times_labeled += 1 else: self.topic_history.append( TopicRecord( label=name, entities=entity_names, first_seen=now, last_seen=now, ) ) # prune old topics self.topic_history = [ r for r in self.topic_history if (now - r.last_seen) < HISTORY_TTL ] def _build_history_context(self) -> str: """build a text block describing recent topic history for the LLM.""" if not self.topic_history: return "" now = time() # sort by last_seen descending (most recent first) recent = sorted(self.topic_history, key=lambda r: -r.last_seen)[ :MAX_HISTORY ] lines = ["Recent topic history (reuse labels for continuing topics):"] for r in recent: age = now - r.first_seen since_seen = now - r.last_seen duration = _fmt_duration(age) members = ", ".join(sorted(r.entities)[:6]) if since_seen < self.call_interval * 2: status = f"active for {duration}" else: status = f"ended {_fmt_duration(since_seen)} ago, lasted {duration}" lines.append(f'- "{r.label}" — {status} ({members})') return "\n".join(lines) def narrate(self, client: httpx.Client) -> None: if not (graph := self._fetch_graph(client)): return clusters = self._extract_clusters(graph) self.stats["clusters_seen"] = len(clusters) if not clusters: return try: # build set of all entity texts in the graph (for validation) all_entity_texts = { e["text"] for e in graph.get("entities", []) if isinstance(e.get("text"), str) } # sort by total cluster surprise descending, take top N sorted_clusters = sorted( clusters.items(), key=lambda x: sum(e.get("surprise", 0) for e in x[1]), reverse=True, )[:MAX_CLUSTERS] # build prompt prompt_parts = [] for cid, entities in sorted_clusters: sorted_ents = sorted(entities, key=lambda x: -x.get("surprise", 0)) lines = [f"Cluster {cid}:"] for e in sorted_ents: lines.append( f" - {e['text']} ({e['label']}, surprise={e.get('surprise', 0):.1f}, " f"trend={e.get('trend', 0):.1f}, {e.get('count', 0)} mentions)" ) prompt_parts.append("\n".join(lines)) entities_text = "\n\n".join(prompt_parts) # include history context history_context = self._build_history_context() # include editorial context if available editorial_context = "" if CONTEXT_NOTES_PATH.exists(): try: editorial_context = CONTEXT_NOTES_PATH.read_text().strip() except Exception: pass parts = [] if history_context: parts.append(history_context) if editorial_context: parts.append(f"Editorial context:\n{editorial_context}") parts.append(f"Curate these trending clusters into named groups:\n\n{entities_text}") user_content = "\n\n".join(parts) import anthropic ac = anthropic.Anthropic() response = ac.messages.parse( model="claude-haiku-4-5-20251001", max_tokens=1024, temperature=0.3, system=SYSTEM_PROMPT, output_format=CuratorOutput, messages=[ { "role": "user", "content": user_content, } ], ) result = response.parsed_output haiku = result.haiku.strip() if result.haiku else "" # code-based validation (not LLM) validated_groups = [] for group in result.groups[:MAX_GROUPS]: # validate each entity text exists in the graph valid_entities = [ e for e in group.entities if e in all_entity_texts ] if valid_entities: validated_groups.append( {"name": group.name, "entities": valid_entities} ) # if LLM returned 0 valid groups, keep previous groups if not validated_groups: if self.current_groups: print("narrator: 0 valid groups, keeping previous", flush=True) return else: self.current_groups = validated_groups # update history with this cycle's results self._update_history(self.current_groups) self.stats["groups"] = len(self.current_groups) self.stats["calls"] += 1 groups_str = ", ".join( f"{g['name']!r} ({len(g['entities'])} ents)" for g in self.current_groups ) history_size = len(self.topic_history) haiku_preview = haiku.replace("\n", " / ") if haiku else "(none)" print( f"narrator: {len(self.current_groups)} groups from " f"{len(sorted_clusters)} clusters, " f"{history_size} in history ({groups_str}) " f"haiku: {haiku_preview}", flush=True, ) # push groups + haiku to backend if self.current_groups: self._post_groups(client, self.current_groups, haiku) except ImportError: print("narrator: anthropic package not available", flush=True) self.stats["errors"] += 1 except Exception as e: print(f"narrator: error: {type(e).__name__}: {e}", flush=True) self.stats["errors"] += 1 def run(self) -> None: print(f"narrator: started (interval={self.call_interval}s, backend={self.backend_url})", flush=True) with httpx.Client() as client: while True: sleep(self.call_interval) try: self.narrate(client) except Exception as e: print(f"narrator: run loop error: {type(e).__name__}: {e}", flush=True) self.stats["errors"] += 1 if __name__ == "__main__": narrator = ClusterNarrator(BACKEND_URL) narrator.run()