tangled
alpha
login
or
join now
rockorager.dev
/
ourio
6
fork
atom
An asynchronous IO runtime
6
fork
atom
overview
issues
pulls
pipelines
op: add readv support
rockorager.dev
10 months ago
aa396c0f
858eec98
verified
This commit was signed with the committer's
known signature
.
rockorager.dev
SSH Key Fingerprint:
SHA256:qn/Fjy7CpbcogGEPB14Y53hLnQleZNFY9lkQnuudFLs=
+58
4 changed files
expand all
collapse all
unified
split
src
Kqueue.zig
Mock.zig
Uring.zig
main.zig
+36
src/Kqueue.zig
···
344
}
345
},
346
0
0
0
0
0
0
347
.recv => |req| {
348
self.in_flight.push(task);
349
const kevent = evSet(@intCast(req.fd), EVFILT.READ, EV.ADD | EV.ONESHOT, task);
···
485
const kevent = evSet(@intCast(cancel_req.fd), EVFILT.WRITE, EV.DELETE, task);
486
try self.submission_queue.append(self.gpa, kevent);
487
}
0
0
0
0
0
0
0
0
0
0
0
0
488
},
489
490
.recv => |cancel_req| {
···
654
// async tasks. These can be handled synchronously in a cancel all
655
.accept,
656
.poll,
0
657
.recv,
658
.write,
659
.writev,
···
700
.msg_ring => .{ .msg_ring = error.Canceled },
701
.noop => unreachable,
702
.poll => .{ .poll = error.Canceled },
0
703
.recv => .{ .recv = error.Canceled },
704
.socket => .{ .socket = error.Canceled },
705
.statx => .{ .statx = error.Canceled },
···
783
const err = unexpectedError(dataToE(event.data));
784
task.result = .{ .poll = err };
785
} else task.result = .{ .poll = {} };
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
786
return task.callback(rt, task.*);
787
},
788
···
344
}
345
},
346
347
+
.readv => |req| {
348
+
self.in_flight.push(task);
349
+
const kevent = evSet(@intCast(req.fd), EVFILT.READ, EV.ADD | EV.ONESHOT, task);
350
+
try self.submission_queue.append(self.gpa, kevent);
351
+
},
352
+
353
.recv => |req| {
354
self.in_flight.push(task);
355
const kevent = evSet(@intCast(req.fd), EVFILT.READ, EV.ADD | EV.ONESHOT, task);
···
491
const kevent = evSet(@intCast(cancel_req.fd), EVFILT.WRITE, EV.DELETE, task);
492
try self.submission_queue.append(self.gpa, kevent);
493
}
494
+
},
495
+
496
+
.readv => |cancel_req| {
497
+
self.in_flight.remove(task);
498
+
task.result = .{ .readv = error.Canceled };
499
+
const kevent = evSet(
500
+
@intCast(cancel_req.fd),
501
+
EVFILT.READ,
502
+
EV.DELETE,
503
+
task,
504
+
);
505
+
try self.submission_queue.append(self.gpa, kevent);
506
},
507
508
.recv => |cancel_req| {
···
672
// async tasks. These can be handled synchronously in a cancel all
673
.accept,
674
.poll,
675
+
.readv,
676
.recv,
677
.write,
678
.writev,
···
719
.msg_ring => .{ .msg_ring = error.Canceled },
720
.noop => unreachable,
721
.poll => .{ .poll = error.Canceled },
722
+
.readv => .{ .readv = error.Canceled },
723
.recv => .{ .recv = error.Canceled },
724
.socket => .{ .socket = error.Canceled },
725
.statx => .{ .statx = error.Canceled },
···
803
const err = unexpectedError(dataToE(event.data));
804
task.result = .{ .poll = err };
805
} else task.result = .{ .poll = {} };
806
+
return task.callback(rt, task.*);
807
+
},
808
+
809
+
.readv => |req| {
810
+
defer self.releaseTask(rt, task);
811
+
self.in_flight.remove(task);
812
+
if (event.flags & EV.ERROR != 0) {
813
+
// Interpret data as an errno
814
+
const err = unexpectedError(dataToE(event.data));
815
+
task.result = .{ .readv = err };
816
+
return task.callback(rt, task.*);
817
+
}
818
+
if (posix.readv(req.fd, req.vecs)) |n|
819
+
task.result = .{ .readv = n }
820
+
else |_|
821
+
task.result = .{ .readv = error.Unexpected };
822
return task.callback(rt, task.*);
823
},
824
+2
src/Mock.zig
···
19
msg_ring_cb: ?*const fn (*io.Task) io.Result = null,
20
noop_cb: ?*const fn (*io.Task) io.Result = null,
21
poll_cb: ?*const fn (*io.Task) io.Result = null,
0
22
recv_cb: ?*const fn (*io.Task) io.Result = null,
23
socket_cb: ?*const fn (*io.Task) io.Result = null,
24
statx_cb: ?*const fn (*io.Task) io.Result = null,
···
69
.msg_ring => if (self.msg_ring_cb) |cb| cb(task) else return error.NoMockCallback,
70
.noop => if (self.noop_cb) |cb| cb(task) else return error.NoMockCallback,
71
.poll => if (self.poll_cb) |cb| cb(task) else return error.NoMockCallback,
0
72
.recv => if (self.recv_cb) |cb| cb(task) else return error.NoMockCallback,
73
.socket => if (self.socket_cb) |cb| cb(task) else return error.NoMockCallback,
74
.statx => if (self.statx_cb) |cb| cb(task) else return error.NoMockCallback,
···
19
msg_ring_cb: ?*const fn (*io.Task) io.Result = null,
20
noop_cb: ?*const fn (*io.Task) io.Result = null,
21
poll_cb: ?*const fn (*io.Task) io.Result = null,
22
+
readv_cb: ?*const fn (*io.Task) io.Result = null,
23
recv_cb: ?*const fn (*io.Task) io.Result = null,
24
socket_cb: ?*const fn (*io.Task) io.Result = null,
25
statx_cb: ?*const fn (*io.Task) io.Result = null,
···
70
.msg_ring => if (self.msg_ring_cb) |cb| cb(task) else return error.NoMockCallback,
71
.noop => if (self.noop_cb) |cb| cb(task) else return error.NoMockCallback,
72
.poll => if (self.poll_cb) |cb| cb(task) else return error.NoMockCallback,
73
+
.readv => if (self.readv_cb) |cb| cb(task) else return error.NoMockCallback,
74
.recv => if (self.recv_cb) |cb| cb(task) else return error.NoMockCallback,
75
.socket => if (self.socket_cb) |cb| cb(task) else return error.NoMockCallback,
76
.statx => if (self.statx_cb) |cb| cb(task) else return error.NoMockCallback,
+14
src/Uring.zig
···
225
self.prepDeadline(task, sqe);
226
},
227
0
0
0
0
0
0
0
228
// user* is only sent internally between rings and higher level wrappers
229
.userfd, .usermsg, .userptr => unreachable,
230
}
···
349
350
.statx => |req| .{ .statx = switch (cqeToE(cqe.res)) {
351
.SUCCESS => req.result,
0
0
0
0
0
0
0
352
.INVAL => io.ResultError.Invalid,
353
.CANCELED => io.ResultError.Canceled,
354
else => |e| unexpectedError(e),
···
225
self.prepDeadline(task, sqe);
226
},
227
228
+
.readv => |req| {
229
+
const sqe = self.getSqe();
230
+
sqe.prep_readv(req.fd, req.vecs, 0);
231
+
sqe.user_data = @intFromPtr(task);
232
+
self.prepDeadline(task, sqe);
233
+
},
234
+
235
// user* is only sent internally between rings and higher level wrappers
236
.userfd, .usermsg, .userptr => unreachable,
237
}
···
356
357
.statx => |req| .{ .statx = switch (cqeToE(cqe.res)) {
358
.SUCCESS => req.result,
359
+
.INVAL => io.ResultError.Invalid,
360
+
.CANCELED => io.ResultError.Canceled,
361
+
else => |e| unexpectedError(e),
362
+
} },
363
+
364
+
.readv => .{ .readv = switch (cqeToE(cqe.res)) {
365
+
.SUCCESS => @intCast(cqe.res),
366
.INVAL => io.ResultError.Invalid,
367
.CANCELED => io.ResultError.Canceled,
368
else => |e| unexpectedError(e),
+6
src/main.zig
···
493
socket,
494
connect,
495
statx,
0
496
497
/// userfd is meant to send file descriptors between Ring instances (using msgRing)
498
userfd,
···
546
path: [:0]const u8,
547
result: *Statx, // this will be filled in by the op
548
},
0
0
0
0
549
550
userfd,
551
usermsg,
···
567
socket: ResultError!posix.fd_t,
568
connect: ResultError!void,
569
statx: ResultError!*Statx,
0
570
571
userfd: anyerror!posix.fd_t,
572
usermsg: u16,
···
493
socket,
494
connect,
495
statx,
496
+
readv,
497
498
/// userfd is meant to send file descriptors between Ring instances (using msgRing)
499
userfd,
···
547
path: [:0]const u8,
548
result: *Statx, // this will be filled in by the op
549
},
550
+
readv: struct {
551
+
fd: posix.fd_t,
552
+
vecs: []const posix.iovec,
553
+
},
554
555
userfd,
556
usermsg,
···
572
socket: ResultError!posix.fd_t,
573
connect: ResultError!void,
574
statx: ResultError!*Statx,
575
+
readv: ResultError!usize,
576
577
userfd: anyerror!posix.fd_t,
578
usermsg: u16,