An MCP server for Osprey
1import httpx
2
3from src.osprey.config import OspreyConfig
4from src.osprey.udfs import UdfCatalog
5
6
7class Osprey:
8 def __init__(self, http_client: httpx.AsyncClient, base_url: str) -> None:
9 self._http_client = http_client
10 self._base_url = base_url
11
12 async def get_udfs(self) -> UdfCatalog:
13 url = f"{self._base_url}/docs/udfs"
14 resp = await self._http_client.get(url)
15 resp.raise_for_status()
16 return UdfCatalog.model_validate(resp.json())
17
18 async def get_config(self) -> OspreyConfig:
19 url = f"{self._base_url}/config"
20 resp = await self._http_client.get(url)
21 resp.raise_for_status()
22 return OspreyConfig.model_validate(resp.json())