an experimental irc client

client: refactor read thread

+138 -279
+13 -10
src/app.zig
··· 163 163 164 164 // clean up clients 165 165 { 166 - for (self.clients.items, 0..) |_, i| { 167 - var client = self.clients.items[i]; 166 + // Loop first to close connections. This will help us close faster by getting the 167 + // threads exited 168 + for (self.clients.items) |client| { 169 + client.close(); 170 + } 171 + for (self.clients.items) |client| { 168 172 client.deinit(); 169 - if (builtin.mode == .Debug) { 170 - // We only clean up clients in Debug mode so we can check for memory leaks 171 - // without failing for this. We don't care about it in any other mode since we 172 - // are exiting anyways and we want to do it fast. If we destroy, our readthread 173 - // could panic so we don't do it unless we have to. 174 - self.alloc.destroy(client); 175 - } 173 + self.alloc.destroy(client); 176 174 } 177 175 self.clients.deinit(); 178 176 } ··· 227 225 }, 228 226 .tick => { 229 227 for (self.clients.items) |client| { 228 + if (client.status.load(.unordered) == .disconnected and 229 + client.retry_delay_s == 0) 230 + { 231 + ctx.redraw = true; 232 + try irc.Client.retryTickHandler(client, ctx, .tick); 233 + } 230 234 client.drainFifo(ctx); 231 235 } 232 236 try ctx.tick(8, self.widget()); ··· 947 951 pub fn connect(self: *App, cfg: irc.Client.Config) !void { 948 952 const client = try self.alloc.create(irc.Client); 949 953 client.* = try irc.Client.init(self.alloc, self, &self.write_queue, cfg); 950 - client.thread = try std.Thread.spawn(.{}, irc.Client.readLoop, .{client}); 951 954 try self.clients.append(client); 952 955 } 953 956
+1 -20
src/comlink.zig
··· 7 7 8 8 pub const App = app.App; 9 9 pub const Completer = completer.Completer; 10 - pub const EventLoop = vaxis.Loop(Event); 11 - pub const WriteQueue = vaxis.Queue(WriteEvent, 256); 10 + pub const WriteQueue = vaxis.Queue(WriteEvent, 32); 12 11 13 12 pub const Bind = struct { 14 13 key: vaxis.Key, ··· 70 69 else => false, 71 70 }; 72 71 } 73 - }; 74 - 75 - /// Any event our application will handle 76 - pub const Event = union(enum) { 77 - key_press: vaxis.Key, 78 - mouse: vaxis.Mouse, 79 - winsize: vaxis.Winsize, 80 - focus_out, 81 - irc: IrcEvent, 82 - connect: irc.Client.Config, 83 - redraw, 84 - paste_start, 85 - paste_end, 86 - }; 87 - 88 - pub const IrcEvent = struct { 89 - client: *irc.Client, 90 - msg: irc.Slice, 91 72 }; 92 73 93 74 /// An event our write thread will handle
+118 -133
src/irc.zig
··· 4 4 const tls = @import("tls"); 5 5 const vaxis = @import("vaxis"); 6 6 const zeit = @import("zeit"); 7 - const bytepool = @import("pool.zig"); 8 7 9 8 const Scrollbar = @import("Scrollbar.zig"); 10 9 const testing = std.testing; ··· 13 12 14 13 const Allocator = std.mem.Allocator; 15 14 const Base64Encoder = std.base64.standard.Encoder; 16 - pub const MessagePool = bytepool.BytePool(max_raw_msg_size * 4); 17 - pub const Slice = MessagePool.Slice; 18 15 19 16 const assert = std.debug.assert; 20 17 ··· 30 27 client: *Client, 31 28 channel: *Channel, 32 29 }; 33 - 34 - pub const Event = comlink.IrcEvent; 35 30 36 31 pub const Command = enum { 37 32 RPL_WELCOME, // 001 ··· 1207 1202 prefix: []const u8 = "", 1208 1203 }; 1209 1204 1205 + pub const Status = enum(u8) { 1206 + disconnected, 1207 + connecting, 1208 + connected, 1209 + }; 1210 + 1210 1211 alloc: std.mem.Allocator, 1211 1212 app: *comlink.App, 1212 1213 client: tls.Connection(std.net.Stream), ··· 1217 1218 users: std.StringHashMap(*User), 1218 1219 1219 1220 should_close: bool = false, 1220 - status: enum { 1221 - connected, 1222 - disconnected, 1223 - } = .disconnected, 1221 + status: std.atomic.Value(Status), 1224 1222 1225 1223 caps: Capabilities = .{}, 1226 1224 supports: ISupport = .{}, ··· 1231 1229 thread: ?std.Thread = null, 1232 1230 1233 1231 redraw: std.atomic.Value(bool), 1234 - fifo: std.fifo.LinearFifo(Event, .Dynamic), 1235 - fifo_mutex: std.Thread.Mutex, 1232 + read_buf_mutex: std.Thread.Mutex, 1233 + read_buf: std.ArrayList(u8), 1236 1234 1237 1235 has_mouse: bool, 1236 + retry_delay_s: u8, 1238 1237 1239 1238 pub fn init( 1240 1239 alloc: std.mem.Allocator, ··· 1252 1251 .users = std.StringHashMap(*User).init(alloc), 1253 1252 .batches = std.StringHashMap(*Channel).init(alloc), 1254 1253 .write_queue = wq, 1254 + .status = std.atomic.Value(Status).init(.disconnected), 1255 1255 .redraw = std.atomic.Value(bool).init(false), 1256 - .fifo = std.fifo.LinearFifo(Event, .Dynamic).init(alloc), 1257 - .fifo_mutex = .{}, 1256 + .read_buf_mutex = .{}, 1257 + .read_buf = std.ArrayList(u8).init(alloc), 1258 1258 .has_mouse = false, 1259 + .retry_delay_s = 0, 1259 1260 }; 1260 1261 } 1261 1262 1263 + /// Closes the connection 1264 + pub fn close(self: *Client) void { 1265 + self.should_close = true; 1266 + if (self.status.load(.unordered) == .disconnected) return; 1267 + if (self.config.tls) { 1268 + self.client.close() catch {}; 1269 + } 1270 + self.stream.close(); 1271 + } 1272 + 1262 1273 pub fn deinit(self: *Client) void { 1263 - self.should_close = true; 1264 - if (self.status == .connected) { 1265 - self.write("PING comlink\r\n") catch |err| 1266 - log.err("couldn't close tls conn: {}", .{err}); 1267 - if (self.thread) |thread| { 1268 - thread.detach(); 1269 - self.thread = null; 1270 - } 1274 + if (self.thread) |thread| { 1275 + thread.join(); 1276 + self.thread = null; 1271 1277 } 1272 1278 // id gets allocated in the main thread. We need to deallocate it here if 1273 1279 // we have one ··· 1295 1301 self.alloc.free(key.*); 1296 1302 } 1297 1303 batches.deinit(); 1298 - self.fifo.deinit(); 1304 + self.read_buf.deinit(); 1305 + } 1306 + 1307 + fn retryWidget(self: *Client) vxfw.Widget { 1308 + return .{ 1309 + .userdata = self, 1310 + .eventHandler = Client.retryTickHandler, 1311 + .drawFn = Client.typeErasedDrawNameSelected, 1312 + }; 1313 + } 1314 + 1315 + pub fn retryTickHandler(ptr: *anyopaque, ctx: *vxfw.EventContext, event: vxfw.Event) anyerror!void { 1316 + const self: *Client = @ptrCast(@alignCast(ptr)); 1317 + switch (event) { 1318 + .tick => { 1319 + const status = self.status.load(.unordered); 1320 + switch (status) { 1321 + .disconnected => { 1322 + // Clean up a thread if we have one 1323 + if (self.thread) |thread| { 1324 + thread.join(); 1325 + self.thread = null; 1326 + } 1327 + self.status.store(.connecting, .unordered); 1328 + self.thread = try std.Thread.spawn(.{}, Client.readThread, .{self}); 1329 + }, 1330 + .connecting => {}, 1331 + .connected => { 1332 + // Reset the delay 1333 + self.retry_delay_s = 0; 1334 + return; 1335 + }, 1336 + } 1337 + // Increment the retry and try again 1338 + self.retry_delay_s = @max(self.retry_delay_s <<| 1, 1); 1339 + log.debug("retry in {d} seconds", .{self.retry_delay_s}); 1340 + try ctx.tick(@as(u32, self.retry_delay_s) * std.time.ms_per_s, self.retryWidget()); 1341 + }, 1342 + else => {}, 1343 + } 1299 1344 } 1300 1345 1301 1346 pub fn view(self: *Client) vxfw.Widget { ··· 1326 1371 var style: vaxis.Style = .{}; 1327 1372 if (selected) style.reverse = true; 1328 1373 if (self.has_mouse) style.bg = .{ .index = 8 }; 1329 - if (self.status == .disconnected) style.fg = .{ .index = 8 }; 1374 + if (self.status.load(.unordered) == .disconnected) style.fg = .{ .index = 8 }; 1330 1375 1331 1376 const name = self.config.name orelse self.config.server; 1332 1377 ··· 1389 1434 } 1390 1435 1391 1436 pub fn drainFifo(self: *Client, ctx: *vxfw.EventContext) void { 1392 - self.fifo_mutex.lock(); 1393 - defer self.fifo_mutex.unlock(); 1394 - while (self.fifo.readItem()) |item| { 1395 - // We redraw if we have any items 1437 + self.read_buf_mutex.lock(); 1438 + defer self.read_buf_mutex.unlock(); 1439 + var i: usize = 0; 1440 + while (std.mem.indexOfPos(u8, self.read_buf.items, i, "\r\n")) |idx| { 1396 1441 ctx.redraw = true; 1397 - self.handleEvent(item) catch |err| { 1442 + defer i = idx + 2; 1443 + self.handleEvent(self.read_buf.items[i..idx]) catch |err| { 1398 1444 log.err("error: {}", .{err}); 1399 1445 }; 1400 1446 } 1447 + self.read_buf.replaceRangeAssumeCapacity(0, i, ""); 1401 1448 } 1402 1449 1403 - pub fn handleEvent(self: *Client, event: Event) !void { 1404 - const msg: Message = .{ .bytes = event.msg.slice() }; 1405 - const client = event.client; 1406 - defer event.msg.deinit(); 1450 + pub fn handleEvent(self: *Client, line: []const u8) !void { 1451 + const msg: Message = .{ .bytes = line }; 1452 + const client = self; 1407 1453 switch (msg.command()) { 1408 1454 .unknown => {}, 1409 1455 .CAP => { ··· 1887 1933 } 1888 1934 } 1889 1935 1890 - pub fn readLoop(self: *Client) !void { 1891 - var delay: u64 = 1 * std.time.ns_per_s; 1936 + pub fn readThread(self: *Client) !void { 1937 + defer self.status.store(.disconnected, .unordered); 1892 1938 1893 - while (!self.should_close) { 1894 - self.status = .disconnected; 1895 - log.debug("reconnecting in {d} seconds...", .{@divFloor(delay, std.time.ns_per_s)}); 1896 - self.connect() catch |err| { 1897 - log.err("connection error: {}", .{err}); 1898 - self.status = .disconnected; 1899 - log.debug("disconnected", .{}); 1900 - log.debug("reconnecting in {d} seconds...", .{@divFloor(delay, std.time.ns_per_s)}); 1901 - std.time.sleep(delay); 1902 - delay = delay * 2; 1903 - if (delay > std.time.ns_per_min) delay = std.time.ns_per_min; 1904 - continue; 1905 - }; 1906 - log.debug("connected", .{}); 1907 - self.status = .connected; 1908 - delay = 1 * std.time.ns_per_s; 1939 + self.connect() catch |err| { 1940 + log.warn("couldn't connect: {}", .{err}); 1941 + return; 1942 + }; 1909 1943 1910 - var buf: [16_384]u8 = undefined; 1944 + try self.queueWrite("CAP LS 302\r\n"); 1911 1945 1912 - // 4x max size. We will almost always be *way* under our maximum size, so we will have a 1913 - // lot more potential messages than just 4 1914 - var pool: MessagePool = .{}; 1915 - pool.init(); 1946 + const cap_names = std.meta.fieldNames(Capabilities); 1947 + for (cap_names) |cap| { 1948 + try self.print("CAP REQ :{s}\r\n", .{cap}); 1949 + } 1916 1950 1917 - errdefer |err| { 1918 - log.err("client: {s} error: {}", .{ self.config.network_id.?, err }); 1919 - } 1951 + try self.print("NICK {s}\r\n", .{self.config.nick}); 1920 1952 1921 - const timeout = std.mem.toBytes(std.posix.timeval{ 1922 - .tv_sec = 5, 1923 - .tv_usec = 0, 1924 - }); 1953 + try self.print("USER {s} 0 * {s}\r\n", .{ self.config.user, self.config.real_name }); 1925 1954 1926 - const keep_alive: i64 = 10 * std.time.ms_per_s; 1927 - // max round trip time equal to our timeout 1928 - const max_rt: i64 = 5 * std.time.ms_per_s; 1929 - var last_msg: i64 = std.time.milliTimestamp(); 1930 - var start: usize = 0; 1931 - 1932 - while (true) { 1933 - try std.posix.setsockopt( 1934 - self.stream.handle, 1935 - std.posix.SOL.SOCKET, 1936 - std.posix.SO.RCVTIMEO, 1937 - &timeout, 1938 - ); 1939 - const n = self.read(buf[start..]) catch |err| { 1940 - if (err != error.WouldBlock) break; 1941 - const now = std.time.milliTimestamp(); 1942 - if (now - last_msg > keep_alive + max_rt) { 1943 - // reconnect?? 1944 - self.status = .disconnected; 1945 - self.redraw.store(true, .unordered); 1946 - break; 1947 - } 1948 - if (now - last_msg > keep_alive) { 1949 - // send a ping 1950 - try self.queueWrite("PING comlink\r\n"); 1951 - continue; 1952 - } 1953 - continue; 1954 - }; 1955 - if (self.should_close) return; 1956 - if (n == 0) { 1957 - self.status = .disconnected; 1958 - self.redraw.store(true, .unordered); 1959 - break; 1960 - } 1961 - last_msg = std.time.milliTimestamp(); 1962 - var i: usize = 0; 1963 - while (std.mem.indexOfPos(u8, buf[0 .. n + start], i, "\r\n")) |idx| { 1964 - defer i = idx + 2; 1965 - const buffer = pool.alloc(idx - i); 1966 - // const line = try self.alloc.dupe(u8, buf[i..idx]); 1967 - @memcpy(buffer.slice(), buf[i..idx]); 1968 - assert(std.mem.eql(u8, buf[idx .. idx + 2], "\r\n")); 1969 - log.debug("[<-{s}] {s}", .{ self.config.name orelse self.config.server, buffer.slice() }); 1970 - self.fifo_mutex.lock(); 1971 - defer self.fifo_mutex.unlock(); 1972 - try self.fifo.writeItem(.{ .client = self, .msg = buffer }); 1973 - } 1974 - if (i != n) { 1975 - // we had a part of a line read. Copy it to the beginning of the 1976 - // buffer 1977 - std.mem.copyForwards(u8, buf[0 .. (n + start) - i], buf[i..(n + start)]); 1978 - start = (n + start) - i; 1979 - } else start = 0; 1980 - } 1955 + var buf: [4096]u8 = undefined; 1956 + while (true) { 1957 + const n = try self.read(&buf); 1958 + if (n == 0) return; 1959 + self.read_buf_mutex.lock(); 1960 + defer self.read_buf_mutex.unlock(); 1961 + try self.read_buf.appendSlice(buf[0..n]); 1981 1962 } 1982 1963 } 1983 1964 ··· 1999 1980 } 2000 1981 2001 1982 pub fn write(self: *Client, buf: []const u8) !void { 1983 + if (self.status.load(.unordered) == .disconnected) { 1984 + log.warn("disconnected: dropping write: {s}", .{buf[0 .. buf.len - 2]}); 1985 + return; 1986 + } 2002 1987 log.debug("[->{s}] {s}", .{ self.config.name orelse self.config.server, buf[0 .. buf.len - 2] }); 2003 1988 switch (self.config.tls) { 2004 1989 true => try self.client.writeAll(buf), ··· 2018 2003 const port: u16 = self.config.port orelse 6667; 2019 2004 self.stream = try std.net.tcpConnectToHost(self.alloc, self.config.server, port); 2020 2005 } 2006 + self.status.store(.connected, .unordered); 2021 2007 2022 - try self.queueWrite("CAP LS 302\r\n"); 2008 + try self.configureKeepalive(); 2009 + } 2010 + 2011 + pub fn configureKeepalive(self: *Client) !void { 2012 + const sock = self.stream.handle; 2013 + 2014 + const posix = std.posix; 2015 + const enable = std.mem.toBytes(@as(i32, 1)); 2016 + try posix.setsockopt(sock, posix.SOL.SOCKET, posix.SO.KEEPALIVE, &enable); 2023 2017 2024 - const cap_names = std.meta.fieldNames(Capabilities); 2025 - for (cap_names) |cap| { 2026 - try self.print( 2027 - "CAP REQ :{s}\r\n", 2028 - .{cap}, 2029 - ); 2030 - } 2018 + const idle = std.mem.toBytes(@as(i32, 10)); // 10 seconds 2019 + try posix.setsockopt(sock, posix.IPPROTO.TCP, posix.TCP.KEEPIDLE, &idle); 2031 2020 2032 - try self.print( 2033 - "NICK {s}\r\n", 2034 - .{self.config.nick}, 2035 - ); 2021 + const interval = std.mem.toBytes(@as(i32, 5)); // 5 seconds 2022 + try posix.setsockopt(sock, posix.IPPROTO.TCP, posix.TCP.KEEPINTVL, &interval); 2036 2023 2037 - try self.print( 2038 - "USER {s} 0 * {s}\r\n", 2039 - .{ self.config.user, self.config.real_name }, 2040 - ); 2024 + const count = std.mem.toBytes(@as(i32, 3)); // 3 probes before closing 2025 + try posix.setsockopt(sock, posix.IPPROTO.TCP, posix.TCP.KEEPCNT, &count); 2041 2026 } 2042 2027 2043 2028 pub fn getOrCreateChannel(self: *Client, name: []const u8) Allocator.Error!*Channel {
+6 -6
src/main.zig
··· 11 11 12 12 pub const panic = vaxis.panic_handler; 13 13 14 - pub const std_options: std.Options = .{ 15 - .log_scope_levels = &.{ 16 - .{ .scope = .vaxis, .level = .warn }, 17 - .{ .scope = .vaxis_parser, .level = .warn }, 18 - }, 19 - }; 14 + // pub const std_options: std.Options = .{ 15 + // .log_scope_levels = &.{ 16 + // .{ .scope = .vaxis, .level = .warn }, 17 + // .{ .scope = .vaxis_parser, .level = .warn }, 18 + // }, 19 + // }; 20 20 21 21 pub const version = options.version; 22 22
-110
src/pool.zig
··· 1 - const std = @import("std"); 2 - 3 - const Condition = std.Thread.Condition; 4 - const Mutex = std.Thread.Mutex; 5 - 6 - /// BytePool is a thread safe buffer. Use it by Allocating a given number of bytes, which will block 7 - /// until one is available. The returned Slice structure contains a reference to a slice within the 8 - /// pool. This slice will always belong to the Slice until deinit is called. 9 - /// 10 - /// This data structure is useful for receiving messages over-the-wire and sending to another thread 11 - /// for processing, while providing some level of backpressure on the read side. For example, we 12 - /// could be reading messages from the wire and sending into a queue for processing. We could read 13 - /// 10 messages off the connection, but the queue is blocked doing an expensive operation. We are 14 - /// still able to read until our BytePool is out of capacity. 15 - /// 16 - /// For IRC, we use this because messages over the wire *could* be up to 4192 bytes, but commonly 17 - /// are less than 100. Instead of a pool of buffers each 4192, we write messages of exact length 18 - /// into this pool to more efficiently pack the memory 19 - pub fn BytePool(comptime size: usize) type { 20 - return struct { 21 - const Self = @This(); 22 - 23 - pub const Slice = struct { 24 - idx: usize, 25 - len: usize, 26 - pool: *Self, 27 - 28 - /// Frees resources associated with Buffer 29 - pub fn deinit(self: Slice) void { 30 - self.pool.mutex.lock(); 31 - defer self.pool.mutex.unlock(); 32 - @memset(self.pool.free_list[self.idx .. self.idx + self.len], true); 33 - // Signal that we may have capacity now 34 - self.pool.buffer_deinited.signal(); 35 - } 36 - 37 - /// Returns the actual slice of this buffer 38 - pub fn slice(self: Slice) []u8 { 39 - return self.pool.buffer[self.idx .. self.idx + self.len]; 40 - } 41 - }; 42 - 43 - buffer: [size]u8 = undefined, 44 - free_list: [size]bool = undefined, 45 - mutex: Mutex = .{}, 46 - /// The index of the next potentially available byte 47 - next_idx: usize = 0, 48 - 49 - buffer_deinited: Condition = .{}, 50 - 51 - pub fn init(self: *Self) void { 52 - @memset(&self.free_list, true); 53 - } 54 - 55 - /// Get a buffer of size n. Blocks until one is available 56 - pub fn alloc(self: *Self, n: usize) Slice { 57 - std.debug.assert(n < size); 58 - self.mutex.lock(); 59 - defer self.mutex.unlock(); 60 - while (true) { 61 - if (self.getBuffer(n)) |buf| return buf; 62 - self.buffer_deinited.wait(&self.mutex); 63 - } 64 - } 65 - 66 - fn getBuffer(self: *Self, n: usize) ?Slice { 67 - var start: usize = self.next_idx; 68 - var did_wrap: bool = false; 69 - while (true) { 70 - if (start + n >= self.buffer.len) { 71 - if (did_wrap) return null; 72 - did_wrap = true; 73 - start = 0; 74 - } 75 - 76 - const next_true = std.mem.indexOfScalarPos(bool, &self.free_list, start, true) orelse { 77 - if (did_wrap) return null; 78 - did_wrap = true; 79 - start = 0; 80 - continue; 81 - }; 82 - 83 - if (next_true + n >= self.buffer.len) { 84 - if (did_wrap) return null; 85 - did_wrap = true; 86 - start = 0; 87 - continue; 88 - } 89 - 90 - // Get our potential slice 91 - const maybe_slice = self.free_list[next_true .. next_true + n]; 92 - // Check that the entire thing is true 93 - if (std.mem.indexOfScalar(bool, maybe_slice, false)) |idx| { 94 - // We have a false, increment and look again 95 - start = next_true + idx + 1; 96 - continue; 97 - } 98 - // Set this slice in the free_list as not free 99 - @memset(maybe_slice, false); 100 - // Update next_idx 101 - self.next_idx = next_true + n; 102 - return .{ 103 - .idx = next_true, 104 - .len = n, 105 - .pool = self, 106 - }; 107 - } 108 - } 109 - }; 110 - }