search for standard sites pub-search.waow.tech
search zig blog atproto
at 237b0322f9d0171f5ced5a04a72be7c5380ca0ca 103 lines 3.3 kB view raw
1const std = @import("std"); 2const net = std.net; 3const posix = std.posix; 4const Thread = std.Thread; 5const logfire = @import("logfire"); 6const db = @import("db/mod.zig"); 7const tpuf = @import("tpuf.zig"); 8const metrics = @import("metrics.zig"); 9const server = @import("server.zig"); 10const ingest = @import("ingest.zig"); 11 12const MAX_HTTP_WORKERS = 16; 13const SOCKET_TIMEOUT_SECS = 5; 14 15pub fn main() !void { 16 var gpa = std.heap.GeneralPurposeAllocator(.{}){}; 17 defer _ = gpa.deinit(); 18 const allocator = gpa.allocator(); 19 20 // configure logfire (reads LOGFIRE_WRITE_TOKEN from env) 21 _ = logfire.configure(.{ 22 .service_name = "leaflet-search", 23 .service_version = "0.1.0", 24 .environment = posix.getenv("FLY_APP_NAME") orelse "development", 25 }) catch |err| { 26 std.debug.print("logfire init failed: {}, continuing without observability\n", .{err}); 27 }; 28 29 // start http server FIRST so Fly proxy doesn't timeout 30 const port: u16 = blk: { 31 const port_str = posix.getenv("PORT") orelse "3000"; 32 break :blk std.fmt.parseInt(u16, port_str, 10) catch 3000; 33 }; 34 35 const address = try net.Address.parseIp("0.0.0.0", port); 36 var listener = try address.listen(.{ .reuse_address = true }); 37 defer listener.deinit(); 38 39 const app_name = posix.getenv("APP_NAME") orelse "leaflet-search"; 40 logfire.info("{s} listening on port {d} (max {d} workers)", .{ app_name, port, MAX_HTTP_WORKERS }); 41 42 // init turso client synchronously (fast, needed for search fallback) 43 try db.initTurso(); 44 45 // init thread pool for http connections 46 var pool: Thread.Pool = undefined; 47 try pool.init(.{ 48 .allocator = allocator, 49 .n_jobs = MAX_HTTP_WORKERS, 50 }); 51 defer pool.deinit(); 52 53 // init local db and other services in background (slow) 54 const init_thread = try Thread.spawn(.{}, initServices, .{allocator}); 55 init_thread.detach(); 56 57 while (true) { 58 const conn = listener.accept() catch |err| { 59 logfire.err("accept error: {}", .{err}); 60 continue; 61 }; 62 63 setSocketTimeout(conn.stream.handle, SOCKET_TIMEOUT_SECS) catch |err| { 64 logfire.warn("failed to set socket timeout: {}", .{err}); 65 }; 66 67 const accepted_at = std.time.microTimestamp(); 68 pool.spawn(server.handleConnection, .{ conn, accepted_at }) catch |err| { 69 logfire.err("pool spawn error: {}", .{err}); 70 conn.stream.close(); 71 }; 72 } 73} 74 75fn initServices(allocator: std.mem.Allocator) void { 76 // init local db (slow - turso already initialized) 77 db.initLocalDb(); 78 db.startSync(); 79 80 // start activity tracker 81 metrics.activity.init(); 82 83 // start stats buffer (background sync to Turso) 84 metrics.buffer.init(); 85 86 // init vector store (reads TURBOPUFFER_API_KEY from env) 87 tpuf.init(); 88 89 // start embedder (voyage-4-lite, 1024 dims, 1 worker) 90 ingest.embedder.start(allocator); 91 92 // start tap consumer 93 ingest.tap.consumer(allocator); 94} 95 96fn setSocketTimeout(fd: posix.fd_t, secs: u32) !void { 97 const timeout = std.mem.toBytes(posix.timeval{ 98 .sec = @intCast(secs), 99 .usec = 0, 100 }); 101 try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.RCVTIMEO, &timeout); 102 try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.SNDTIMEO, &timeout); 103}