Auto-indexing service and GraphQL API for AT Protocol Records quickslice.slices.network/
atproto gleam graphql
at main 524 lines 16 kB view raw
1import activity_cleanup 2import backfill 3import backfill_state 4import database/connection 5import database/executor.{type Executor} 6import database/repositories/config as config_repo 7import database/repositories/oauth_clients 8import dotenv_gleam 9import envoy 10import gleam/erlang/process 11import gleam/http as gleam_http 12import gleam/http/request 13import gleam/int 14import gleam/option 15import gleam/string 16import gleam/uri 17import handlers/admin_graphql as admin_graphql_handler 18import handlers/admin_oauth_authorize as admin_oauth_authorize_handler 19import handlers/admin_oauth_callback as admin_oauth_callback_handler 20import handlers/graphiql as graphiql_handler 21import handlers/graphql as graphql_handler 22import handlers/graphql_ws as graphql_ws_handler 23import handlers/health as health_handler 24import handlers/index as index_handler 25import handlers/logout as logout_handler 26import handlers/mcp as mcp_handler 27import handlers/oauth/atp_callback as oauth_atp_callback_handler 28import handlers/oauth/atp_session as oauth_atp_session_handler 29import handlers/oauth/authorize as oauth_authorize_handler 30import handlers/oauth/client_metadata as oauth_client_metadata_handler 31import handlers/oauth/dpop_nonce as oauth_dpop_nonce_handler 32import handlers/oauth/jwks as oauth_jwks_handler 33import handlers/oauth/metadata as oauth_metadata_handler 34import handlers/oauth/par as oauth_par_handler 35import handlers/oauth/register as oauth_register_handler 36import handlers/oauth/token as oauth_token_handler 37import jetstream_consumer 38import lib/oauth/did_cache 39import logging 40import mist 41import pubsub 42import stats_pubsub 43import wisp 44import wisp/wisp_mist 45 46pub type Context { 47 Context( 48 db: Executor, 49 external_base_url: String, 50 backfill_state: process.Subject(backfill_state.Message), 51 jetstream_consumer: option.Option( 52 process.Subject(jetstream_consumer.ManagerMessage), 53 ), 54 did_cache: process.Subject(did_cache.Message), 55 oauth_signing_key: option.Option(String), 56 oauth_loopback_mode: Bool, 57 /// AT Protocol client_id for OAuth (metadata URL or loopback client_id) 58 atp_client_id: String, 59 ) 60} 61 62pub fn main() { 63 // Initialize logging 64 logging.configure() 65 logging.set_level(logging.Info) 66 67 // Load environment variables from .env file 68 let _ = dotenv_gleam.config() 69 70 // Get database URL from environment variable or use default 71 let database_url = case envoy.get("DATABASE_URL") { 72 Ok(url) -> url 73 Error(_) -> "quickslice.db" 74 } 75 76 // Connect to the database 77 // Note: Schema migrations must be run externally using dbmate before starting 78 let assert Ok(db) = connection.connect(database_url) 79 80 // Initialize config defaults 81 let _ = config_repo.initialize_config_defaults(db) 82 83 // Ensure the internal admin OAuth client exists (for admin UI authentication) 84 let _ = oauth_clients.ensure_admin_client(db) 85 86 // Initialize HTTP connection pool for backfill/DID resolution 87 backfill.configure_hackney_pool(150) 88 89 // Initialize PubSub registry for subscriptions 90 pubsub.start() 91 logging.log(logging.Info, "[server] PubSub registry initialized") 92 93 // Initialize Stats PubSub registry for real-time stats 94 stats_pubsub.start() 95 logging.log(logging.Info, "[server] Stats PubSub registry initialized") 96 97 // Start activity cleanup scheduler 98 case activity_cleanup.start(db) { 99 Ok(_cleanup_subject) -> 100 logging.log( 101 logging.Info, 102 "[server] Activity cleanup scheduler started (runs hourly)", 103 ) 104 Error(err) -> 105 logging.log( 106 logging.Warning, 107 "[server] Failed to start activity cleanup scheduler: " 108 <> string.inspect(err), 109 ) 110 } 111 112 // Start Jetstream consumer in background 113 let jetstream_subject = case jetstream_consumer.start(db) { 114 Ok(subject) -> option.Some(subject) 115 Error(err) -> { 116 logging.log( 117 logging.Error, 118 "[server] Failed to start Jetstream consumer: " <> err, 119 ) 120 logging.log( 121 logging.Warning, 122 "[server] Server will continue without real-time indexing", 123 ) 124 option.None 125 } 126 } 127 128 logging.log(logging.Info, "") 129 logging.log(logging.Info, "[server] === quickslice ===") 130 logging.log(logging.Info, "") 131 132 // Start server immediately (this blocks) 133 start_server(db, jetstream_subject) 134} 135 136fn start_server( 137 db: Executor, 138 jetstream_subject: option.Option( 139 process.Subject(jetstream_consumer.ManagerMessage), 140 ), 141) { 142 wisp.configure_logger() 143 144 // Get priv directory for serving static files 145 let assert Ok(priv_directory) = wisp.priv_directory("server") 146 let static_directory = priv_directory <> "/static" 147 148 // Get secret_key_base from environment or generate one 149 let secret_key_base = case envoy.get("SECRET_KEY_BASE") { 150 Ok(key) -> { 151 logging.log( 152 logging.Info, 153 "[server] Using SECRET_KEY_BASE from environment", 154 ) 155 key 156 } 157 Error(_) -> { 158 logging.log( 159 logging.Warning, 160 "[server] WARNING: SECRET_KEY_BASE not set, generating random key", 161 ) 162 logging.log( 163 logging.Warning, 164 "[server] Sessions will be invalidated on server restart. Set SECRET_KEY_BASE in .env for persistence.", 165 ) 166 wisp.random_string(64) 167 } 168 } 169 170 // Get HOST and PORT from environment variables or use defaults 171 let host = case envoy.get("HOST") { 172 Ok(h) -> h 173 Error(_) -> "localhost" 174 } 175 176 let port = case envoy.get("PORT") { 177 Ok(p) -> 178 case int.parse(p) { 179 Ok(port_num) -> port_num 180 Error(_) -> 8080 181 } 182 Error(_) -> 8080 183 } 184 185 // Determine external base URL from EXTERNAL_BASE_URL environment variable 186 let external_base_url = case envoy.get("EXTERNAL_BASE_URL") { 187 Ok(base_url) -> base_url 188 Error(_) -> "http://" <> host <> ":" <> int.to_string(port) 189 } 190 191 // Get OAuth signing key from environment variable (multibase format) 192 let oauth_signing_key = case envoy.get("OAUTH_SIGNING_KEY") { 193 Ok(key) if key != "" -> { 194 logging.log( 195 logging.Info, 196 "[oauth] Using OAUTH_SIGNING_KEY from environment", 197 ) 198 option.Some(key) 199 } 200 _ -> { 201 logging.log( 202 logging.Warning, 203 "[oauth] OAUTH_SIGNING_KEY not set, JWT signing and JWKS will be unavailable", 204 ) 205 option.None 206 } 207 } 208 209 // Get OAuth loopback mode from environment variable 210 // When true, uses loopback client IDs (http://localhost/?redirect_uri=...) 211 // instead of client metadata URLs, allowing local development without ngrok 212 let oauth_loopback_mode = case envoy.get("OAUTH_LOOPBACK_MODE") { 213 Ok("true") -> { 214 logging.log( 215 logging.Info, 216 "[oauth] Loopback mode enabled - using loopback client IDs", 217 ) 218 True 219 } 220 _ -> False 221 } 222 223 // Start backfill state actor to track backfill status across requests 224 let assert Ok(backfill_state_subject) = backfill_state.start() 225 logging.log(logging.Info, "[server] Backfill state actor initialized") 226 227 // Start DID cache actor 228 let assert Ok(did_cache_subject) = did_cache.start() 229 logging.log(logging.Info, "[server] DID cache actor initialized") 230 231 // Compute ATP client_id once (used for token refresh) 232 let atp_client_id = case oauth_loopback_mode { 233 True -> 234 build_loopback_client_id( 235 external_base_url <> "/oauth/atp/callback", 236 "atproto transition:generic", 237 ) 238 False -> external_base_url <> "/oauth-client-metadata.json" 239 } 240 241 let ctx = 242 Context( 243 db: db, 244 external_base_url: external_base_url, 245 backfill_state: backfill_state_subject, 246 jetstream_consumer: jetstream_subject, 247 did_cache: did_cache_subject, 248 oauth_signing_key: oauth_signing_key, 249 oauth_loopback_mode: oauth_loopback_mode, 250 atp_client_id: atp_client_id, 251 ) 252 253 let handler = fn(req) { handle_request(req, ctx, static_directory) } 254 255 logging.log( 256 logging.Info, 257 "[server] Server started on http://" <> host <> ":" <> int.to_string(port), 258 ) 259 260 // Create Wisp handler converted to Mist format 261 let wisp_handler = wisp_mist.handler(handler, secret_key_base) 262 263 // Wrap it to intercept WebSocket upgrades for GraphQL subscriptions 264 let mist_handler = fn(req: request.Request(mist.Connection)) { 265 let upgrade_header = request.get_header(req, "upgrade") 266 let path = request.path_segments(req) 267 268 case path { 269 // GraphQL WebSocket for subscriptions 270 ["graphql"] | ["", "graphql"] -> { 271 case upgrade_header { 272 Ok(upgrade_value) -> { 273 case string.lowercase(upgrade_value) { 274 "websocket" -> { 275 logging.log( 276 logging.Info, 277 "[server] Handling WebSocket upgrade for /graphql", 278 ) 279 let domain_authority = case 280 config_repo.get(ctx.db, "domain_authority") 281 { 282 Ok(authority) -> authority 283 Error(_) -> "" 284 } 285 graphql_ws_handler.handle_websocket( 286 req, 287 ctx.db, 288 ctx.did_cache, 289 ctx.oauth_signing_key, 290 ctx.atp_client_id, 291 config_repo.get_plc_directory_url(ctx.db), 292 domain_authority, 293 ) 294 } 295 _ -> wisp_handler(req) 296 } 297 } 298 _ -> wisp_handler(req) 299 } 300 } 301 302 _ -> wisp_handler(req) 303 } 304 } 305 306 let assert Ok(_) = 307 mist.new(mist_handler) 308 |> mist.bind(host) 309 |> mist.port(port) 310 |> mist.start 311 312 process.sleep_forever() 313} 314 315/// Build a loopback client ID for OAuth with native apps 316/// Format: http://localhost/?redirect_uri=...&scope=... 317/// Per RFC 8252, redirect_uri must use 127.0.0.1 (not localhost) 318fn build_loopback_client_id(redirect_uri: String, scope: String) -> String { 319 "http://localhost/?redirect_uri=" 320 <> uri.percent_encode(redirect_uri) 321 <> "&scope=" 322 <> uri.percent_encode(scope) 323} 324 325fn handle_request( 326 req: wisp.Request, 327 ctx: Context, 328 static_directory: String, 329) -> wisp.Response { 330 use _req <- middleware(req, static_directory) 331 332 let segments = wisp.path_segments(req) 333 334 case segments { 335 [] -> index_handler.handle() 336 ["health"] -> health_handler.handle(ctx.db) 337 ["logout"] -> logout_handler.handle(req, ctx.db) 338 ["admin", "oauth", "authorize"] -> { 339 let redirect_uri = ctx.external_base_url <> "/admin/oauth/callback" 340 let client_id = case ctx.oauth_loopback_mode { 341 True -> 342 build_loopback_client_id( 343 redirect_uri, 344 config_repo.get_oauth_supported_scopes(ctx.db), 345 ) 346 False -> ctx.external_base_url <> "/oauth-client-metadata.json" 347 } 348 admin_oauth_authorize_handler.handle( 349 req, 350 ctx.db, 351 ctx.did_cache, 352 redirect_uri, 353 client_id, 354 ctx.oauth_signing_key, 355 config_repo.get_oauth_supported_scopes_list(ctx.db), 356 ) 357 } 358 ["admin", "oauth", "callback"] -> { 359 let redirect_uri = ctx.external_base_url <> "/admin/oauth/callback" 360 let client_id = case ctx.oauth_loopback_mode { 361 True -> 362 build_loopback_client_id( 363 redirect_uri, 364 config_repo.get_oauth_supported_scopes(ctx.db), 365 ) 366 False -> ctx.external_base_url <> "/oauth-client-metadata.json" 367 } 368 admin_oauth_callback_handler.handle( 369 req, 370 ctx.db, 371 ctx.did_cache, 372 redirect_uri, 373 client_id, 374 ctx.oauth_signing_key, 375 ) 376 } 377 ["admin", "graphql"] -> 378 admin_graphql_handler.handle_admin_graphql_request( 379 req, 380 ctx.db, 381 ctx.jetstream_consumer, 382 ctx.did_cache, 383 config_repo.get_oauth_supported_scopes_list(ctx.db), 384 ctx.backfill_state, 385 ) 386 ["graphql"] -> 387 graphql_handler.handle_graphql_request( 388 req, 389 ctx.db, 390 ctx.did_cache, 391 ctx.oauth_signing_key, 392 ctx.atp_client_id, 393 config_repo.get_plc_directory_url(ctx.db), 394 ) 395 ["graphiql"] -> 396 graphiql_handler.handle_graphiql_request(req, ctx.db, ctx.did_cache) 397 ["graphiql", "admin"] -> 398 graphiql_handler.handle_admin_graphiql_request(req, ctx.db, ctx.did_cache) 399 // MCP endpoint for AI assistant introspection 400 ["mcp"] -> { 401 let mcp_ctx = 402 mcp_handler.McpContext( 403 db: ctx.db, 404 external_base_url: ctx.external_base_url, 405 did_cache: ctx.did_cache, 406 signing_key: ctx.oauth_signing_key, 407 plc_url: config_repo.get_plc_directory_url(ctx.db), 408 supported_scopes: config_repo.get_oauth_supported_scopes_list(ctx.db), 409 ) 410 mcp_handler.handle(req, mcp_ctx) 411 } 412 // New OAuth 2.0 endpoints 413 [".well-known", "oauth-authorization-server"] -> 414 oauth_metadata_handler.handle( 415 ctx.external_base_url, 416 config_repo.get_oauth_supported_scopes_list(ctx.db), 417 ) 418 [".well-known", "jwks.json"] -> 419 oauth_jwks_handler.handle(ctx.oauth_signing_key) 420 ["oauth-client-metadata.json"] -> 421 oauth_client_metadata_handler.handle( 422 ctx.external_base_url, 423 "Quickslice Server", 424 [ 425 ctx.external_base_url <> "/admin/oauth/callback", 426 ctx.external_base_url <> "/oauth/atp/callback", 427 ], 428 config_repo.get_oauth_supported_scopes(ctx.db), 429 option.None, 430 option.Some(ctx.external_base_url <> "/.well-known/jwks.json"), 431 ) 432 ["oauth", "dpop", "nonce"] -> oauth_dpop_nonce_handler.handle(ctx.db) 433 ["oauth", "register"] -> oauth_register_handler.handle(req, ctx.db) 434 ["oauth", "par"] -> oauth_par_handler.handle(req, ctx.db) 435 ["oauth", "authorize"] -> { 436 let redirect_uri = ctx.external_base_url <> "/oauth/atp/callback" 437 let client_id = case ctx.oauth_loopback_mode { 438 True -> 439 build_loopback_client_id( 440 redirect_uri, 441 config_repo.get_oauth_supported_scopes(ctx.db), 442 ) 443 False -> ctx.external_base_url <> "/oauth-client-metadata.json" 444 } 445 oauth_authorize_handler.handle( 446 req, 447 ctx.db, 448 ctx.did_cache, 449 redirect_uri, 450 client_id, 451 ctx.oauth_signing_key, 452 ) 453 } 454 455 ["oauth", "token"] -> 456 oauth_token_handler.handle(req, ctx.db, ctx.external_base_url) 457 ["oauth", "atp", "callback"] -> { 458 let redirect_uri = ctx.external_base_url <> "/oauth/atp/callback" 459 let client_id = case ctx.oauth_loopback_mode { 460 True -> 461 build_loopback_client_id( 462 redirect_uri, 463 config_repo.get_oauth_supported_scopes(ctx.db), 464 ) 465 False -> ctx.external_base_url <> "/oauth-client-metadata.json" 466 } 467 oauth_atp_callback_handler.handle( 468 req, 469 ctx.db, 470 ctx.did_cache, 471 redirect_uri, 472 client_id, 473 ctx.oauth_signing_key, 474 ) 475 } 476 ["api", "atp", "sessions", session_id] -> 477 oauth_atp_session_handler.handle(req, ctx.db, session_id) 478 // Fallback: serve SPA index.html for client-side routing 479 _ -> index_handler.handle() 480 } 481} 482 483fn middleware( 484 req: wisp.Request, 485 static_directory: String, 486 handle_request: fn(wisp.Request) -> wisp.Response, 487) -> wisp.Response { 488 use <- wisp.rescue_crashes 489 use <- wisp.log_request(req) 490 use req <- wisp.handle_head(req) 491 use <- wisp.serve_static(req, under: "/", from: static_directory) 492 493 // Get origin from request headers 494 let origin = case request.get_header(req, "origin") { 495 Ok(o) -> o 496 Error(_) -> "http://localhost:8080" 497 } 498 499 // Handle CORS preflight requests 500 case req.method { 501 gleam_http.Options -> { 502 wisp.response(200) 503 |> wisp.set_header("access-control-allow-origin", origin) 504 |> wisp.set_header("access-control-allow-credentials", "true") 505 |> wisp.set_header("access-control-allow-methods", "GET, POST, OPTIONS") 506 |> wisp.set_header( 507 "access-control-allow-headers", 508 "Content-Type, Authorization, DPoP", 509 ) 510 |> wisp.set_body(wisp.Text("")) 511 } 512 _ -> { 513 // Add CORS headers to all responses 514 handle_request(req) 515 |> wisp.set_header("access-control-allow-origin", origin) 516 |> wisp.set_header("access-control-allow-credentials", "true") 517 |> wisp.set_header("access-control-allow-methods", "GET, POST, OPTIONS") 518 |> wisp.set_header( 519 "access-control-allow-headers", 520 "Content-Type, Authorization, DPoP", 521 ) 522 } 523 } 524}