an experimental irc client

irc: roll our own keepalive

TCP keepalive isn't very cross platform. macOS doesn't let us set
certain settings. Roll our own logic to perform the same sequence as the
linux TCP stack

+53 -26
+53 -26
src/irc.zig
··· 24 /// maximum size message we can receive 25 const max_raw_msg_size = 512 + 8191; // see modernircdocs 26 27 pub const Buffer = union(enum) { 28 client: *Client, 29 channel: *Channel, ··· 1569 while (std.mem.indexOfPos(u8, self.read_buf.items, i, "\r\n")) |idx| { 1570 ctx.redraw = true; 1571 defer i = idx + 2; 1572 self.handleEvent(self.read_buf.items[i..idx]) catch |err| { 1573 log.err("error: {}", .{err}); 1574 }; ··· 2083 try self.print("USER {s} 0 * {s}\r\n", .{ self.config.user, self.config.real_name }); 2084 2085 var buf: [4096]u8 = undefined; 2086 while (true) { 2087 - const n = try self.read(&buf); 2088 if (n == 0) return; 2089 self.read_buf_mutex.lock(); 2090 defer self.read_buf_mutex.unlock(); 2091 try self.read_buf.appendSlice(buf[0..n]); ··· 2110 } 2111 2112 pub fn write(self: *Client, buf: []const u8) !void { 2113 if (self.status.load(.unordered) == .disconnected) { 2114 log.warn("disconnected: dropping write: {s}", .{buf[0 .. buf.len - 2]}); 2115 return; ··· 2135 } 2136 self.status.store(.connected, .unordered); 2137 2138 - try self.configureKeepalive(); 2139 } 2140 2141 - pub fn configureKeepalive(self: *Client) !void { 2142 - const sock = self.stream.handle; 2143 2144 - const os = std.c; 2145 - const size = @sizeOf(i32); 2146 - 2147 - const enable: i32 = 1; 2148 - if (os.setsockopt(sock, os.SOL.SOCKET, os.SO.KEEPALIVE, &enable, size) != 0) { 2149 - return error.SetSockOptError; 2150 - } 2151 - 2152 - const idle: i32 = 10; // 10 seconds 2153 - if (os.setsockopt(sock, os.IPPROTO.TCP, os.TCP.KEEPIDLE, &idle, size) != 0) { 2154 - return error.SetSockOptError; 2155 - } 2156 - 2157 - const interval: i32 = 5; // 5 seconds 2158 - if (os.setsockopt(sock, os.IPPROTO.TCP, os.TCP.KEEPINTVL, &interval, size) != 0) { 2159 - return error.SetSockOptError; 2160 - } 2161 - 2162 - const count: i32 = 3; // 3 probes before closing 2163 - if (os.setsockopt(sock, os.IPPROTO.TCP, os.TCP.KEEPCNT, &count, size) != 0) { 2164 - return error.SetSockOptError; 2165 - } 2166 } 2167 2168 pub fn getOrCreateChannel(self: *Client, name: []const u8) Allocator.Error!*Channel {
··· 24 /// maximum size message we can receive 25 const max_raw_msg_size = 512 + 8191; // see modernircdocs 26 27 + /// Seconds of idle connection before we start pinging 28 + const keepalive_idle: i32 = 15; 29 + 30 + /// Seconds between pings 31 + const keepalive_interval: i32 = 5; 32 + 33 + /// Number of failed pings before we consider the connection failed 34 + const keepalive_retries: i32 = 3; 35 + 36 pub const Buffer = union(enum) { 37 client: *Client, 38 channel: *Channel, ··· 1578 while (std.mem.indexOfPos(u8, self.read_buf.items, i, "\r\n")) |idx| { 1579 ctx.redraw = true; 1580 defer i = idx + 2; 1581 + log.debug("[<-{s}] {s}", .{ 1582 + self.config.name orelse self.config.server, 1583 + self.read_buf.items[i..idx], 1584 + }); 1585 self.handleEvent(self.read_buf.items[i..idx]) catch |err| { 1586 log.err("error: {}", .{err}); 1587 }; ··· 2096 try self.print("USER {s} 0 * {s}\r\n", .{ self.config.user, self.config.real_name }); 2097 2098 var buf: [4096]u8 = undefined; 2099 + var retries: u8 = 0; 2100 while (true) { 2101 + const n = self.read(&buf) catch |err| { 2102 + // WouldBlock means our socket timeout expired 2103 + switch (err) { 2104 + error.WouldBlock => {}, 2105 + else => return err, 2106 + } 2107 + 2108 + if (retries == keepalive_retries) { 2109 + log.debug("[{s}] connection closed", .{self.config.name orelse self.config.server}); 2110 + self.close(); 2111 + return; 2112 + } 2113 + 2114 + if (retries == 0) { 2115 + try self.configureKeepalive(keepalive_interval); 2116 + } 2117 + retries += 1; 2118 + try self.queueWrite("PING comlink\r\n"); 2119 + continue; 2120 + }; 2121 if (n == 0) return; 2122 + 2123 + // If we did a connection retry, we reset the state 2124 + if (retries > 0) { 2125 + retries = 0; 2126 + try self.configureKeepalive(keepalive_idle); 2127 + } 2128 self.read_buf_mutex.lock(); 2129 defer self.read_buf_mutex.unlock(); 2130 try self.read_buf.appendSlice(buf[0..n]); ··· 2149 } 2150 2151 pub fn write(self: *Client, buf: []const u8) !void { 2152 + assert(std.mem.endsWith(u8, buf, "\r\n")); 2153 if (self.status.load(.unordered) == .disconnected) { 2154 log.warn("disconnected: dropping write: {s}", .{buf[0 .. buf.len - 2]}); 2155 return; ··· 2175 } 2176 self.status.store(.connected, .unordered); 2177 2178 + try self.configureKeepalive(keepalive_idle); 2179 } 2180 2181 + pub fn configureKeepalive(self: *Client, seconds: i32) !void { 2182 + const timeout = std.mem.toBytes(std.posix.timeval{ 2183 + .tv_sec = seconds, 2184 + .tv_usec = 0, 2185 + }); 2186 2187 + try std.posix.setsockopt( 2188 + self.stream.handle, 2189 + std.posix.SOL.SOCKET, 2190 + std.posix.SO.RCVTIMEO, 2191 + &timeout, 2192 + ); 2193 } 2194 2195 pub fn getOrCreateChannel(self: *Client, name: []const u8) Allocator.Error!*Channel {