search for standard sites
pub-search.waow.tech
search
zig
blog
atproto
1const std = @import("std");
2const net = std.net;
3const posix = std.posix;
4const Thread = std.Thread;
5const logfire = @import("logfire");
6const db = @import("db/mod.zig");
7const tpuf = @import("tpuf.zig");
8const metrics = @import("metrics.zig");
9const server = @import("server.zig");
10const ingest = @import("ingest.zig");
11
12const MAX_HTTP_WORKERS = 16;
13const SOCKET_TIMEOUT_SECS = 5;
14
15pub fn main() !void {
16 var gpa = std.heap.GeneralPurposeAllocator(.{}){};
17 defer _ = gpa.deinit();
18 const allocator = gpa.allocator();
19
20 // configure logfire (reads LOGFIRE_WRITE_TOKEN from env)
21 _ = logfire.configure(.{
22 .service_name = "leaflet-search",
23 .service_version = "0.1.0",
24 .environment = posix.getenv("FLY_APP_NAME") orelse "development",
25 }) catch |err| {
26 std.debug.print("logfire init failed: {}, continuing without observability\n", .{err});
27 };
28
29 // start http server FIRST so Fly proxy doesn't timeout
30 const port: u16 = blk: {
31 const port_str = posix.getenv("PORT") orelse "3000";
32 break :blk std.fmt.parseInt(u16, port_str, 10) catch 3000;
33 };
34
35 const address = try net.Address.parseIp("0.0.0.0", port);
36 var listener = try address.listen(.{ .reuse_address = true });
37 defer listener.deinit();
38
39 const app_name = posix.getenv("APP_NAME") orelse "leaflet-search";
40 logfire.info("{s} listening on port {d} (max {d} workers)", .{ app_name, port, MAX_HTTP_WORKERS });
41
42 // init turso client synchronously (fast, needed for search fallback)
43 try db.initTurso();
44
45 // init thread pool for http connections
46 var pool: Thread.Pool = undefined;
47 try pool.init(.{
48 .allocator = allocator,
49 .n_jobs = MAX_HTTP_WORKERS,
50 });
51 defer pool.deinit();
52
53 // init local db and other services in background (slow)
54 const init_thread = try Thread.spawn(.{}, initServices, .{allocator});
55 init_thread.detach();
56
57 while (true) {
58 const conn = listener.accept() catch |err| {
59 logfire.err("accept error: {}", .{err});
60 continue;
61 };
62
63 setSocketTimeout(conn.stream.handle, SOCKET_TIMEOUT_SECS) catch |err| {
64 logfire.warn("failed to set socket timeout: {}", .{err});
65 };
66
67 const accepted_at = std.time.microTimestamp();
68 pool.spawn(server.handleConnection, .{ conn, accepted_at }) catch |err| {
69 logfire.err("pool spawn error: {}", .{err});
70 conn.stream.close();
71 };
72 }
73}
74
75fn initServices(allocator: std.mem.Allocator) void {
76 // init local db (slow - turso already initialized)
77 db.initLocalDb();
78 db.startSync();
79
80 // start activity tracker
81 metrics.activity.init();
82
83 // start stats buffer (background sync to Turso)
84 metrics.buffer.init();
85
86 // init vector store (reads TURBOPUFFER_API_KEY from env)
87 tpuf.init();
88
89 // start embedder (voyage-4-lite, 1024 dims, 1 worker)
90 ingest.embedder.start(allocator);
91
92 // start tap consumer
93 ingest.tap.consumer(allocator);
94}
95
96fn setSocketTimeout(fd: posix.fd_t, secs: u32) !void {
97 const timeout = std.mem.toBytes(posix.timeval{
98 .sec = @intCast(secs),
99 .usec = 0,
100 });
101 try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.RCVTIMEO, &timeout);
102 try posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.SNDTIMEO, &timeout);
103}