semantic bufo search find-bufo.com
bufo

feat(bot): use zat's jetstream failover instead of single host

stop overriding .hosts with a single endpoint — use zat's 12 default
jetstream instances with round-robin failover, exponential backoff, and
cursor rewind on host switch. PREFERRED_JETSTREAM env var optionally
prepends a preferred host to try first (e.g. for canary testing).

adds onConnect callback so the stats dashboard shows whichever instance
is currently connected, updating live on failover.

also wires up the backend image resize proxy for non-GIF uploads.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+57 -13
+4 -2
bot/src/config.zig
··· 4 pub const Config = struct { 5 bsky_handle: []const u8, 6 bsky_app_password: []const u8, 7 - jetstream_endpoint: []const u8, 8 min_phrase_words: u32, 9 posting_enabled: bool, 10 cooldown_minutes: u32, 11 exclude_patterns: []const u8, 12 stats_port: u16, 13 14 pub fn fromEnv() Config { 15 return .{ 16 .bsky_handle = posix.getenv("BSKY_HANDLE") orelse "find-bufo.com", 17 .bsky_app_password = posix.getenv("BSKY_APP_PASSWORD") orelse "", 18 - .jetstream_endpoint = posix.getenv("JETSTREAM_ENDPOINT") orelse "jetstream2.us-east.bsky.network", 19 .min_phrase_words = parseU32(posix.getenv("MIN_PHRASE_WORDS"), 4), 20 .posting_enabled = parseBool(posix.getenv("POSTING_ENABLED")), 21 .cooldown_minutes = parseU32(posix.getenv("COOLDOWN_MINUTES"), 120), 22 .exclude_patterns = posix.getenv("EXCLUDE_PATTERNS") orelse "what-have-you-done,what-have-i-done,sad,crying,cant-take", 23 .stats_port = parseU16(posix.getenv("STATS_PORT"), 8080), 24 }; 25 } 26 };
··· 4 pub const Config = struct { 5 bsky_handle: []const u8, 6 bsky_app_password: []const u8, 7 + preferred_jetstream: ?[]const u8, 8 min_phrase_words: u32, 9 posting_enabled: bool, 10 cooldown_minutes: u32, 11 exclude_patterns: []const u8, 12 stats_port: u16, 13 + backend_url: []const u8, 14 15 pub fn fromEnv() Config { 16 return .{ 17 .bsky_handle = posix.getenv("BSKY_HANDLE") orelse "find-bufo.com", 18 .bsky_app_password = posix.getenv("BSKY_APP_PASSWORD") orelse "", 19 + .preferred_jetstream = posix.getenv("PREFERRED_JETSTREAM"), 20 .min_phrase_words = parseU32(posix.getenv("MIN_PHRASE_WORDS"), 4), 21 .posting_enabled = parseBool(posix.getenv("POSTING_ENABLED")), 22 .cooldown_minutes = parseU32(posix.getenv("COOLDOWN_MINUTES"), 120), 23 .exclude_patterns = posix.getenv("EXCLUDE_PATTERNS") orelse "what-have-you-done,what-have-i-done,sad,crying,cant-take", 24 .stats_port = parseU16(posix.getenv("STATS_PORT"), 8080), 25 + .backend_url = posix.getenv("BACKEND_URL") orelse "https://find-bufo.com", 26 }; 27 } 28 };
+5
bot/src/jetstream.zig
··· 36 37 pub const PostHandler = struct { 38 callback: *const fn (Post) void, 39 40 pub fn onEvent(self: *PostHandler, event: zat.JetstreamEvent) void { 41 switch (event) { ··· 46 47 pub fn onError(_: *PostHandler, err: anyerror) void { 48 std.debug.print("jetstream error: {s}\n", .{@errorName(err)}); 49 } 50 51 fn handleCommit(self: *PostHandler, c: zat.jetstream.CommitEvent) void {
··· 36 37 pub const PostHandler = struct { 38 callback: *const fn (Post) void, 39 + on_connect: ?*const fn ([]const u8) void = null, 40 41 pub fn onEvent(self: *PostHandler, event: zat.JetstreamEvent) void { 42 switch (event) { ··· 47 48 pub fn onError(_: *PostHandler, err: anyerror) void { 49 std.debug.print("jetstream error: {s}\n", .{@errorName(err)}); 50 + } 51 + 52 + pub fn onConnect(self: *PostHandler, host: []const u8) void { 53 + if (self.on_connect) |cb| cb(host); 54 } 55 56 fn handleCommit(self: *PostHandler, c: zat.jetstream.CommitEvent) void {
+32 -8
bot/src/main.zig
··· 55 var bot_stats = stats.Stats.init(allocator); 56 defer bot_stats.deinit(); 57 bot_stats.setBufosLoaded(@intCast(m.count())); 58 - bot_stats.jetstream_endpoint = cfg.jetstream_endpoint; 59 60 // init state 61 var state = BotState{ ··· 76 }; 77 defer stats_thread.join(); 78 79 - // start jetstream consumer 80 - var handler = jetstream.PostHandler{ .callback = onPost }; 81 var client = zat.JetstreamClient.init(allocator, .{ 82 - .hosts = &.{cfg.jetstream_endpoint}, 83 .wanted_collections = &.{"app.bsky.feed.post"}, 84 }); 85 defer client.deinit(); 86 client.subscribe(&handler); 87 } 88 89 fn onPost(post: jetstream.Post) void { ··· 152 } 153 154 fn tryPost(state: *BotState, post: jetstream.Post, match: matcher.Match, now: i64) !void { 155 - // fetch bufo image 156 - const img_data = try state.bsky_client.fetchImage(match.url); 157 - defer state.allocator.free(img_data); 158 159 - const is_gif = mem.endsWith(u8, match.url, ".gif"); 160 161 // build alt text (name without extension, dashes to spaces) 162 var alt_buf: [128]u8 = undefined;
··· 55 var bot_stats = stats.Stats.init(allocator); 56 defer bot_stats.deinit(); 57 bot_stats.setBufosLoaded(@intCast(m.count())); 58 59 // init state 60 var state = BotState{ ··· 75 }; 76 defer stats_thread.join(); 77 78 + // start jetstream consumer (use zat defaults with optional preferred relay) 79 + var handler = jetstream.PostHandler{ .callback = onPost, .on_connect = onConnect }; 80 + 81 + // prepend preferred relay to default host list if set 82 + var hosts_buf: [1 + zat.jetstream.default_hosts.len][]const u8 = undefined; 83 + var hosts_len: usize = 0; 84 + if (cfg.preferred_jetstream) |host| { 85 + hosts_buf[0] = host; 86 + hosts_len = 1; 87 + } 88 + for (zat.jetstream.default_hosts) |h| { 89 + hosts_buf[hosts_len] = h; 90 + hosts_len += 1; 91 + } 92 + 93 var client = zat.JetstreamClient.init(allocator, .{ 94 + .hosts = hosts_buf[0..hosts_len], 95 .wanted_collections = &.{"app.bsky.feed.post"}, 96 }); 97 defer client.deinit(); 98 client.subscribe(&handler); 99 + } 100 + 101 + fn onConnect(host: []const u8) void { 102 + const state = global_state orelse return; 103 + std.debug.print("connected to jetstream: {s}\n", .{host}); 104 + state.stats.setJetstreamHost(host); 105 } 106 107 fn onPost(post: jetstream.Post) void { ··· 170 } 171 172 fn tryPost(state: *BotState, post: jetstream.Post, match: matcher.Match, now: i64) !void { 173 + // fetch bufo image (route non-GIF images through resize proxy) 174 + const is_gif = mem.endsWith(u8, match.url, ".gif"); 175 176 + var url_buf: [1024]u8 = undefined; 177 + const fetch_url = if (is_gif) 178 + match.url 179 + else 180 + std.fmt.bufPrint(&url_buf, "{s}/api/image?url={s}&max_bytes=900000", .{ state.config.backend_url, match.url }) catch match.url; 181 + 182 + const img_data = try state.bsky_client.fetchImage(fetch_url); 183 + defer state.allocator.free(img_data); 184 185 // build alt text (name without extension, dashes to spaces) 186 var alt_buf: [128]u8 = undefined;
+15 -2
bot/src/stats.zig
··· 19 blocks_respected: std.atomic.Value(u64) = .init(0), 20 errors: std.atomic.Value(u64) = .init(0), 21 bufos_loaded: u64 = 0, 22 - jetstream_endpoint: []const u8 = "", 23 24 // track per-bufo match counts: name -> {count, url} 25 bufo_matches: std.StringHashMap(BufoMatchData), ··· 307 self.bufos_loaded = count; 308 } 309 310 fn formatUptime(seconds: i64, buf: []u8) []const u8 { 311 const s: u64 = @intCast(@max(0, seconds)); 312 const days = s / 86400; ··· 396 const html = try std.fmt.allocPrint(allocator, template.html, .{ 397 uptime, 398 uptime_str, 399 - self.jetstream_endpoint, 400 self.posts_checked.load(.monotonic), 401 self.posts_checked.load(.monotonic), 402 self.matches_found.load(.monotonic),
··· 19 blocks_respected: std.atomic.Value(u64) = .init(0), 20 errors: std.atomic.Value(u64) = .init(0), 21 bufos_loaded: u64 = 0, 22 + jetstream_host_buf: [256]u8 = undefined, 23 + jetstream_host_len: std.atomic.Value(usize) = .init(0), 24 25 // track per-bufo match counts: name -> {count, url} 26 bufo_matches: std.StringHashMap(BufoMatchData), ··· 308 self.bufos_loaded = count; 309 } 310 311 + pub fn setJetstreamHost(self: *Stats, host: []const u8) void { 312 + const len = @min(host.len, self.jetstream_host_buf.len); 313 + @memcpy(self.jetstream_host_buf[0..len], host[0..len]); 314 + self.jetstream_host_len.store(len, .release); 315 + } 316 + 317 + pub fn getJetstreamHost(self: *Stats) []const u8 { 318 + const len = self.jetstream_host_len.load(.acquire); 319 + if (len == 0) return "(connecting...)"; 320 + return self.jetstream_host_buf[0..len]; 321 + } 322 + 323 fn formatUptime(seconds: i64, buf: []u8) []const u8 { 324 const s: u64 = @intCast(@max(0, seconds)); 325 const days = s / 86400; ··· 409 const html = try std.fmt.allocPrint(allocator, template.html, .{ 410 uptime, 411 uptime_str, 412 + self.getJetstreamHost(), 413 self.posts_checked.load(.monotonic), 414 self.posts_checked.load(.monotonic), 415 self.matches_found.load(.monotonic),
+1 -1
bot/src/stats_template.zig
··· 181 \\ <span class="stat-value" id="uptime" data-seconds="{}">{s}</span> 182 \\</div> 183 \\<div class="stat"> 184 - \\ <span class="stat-label">relay</span> 185 \\ <span class="stat-value">{s}</span> 186 \\</div> 187 \\<div class="stat">
··· 181 \\ <span class="stat-value" id="uptime" data-seconds="{}">{s}</span> 182 \\</div> 183 \\<div class="stat"> 184 + \\ <span class="stat-label">jetstream</span> 185 \\ <span class="stat-value">{s}</span> 186 \\</div> 187 \\<div class="stat">