this repo has no description
coral.waow.tech
1"""
2LLM-powered cluster curation via Claude Haiku.
3
4Periodically fetches the entity graph from the backend, extracts
5trending clusters, and asks Haiku to curate them into named groups.
6The LLM identifies coherent news stories from co-occurrence clusters,
7merges duplicates, drops garbage, and returns at most 5 named groups.
8
9The curator maintains a rolling history of recent topics so the LLM
10can see what it labeled before, how long topics have persisted, and
11what's new vs continuing. This produces more stable, contextual labels.
12
13The groups are POSTed back to the backend, which serves them in the
14entity-graph websocket data for frontend rendering.
15
16Runs standalone — fetches entity-graph, curates, posts groups, sleeps, repeat.
17"""
18
19import os
20import sys
21from dataclasses import dataclass
22from pathlib import Path
23from time import time, sleep
24
25import httpx
26from pydantic import BaseModel
27
28NARRATOR_INTERVAL = int(os.getenv("NARRATOR_INTERVAL", "300"))
29MIN_CLUSTER_SIZE = 2 # need at least 2 entities to narrate
30MIN_ENTITY_MENTIONS = 3 # filter noise entities with near-zero counts
31MAX_CLUSTERS = 10 # cap clusters sent to LLM
32MAX_HISTORY = 8 # max past topics shown to LLM
33HISTORY_TTL = 60 * 60 # prune topics older than 1 hour
34OVERLAP_THRESHOLD = 0.3 # Jaccard similarity to match a cluster to history
35MAX_GROUPS = 5 # max groups returned to backend
36CONTEXT_NOTES_PATH = Path(__file__).parent / "context_notes.txt"
37BACKEND_URL = os.getenv("ZIG_BACKEND_URL", "http://127.0.0.1:3000")
38
39
40class NamedGroup(BaseModel):
41 name: str
42 entities: list[str]
43
44
45class CuratorOutput(BaseModel):
46 groups: list[NamedGroup]
47 haiku: str
48
49
50@dataclass
51class TopicRecord:
52 label: str
53 entities: set[str]
54 first_seen: float
55 last_seen: float
56 times_labeled: int = 1
57
58
59SYSTEM_PROMPT = """\
60You are curating trending topics from a real-time entity co-occurrence graph.
61Each cluster below contains entities that frequently appear together in
62social media posts. Your job is to identify the real stories.
63
64For each coherent cluster, provide a short (2-4 word) topic name.
65You may merge clusters that are about the same story.
66Drop clusters that are incoherent or uninteresting.
67Return at most 5 groups — quality over quantity.
68
69Each entity has a "surprise" score (Poisson z-score) measuring how unusual
70its current mention volume is compared to baseline. Higher surprise = more
71newsworthy. Prioritize clusters with high-surprise entities over persistent
72low-surprise ones.
73
74Be specific: "Trump Budget Fight" not "Politics".
75Use natural language: "SpaceX Launch" not "SPACEX/LAUNCH".
76Preserve proper nouns and capitalization.
77When a group matches a recent topic (shown below), prefer reusing or
78refining that name rather than inventing a new one — unless the entities
79have changed significantly.
80
81For each group, list the entity names (exact text from the clusters) that
82belong to it.
83
84Also write a haiku about the real-world events in these clusters.
85STRICT RULES for the haiku:
86- Line 1: exactly 5 syllables
87- Line 2: exactly 7 syllables
88- Line 3: exactly 5 syllables
89- Count syllables carefully before writing each line
90- Write in the style of Bashō — concrete, grounded in the physical world
91- Reference specific people, places, or events from the data
92- The haiku is about the world, not about the internet"""
93
94
95def _jaccard(a: set, b: set) -> float:
96 if not a or not b:
97 return 0.0
98 return len(a & b) / len(a | b)
99
100
101def _fmt_duration(seconds: float) -> str:
102 if seconds < 60:
103 return f"{int(seconds)}s"
104 minutes = seconds / 60
105 if minutes < 60:
106 return f"{int(minutes)}min"
107 return f"{minutes / 60:.1f}hr"
108
109
110class ClusterNarrator:
111 def __init__(self, backend_url: str, call_interval: int = NARRATOR_INTERVAL):
112 self.backend_url = backend_url
113 self.call_interval = call_interval
114 self.current_groups: list[dict] = [] # [{name, entities}, ...]
115 self.topic_history: list[TopicRecord] = []
116 self.stats = {
117 "calls": 0,
118 "errors": 0,
119 "groups": 0,
120 "clusters_seen": 0,
121 }
122
123 def _fetch_graph(self, client: httpx.Client) -> dict | None:
124 try:
125 resp = client.get(f"{self.backend_url}/entity-graph", timeout=5)
126 if resp.status_code == 200:
127 return resp.json()
128 except Exception as e:
129 print(f"narrator: fetch error: {e}", flush=True)
130 return None
131
132 def _post_groups(self, client: httpx.Client, groups: list[dict], haiku: str = "") -> None:
133 try:
134 payload: dict = {"groups": groups}
135 if haiku:
136 payload["haiku"] = haiku
137 resp = client.post(
138 f"{self.backend_url}/groups",
139 json=payload,
140 timeout=5,
141 )
142 if resp.status_code != 200:
143 print(f"narrator: POST failed: {resp.status_code} {resp.text[:100]}", flush=True)
144 except Exception as e:
145 print(f"narrator: POST error: {e}", flush=True)
146
147 def _extract_clusters(self, graph: dict) -> dict[int, list[dict]]:
148 clusters: dict[int, list[dict]] = {}
149 for entity in graph.get("entities", []):
150 cid = entity.get("cluster", 0)
151 if cid == 0:
152 continue
153 if entity.get("trend", 0) <= 0:
154 continue
155 if entity.get("count", 0) < MIN_ENTITY_MENTIONS:
156 continue
157 clusters.setdefault(cid, []).append(entity)
158 return {
159 cid: ents
160 for cid, ents in clusters.items()
161 if len(ents) >= MIN_CLUSTER_SIZE
162 }
163
164 def _match_history(self, entity_names: set[str]) -> TopicRecord | None:
165 """find the best-matching historical topic by entity overlap."""
166 best, best_score = None, 0.0
167 for record in self.topic_history:
168 score = _jaccard(entity_names, record.entities)
169 if score > best_score:
170 best, best_score = record, score
171 if best_score >= OVERLAP_THRESHOLD:
172 return best
173 return None
174
175 def _update_history(self, groups: list[dict]) -> None:
176 """update topic history with current cycle's results."""
177 now = time()
178
179 for group in groups:
180 name = group["name"]
181 entity_names = set(group["entities"])
182 if not entity_names:
183 continue
184
185 if existing := self._match_history(entity_names):
186 existing.label = name
187 existing.last_seen = now
188 existing.entities = entity_names
189 existing.times_labeled += 1
190 else:
191 self.topic_history.append(
192 TopicRecord(
193 label=name,
194 entities=entity_names,
195 first_seen=now,
196 last_seen=now,
197 )
198 )
199
200 # prune old topics
201 self.topic_history = [
202 r for r in self.topic_history if (now - r.last_seen) < HISTORY_TTL
203 ]
204
205 def _build_history_context(self) -> str:
206 """build a text block describing recent topic history for the LLM."""
207 if not self.topic_history:
208 return ""
209
210 now = time()
211 # sort by last_seen descending (most recent first)
212 recent = sorted(self.topic_history, key=lambda r: -r.last_seen)[
213 :MAX_HISTORY
214 ]
215
216 lines = ["Recent topic history (reuse labels for continuing topics):"]
217 for r in recent:
218 age = now - r.first_seen
219 since_seen = now - r.last_seen
220 duration = _fmt_duration(age)
221 members = ", ".join(sorted(r.entities)[:6])
222 if since_seen < self.call_interval * 2:
223 status = f"active for {duration}"
224 else:
225 status = f"ended {_fmt_duration(since_seen)} ago, lasted {duration}"
226 lines.append(f'- "{r.label}" — {status} ({members})')
227
228 return "\n".join(lines)
229
230 def narrate(self, client: httpx.Client) -> None:
231 if not (graph := self._fetch_graph(client)):
232 return
233
234 clusters = self._extract_clusters(graph)
235 self.stats["clusters_seen"] = len(clusters)
236 if not clusters:
237 return
238
239 try:
240 # build set of all entity texts in the graph (for validation)
241 all_entity_texts = {
242 e["text"] for e in graph.get("entities", [])
243 if isinstance(e.get("text"), str)
244 }
245
246 # sort by total cluster surprise descending, take top N
247 sorted_clusters = sorted(
248 clusters.items(),
249 key=lambda x: sum(e.get("surprise", 0) for e in x[1]),
250 reverse=True,
251 )[:MAX_CLUSTERS]
252
253 # build prompt
254 prompt_parts = []
255 for cid, entities in sorted_clusters:
256 sorted_ents = sorted(entities, key=lambda x: -x.get("surprise", 0))
257 lines = [f"Cluster {cid}:"]
258 for e in sorted_ents:
259 lines.append(
260 f" - {e['text']} ({e['label']}, surprise={e.get('surprise', 0):.1f}, "
261 f"trend={e.get('trend', 0):.1f}, {e.get('count', 0)} mentions)"
262 )
263 prompt_parts.append("\n".join(lines))
264
265 entities_text = "\n\n".join(prompt_parts)
266
267 # include history context
268 history_context = self._build_history_context()
269
270 # include editorial context if available
271 editorial_context = ""
272 if CONTEXT_NOTES_PATH.exists():
273 try:
274 editorial_context = CONTEXT_NOTES_PATH.read_text().strip()
275 except Exception:
276 pass
277
278 parts = []
279 if history_context:
280 parts.append(history_context)
281 if editorial_context:
282 parts.append(f"Editorial context:\n{editorial_context}")
283 parts.append(f"Curate these trending clusters into named groups:\n\n{entities_text}")
284 user_content = "\n\n".join(parts)
285
286 import anthropic
287
288 ac = anthropic.Anthropic()
289 response = ac.messages.parse(
290 model="claude-haiku-4-5-20251001",
291 max_tokens=1024,
292 temperature=0.3,
293 system=SYSTEM_PROMPT,
294 output_format=CuratorOutput,
295 messages=[
296 {
297 "role": "user",
298 "content": user_content,
299 }
300 ],
301 )
302
303 result = response.parsed_output
304 haiku = result.haiku.strip() if result.haiku else ""
305
306 # code-based validation (not LLM)
307 validated_groups = []
308 for group in result.groups[:MAX_GROUPS]:
309 # validate each entity text exists in the graph
310 valid_entities = [
311 e for e in group.entities if e in all_entity_texts
312 ]
313 if valid_entities:
314 validated_groups.append(
315 {"name": group.name, "entities": valid_entities}
316 )
317
318 # if LLM returned 0 valid groups, keep previous groups
319 if not validated_groups:
320 if self.current_groups:
321 print("narrator: 0 valid groups, keeping previous", flush=True)
322 return
323 else:
324 self.current_groups = validated_groups
325
326 # update history with this cycle's results
327 self._update_history(self.current_groups)
328
329 self.stats["groups"] = len(self.current_groups)
330 self.stats["calls"] += 1
331
332 groups_str = ", ".join(
333 f"{g['name']!r} ({len(g['entities'])} ents)"
334 for g in self.current_groups
335 )
336 history_size = len(self.topic_history)
337 haiku_preview = haiku.replace("\n", " / ") if haiku else "(none)"
338 print(
339 f"narrator: {len(self.current_groups)} groups from "
340 f"{len(sorted_clusters)} clusters, "
341 f"{history_size} in history ({groups_str}) "
342 f"haiku: {haiku_preview}",
343 flush=True,
344 )
345
346 # push groups + haiku to backend
347 if self.current_groups:
348 self._post_groups(client, self.current_groups, haiku)
349
350 except ImportError:
351 print("narrator: anthropic package not available", flush=True)
352 self.stats["errors"] += 1
353 except Exception as e:
354 print(f"narrator: error: {type(e).__name__}: {e}", flush=True)
355 self.stats["errors"] += 1
356
357 def run(self) -> None:
358 print(f"narrator: started (interval={self.call_interval}s, backend={self.backend_url})", flush=True)
359 with httpx.Client() as client:
360 while True:
361 sleep(self.call_interval)
362 try:
363 self.narrate(client)
364 except Exception as e:
365 print(f"narrator: run loop error: {type(e).__name__}: {e}", flush=True)
366 self.stats["errors"] += 1
367
368
369if __name__ == "__main__":
370 narrator = ClusterNarrator(BACKEND_URL)
371 narrator.run()