an experimental irc client
at ec2fb6ec28aa8aa40e3c97fa3ebb23afeba605bf 110 lines 4.3 kB view raw
1const std = @import("std"); 2 3const Condition = std.Thread.Condition; 4const Mutex = std.Thread.Mutex; 5 6/// BytePool is a thread safe buffer. Use it by Allocating a given number of bytes, which will block 7/// until one is available. The returned Slice structure contains a reference to a slice within the 8/// pool. This slice will always belong to the Slice until deinit is called. 9/// 10/// This data structure is useful for receiving messages over-the-wire and sending to another thread 11/// for processing, while providing some level of backpressure on the read side. For example, we 12/// could be reading messages from the wire and sending into a queue for processing. We could read 13/// 10 messages off the connection, but the queue is blocked doing an expensive operation. We are 14/// still able to read until our BytePool is out of capacity. 15/// 16/// For IRC, we use this because messages over the wire *could* be up to 4192 bytes, but commonly 17/// are less than 100. Instead of a pool of buffers each 4192, we write messages of exact length 18/// into this pool to more efficiently pack the memory 19pub fn BytePool(comptime size: usize) type { 20 return struct { 21 const Self = @This(); 22 23 pub const Slice = struct { 24 idx: usize, 25 len: usize, 26 pool: *Self, 27 28 /// Frees resources associated with Buffer 29 pub fn deinit(self: Slice) void { 30 self.pool.mutex.lock(); 31 defer self.pool.mutex.unlock(); 32 @memset(self.pool.free_list[self.idx .. self.idx + self.len], true); 33 // Signal that we may have capacity now 34 self.pool.buffer_deinited.signal(); 35 } 36 37 /// Returns the actual slice of this buffer 38 pub fn slice(self: Slice) []u8 { 39 return self.pool.buffer[self.idx .. self.idx + self.len]; 40 } 41 }; 42 43 buffer: [size]u8 = undefined, 44 free_list: [size]bool = undefined, 45 mutex: Mutex = .{}, 46 /// The index of the next potentially available byte 47 next_idx: usize = 0, 48 49 buffer_deinited: Condition = .{}, 50 51 pub fn init(self: *Self) void { 52 @memset(&self.free_list, true); 53 } 54 55 /// Get a buffer of size n. Blocks until one is available 56 pub fn alloc(self: *Self, n: usize) Slice { 57 std.debug.assert(n < size); 58 self.mutex.lock(); 59 defer self.mutex.unlock(); 60 while (true) { 61 if (self.getBuffer(n)) |buf| return buf; 62 self.buffer_deinited.wait(&self.mutex); 63 } 64 } 65 66 fn getBuffer(self: *Self, n: usize) ?Slice { 67 var start: usize = self.next_idx; 68 var did_wrap: bool = false; 69 while (true) { 70 if (start + n >= self.buffer.len) { 71 if (did_wrap) return null; 72 did_wrap = true; 73 start = 0; 74 } 75 76 const next_true = std.mem.indexOfScalarPos(bool, &self.free_list, start, true) orelse { 77 if (did_wrap) return null; 78 did_wrap = true; 79 start = 0; 80 continue; 81 }; 82 83 if (next_true + n >= self.buffer.len) { 84 if (did_wrap) return null; 85 did_wrap = true; 86 start = 0; 87 continue; 88 } 89 90 // Get our potential slice 91 const maybe_slice = self.free_list[next_true .. next_true + n]; 92 // Check that the entire thing is true 93 if (std.mem.indexOfScalar(bool, maybe_slice, false)) |idx| { 94 // We have a false, increment and look again 95 start = next_true + idx + 1; 96 continue; 97 } 98 // Set this slice in the free_list as not free 99 @memset(maybe_slice, false); 100 // Update next_idx 101 self.next_idx = next_true + n; 102 return .{ 103 .idx = next_true, 104 .len = n, 105 .pool = self, 106 }; 107 } 108 } 109 }; 110}