A trust and safety agent that interacts with Osprey for investigation, real-time analysis, and prevention implementations
at main 213 lines 6.4 kB view raw
1import logging 2from collections.abc import Callable 3from typing import Literal 4 5import click 6import httpx 7from aiokafka.client import asyncio 8 9from src.agent.agent import Agent 10from src.clickhouse.clickhouse import Clickhouse 11from src.config import CONFIG 12from src.osprey.osprey import Osprey 13from src.ozone.ozone import Ozone 14from src.tools.executor import ToolExecutor 15from src.tools.registry import TOOL_REGISTRY, ToolContext 16 17logging.basicConfig( 18 level=logging.INFO, 19 format="%(asctime)s - %(levelname)s - %(message)s", 20) 21 22logger = logging.getLogger(__name__) 23 24# disable httpx verbose logging 25httpx_logger = logging.getLogger("httpx") 26httpx_logger.setLevel(logging.WARNING) 27 28 29SHARED_OPTIONS: list[Callable[..., Callable[..., object]]] = [ 30 click.option("--clickhouse-host"), 31 click.option("--clickhouse-port"), 32 click.option("--clickhouse-user"), 33 click.option("--clickhouse-password"), 34 click.option("--clickhouse-database"), 35 click.option("--osprey-base-url"), 36 click.option("--osprey-repo-url"), 37 click.option("--osprey-ruleset-url"), 38 click.option("--model-api"), 39 click.option("--model-name"), 40 click.option("--model-api-key"), 41 click.option("--model-endpoint"), 42] 43 44 45def shared_options[F: Callable[..., object]](func: F) -> F: 46 for option in reversed(SHARED_OPTIONS): 47 func = option(func) # type: ignore[assignment] 48 return func 49 50 51def build_services( 52 clickhouse_host: str | None, 53 clickhouse_port: int | None, 54 clickhouse_user: str | None, 55 clickhouse_password: str | None, 56 clickhouse_database: str | None, 57 osprey_base_url: str | None, 58 osprey_repo_url: str | None, 59 osprey_ruleset_url: str | None, 60 model_api: Literal["anthropic", "openai", "openapi"] | None, 61 model_name: str | None, 62 model_api_key: str | None, 63 model_endpoint: str | None, 64) -> tuple[Clickhouse, Osprey, ToolExecutor, Agent]: 65 http_client = httpx.AsyncClient() 66 67 clickhouse = Clickhouse( 68 host=clickhouse_host or CONFIG.clickhouse_host, 69 port=clickhouse_port or CONFIG.clickhouse_port, 70 user=clickhouse_user or CONFIG.clickhouse_user, 71 password=clickhouse_password or CONFIG.clickhouse_password, 72 database=clickhouse_database or CONFIG.clickhouse_database, 73 ) 74 75 osprey = Osprey( 76 http_client=http_client, 77 base_url=osprey_base_url or CONFIG.osprey_base_url, 78 osprey_repo_url=osprey_repo_url or CONFIG.osprey_repo_url, 79 osprey_ruleset_url=osprey_ruleset_url or CONFIG.osprey_ruleset_url, 80 ) 81 82 ozone = Ozone() 83 84 tool_context = ToolContext( 85 clickhouse=clickhouse, 86 osprey=osprey, 87 ozone=ozone, 88 ) 89 90 executor = ToolExecutor( 91 registry=TOOL_REGISTRY, 92 ctx=tool_context, 93 ) 94 95 agent = Agent( 96 model_api=model_api or CONFIG.model_api, 97 model_name=model_name or CONFIG.model_name, 98 model_api_key=model_api_key or CONFIG.model_api_key, 99 model_endpoint=model_endpoint or CONFIG.model_endpoint or None, 100 tool_executor=executor, 101 ) 102 103 return clickhouse, osprey, executor, agent 104 105 106@click.group() 107def cli(): 108 pass 109 110 111@cli.command() 112@shared_options 113def main( 114 clickhouse_host: str | None, 115 clickhouse_port: int | None, 116 clickhouse_user: str | None, 117 clickhouse_password: str | None, 118 clickhouse_database: str | None, 119 osprey_base_url: str | None, 120 osprey_repo_url: str | None, 121 osprey_ruleset_url: str | None, 122 model_api: Literal["anthropic", "openai", "openapi"] | None, 123 model_name: str | None, 124 model_api_key: str | None, 125 model_endpoint: str | None, 126): 127 clickhouse, osprey, executor, agent = build_services( 128 clickhouse_host=clickhouse_host, 129 clickhouse_port=clickhouse_port, 130 clickhouse_user=clickhouse_user, 131 clickhouse_password=clickhouse_password, 132 clickhouse_database=clickhouse_database, 133 osprey_base_url=osprey_base_url, 134 osprey_repo_url=osprey_repo_url or CONFIG.osprey_repo_url, 135 osprey_ruleset_url=osprey_ruleset_url or CONFIG.osprey_ruleset_url, 136 model_api=model_api, 137 model_name=model_name, 138 model_api_key=model_api_key, 139 model_endpoint=model_endpoint, 140 ) 141 142 async def run(): 143 await clickhouse.initialize() 144 await executor.initialize() 145 await osprey.initialize() 146 async with asyncio.TaskGroup() as tg: 147 tg.create_task(agent.run()) 148 149 try: 150 asyncio.run(run()) 151 except KeyboardInterrupt: 152 logger.info("received keyboard interrupt") 153 154 155@cli.command(name="chat") 156@shared_options 157def chat( 158 clickhouse_host: str | None, 159 clickhouse_port: int | None, 160 clickhouse_user: str | None, 161 clickhouse_password: str | None, 162 clickhouse_database: str | None, 163 osprey_base_url: str | None, 164 osprey_repo_url: str | None, 165 osprey_ruleset_url: str | None, 166 model_api: Literal["anthropic", "openai", "openapi"] | None, 167 model_name: str | None, 168 model_api_key: str | None, 169 model_endpoint: str | None, 170): 171 clickhouse, osprey, executor, agent = build_services( 172 clickhouse_host=clickhouse_host, 173 clickhouse_port=clickhouse_port, 174 clickhouse_user=clickhouse_user, 175 clickhouse_password=clickhouse_password, 176 clickhouse_database=clickhouse_database, 177 osprey_base_url=osprey_base_url, 178 osprey_repo_url=osprey_repo_url or CONFIG.osprey_repo_url, 179 osprey_ruleset_url=osprey_ruleset_url or CONFIG.osprey_ruleset_url, 180 model_api=model_api, 181 model_name=model_name, 182 model_api_key=model_api_key, 183 model_endpoint=model_endpoint, 184 ) 185 186 async def run(): 187 await clickhouse.initialize() 188 await executor.initialize() 189 await osprey.initialize() 190 logger.info("Services initialized. Starting interactive chat.") 191 print("\nAgent ready. Type your message (Ctrl+C to exit).\n") 192 193 while True: 194 try: 195 user_input = input("You: ") 196 except EOFError: 197 break 198 199 if not user_input.strip(): 200 continue 201 202 logger.info("User: %s", user_input) 203 response = await agent.chat(user_input) 204 print(f"\nAgent: {response}\n") 205 206 try: 207 asyncio.run(run()) 208 except KeyboardInterrupt: 209 print("\nExiting.") 210 211 212if __name__ == "__main__": 213 cli()