An asynchronous IO runtime

thread: add spawnable threads that can wait for work

rockorager.dev 379d5190 5ceb12eb

verified
+142 -16
+9 -4
README.md
··· 57 57 to another for handling. 58 58 59 59 ```zig 60 + // Spawn a thread with a queue of 16 entries. When this function returns, the 61 + // the thread is idle and waiting to receive tasks via msgRing 62 + const thread = main_rt.spawnThread(16); 60 63 const target_task = try main_rt.getTask(); 61 64 target_task.* { 62 65 .userdata = &foo, ··· 65 68 .req = .{ .userfd = fd }, 66 69 }; 67 70 68 - // Send target_task from the main_rt thread to the thread_rt Ring. The 71 + // Send target_task from the main_rt thread to the thread Ring. The 69 72 // thread_rt Ring will then // process the task as a completion, ie 70 - // Worker.onCompletion will be called with // this task. That thread can then 71 - // schedule a recv, a write, etc on the file // descriptor it just received. 72 - _ = try main_rt.msgRing(thread_rt, target_task, .{}); 73 + // Worker.onCompletion will be called with this task. That thread can then 74 + // schedule a recv, a write, etc on the file descriptor it just received. Or do 75 + // arbitrary work 76 + _ = try main_rt.msgRing(&thread.ring, target_task, .{}); 73 77 ``` 74 78 75 79 ### Multiple Rings on the same thread ··· 84 88 .cb = onCompletion, 85 89 .msg = @intFromEnum(Msg.rt1_has_completions)} 86 90 ); 91 + 87 92 ``` 88 93 89 94 ## Example
+14 -12
src/Uring.zig
··· 346 346 .userfd, .userptr => unreachable, 347 347 }; 348 348 349 - try task.callback(rt, task.*); 350 - 351 - if (cqe.flags & msg_ring_received_cqe != 0) { 352 - // This message was received from another ring. We don't decrement inflight for this. 353 - // But we do need to set the task as free because we will add it to our free list 354 - rt.free_q.push(task); 355 - } else if (cqe.flags & linux.IORING_CQE_F_MORE == 0) { 356 - // If the cqe doesn't have IORING_CQE_F_MORE set, then this task is complete and free to 357 - // be rescheduled 358 - task.state = .complete; 359 - self.in_flight.remove(task); 360 - rt.free_q.push(task); 349 + defer { 350 + if (cqe.flags & msg_ring_received_cqe != 0) { 351 + // This message was received from another ring. We don't decrement inflight for this. 352 + // But we do need to set the task as free because we will add it to our free list 353 + rt.free_q.push(task); 354 + } else if (cqe.flags & linux.IORING_CQE_F_MORE == 0) { 355 + // If the cqe doesn't have IORING_CQE_F_MORE set, then this task is complete and free to 356 + // be rescheduled 357 + task.state = .complete; 358 + self.in_flight.remove(task); 359 + rt.free_q.push(task); 360 + } 361 361 } 362 + 363 + try task.callback(rt, task.*); 362 364 } 363 365 } 364 366
+119
src/main.zig
··· 394 394 self.submission_q.push(task); 395 395 return task; 396 396 } 397 + 398 + /// Spawns a thread with a Ring instance. The thread will be idle and waiting to receive work 399 + /// via msgRing when this function returns. Call kill on the returned thread to signal it to 400 + /// shutdown. 401 + pub fn spawnThread(self: *Ring, entries: u16) !*Thread { 402 + const thread = try self.gpa.create(Thread); 403 + errdefer self.gpa.destroy(thread); 404 + 405 + var wg: std.Thread.WaitGroup = .{}; 406 + wg.start(); 407 + thread.thread = try std.Thread.spawn(.{}, Thread.run, .{ thread, self, &wg, entries }); 408 + wg.wait(); 409 + 410 + return thread; 411 + } 412 + }; 413 + 414 + pub const Thread = struct { 415 + thread: std.Thread, 416 + ring: io.Ring = undefined, 417 + 418 + pub const Msg = enum { 419 + kill, 420 + }; 421 + 422 + pub fn run(self: *Thread, parent: *io.Ring, wg: *std.Thread.WaitGroup, entries: u16) !void { 423 + self.ring = try parent.initChild(entries); 424 + wg.finish(); 425 + 426 + defer self.ring.deinit(); 427 + 428 + // Run forever, because we may not start with a task. Inter-thread messaging means we could 429 + // receive work at any time 430 + self.ring.run(.forever) catch |err| { 431 + switch (err) { 432 + error.ThreadKilled => return, 433 + else => return err, 434 + } 435 + }; 436 + } 437 + 438 + /// Kill sends a message to the thread telling it to exit. Callers of this thread can safely 439 + /// join and deinit the Thread in the Context callback 440 + pub fn kill(self: *Thread, rt: *io.Ring, ctx: Context) Allocator.Error!*io.Task { 441 + const target_task = try rt.getTask(); 442 + target_task.* = .{ 443 + .userdata = self, 444 + .msg = @intFromEnum(Thread.Msg.kill), 445 + .callback = Thread.onCompletion, 446 + .result = .noop, 447 + }; 448 + 449 + return rt.msgRing(&self.ring, target_task, ctx); 450 + } 451 + 452 + pub fn join(self: Thread) void { 453 + self.thread.join(); 454 + } 455 + 456 + fn onCompletion(_: *io.Ring, task: Task) anyerror!void { 457 + switch (task.msgToEnum(Thread.Msg)) { 458 + .kill => return error.ThreadKilled, 459 + } 460 + } 397 461 }; 398 462 399 463 pub const Op = enum { ··· 715 779 try std.testing.expect(foo.rt1); 716 780 try std.testing.expect(foo.rt2); 717 781 } 782 + 783 + test "runtime: spawnThread" { 784 + const gpa = std.testing.allocator; 785 + var rt = try io.Ring.init(gpa, 16); 786 + defer rt.deinit(); 787 + 788 + const thread = try rt.spawnThread(4); 789 + 790 + const Foo2 = struct { 791 + kill: bool = false, 792 + did_work: bool = false, 793 + 794 + gpa: Allocator, 795 + thread: *Thread, 796 + 797 + const Msg = enum { kill, work }; 798 + 799 + fn callback(_: *io.Ring, task: io.Task) anyerror!void { 800 + const self = task.userdataCast(@This()); 801 + const msg = task.msgToEnum(Msg); 802 + switch (msg) { 803 + .kill => { 804 + self.kill = true; 805 + self.thread.join(); 806 + self.gpa.destroy(self.thread); 807 + }, 808 + .work => self.did_work = true, 809 + } 810 + } 811 + }; 812 + 813 + var foo: Foo2 = .{ .thread = thread, .gpa = gpa }; 814 + 815 + // Send work to the thread 816 + const target_task = try rt.getTask(); 817 + target_task.* = .{ 818 + .userdata = &foo, 819 + .callback = Foo2.callback, 820 + .msg = @intFromEnum(Foo2.Msg.work), 821 + .result = .{ .usermsg = 0 }, 822 + }; 823 + 824 + _ = try rt.msgRing(&thread.ring, target_task, .{}); 825 + 826 + try rt.run(.until_done); 827 + _ = try thread.kill(&rt, .{ 828 + .ptr = &foo, 829 + .cb = Foo2.callback, 830 + .msg = @intFromEnum(Foo2.Msg.kill), 831 + }); 832 + try rt.run(.until_done); 833 + 834 + try std.testing.expect(foo.did_work); 835 + try std.testing.expect(foo.kill); 836 + }