this repo has no description coral.waow.tech
at main 371 lines 14 kB view raw
1""" 2LLM-powered cluster curation via Claude Haiku. 3 4Periodically fetches the entity graph from the backend, extracts 5trending clusters, and asks Haiku to curate them into named groups. 6The LLM identifies coherent news stories from co-occurrence clusters, 7merges duplicates, drops garbage, and returns at most 5 named groups. 8 9The curator maintains a rolling history of recent topics so the LLM 10can see what it labeled before, how long topics have persisted, and 11what's new vs continuing. This produces more stable, contextual labels. 12 13The groups are POSTed back to the backend, which serves them in the 14entity-graph websocket data for frontend rendering. 15 16Runs standalone — fetches entity-graph, curates, posts groups, sleeps, repeat. 17""" 18 19import os 20import sys 21from dataclasses import dataclass 22from pathlib import Path 23from time import time, sleep 24 25import httpx 26from pydantic import BaseModel 27 28NARRATOR_INTERVAL = int(os.getenv("NARRATOR_INTERVAL", "300")) 29MIN_CLUSTER_SIZE = 2 # need at least 2 entities to narrate 30MIN_ENTITY_MENTIONS = 3 # filter noise entities with near-zero counts 31MAX_CLUSTERS = 10 # cap clusters sent to LLM 32MAX_HISTORY = 8 # max past topics shown to LLM 33HISTORY_TTL = 60 * 60 # prune topics older than 1 hour 34OVERLAP_THRESHOLD = 0.3 # Jaccard similarity to match a cluster to history 35MAX_GROUPS = 5 # max groups returned to backend 36CONTEXT_NOTES_PATH = Path(__file__).parent / "context_notes.txt" 37BACKEND_URL = os.getenv("ZIG_BACKEND_URL", "http://127.0.0.1:3000") 38 39 40class NamedGroup(BaseModel): 41 name: str 42 entities: list[str] 43 44 45class CuratorOutput(BaseModel): 46 groups: list[NamedGroup] 47 haiku: str 48 49 50@dataclass 51class TopicRecord: 52 label: str 53 entities: set[str] 54 first_seen: float 55 last_seen: float 56 times_labeled: int = 1 57 58 59SYSTEM_PROMPT = """\ 60You are curating trending topics from a real-time entity co-occurrence graph. 61Each cluster below contains entities that frequently appear together in 62social media posts. Your job is to identify the real stories. 63 64For each coherent cluster, provide a short (2-4 word) topic name. 65You may merge clusters that are about the same story. 66Drop clusters that are incoherent or uninteresting. 67Return at most 5 groups — quality over quantity. 68 69Each entity has a "surprise" score (Poisson z-score) measuring how unusual 70its current mention volume is compared to baseline. Higher surprise = more 71newsworthy. Prioritize clusters with high-surprise entities over persistent 72low-surprise ones. 73 74Be specific: "Trump Budget Fight" not "Politics". 75Use natural language: "SpaceX Launch" not "SPACEX/LAUNCH". 76Preserve proper nouns and capitalization. 77When a group matches a recent topic (shown below), prefer reusing or 78refining that name rather than inventing a new one — unless the entities 79have changed significantly. 80 81For each group, list the entity names (exact text from the clusters) that 82belong to it. 83 84Also write a haiku about the real-world events in these clusters. 85STRICT RULES for the haiku: 86- Line 1: exactly 5 syllables 87- Line 2: exactly 7 syllables 88- Line 3: exactly 5 syllables 89- Count syllables carefully before writing each line 90- Write in the style of Bashō — concrete, grounded in the physical world 91- Reference specific people, places, or events from the data 92- The haiku is about the world, not about the internet""" 93 94 95def _jaccard(a: set, b: set) -> float: 96 if not a or not b: 97 return 0.0 98 return len(a & b) / len(a | b) 99 100 101def _fmt_duration(seconds: float) -> str: 102 if seconds < 60: 103 return f"{int(seconds)}s" 104 minutes = seconds / 60 105 if minutes < 60: 106 return f"{int(minutes)}min" 107 return f"{minutes / 60:.1f}hr" 108 109 110class ClusterNarrator: 111 def __init__(self, backend_url: str, call_interval: int = NARRATOR_INTERVAL): 112 self.backend_url = backend_url 113 self.call_interval = call_interval 114 self.current_groups: list[dict] = [] # [{name, entities}, ...] 115 self.topic_history: list[TopicRecord] = [] 116 self.stats = { 117 "calls": 0, 118 "errors": 0, 119 "groups": 0, 120 "clusters_seen": 0, 121 } 122 123 def _fetch_graph(self, client: httpx.Client) -> dict | None: 124 try: 125 resp = client.get(f"{self.backend_url}/entity-graph", timeout=5) 126 if resp.status_code == 200: 127 return resp.json() 128 except Exception as e: 129 print(f"narrator: fetch error: {e}", flush=True) 130 return None 131 132 def _post_groups(self, client: httpx.Client, groups: list[dict], haiku: str = "") -> None: 133 try: 134 payload: dict = {"groups": groups} 135 if haiku: 136 payload["haiku"] = haiku 137 resp = client.post( 138 f"{self.backend_url}/groups", 139 json=payload, 140 timeout=5, 141 ) 142 if resp.status_code != 200: 143 print(f"narrator: POST failed: {resp.status_code} {resp.text[:100]}", flush=True) 144 except Exception as e: 145 print(f"narrator: POST error: {e}", flush=True) 146 147 def _extract_clusters(self, graph: dict) -> dict[int, list[dict]]: 148 clusters: dict[int, list[dict]] = {} 149 for entity in graph.get("entities", []): 150 cid = entity.get("cluster", 0) 151 if cid == 0: 152 continue 153 if entity.get("trend", 0) <= 0: 154 continue 155 if entity.get("count", 0) < MIN_ENTITY_MENTIONS: 156 continue 157 clusters.setdefault(cid, []).append(entity) 158 return { 159 cid: ents 160 for cid, ents in clusters.items() 161 if len(ents) >= MIN_CLUSTER_SIZE 162 } 163 164 def _match_history(self, entity_names: set[str]) -> TopicRecord | None: 165 """find the best-matching historical topic by entity overlap.""" 166 best, best_score = None, 0.0 167 for record in self.topic_history: 168 score = _jaccard(entity_names, record.entities) 169 if score > best_score: 170 best, best_score = record, score 171 if best_score >= OVERLAP_THRESHOLD: 172 return best 173 return None 174 175 def _update_history(self, groups: list[dict]) -> None: 176 """update topic history with current cycle's results.""" 177 now = time() 178 179 for group in groups: 180 name = group["name"] 181 entity_names = set(group["entities"]) 182 if not entity_names: 183 continue 184 185 if existing := self._match_history(entity_names): 186 existing.label = name 187 existing.last_seen = now 188 existing.entities = entity_names 189 existing.times_labeled += 1 190 else: 191 self.topic_history.append( 192 TopicRecord( 193 label=name, 194 entities=entity_names, 195 first_seen=now, 196 last_seen=now, 197 ) 198 ) 199 200 # prune old topics 201 self.topic_history = [ 202 r for r in self.topic_history if (now - r.last_seen) < HISTORY_TTL 203 ] 204 205 def _build_history_context(self) -> str: 206 """build a text block describing recent topic history for the LLM.""" 207 if not self.topic_history: 208 return "" 209 210 now = time() 211 # sort by last_seen descending (most recent first) 212 recent = sorted(self.topic_history, key=lambda r: -r.last_seen)[ 213 :MAX_HISTORY 214 ] 215 216 lines = ["Recent topic history (reuse labels for continuing topics):"] 217 for r in recent: 218 age = now - r.first_seen 219 since_seen = now - r.last_seen 220 duration = _fmt_duration(age) 221 members = ", ".join(sorted(r.entities)[:6]) 222 if since_seen < self.call_interval * 2: 223 status = f"active for {duration}" 224 else: 225 status = f"ended {_fmt_duration(since_seen)} ago, lasted {duration}" 226 lines.append(f'- "{r.label}"{status} ({members})') 227 228 return "\n".join(lines) 229 230 def narrate(self, client: httpx.Client) -> None: 231 if not (graph := self._fetch_graph(client)): 232 return 233 234 clusters = self._extract_clusters(graph) 235 self.stats["clusters_seen"] = len(clusters) 236 if not clusters: 237 return 238 239 try: 240 # build set of all entity texts in the graph (for validation) 241 all_entity_texts = { 242 e["text"] for e in graph.get("entities", []) 243 if isinstance(e.get("text"), str) 244 } 245 246 # sort by total cluster surprise descending, take top N 247 sorted_clusters = sorted( 248 clusters.items(), 249 key=lambda x: sum(e.get("surprise", 0) for e in x[1]), 250 reverse=True, 251 )[:MAX_CLUSTERS] 252 253 # build prompt 254 prompt_parts = [] 255 for cid, entities in sorted_clusters: 256 sorted_ents = sorted(entities, key=lambda x: -x.get("surprise", 0)) 257 lines = [f"Cluster {cid}:"] 258 for e in sorted_ents: 259 lines.append( 260 f" - {e['text']} ({e['label']}, surprise={e.get('surprise', 0):.1f}, " 261 f"trend={e.get('trend', 0):.1f}, {e.get('count', 0)} mentions)" 262 ) 263 prompt_parts.append("\n".join(lines)) 264 265 entities_text = "\n\n".join(prompt_parts) 266 267 # include history context 268 history_context = self._build_history_context() 269 270 # include editorial context if available 271 editorial_context = "" 272 if CONTEXT_NOTES_PATH.exists(): 273 try: 274 editorial_context = CONTEXT_NOTES_PATH.read_text().strip() 275 except Exception: 276 pass 277 278 parts = [] 279 if history_context: 280 parts.append(history_context) 281 if editorial_context: 282 parts.append(f"Editorial context:\n{editorial_context}") 283 parts.append(f"Curate these trending clusters into named groups:\n\n{entities_text}") 284 user_content = "\n\n".join(parts) 285 286 import anthropic 287 288 ac = anthropic.Anthropic() 289 response = ac.messages.parse( 290 model="claude-haiku-4-5-20251001", 291 max_tokens=1024, 292 temperature=0.3, 293 system=SYSTEM_PROMPT, 294 output_format=CuratorOutput, 295 messages=[ 296 { 297 "role": "user", 298 "content": user_content, 299 } 300 ], 301 ) 302 303 result = response.parsed_output 304 haiku = result.haiku.strip() if result.haiku else "" 305 306 # code-based validation (not LLM) 307 validated_groups = [] 308 for group in result.groups[:MAX_GROUPS]: 309 # validate each entity text exists in the graph 310 valid_entities = [ 311 e for e in group.entities if e in all_entity_texts 312 ] 313 if valid_entities: 314 validated_groups.append( 315 {"name": group.name, "entities": valid_entities} 316 ) 317 318 # if LLM returned 0 valid groups, keep previous groups 319 if not validated_groups: 320 if self.current_groups: 321 print("narrator: 0 valid groups, keeping previous", flush=True) 322 return 323 else: 324 self.current_groups = validated_groups 325 326 # update history with this cycle's results 327 self._update_history(self.current_groups) 328 329 self.stats["groups"] = len(self.current_groups) 330 self.stats["calls"] += 1 331 332 groups_str = ", ".join( 333 f"{g['name']!r} ({len(g['entities'])} ents)" 334 for g in self.current_groups 335 ) 336 history_size = len(self.topic_history) 337 haiku_preview = haiku.replace("\n", " / ") if haiku else "(none)" 338 print( 339 f"narrator: {len(self.current_groups)} groups from " 340 f"{len(sorted_clusters)} clusters, " 341 f"{history_size} in history ({groups_str}) " 342 f"haiku: {haiku_preview}", 343 flush=True, 344 ) 345 346 # push groups + haiku to backend 347 if self.current_groups: 348 self._post_groups(client, self.current_groups, haiku) 349 350 except ImportError: 351 print("narrator: anthropic package not available", flush=True) 352 self.stats["errors"] += 1 353 except Exception as e: 354 print(f"narrator: error: {type(e).__name__}: {e}", flush=True) 355 self.stats["errors"] += 1 356 357 def run(self) -> None: 358 print(f"narrator: started (interval={self.call_interval}s, backend={self.backend_url})", flush=True) 359 with httpx.Client() as client: 360 while True: 361 sleep(self.call_interval) 362 try: 363 self.narrate(client) 364 except Exception as e: 365 print(f"narrator: run loop error: {type(e).__name__}: {e}", flush=True) 366 self.stats["errors"] += 1 367 368 369if __name__ == "__main__": 370 narrator = ClusterNarrator(BACKEND_URL) 371 narrator.run()