···1from abc import ABC, abstractmethod
2import asyncio
3+import json
4import logging
5+from dataclasses import dataclass
6from typing import Any, Literal
78import anthropic
9from anthropic.types import TextBlock, ToolUseBlock
10+import httpx
1112from src.agent.prompt import build_system_prompt
13from src.tools.executor import ToolExecutor
···15logger = logging.getLogger(__name__)
161718+@dataclass
19+class AgentTextBlock:
20+ text: str
21+22+23+@dataclass
24+class AgentToolUseBlock:
25+ id: str
26+ name: str
27+ input: dict[str, Any]
28+29+30+@dataclass
31+class AgentResponse:
32+ content: list[AgentTextBlock | AgentToolUseBlock]
33+ stop_reason: Literal["end_turn", "tool_use"]
34+ reasoning_content: str | None = None
353637class AgentClient(ABC):
···41 messages: list[dict[str, Any]],
42 system: str | None = None,
43 tools: list[dict[str, Any]] | None = None,
44+ ) -> AgentResponse:
45 pass
4647···57 messages: list[dict[str, Any]],
58 system: str | None = None,
59 tools: list[dict[str, Any]] | None = None,
60+ ) -> AgentResponse:
61 system_text = system or build_system_prompt()
62 kwargs: dict[str, Any] = {
63 "model": self._model_name,
···73 }
7475 if tools:
76+ tools = [dict(t) for t in tools]
77 tools[-1]["cache_control"] = {"type": "ephemeral"}
78 kwargs["tools"] = tools
7980 async with self._client.messages.stream(**kwargs) as stream: # type: ignore
81+ msg = await stream.get_final_message()
82+83+ content: list[AgentTextBlock | AgentToolUseBlock] = []
84+ for block in msg.content:
85+ if isinstance(block, TextBlock):
86+ content.append(AgentTextBlock(text=block.text))
87+ elif isinstance(block, ToolUseBlock):
88+ content.append(
89+ AgentToolUseBlock(
90+ id=block.id,
91+ name=block.name,
92+ input=block.input, # type: ignore
93+ )
94+ )
95+96+ return AgentResponse(
97+ content=content,
98+ stop_reason=msg.stop_reason or "end_turn", # type: ignore TODO: fix this
99+ )
100+101+102+class OpenAICompatibleClient(AgentClient):
103+ """client for openapi compatible apis like openai, moonshot, etc"""
104+105+ def __init__(self, api_key: str, model_name: str, endpoint: str) -> None:
106+ self._api_key = api_key
107+ self._model_name = model_name
108+ self._endpoint = endpoint.rstrip("/")
109+ self._http = httpx.AsyncClient(timeout=300.0)
110+111+ async def complete(
112+ self,
113+ messages: list[dict[str, Any]],
114+ system: str | None = None,
115+ tools: list[dict[str, Any]] | None = None,
116+ ) -> AgentResponse:
117+ oai_messages = self._convert_messages(messages, system or build_system_prompt())
118+119+ payload: dict[str, Any] = {
120+ "model": self._model_name,
121+ "messages": oai_messages,
122+ "max_tokens": 16_000,
123+ }
124+125+ if tools:
126+ payload["tools"] = self._convert_tools(tools)
127+128+ resp = await self._http.post(
129+ f"{self._endpoint}/chat/completions",
130+ headers={
131+ "Authorization": f"Bearer {self._api_key}",
132+ "Content-Type": "application/json",
133+ },
134+ json=payload,
135+ )
136+ if not resp.is_success:
137+ logger.error(
138+ "API error %d: %s", resp.status_code, resp.text[:1000]
139+ )
140+ resp.raise_for_status()
141+ data = resp.json()
142+143+ return self._parse_response(data)
144+145+ def _convert_messages(
146+ self, messages: list[dict[str, Any]], system: str
147+ ) -> list[dict[str, Any]]:
148+ """for anthropic chats, we'll convert the outputs into a similar format"""
149+ result: list[dict[str, Any]] = [{"role": "system", "content": system}]
150+151+ for msg in messages:
152+ role = msg["role"]
153+ content = msg["content"]
154+155+ if isinstance(content, str):
156+ result.append({"role": role, "content": content})
157+ elif isinstance(content, list):
158+ if role == "assistant":
159+ text_parts = []
160+ tool_calls = []
161+ for block in content:
162+ if block.get("type") == "text":
163+ text_parts.append(block["text"])
164+ elif block.get("type") == "tool_use":
165+ tool_calls.append(
166+ {
167+ "id": block["id"],
168+ "type": "function",
169+ "function": {
170+ "name": block["name"],
171+ "arguments": json.dumps(block["input"]),
172+ },
173+ }
174+ )
175+ oai_msg: dict[str, Any] = {"role": "assistant"}
176+ if msg.get("reasoning_content"):
177+ oai_msg["reasoning_content"] = msg["reasoning_content"]
178+ # some openai-compatible apis reject content: null on
179+ # assistant messages with tool_calls, so omit it when empty
180+ if text_parts:
181+ oai_msg["content"] = "\n".join(text_parts)
182+ else:
183+ oai_msg["content"] = ""
184+ if tool_calls:
185+ oai_msg["tool_calls"] = tool_calls
186+ result.append(oai_msg)
187+ elif role == "user":
188+ if content and content[0].get("type") == "tool_result":
189+ for block in content:
190+ result.append(
191+ {
192+ "role": "tool",
193+ "tool_call_id": block["tool_use_id"],
194+ "content": block.get("content", ""),
195+ }
196+ )
197+ else:
198+ text = " ".join(b.get("text", str(b)) for b in content)
199+ result.append({"role": "user", "content": text})
200+201+ return result
202+203+ def _convert_tools(self, tools: list[dict[str, Any]]) -> list[dict[str, Any]]:
204+ """convert anthropic tool defs to oai function calling format"""
205+ result = []
206+ for t in tools:
207+ func: dict[str, Any] = {
208+ "name": t["name"],
209+ "description": t.get("description", ""),
210+ }
211+ if "input_schema" in t:
212+ func["parameters"] = t["input_schema"]
213+ result.append({"type": "function", "function": func})
214+ return result
215+216+ def _parse_response(self, data: dict[str, Any]) -> AgentResponse:
217+ """convert an oai chat completion resp to agentresponse"""
218+ choice = data["choices"][0]
219+ message = choice["message"]
220+ finish_reason = choice.get("finish_reason", "stop")
221+222+ content: list[AgentTextBlock | AgentToolUseBlock] = []
223+224+ if message.get("content"):
225+ content.append(AgentTextBlock(text=message["content"]))
226+227+ if message.get("tool_calls"):
228+ for tc in message["tool_calls"]:
229+ try:
230+ args = json.loads(tc["function"]["arguments"])
231+ except (json.JSONDecodeError, KeyError):
232+ args = {}
233+ content.append(
234+ AgentToolUseBlock(
235+ id=tc["id"],
236+ name=tc["function"]["name"],
237+ input=args,
238+ )
239+ )
240+241+ stop_reason = "tool_use" if finish_reason == "tool_calls" else "end_turn"
242+ reasoning_content = message.get("reasoning_content")
243+ return AgentResponse(content=content, stop_reason=stop_reason, reasoning_content=reasoning_content)
244245246MAX_TOOL_RESULT_LENGTH = 10_000
···252 model_api: Literal["anthropic", "openai", "openapi"],
253 model_name: str,
254 model_api_key: str | None,
255+ model_endpoint: str | None = None,
256 tool_executor: ToolExecutor | None = None,
257 ) -> None:
258+ match model_api:
259+ case "anthropic":
260+ assert model_api_key
261+ self._client: AgentClient = AnthropicClient(
262+ api_key=model_api_key, model_name=model_name
263+ )
264+ case "openai":
265+ assert model_api_key
266+ self._client = OpenAICompatibleClient(
267+ api_key=model_api_key,
268+ model_name=model_name,
269+ endpoint="https://api.openai.com/v1",
270+ )
271+ case "openapi":
272+ assert model_api_key
273+ assert model_endpoint, "model_endpoint is required for openapi"
274+ self._client = OpenAICompatibleClient(
275+ api_key=model_api_key,
276+ model_name=model_name,
277+ endpoint=model_endpoint,
278+ )
279280 self._tool_executor = tool_executor
281 self._conversation: list[dict[str, Any]] = []
···287 return None
288 return [self._tool_executor.get_execute_code_tool_definition()]
289290+ async def _handle_tool_call(self, tool_use: AgentToolUseBlock) -> dict[str, Any]:
291 """handle a tool call from the model"""
292 if tool_use.name == "execute_code" and self._tool_executor:
293+ code = tool_use.input.get("code", "")
294+ result = await self._tool_executor.execute_code(code)
295 return result
296 else:
297 return {"error": f"Unknown tool: {tool_use.name}"}
···310 text_response = ""
311312 for block in resp.content:
313+ if isinstance(block, AgentTextBlock):
314 assistant_content.append({"type": "text", "text": block.text})
315 text_response += block.text
316+ elif isinstance(block, AgentToolUseBlock): # type: ignore TODO: for now this errors because there are no other types, but ignore for now
317 assistant_content.append(
318 {
319 "type": "tool_use",
···323 }
324 )
325326+ assistant_msg: dict[str, Any] = {"role": "assistant", "content": assistant_content}
327+ if resp.reasoning_content:
328+ assistant_msg["reasoning_content"] = resp.reasoning_content
329+ self._conversation.append(assistant_msg)
330331 # find any tool calls that we need to handle
332 if resp.stop_reason == "tool_use":
333 tool_results: list[dict[str, Any]] = []
334 for block in resp.content:
335+ if isinstance(block, AgentToolUseBlock):
336+ code = block.input.get("code", "")
0000337 logger.info("Tool call: %s\n%s", block.name, code)
338 result = await self._handle_tool_call(block)
339 is_error = "error" in result
···345 )
346 content_str = str(result)
347 if len(content_str) > MAX_TOOL_RESULT_LENGTH:
348+ content_str = (
349+ content_str[:MAX_TOOL_RESULT_LENGTH]
350+ + "\n... (truncated)"
351+ )
352353 tool_results.append(
354 {
···360361 self._conversation.append({"role": "user", "content": tool_results})
362 else:
363+ # once there are no more tool calls, we proceed to the text response
364 return text_response
365366 async def run(self):
+46-10
src/tools/executor.py
···130 tools_path = DENO_DIR / "tools.ts"
131 tools_path.write_text(tools_ts)
13200000000133 async def _run_deno(self, script_path: str) -> dict[str, Any]:
134 """run the input script in a deno subprocess"""
135···172 # calculate remaining time against the total execution deadline
173 remaining = deadline - asyncio.get_event_loop().time()
174 if remaining <= 0:
175- process.kill()
176 error = f"execution timed out after {MAX_EXECUTION_TIME:.0f} seconds (total)"
177 break
178···193 # track total output size to prevent stdout flooding
194 total_output_bytes += len(line)
195 if total_output_bytes > MAX_OUTPUT_SIZE:
196- process.kill()
197 error = f"output exceeded {MAX_OUTPUT_SIZE} bytes, killed"
198 break
199···208 if "__tool_call__" in message:
209 tool_call_count += 1
210 if tool_call_count > MAX_TOOL_CALLS:
211- process.kill()
212 error = f"exceeded maximum of {MAX_TOOL_CALLS} tool calls"
213 break
214···225 logger.exception(f"Tool error: {tool_name}")
226 response = json.dumps({"__tool_error__": str(e)})
227228- process.stdin.write((response + "\n").encode())
229- await process.stdin.drain()
0000230231 elif "__output__" in message:
232 outputs.append(message["__output__"])
···239240 # make sure that we kill deno subprocess if the execution times out
241 except asyncio.TimeoutError:
242- process.kill()
243 error = "execution timed out"
244 # also kill it for any other exceptions we encounter
245 except Exception as e:
246- process.kill()
247 error = str(e)
248249 await process.wait()
···273 return result
274275 def get_execute_code_tool_definition(self) -> dict[str, Any]:
276- """get the anthropic tool definition for execute_code, including all the docs for available backend tools"""
277278 if self._tool_definition is not None:
279 return self._tool_definition
···290{self._database_schema}
291292Use these exact column names when writing SQL queries. Do NOT guess column names.
0000000000000000293"""
294295 osprey_section = ""
···329330Example:
331```typescript
332-const result = await tools.clickhouse.query("SELECT count() FROM events");
333-output(result);
00000000334```
335336{tool_docs}{schema_section}{osprey_section}"""
···130 tools_path = DENO_DIR / "tools.ts"
131 tools_path.write_text(tools_ts)
132133+ @staticmethod
134+ def _kill_process(process: asyncio.subprocess.Process) -> None:
135+ """kill a subprocess, ignoring errors if it's already dead"""
136+ try:
137+ process.kill()
138+ except ProcessLookupError:
139+ pass
140+141 async def _run_deno(self, script_path: str) -> dict[str, Any]:
142 """run the input script in a deno subprocess"""
143···180 # calculate remaining time against the total execution deadline
181 remaining = deadline - asyncio.get_event_loop().time()
182 if remaining <= 0:
183+ self._kill_process(process)
184 error = f"execution timed out after {MAX_EXECUTION_TIME:.0f} seconds (total)"
185 break
186···201 # track total output size to prevent stdout flooding
202 total_output_bytes += len(line)
203 if total_output_bytes > MAX_OUTPUT_SIZE:
204+ self._kill_process(process)
205 error = f"output exceeded {MAX_OUTPUT_SIZE} bytes, killed"
206 break
207···216 if "__tool_call__" in message:
217 tool_call_count += 1
218 if tool_call_count > MAX_TOOL_CALLS:
219+ self._kill_process(process)
220 error = f"exceeded maximum of {MAX_TOOL_CALLS} tool calls"
221 break
222···233 logger.exception(f"Tool error: {tool_name}")
234 response = json.dumps({"__tool_error__": str(e)})
235236+ try:
237+ process.stdin.write((response + "\n").encode())
238+ await process.stdin.drain()
239+ except (ConnectionResetError, BrokenPipeError):
240+ error = f"deno process exited while sending tool result for {tool_name}"
241+ break
242243 elif "__output__" in message:
244 outputs.append(message["__output__"])
···251252 # make sure that we kill deno subprocess if the execution times out
253 except asyncio.TimeoutError:
254+ self._kill_process(process)
255 error = "execution timed out"
256 # also kill it for any other exceptions we encounter
257 except Exception as e:
258+ self._kill_process(process)
259 error = str(e)
260261 await process.wait()
···285 return result
286287 def get_execute_code_tool_definition(self) -> dict[str, Any]:
288+ """get tool definition for execute_code, including all the docs for available backend tools"""
289290 if self._tool_definition is not None:
291 return self._tool_definition
···302{self._database_schema}
303304Use these exact column names when writing SQL queries. Do NOT guess column names.
305+306+## ClickHouse SQL Tips
307+308+- **DateTime filtering**: The `__timestamp` column is `DateTime64(3)`. Do NOT use raw ISO strings. Use `parseDateTimeBestEffort()`:
309+ ```sql
310+ WHERE __timestamp >= parseDateTimeBestEffort('2026-02-06 04:30:00')
311+ ```
312+ To compute a relative time in TypeScript, format it as `YYYY-MM-DD HH:MM:SS`:
313+ ```typescript
314+ const ts = new Date(Date.now() - 30 * 60 * 1000).toISOString().slice(0, 19).replace('T', ' ');
315+ ```
316+- **Array slicing**: ClickHouse does NOT support `array[1:5]` syntax. Use `arraySlice(array, offset, length)`:
317+ ```sql
318+ arraySlice(groupArray(DISTINCT UserId), 1, 5) as sample_accounts
319+ ```
320+- **Error handling**: When running multiple independent queries, use `Promise.allSettled()` instead of `Promise.all()` so one failure doesn't crash the rest. Check each result's `.status` field.
321"""
322323 osprey_section = ""
···357358Example:
359```typescript
360+// format a relative timestamp for ClickHouse DateTime64 columns
361+const thirtyMinAgo = new Date(Date.now() - 30 * 60 * 1000).toISOString().slice(0, 19).replace('T', ' ');
362+363+// run multiple independent queries safely with Promise.allSettled
364+const results = await Promise.allSettled([
365+ tools.clickhouse.query(`SELECT Count() as cnt FROM default.osprey_execution_results WHERE __timestamp >= parseDateTimeBestEffort('${{thirtyMinAgo}}') LIMIT 1`),
366+ tools.clickhouse.query(`SELECT UserId, Count() as n FROM default.osprey_execution_results WHERE __timestamp >= parseDateTimeBestEffort('${{thirtyMinAgo}}') GROUP BY UserId ORDER BY n DESC LIMIT 10`),
367+]);
368+369+output(results.map(r => r.status === 'fulfilled' ? r.value : r.reason?.message));
370```
371372{tool_docs}{schema_section}{osprey_section}"""