An asynchronous IO runtime
at main 176 lines 5.8 kB view raw view rendered
1# Ourio 2 3<p align="center"> 4 <img width="128" height="128" src="ouroboros.svg"> 5</p> 6 7Ourio (prounounced "oreo", think "Ouroboros") is an asynchronous IO runtime 8built heavily around the semantics of io_uring. The design is inspired by 9[libxev](https://github.com/mitchellh/libxev), which is in turn inspired by 10[TigerBeetle](https://github.com/tigerbeetle/tigerbeetle). 11 12Ourio has only a slightly different approach: it is designed to encourage 13message passing approach to asynchronous IO. Users of the library give each task 14a Context, which contains a pointer, a callback, *and a message*. The message is 15implemented as a u16, and generally you should use an enum for it. The idea is 16that you can minimize the number of callback functions required by tagging tasks 17with a small amount of semantic meaning in the `msg` field. 18 19Ourio has io_uring and kqueue backends. Ourio supports the `msg_ring` 20capability of io_uring to pass a completion from one ring to another. This 21allows a multithreaded application to implement message passing using io_uring 22(or kqueue, if that's your flavor). Multithreaded applications should plan to 23use one `Ring` per thread. Submission onto the runtime is not thread safe, 24any message passing must occur using `msg_ring` rather than directly submitting 25a task to another 26 27Ourio also includes a fully mockable IO runtime to make it easy to unit test 28your async code. 29 30## Tasks 31 32### Deadlines and Cancelation 33 34Each IO operation creates a `Task`. When scheduling a task on the runtime, the 35caller receives a pointer to the `Task` at which point they could cancel it, or 36set a deadline. 37 38```zig 39// Timers are always relative time 40const task = try rt.timer(.{.sec = 3}, .{.cb = onCompletion, .msg = 0}); 41 42// If the deadline expired, the task will be sent to the onCompletion callback 43// with a result of error.Canceled. Deadlines are always absolute time 44try task.setDeadline(rt, .{.sec = std.time.timestamp() + 3}); 45 46// Alternatively, we can hold on to the pointer for the task while it is with 47// the runtime and cancel it. The Context we give to the cancel function let's 48// us know the result of the cancelation, but we will also receive a message 49// from the original task with error.Canceled. We can ignore the cancel result 50// by using the default context value 51try task.cancel(rt, .{}); 52``` 53 54### Passing tasks between threads 55 56Say we `accept` a connection in one thread, and want to send the file descriptor 57to another for handling. 58 59```zig 60// Spawn a thread with a queue of 16 entries. When this function returns, the 61// the thread is idle and waiting to receive tasks via msgRing 62const thread = main_rt.spawnThread(16); 63const target_task = try main_rt.getTask(); 64target_task.* { 65 .userdata = &foo, 66 .msg = @intFromEnum(Msg.some_message), 67 .cb = Worker.onCompletion, 68 .req = .{ .userfd = fd }, 69}; 70 71// Send target_task from the main_rt thread to the thread Ring. The 72// thread_rt Ring will then // process the task as a completion, ie 73// Worker.onCompletion will be called with this task. That thread can then 74// schedule a recv, a write, etc on the file descriptor it just received. Or do 75// arbitrary work 76_ = try main_rt.msgRing(&thread.ring, target_task, .{}); 77``` 78 79### Multiple Rings on the same thread 80 81You can have multiple Rings in a single thread. One could be a priority 82Ring, or handle specific types of tasks, etc. Poll any `Ring` from any other 83`Ring`. 84 85```zig 86const fd = rt1.backend.pollableFd(); 87_ = try rt2.poll(fd, .{ 88 .cb = onCompletion, 89 .msg = @intFromEnum(Msg.rt1_has_completions)} 90); 91 92``` 93 94## Example 95 96An example implementation of an asynchronous writer to two file descriptors: 97 98```zig 99const std = @import("std"); 100const io = @import("ourio"); 101const posix = std.posix; 102 103pub const MultiWriter = struct { 104 fd1: posix.fd_t, 105 fd1_written: usize = 0, 106 107 fd2: posix.fd_t, 108 fd2_written: usize = 0, 109 110 buf: std.ArrayListUnmanaged(u8), 111 112 pub const Msg = enum { fd1, fd2 }; 113 114 pub fn init(fd1: posix.fd_t, fd2: posix.fd_t) MultiWriter { 115 return .{ .fd1 = fd1, .fd2 = fd2 }; 116 } 117 118 pub fn write(self: *MultiWriter, gpa: Allocator, bytes: []const u8) !void { 119 try self.buf.appendSlice(gpa, bytes); 120 } 121 122 pub fn flush(self: *MultiWriter, rt: *io.Ring) !void { 123 if (self.fd1_written < self.buf.items.len) { 124 _ = try rt.write(self.fd1, self.buf.items[self.fd1_written..], .{ 125 .ptr = self, 126 .msg = @intFromEnum(Msg.fd1), 127 .cb = MultiWriter.onCompletion, 128 }); 129 } 130 131 if (self.fd2_written < self.buf.items.len) { 132 _ = try rt.write(self.fd2, self.buf.items[self.fd2_written..], .{ 133 .ptr = self, 134 .msg = @intFromEnum(Msg.fd2), 135 .cb = MultiWriter.onCompletion, 136 }); 137 } 138 } 139 140 pub fn onCompletion(rt: *io.Ring, task: io.Task) anyerror!void { 141 const self = task.userdataCast(MultiWriter); 142 const result = task.result.?; 143 144 const n = try result.write; 145 switch (task.msgToEnum(MultiWriter.Msg)) { 146 .fd1 => self.fd1_written += n, 147 .fd2 => self.fd2_written += n, 148 } 149 150 const len = self.buf.items.len; 151 152 if (self.fd1_written < len or self.fd2_written < len) 153 return self.flush(rt); 154 155 self.fd1_written = 0; 156 self.fd2_written = 0; 157 self.buf.clearRetainingCapacity(); 158 } 159}; 160 161pub fn main() !void { 162 var gpa: std.heap.DebugAllocator(.{}) = .init; 163 var rt: io.Ring = try .init(gpa.allocator(), 16); 164 defer rt.deinit(); 165 166 // Pretend I created some files 167 const fd1: posix.fd_t = 5; 168 const fd2: posix.fd_t = 6; 169 170 var mw: MultiWriter = .init(fd1, fd2); 171 try mw.write(gpa.allocator(), "Hello, world!"); 172 try mw.flush(&rt); 173 174 try rt.run(.until_done); 175} 176```