A trust and safety agent that interacts with Osprey for investigation, real-time analysis, and prevention implementations
1import logging
2from collections.abc import Callable
3from typing import Literal
4
5import click
6import httpx
7from aiokafka.client import asyncio
8
9from src.agent.agent import Agent
10from src.clickhouse.clickhouse import Clickhouse
11from src.config import CONFIG
12from src.osprey.osprey import Osprey
13from src.ozone.ozone import Ozone
14from src.tools.executor import ToolExecutor
15from src.tools.registry import TOOL_REGISTRY, ToolContext
16
17logging.basicConfig(
18 level=logging.INFO,
19 format="%(asctime)s - %(levelname)s - %(message)s",
20)
21
22logger = logging.getLogger(__name__)
23
24# disable httpx verbose logging
25httpx_logger = logging.getLogger("httpx")
26httpx_logger.setLevel(logging.WARNING)
27
28
29SHARED_OPTIONS: list[Callable[..., Callable[..., object]]] = [
30 click.option("--clickhouse-host"),
31 click.option("--clickhouse-port"),
32 click.option("--clickhouse-user"),
33 click.option("--clickhouse-password"),
34 click.option("--clickhouse-database"),
35 click.option("--osprey-base-url"),
36 click.option("--osprey-repo-url"),
37 click.option("--osprey-ruleset-url"),
38 click.option("--model-api"),
39 click.option("--model-name"),
40 click.option("--model-api-key"),
41 click.option("--model-endpoint"),
42]
43
44
45def shared_options[F: Callable[..., object]](func: F) -> F:
46 for option in reversed(SHARED_OPTIONS):
47 func = option(func) # type: ignore[assignment]
48 return func
49
50
51def build_services(
52 clickhouse_host: str | None,
53 clickhouse_port: int | None,
54 clickhouse_user: str | None,
55 clickhouse_password: str | None,
56 clickhouse_database: str | None,
57 osprey_base_url: str | None,
58 osprey_repo_url: str | None,
59 osprey_ruleset_url: str | None,
60 model_api: Literal["anthropic", "openai", "openapi"] | None,
61 model_name: str | None,
62 model_api_key: str | None,
63 model_endpoint: str | None,
64) -> tuple[Clickhouse, Osprey, ToolExecutor, Agent]:
65 http_client = httpx.AsyncClient()
66
67 clickhouse = Clickhouse(
68 host=clickhouse_host or CONFIG.clickhouse_host,
69 port=clickhouse_port or CONFIG.clickhouse_port,
70 user=clickhouse_user or CONFIG.clickhouse_user,
71 password=clickhouse_password or CONFIG.clickhouse_password,
72 database=clickhouse_database or CONFIG.clickhouse_database,
73 )
74
75 osprey = Osprey(
76 http_client=http_client,
77 base_url=osprey_base_url or CONFIG.osprey_base_url,
78 osprey_repo_url=osprey_repo_url or CONFIG.osprey_repo_url,
79 osprey_ruleset_url=osprey_ruleset_url or CONFIG.osprey_ruleset_url,
80 )
81
82 ozone = Ozone()
83
84 tool_context = ToolContext(
85 clickhouse=clickhouse,
86 osprey=osprey,
87 ozone=ozone,
88 )
89
90 executor = ToolExecutor(
91 registry=TOOL_REGISTRY,
92 ctx=tool_context,
93 )
94
95 agent = Agent(
96 model_api=model_api or CONFIG.model_api,
97 model_name=model_name or CONFIG.model_name,
98 model_api_key=model_api_key or CONFIG.model_api_key,
99 model_endpoint=model_endpoint or CONFIG.model_endpoint or None,
100 tool_executor=executor,
101 )
102
103 return clickhouse, osprey, executor, agent
104
105
106@click.group()
107def cli():
108 pass
109
110
111@cli.command()
112@shared_options
113def main(
114 clickhouse_host: str | None,
115 clickhouse_port: int | None,
116 clickhouse_user: str | None,
117 clickhouse_password: str | None,
118 clickhouse_database: str | None,
119 osprey_base_url: str | None,
120 osprey_repo_url: str | None,
121 osprey_ruleset_url: str | None,
122 model_api: Literal["anthropic", "openai", "openapi"] | None,
123 model_name: str | None,
124 model_api_key: str | None,
125 model_endpoint: str | None,
126):
127 clickhouse, osprey, executor, agent = build_services(
128 clickhouse_host=clickhouse_host,
129 clickhouse_port=clickhouse_port,
130 clickhouse_user=clickhouse_user,
131 clickhouse_password=clickhouse_password,
132 clickhouse_database=clickhouse_database,
133 osprey_base_url=osprey_base_url,
134 osprey_repo_url=osprey_repo_url or CONFIG.osprey_repo_url,
135 osprey_ruleset_url=osprey_ruleset_url or CONFIG.osprey_ruleset_url,
136 model_api=model_api,
137 model_name=model_name,
138 model_api_key=model_api_key,
139 model_endpoint=model_endpoint,
140 )
141
142 async def run():
143 await clickhouse.initialize()
144 await executor.initialize()
145 await osprey.initialize()
146 async with asyncio.TaskGroup() as tg:
147 tg.create_task(agent.run())
148
149 try:
150 asyncio.run(run())
151 except KeyboardInterrupt:
152 logger.info("received keyboard interrupt")
153
154
155@cli.command(name="chat")
156@shared_options
157def chat(
158 clickhouse_host: str | None,
159 clickhouse_port: int | None,
160 clickhouse_user: str | None,
161 clickhouse_password: str | None,
162 clickhouse_database: str | None,
163 osprey_base_url: str | None,
164 osprey_repo_url: str | None,
165 osprey_ruleset_url: str | None,
166 model_api: Literal["anthropic", "openai", "openapi"] | None,
167 model_name: str | None,
168 model_api_key: str | None,
169 model_endpoint: str | None,
170):
171 clickhouse, osprey, executor, agent = build_services(
172 clickhouse_host=clickhouse_host,
173 clickhouse_port=clickhouse_port,
174 clickhouse_user=clickhouse_user,
175 clickhouse_password=clickhouse_password,
176 clickhouse_database=clickhouse_database,
177 osprey_base_url=osprey_base_url,
178 osprey_repo_url=osprey_repo_url or CONFIG.osprey_repo_url,
179 osprey_ruleset_url=osprey_ruleset_url or CONFIG.osprey_ruleset_url,
180 model_api=model_api,
181 model_name=model_name,
182 model_api_key=model_api_key,
183 model_endpoint=model_endpoint,
184 )
185
186 async def run():
187 await clickhouse.initialize()
188 await executor.initialize()
189 await osprey.initialize()
190 logger.info("Services initialized. Starting interactive chat.")
191 print("\nAgent ready. Type your message (Ctrl+C to exit).\n")
192
193 while True:
194 try:
195 user_input = input("You: ")
196 except EOFError:
197 break
198
199 if not user_input.strip():
200 continue
201
202 logger.info("User: %s", user_input)
203 response = await agent.chat(user_input)
204 print(f"\nAgent: {response}\n")
205
206 try:
207 asyncio.run(run())
208 except KeyboardInterrupt:
209 print("\nExiting.")
210
211
212if __name__ == "__main__":
213 cli()