An asynchronous IO runtime
1# Ourio
2
3<p align="center">
4 <img width="128" height="128" src="ouroboros.svg">
5</p>
6
7Ourio (prounounced "oreo", think "Ouroboros") is an asynchronous IO runtime
8built heavily around the semantics of io_uring. The design is inspired by
9[libxev](https://github.com/mitchellh/libxev), which is in turn inspired by
10[TigerBeetle](https://github.com/tigerbeetle/tigerbeetle).
11
12Ourio has only a slightly different approach: it is designed to encourage
13message passing approach to asynchronous IO. Users of the library give each task
14a Context, which contains a pointer, a callback, *and a message*. The message is
15implemented as a u16, and generally you should use an enum for it. The idea is
16that you can minimize the number of callback functions required by tagging tasks
17with a small amount of semantic meaning in the `msg` field.
18
19Ourio has io_uring and kqueue backends. Ourio supports the `msg_ring`
20capability of io_uring to pass a completion from one ring to another. This
21allows a multithreaded application to implement message passing using io_uring
22(or kqueue, if that's your flavor). Multithreaded applications should plan to
23use one `Ring` per thread. Submission onto the runtime is not thread safe,
24any message passing must occur using `msg_ring` rather than directly submitting
25a task to another
26
27Ourio also includes a fully mockable IO runtime to make it easy to unit test
28your async code.
29
30## Tasks
31
32### Deadlines and Cancelation
33
34Each IO operation creates a `Task`. When scheduling a task on the runtime, the
35caller receives a pointer to the `Task` at which point they could cancel it, or
36set a deadline.
37
38```zig
39// Timers are always relative time
40const task = try rt.timer(.{.sec = 3}, .{.cb = onCompletion, .msg = 0});
41
42// If the deadline expired, the task will be sent to the onCompletion callback
43// with a result of error.Canceled. Deadlines are always absolute time
44try task.setDeadline(rt, .{.sec = std.time.timestamp() + 3});
45
46// Alternatively, we can hold on to the pointer for the task while it is with
47// the runtime and cancel it. The Context we give to the cancel function let's
48// us know the result of the cancelation, but we will also receive a message
49// from the original task with error.Canceled. We can ignore the cancel result
50// by using the default context value
51try task.cancel(rt, .{});
52```
53
54### Passing tasks between threads
55
56Say we `accept` a connection in one thread, and want to send the file descriptor
57to another for handling.
58
59```zig
60// Spawn a thread with a queue of 16 entries. When this function returns, the
61// the thread is idle and waiting to receive tasks via msgRing
62const thread = main_rt.spawnThread(16);
63const target_task = try main_rt.getTask();
64target_task.* {
65 .userdata = &foo,
66 .msg = @intFromEnum(Msg.some_message),
67 .cb = Worker.onCompletion,
68 .req = .{ .userfd = fd },
69};
70
71// Send target_task from the main_rt thread to the thread Ring. The
72// thread_rt Ring will then // process the task as a completion, ie
73// Worker.onCompletion will be called with this task. That thread can then
74// schedule a recv, a write, etc on the file descriptor it just received. Or do
75// arbitrary work
76_ = try main_rt.msgRing(&thread.ring, target_task, .{});
77```
78
79### Multiple Rings on the same thread
80
81You can have multiple Rings in a single thread. One could be a priority
82Ring, or handle specific types of tasks, etc. Poll any `Ring` from any other
83`Ring`.
84
85```zig
86const fd = rt1.backend.pollableFd();
87_ = try rt2.poll(fd, .{
88 .cb = onCompletion,
89 .msg = @intFromEnum(Msg.rt1_has_completions)}
90);
91
92```
93
94## Example
95
96An example implementation of an asynchronous writer to two file descriptors:
97
98```zig
99const std = @import("std");
100const io = @import("ourio");
101const posix = std.posix;
102
103pub const MultiWriter = struct {
104 fd1: posix.fd_t,
105 fd1_written: usize = 0,
106
107 fd2: posix.fd_t,
108 fd2_written: usize = 0,
109
110 buf: std.ArrayListUnmanaged(u8),
111
112 pub const Msg = enum { fd1, fd2 };
113
114 pub fn init(fd1: posix.fd_t, fd2: posix.fd_t) MultiWriter {
115 return .{ .fd1 = fd1, .fd2 = fd2 };
116 }
117
118 pub fn write(self: *MultiWriter, gpa: Allocator, bytes: []const u8) !void {
119 try self.buf.appendSlice(gpa, bytes);
120 }
121
122 pub fn flush(self: *MultiWriter, rt: *io.Ring) !void {
123 if (self.fd1_written < self.buf.items.len) {
124 _ = try rt.write(self.fd1, self.buf.items[self.fd1_written..], .{
125 .ptr = self,
126 .msg = @intFromEnum(Msg.fd1),
127 .cb = MultiWriter.onCompletion,
128 });
129 }
130
131 if (self.fd2_written < self.buf.items.len) {
132 _ = try rt.write(self.fd2, self.buf.items[self.fd2_written..], .{
133 .ptr = self,
134 .msg = @intFromEnum(Msg.fd2),
135 .cb = MultiWriter.onCompletion,
136 });
137 }
138 }
139
140 pub fn onCompletion(rt: *io.Ring, task: io.Task) anyerror!void {
141 const self = task.userdataCast(MultiWriter);
142 const result = task.result.?;
143
144 const n = try result.write;
145 switch (task.msgToEnum(MultiWriter.Msg)) {
146 .fd1 => self.fd1_written += n,
147 .fd2 => self.fd2_written += n,
148 }
149
150 const len = self.buf.items.len;
151
152 if (self.fd1_written < len or self.fd2_written < len)
153 return self.flush(rt);
154
155 self.fd1_written = 0;
156 self.fd2_written = 0;
157 self.buf.clearRetainingCapacity();
158 }
159};
160
161pub fn main() !void {
162 var gpa: std.heap.DebugAllocator(.{}) = .init;
163 var rt: io.Ring = try .init(gpa.allocator(), 16);
164 defer rt.deinit();
165
166 // Pretend I created some files
167 const fd1: posix.fd_t = 5;
168 const fd2: posix.fd_t = 6;
169
170 var mw: MultiWriter = .init(fd1, fd2);
171 try mw.write(gpa.allocator(), "Hello, world!");
172 try mw.flush(&rt);
173
174 try rt.run(.until_done);
175}
176```