qemu with hax to log dma reads & writes jcs.org/2018/11/12/vfio

Merge remote-tracking branch 'remotes/stefanha/tags/block-pull-request' into staging

Pull request

# gpg: Signature made Wed 11 Mar 2020 12:40:36 GMT
# gpg: using RSA key 8695A8BFD3F97CDAAC35775A9CA4ABB381AB73C8
# gpg: Good signature from "Stefan Hajnoczi <stefanha@redhat.com>" [full]
# gpg: aka "Stefan Hajnoczi <stefanha@gmail.com>" [full]
# Primary key fingerprint: 8695 A8BF D3F9 7CDA AC35 775A 9CA4 ABB3 81AB 73C8

* remotes/stefanha/tags/block-pull-request:
aio-posix: remove idle poll handlers to improve scalability
aio-posix: support userspace polling of fd monitoring
aio-posix: add io_uring fd monitoring implementation
aio-posix: simplify FDMonOps->update() prototype
aio-posix: extract ppoll(2) and epoll(7) fd monitoring
aio-posix: move RCU_READ_LOCK() into run_poll_handlers()
aio-posix: completely stop polling when disabled
aio-posix: remove confusing QLIST_SAFE_REMOVE()
qemu/queue.h: clear linked list pointers on remove

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>

+914 -312
+2
MAINTAINERS
··· 1885 1885 S: Supported 1886 1886 F: util/async.c 1887 1887 F: util/aio-*.c 1888 + F: util/aio-*.h 1889 + F: util/fdmon-*.c 1888 1890 F: block/io.c 1889 1891 F: migration/block* 1890 1892 F: include/block/aio.h
+5
configure
··· 4093 4093 linux_io_uring_cflags=$($pkg_config --cflags liburing) 4094 4094 linux_io_uring_libs=$($pkg_config --libs liburing) 4095 4095 linux_io_uring=yes 4096 + 4097 + # io_uring is used in libqemuutil.a where per-file -libs variables are not 4098 + # seen by programs linking the archive. It's not ideal, but just add the 4099 + # library dependency globally. 4100 + LIBS="$linux_io_uring_libs $LIBS" 4096 4101 else 4097 4102 if test "$linux_io_uring" = "yes" ; then 4098 4103 feature_not_found "linux io_uring" "Install liburing devel"
+69 -2
include/block/aio.h
··· 14 14 #ifndef QEMU_AIO_H 15 15 #define QEMU_AIO_H 16 16 17 + #ifdef CONFIG_LINUX_IO_URING 18 + #include <liburing.h> 19 + #endif 17 20 #include "qemu/queue.h" 18 21 #include "qemu/event_notifier.h" 19 22 #include "qemu/thread.h" ··· 52 55 struct LinuxAioState; 53 56 struct LuringState; 54 57 58 + /* Is polling disabled? */ 59 + bool aio_poll_disabled(AioContext *ctx); 60 + 61 + /* Callbacks for file descriptor monitoring implementations */ 62 + typedef struct { 63 + /* 64 + * update: 65 + * @ctx: the AioContext 66 + * @old_node: the existing handler or NULL if this file descriptor is being 67 + * monitored for the first time 68 + * @new_node: the new handler or NULL if this file descriptor is being 69 + * removed 70 + * 71 + * Add/remove/modify a monitored file descriptor. 72 + * 73 + * Called with ctx->list_lock acquired. 74 + */ 75 + void (*update)(AioContext *ctx, AioHandler *old_node, AioHandler *new_node); 76 + 77 + /* 78 + * wait: 79 + * @ctx: the AioContext 80 + * @ready_list: list for handlers that become ready 81 + * @timeout: maximum duration to wait, in nanoseconds 82 + * 83 + * Wait for file descriptors to become ready and place them on ready_list. 84 + * 85 + * Called with ctx->list_lock incremented but not locked. 86 + * 87 + * Returns: number of ready file descriptors. 88 + */ 89 + int (*wait)(AioContext *ctx, AioHandlerList *ready_list, int64_t timeout); 90 + 91 + /* 92 + * need_wait: 93 + * @ctx: the AioContext 94 + * 95 + * Tell aio_poll() when to stop userspace polling early because ->wait() 96 + * has fds ready. 97 + * 98 + * File descriptor monitoring implementations that cannot poll fd readiness 99 + * from userspace should use aio_poll_disabled() here. This ensures that 100 + * file descriptors are not starved by handlers that frequently make 101 + * progress via userspace polling. 102 + * 103 + * Returns: true if ->wait() should be called, false otherwise. 104 + */ 105 + bool (*need_wait)(AioContext *ctx); 106 + } FDMonOps; 107 + 55 108 /* 56 109 * Each aio_bh_poll() call carves off a slice of the BH list, so that newly 57 110 * scheduled BHs are not processed until the next aio_bh_poll() call. All ··· 64 117 BHList bh_list; 65 118 QSIMPLEQ_ENTRY(BHListSlice) next; 66 119 }; 120 + 121 + typedef QSLIST_HEAD(, AioHandler) AioHandlerSList; 67 122 68 123 struct AioContext { 69 124 GSource source; ··· 150 205 * locking. 151 206 */ 152 207 struct LuringState *linux_io_uring; 208 + 209 + /* State for file descriptor monitoring using Linux io_uring */ 210 + struct io_uring fdmon_io_uring; 211 + AioHandlerSList submit_list; 153 212 #endif 154 213 155 214 /* TimerLists for calling timers - one per clock type. Has its own ··· 168 227 int64_t poll_grow; /* polling time growth factor */ 169 228 int64_t poll_shrink; /* polling time shrink factor */ 170 229 230 + /* 231 + * List of handlers participating in userspace polling. Protected by 232 + * ctx->list_lock. Iterated and modified mostly by the event loop thread 233 + * from aio_poll() with ctx->list_lock incremented. aio_set_fd_handler() 234 + * only touches the list to delete nodes if ctx->list_lock's count is zero. 235 + */ 236 + AioHandlerList poll_aio_handlers; 237 + 171 238 /* Are we in polling mode or monitoring file descriptors? */ 172 239 bool poll_started; 173 240 174 241 /* epoll(7) state used when built with CONFIG_EPOLL */ 175 242 int epollfd; 176 - bool epoll_enabled; 177 - bool epoll_available; 243 + 244 + const FDMonOps *fdmon_ops; 178 245 }; 179 246 180 247 /**
+15 -4
include/qemu/queue.h
··· 142 142 (elm)->field.le_next->field.le_prev = \ 143 143 (elm)->field.le_prev; \ 144 144 *(elm)->field.le_prev = (elm)->field.le_next; \ 145 + (elm)->field.le_next = NULL; \ 146 + (elm)->field.le_prev = NULL; \ 145 147 } while (/*CONSTCOND*/0) 146 148 147 149 /* ··· 225 227 } while (/*CONSTCOND*/0) 226 228 227 229 #define QSLIST_REMOVE_HEAD(head, field) do { \ 228 - (head)->slh_first = (head)->slh_first->field.sle_next; \ 230 + typeof((head)->slh_first) elm = (head)->slh_first; \ 231 + (head)->slh_first = elm->field.sle_next; \ 232 + elm->field.sle_next = NULL; \ 229 233 } while (/*CONSTCOND*/0) 230 234 231 235 #define QSLIST_REMOVE_AFTER(slistelm, field) do { \ 232 - (slistelm)->field.sle_next = \ 233 - QSLIST_NEXT(QSLIST_NEXT((slistelm), field), field); \ 236 + typeof(slistelm) next = (slistelm)->field.sle_next; \ 237 + (slistelm)->field.sle_next = next->field.sle_next; \ 238 + next->field.sle_next = NULL; \ 234 239 } while (/*CONSTCOND*/0) 235 240 236 241 #define QSLIST_REMOVE(head, elm, type, field) do { \ ··· 241 246 while (curelm->field.sle_next != (elm)) \ 242 247 curelm = curelm->field.sle_next; \ 243 248 curelm->field.sle_next = curelm->field.sle_next->field.sle_next; \ 249 + (elm)->field.sle_next = NULL; \ 244 250 } \ 245 251 } while (/*CONSTCOND*/0) 246 252 ··· 304 310 } while (/*CONSTCOND*/0) 305 311 306 312 #define QSIMPLEQ_REMOVE_HEAD(head, field) do { \ 307 - if (((head)->sqh_first = (head)->sqh_first->field.sqe_next) == NULL)\ 313 + typeof((head)->sqh_first) elm = (head)->sqh_first; \ 314 + if (((head)->sqh_first = elm->field.sqe_next) == NULL) \ 308 315 (head)->sqh_last = &(head)->sqh_first; \ 316 + elm->field.sqe_next = NULL; \ 309 317 } while (/*CONSTCOND*/0) 310 318 311 319 #define QSIMPLEQ_SPLIT_AFTER(head, elm, field, removed) do { \ ··· 329 337 if ((curelm->field.sqe_next = \ 330 338 curelm->field.sqe_next->field.sqe_next) == NULL) \ 331 339 (head)->sqh_last = &(curelm)->field.sqe_next; \ 340 + (elm)->field.sqe_next = NULL; \ 332 341 } \ 333 342 } while (/*CONSTCOND*/0) 334 343 ··· 446 455 (head)->tqh_circ.tql_prev = (elm)->field.tqe_circ.tql_prev; \ 447 456 (elm)->field.tqe_circ.tql_prev->tql_next = (elm)->field.tqe_next; \ 448 457 (elm)->field.tqe_circ.tql_prev = NULL; \ 458 + (elm)->field.tqe_circ.tql_next = NULL; \ 459 + (elm)->field.tqe_next = NULL; \ 449 460 } while (/*CONSTCOND*/0) 450 461 451 462 /* remove @left, @right and all elements in between from @head */
+3
util/Makefile.objs
··· 5 5 util-obj-y += main-loop.o 6 6 util-obj-$(call lnot,$(CONFIG_ATOMIC64)) += atomic64.o 7 7 util-obj-$(CONFIG_POSIX) += aio-posix.o 8 + util-obj-$(CONFIG_POSIX) += fdmon-poll.o 9 + util-obj-$(CONFIG_EPOLL_CREATE1) += fdmon-epoll.o 10 + util-obj-$(CONFIG_LINUX_IO_URING) += fdmon-io_uring.o 8 11 util-obj-$(CONFIG_POSIX) += compatfd.o 9 12 util-obj-$(CONFIG_POSIX) += event_notifier-posix.o 10 13 util-obj-$(CONFIG_POSIX) += mmap-alloc.o
+143 -306
util/aio-posix.c
··· 20 20 #include "qemu/sockets.h" 21 21 #include "qemu/cutils.h" 22 22 #include "trace.h" 23 - #ifdef CONFIG_EPOLL_CREATE1 24 - #include <sys/epoll.h> 25 - #endif 23 + #include "aio-posix.h" 26 24 27 - struct AioHandler 25 + /* Stop userspace polling on a handler if it isn't active for some time */ 26 + #define POLL_IDLE_INTERVAL_NS (7 * NANOSECONDS_PER_SECOND) 27 + 28 + bool aio_poll_disabled(AioContext *ctx) 28 29 { 29 - GPollFD pfd; 30 - IOHandler *io_read; 31 - IOHandler *io_write; 32 - AioPollFn *io_poll; 33 - IOHandler *io_poll_begin; 34 - IOHandler *io_poll_end; 35 - void *opaque; 36 - bool is_external; 37 - QLIST_ENTRY(AioHandler) node; 38 - QLIST_ENTRY(AioHandler) node_ready; /* only used during aio_poll() */ 39 - QLIST_ENTRY(AioHandler) node_deleted; 40 - }; 30 + return atomic_read(&ctx->poll_disable_cnt); 31 + } 41 32 42 - /* Add a handler to a ready list */ 43 - static void add_ready_handler(AioHandlerList *ready_list, 44 - AioHandler *node, 45 - int revents) 33 + void aio_add_ready_handler(AioHandlerList *ready_list, 34 + AioHandler *node, 35 + int revents) 46 36 { 47 37 QLIST_SAFE_REMOVE(node, node_ready); /* remove from nested parent's list */ 48 38 node->pfd.revents = revents; 49 39 QLIST_INSERT_HEAD(ready_list, node, node_ready); 50 40 } 51 41 52 - #ifdef CONFIG_EPOLL_CREATE1 53 - 54 - /* The fd number threshold to switch to epoll */ 55 - #define EPOLL_ENABLE_THRESHOLD 64 56 - 57 - static void aio_epoll_disable(AioContext *ctx) 58 - { 59 - ctx->epoll_enabled = false; 60 - if (!ctx->epoll_available) { 61 - return; 62 - } 63 - ctx->epoll_available = false; 64 - close(ctx->epollfd); 65 - } 66 - 67 - static inline int epoll_events_from_pfd(int pfd_events) 68 - { 69 - return (pfd_events & G_IO_IN ? EPOLLIN : 0) | 70 - (pfd_events & G_IO_OUT ? EPOLLOUT : 0) | 71 - (pfd_events & G_IO_HUP ? EPOLLHUP : 0) | 72 - (pfd_events & G_IO_ERR ? EPOLLERR : 0); 73 - } 74 - 75 - static bool aio_epoll_try_enable(AioContext *ctx) 76 - { 77 - AioHandler *node; 78 - struct epoll_event event; 79 - 80 - QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 81 - int r; 82 - if (QLIST_IS_INSERTED(node, node_deleted) || !node->pfd.events) { 83 - continue; 84 - } 85 - event.events = epoll_events_from_pfd(node->pfd.events); 86 - event.data.ptr = node; 87 - r = epoll_ctl(ctx->epollfd, EPOLL_CTL_ADD, node->pfd.fd, &event); 88 - if (r) { 89 - return false; 90 - } 91 - } 92 - ctx->epoll_enabled = true; 93 - return true; 94 - } 95 - 96 - static void aio_epoll_update(AioContext *ctx, AioHandler *node, bool is_new) 97 - { 98 - struct epoll_event event; 99 - int r; 100 - int ctl; 101 - 102 - if (!ctx->epoll_enabled) { 103 - return; 104 - } 105 - if (!node->pfd.events) { 106 - ctl = EPOLL_CTL_DEL; 107 - } else { 108 - event.data.ptr = node; 109 - event.events = epoll_events_from_pfd(node->pfd.events); 110 - ctl = is_new ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; 111 - } 112 - 113 - r = epoll_ctl(ctx->epollfd, ctl, node->pfd.fd, &event); 114 - if (r) { 115 - aio_epoll_disable(ctx); 116 - } 117 - } 118 - 119 - static int aio_epoll(AioContext *ctx, AioHandlerList *ready_list, 120 - int64_t timeout) 121 - { 122 - GPollFD pfd = { 123 - .fd = ctx->epollfd, 124 - .events = G_IO_IN | G_IO_OUT | G_IO_HUP | G_IO_ERR, 125 - }; 126 - AioHandler *node; 127 - int i, ret = 0; 128 - struct epoll_event events[128]; 129 - 130 - if (timeout > 0) { 131 - ret = qemu_poll_ns(&pfd, 1, timeout); 132 - if (ret > 0) { 133 - timeout = 0; 134 - } 135 - } 136 - if (timeout <= 0 || ret > 0) { 137 - ret = epoll_wait(ctx->epollfd, events, 138 - ARRAY_SIZE(events), 139 - timeout); 140 - if (ret <= 0) { 141 - goto out; 142 - } 143 - for (i = 0; i < ret; i++) { 144 - int ev = events[i].events; 145 - int revents = (ev & EPOLLIN ? G_IO_IN : 0) | 146 - (ev & EPOLLOUT ? G_IO_OUT : 0) | 147 - (ev & EPOLLHUP ? G_IO_HUP : 0) | 148 - (ev & EPOLLERR ? G_IO_ERR : 0); 149 - 150 - node = events[i].data.ptr; 151 - add_ready_handler(ready_list, node, revents); 152 - } 153 - } 154 - out: 155 - return ret; 156 - } 157 - 158 - static bool aio_epoll_enabled(AioContext *ctx) 159 - { 160 - /* Fall back to ppoll when external clients are disabled. */ 161 - return !aio_external_disabled(ctx) && ctx->epoll_enabled; 162 - } 163 - 164 - static bool aio_epoll_check_poll(AioContext *ctx, GPollFD *pfds, 165 - unsigned npfd, int64_t timeout) 166 - { 167 - if (!ctx->epoll_available) { 168 - return false; 169 - } 170 - if (aio_epoll_enabled(ctx)) { 171 - return true; 172 - } 173 - if (npfd >= EPOLL_ENABLE_THRESHOLD) { 174 - if (aio_epoll_try_enable(ctx)) { 175 - return true; 176 - } else { 177 - aio_epoll_disable(ctx); 178 - } 179 - } 180 - return false; 181 - } 182 - 183 - #else 184 - 185 - static void aio_epoll_update(AioContext *ctx, AioHandler *node, bool is_new) 186 - { 187 - } 188 - 189 - static int aio_epoll(AioContext *ctx, AioHandlerList *ready_list, 190 - int64_t timeout) 191 - { 192 - assert(false); 193 - } 194 - 195 - static bool aio_epoll_enabled(AioContext *ctx) 196 - { 197 - return false; 198 - } 199 - 200 - static bool aio_epoll_check_poll(AioContext *ctx, GPollFD *pfds, 201 - unsigned npfd, int64_t timeout) 202 - { 203 - return false; 204 - } 205 - 206 - #endif 207 - 208 42 static AioHandler *find_aio_handler(AioContext *ctx, int fd) 209 43 { 210 44 AioHandler *node; ··· 231 65 g_source_remove_poll(&ctx->source, &node->pfd); 232 66 } 233 67 68 + node->pfd.revents = 0; 69 + 70 + /* If the fd monitor has already marked it deleted, leave it alone */ 71 + if (QLIST_IS_INSERTED(node, node_deleted)) { 72 + return false; 73 + } 74 + 234 75 /* If a read is in progress, just mark the node as deleted */ 235 76 if (qemu_lockcnt_count(&ctx->list_lock)) { 236 77 QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted); 237 - node->pfd.revents = 0; 238 78 return false; 239 79 } 240 80 /* Otherwise, delete it for real. We can't just mark it as 241 81 * deleted because deleted nodes are only cleaned up while 242 82 * no one is walking the handlers list. 243 83 */ 84 + QLIST_SAFE_REMOVE(node, node_poll); 244 85 QLIST_REMOVE(node, node); 245 86 return true; 246 87 } ··· 300 141 301 142 QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node); 302 143 } 303 - if (node) { 304 - deleted = aio_remove_fd_handler(ctx, node); 305 - } 306 144 307 145 /* No need to order poll_disable_cnt writes against other updates; 308 146 * the counter is only used to avoid wasting time and latency on ··· 313 151 atomic_set(&ctx->poll_disable_cnt, 314 152 atomic_read(&ctx->poll_disable_cnt) + poll_disable_change); 315 153 316 - if (new_node) { 317 - aio_epoll_update(ctx, new_node, is_new); 318 - } else if (node) { 319 - /* Unregister deleted fd_handler */ 320 - aio_epoll_update(ctx, node, false); 154 + ctx->fdmon_ops->update(ctx, node, new_node); 155 + if (node) { 156 + deleted = aio_remove_fd_handler(ctx, node); 321 157 } 322 158 qemu_lockcnt_unlock(&ctx->list_lock); 323 159 aio_notify(ctx); ··· 361 197 (IOHandler *)io_poll_end); 362 198 } 363 199 364 - static void poll_set_started(AioContext *ctx, bool started) 200 + static bool poll_set_started(AioContext *ctx, bool started) 365 201 { 366 202 AioHandler *node; 203 + bool progress = false; 367 204 368 205 if (started == ctx->poll_started) { 369 - return; 206 + return false; 370 207 } 371 208 372 209 ctx->poll_started = started; 373 210 374 211 qemu_lockcnt_inc(&ctx->list_lock); 375 - QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 212 + QLIST_FOREACH(node, &ctx->poll_aio_handlers, node_poll) { 376 213 IOHandler *fn; 377 214 378 215 if (QLIST_IS_INSERTED(node, node_deleted)) { ··· 388 225 if (fn) { 389 226 fn(node->opaque); 390 227 } 228 + 229 + /* Poll one last time in case ->io_poll_end() raced with the event */ 230 + if (!started) { 231 + progress = node->io_poll(node->opaque) || progress; 232 + } 391 233 } 392 234 qemu_lockcnt_dec(&ctx->list_lock); 235 + 236 + return progress; 393 237 } 394 238 395 239 ··· 446 290 while ((node = QLIST_FIRST_RCU(&ctx->deleted_aio_handlers))) { 447 291 QLIST_REMOVE(node, node); 448 292 QLIST_REMOVE(node, node_deleted); 293 + QLIST_SAFE_REMOVE(node, node_poll); 449 294 g_free(node); 450 295 } 451 296 ··· 460 305 revents = node->pfd.revents & node->pfd.events; 461 306 node->pfd.revents = 0; 462 307 308 + /* 309 + * Start polling AioHandlers when they become ready because activity is 310 + * likely to continue. Note that starvation is theoretically possible when 311 + * fdmon_supports_polling(), but only until the fd fires for the first 312 + * time. 313 + */ 314 + if (!QLIST_IS_INSERTED(node, node_deleted) && 315 + !QLIST_IS_INSERTED(node, node_poll) && 316 + node->io_poll) { 317 + trace_poll_add(ctx, node, node->pfd.fd, revents); 318 + if (ctx->poll_started && node->io_poll_begin) { 319 + node->io_poll_begin(node->opaque); 320 + } 321 + QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll); 322 + } 323 + 463 324 if (!QLIST_IS_INSERTED(node, node_deleted) && 464 325 (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) && 465 326 aio_node_check(ctx, node->is_external) && ··· 493 354 AioHandler *node; 494 355 495 356 while ((node = QLIST_FIRST(ready_list))) { 496 - QLIST_SAFE_REMOVE(node, node_ready); 357 + QLIST_REMOVE(node, node_ready); 497 358 progress = aio_dispatch_handler(ctx, node) || progress; 498 359 } 499 360 ··· 524 385 timerlistgroup_run_timers(&ctx->tlg); 525 386 } 526 387 527 - /* These thread-local variables are used only in a small part of aio_poll 528 - * around the call to the poll() system call. In particular they are not 529 - * used while aio_poll is performing callbacks, which makes it much easier 530 - * to think about reentrancy! 531 - * 532 - * Stack-allocated arrays would be perfect but they have size limitations; 533 - * heap allocation is expensive enough that we want to reuse arrays across 534 - * calls to aio_poll(). And because poll() has to be called without holding 535 - * any lock, the arrays cannot be stored in AioContext. Thread-local data 536 - * has none of the disadvantages of these three options. 537 - */ 538 - static __thread GPollFD *pollfds; 539 - static __thread AioHandler **nodes; 540 - static __thread unsigned npfd, nalloc; 541 - static __thread Notifier pollfds_cleanup_notifier; 542 - 543 - static void pollfds_cleanup(Notifier *n, void *unused) 544 - { 545 - g_assert(npfd == 0); 546 - g_free(pollfds); 547 - g_free(nodes); 548 - nalloc = 0; 549 - } 550 - 551 - static void add_pollfd(AioHandler *node) 552 - { 553 - if (npfd == nalloc) { 554 - if (nalloc == 0) { 555 - pollfds_cleanup_notifier.notify = pollfds_cleanup; 556 - qemu_thread_atexit_add(&pollfds_cleanup_notifier); 557 - nalloc = 8; 558 - } else { 559 - g_assert(nalloc <= INT_MAX); 560 - nalloc *= 2; 561 - } 562 - pollfds = g_renew(GPollFD, pollfds, nalloc); 563 - nodes = g_renew(AioHandler *, nodes, nalloc); 564 - } 565 - nodes[npfd] = node; 566 - pollfds[npfd] = (GPollFD) { 567 - .fd = node->pfd.fd, 568 - .events = node->pfd.events, 569 - }; 570 - npfd++; 571 - } 572 - 573 - static bool run_poll_handlers_once(AioContext *ctx, int64_t *timeout) 388 + static bool run_poll_handlers_once(AioContext *ctx, 389 + int64_t now, 390 + int64_t *timeout) 574 391 { 575 392 bool progress = false; 576 393 AioHandler *node; 577 - 578 - /* 579 - * Optimization: ->io_poll() handlers often contain RCU read critical 580 - * sections and we therefore see many rcu_read_lock() -> rcu_read_unlock() 581 - * -> rcu_read_lock() -> ... sequences with expensive memory 582 - * synchronization primitives. Make the entire polling loop an RCU 583 - * critical section because nested rcu_read_lock()/rcu_read_unlock() calls 584 - * are cheap. 585 - */ 586 - RCU_READ_LOCK_GUARD(); 394 + AioHandler *tmp; 587 395 588 - QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 589 - if (!QLIST_IS_INSERTED(node, node_deleted) && node->io_poll && 590 - aio_node_check(ctx, node->is_external) && 396 + QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) { 397 + if (aio_node_check(ctx, node->is_external) && 591 398 node->io_poll(node->opaque)) { 399 + node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS; 400 + 592 401 /* 593 402 * Polling was successful, exit try_poll_mode immediately 594 403 * to adjust the next polling time. ··· 605 414 return progress; 606 415 } 607 416 417 + static bool fdmon_supports_polling(AioContext *ctx) 418 + { 419 + return ctx->fdmon_ops->need_wait != aio_poll_disabled; 420 + } 421 + 422 + static bool remove_idle_poll_handlers(AioContext *ctx, int64_t now) 423 + { 424 + AioHandler *node; 425 + AioHandler *tmp; 426 + bool progress = false; 427 + 428 + /* 429 + * File descriptor monitoring implementations without userspace polling 430 + * support suffer from starvation when a subset of handlers is polled 431 + * because fds will not be processed in a timely fashion. Don't remove 432 + * idle poll handlers. 433 + */ 434 + if (!fdmon_supports_polling(ctx)) { 435 + return false; 436 + } 437 + 438 + QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) { 439 + if (node->poll_idle_timeout == 0LL) { 440 + node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS; 441 + } else if (now >= node->poll_idle_timeout) { 442 + trace_poll_remove(ctx, node, node->pfd.fd); 443 + node->poll_idle_timeout = 0LL; 444 + QLIST_SAFE_REMOVE(node, node_poll); 445 + if (ctx->poll_started && node->io_poll_end) { 446 + node->io_poll_end(node->opaque); 447 + 448 + /* 449 + * Final poll in case ->io_poll_end() races with an event. 450 + * Nevermind about re-adding the handler in the rare case where 451 + * this causes progress. 452 + */ 453 + progress = node->io_poll(node->opaque) || progress; 454 + } 455 + } 456 + } 457 + 458 + return progress; 459 + } 460 + 608 461 /* run_poll_handlers: 609 462 * @ctx: the AioContext 610 463 * @max_ns: maximum time to poll for, in nanoseconds ··· 628 481 629 482 trace_run_poll_handlers_begin(ctx, max_ns, *timeout); 630 483 484 + /* 485 + * Optimization: ->io_poll() handlers often contain RCU read critical 486 + * sections and we therefore see many rcu_read_lock() -> rcu_read_unlock() 487 + * -> rcu_read_lock() -> ... sequences with expensive memory 488 + * synchronization primitives. Make the entire polling loop an RCU 489 + * critical section because nested rcu_read_lock()/rcu_read_unlock() calls 490 + * are cheap. 491 + */ 492 + RCU_READ_LOCK_GUARD(); 493 + 631 494 start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME); 632 495 do { 633 - progress = run_poll_handlers_once(ctx, timeout); 496 + progress = run_poll_handlers_once(ctx, start_time, timeout); 634 497 elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time; 635 498 max_ns = qemu_soonest_timeout(*timeout, max_ns); 636 499 assert(!(max_ns && progress)); 637 - } while (elapsed_time < max_ns && !atomic_read(&ctx->poll_disable_cnt)); 500 + } while (elapsed_time < max_ns && !ctx->fdmon_ops->need_wait(ctx)); 501 + 502 + if (remove_idle_poll_handlers(ctx, start_time + elapsed_time)) { 503 + *timeout = 0; 504 + progress = true; 505 + } 638 506 639 507 /* If time has passed with no successful polling, adjust *timeout to 640 508 * keep the same ending time. ··· 660 528 */ 661 529 static bool try_poll_mode(AioContext *ctx, int64_t *timeout) 662 530 { 663 - int64_t max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns); 531 + int64_t max_ns; 532 + 533 + if (QLIST_EMPTY_RCU(&ctx->poll_aio_handlers)) { 534 + return false; 535 + } 664 536 665 - if (max_ns && !atomic_read(&ctx->poll_disable_cnt)) { 537 + max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns); 538 + if (max_ns && !ctx->fdmon_ops->need_wait(ctx)) { 666 539 poll_set_started(ctx, true); 667 540 668 541 if (run_poll_handlers(ctx, max_ns, timeout)) { ··· 670 543 } 671 544 } 672 545 673 - poll_set_started(ctx, false); 546 + if (poll_set_started(ctx, false)) { 547 + *timeout = 0; 548 + return true; 549 + } 674 550 675 - /* Even if we don't run busy polling, try polling once in case it can make 676 - * progress and the caller will be able to avoid ppoll(2)/epoll_wait(2). 677 - */ 678 - return run_poll_handlers_once(ctx, timeout); 551 + return false; 679 552 } 680 553 681 554 bool aio_poll(AioContext *ctx, bool blocking) 682 555 { 683 556 AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list); 684 - AioHandler *node; 685 - int i; 686 557 int ret = 0; 687 558 bool progress; 688 559 int64_t timeout; ··· 714 585 /* If polling is allowed, non-blocking aio_poll does not need the 715 586 * system call---a single round of run_poll_handlers_once suffices. 716 587 */ 717 - if (timeout || atomic_read(&ctx->poll_disable_cnt)) { 718 - assert(npfd == 0); 719 - 720 - /* fill pollfds */ 721 - 722 - if (!aio_epoll_enabled(ctx)) { 723 - QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 724 - if (!QLIST_IS_INSERTED(node, node_deleted) && node->pfd.events 725 - && aio_node_check(ctx, node->is_external)) { 726 - add_pollfd(node); 727 - } 728 - } 729 - } 730 - 731 - /* wait until next event */ 732 - if (aio_epoll_check_poll(ctx, pollfds, npfd, timeout)) { 733 - npfd = 0; /* pollfds[] is not being used */ 734 - ret = aio_epoll(ctx, &ready_list, timeout); 735 - } else { 736 - ret = qemu_poll_ns(pollfds, npfd, timeout); 737 - } 588 + if (timeout || ctx->fdmon_ops->need_wait(ctx)) { 589 + ret = ctx->fdmon_ops->wait(ctx, &ready_list, timeout); 738 590 } 739 591 740 592 if (blocking) { ··· 783 635 } 784 636 } 785 637 786 - /* if we have any readable fds, dispatch event */ 787 - if (ret > 0) { 788 - for (i = 0; i < npfd; i++) { 789 - int revents = pollfds[i].revents; 790 - 791 - if (revents) { 792 - add_ready_handler(&ready_list, nodes[i], revents); 793 - } 794 - } 795 - } 796 - 797 - npfd = 0; 798 - 799 638 progress |= aio_bh_poll(ctx); 800 639 801 640 if (ret > 0) { ··· 813 652 814 653 void aio_context_setup(AioContext *ctx) 815 654 { 816 - #ifdef CONFIG_EPOLL_CREATE1 817 - assert(!ctx->epollfd); 818 - ctx->epollfd = epoll_create1(EPOLL_CLOEXEC); 819 - if (ctx->epollfd == -1) { 820 - fprintf(stderr, "Failed to create epoll instance: %s", strerror(errno)); 821 - ctx->epoll_available = false; 822 - } else { 823 - ctx->epoll_available = true; 655 + ctx->fdmon_ops = &fdmon_poll_ops; 656 + ctx->epollfd = -1; 657 + 658 + /* Use the fastest fd monitoring implementation if available */ 659 + if (fdmon_io_uring_setup(ctx)) { 660 + return; 824 661 } 825 - #endif 662 + 663 + fdmon_epoll_setup(ctx); 826 664 } 827 665 828 666 void aio_context_destroy(AioContext *ctx) 829 667 { 830 - #ifdef CONFIG_EPOLL_CREATE1 831 - aio_epoll_disable(ctx); 832 - #endif 668 + fdmon_io_uring_destroy(ctx); 669 + fdmon_epoll_disable(ctx); 833 670 } 834 671 835 672 void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
+81
util/aio-posix.h
··· 1 + /* 2 + * AioContext POSIX event loop implementation internal APIs 3 + * 4 + * Copyright IBM, Corp. 2008 5 + * Copyright Red Hat, Inc. 2020 6 + * 7 + * Authors: 8 + * Anthony Liguori <aliguori@us.ibm.com> 9 + * 10 + * This work is licensed under the terms of the GNU GPL, version 2. See 11 + * the COPYING file in the top-level directory. 12 + * 13 + * Contributions after 2012-01-13 are licensed under the terms of the 14 + * GNU GPL, version 2 or (at your option) any later version. 15 + */ 16 + 17 + #ifndef AIO_POSIX_H 18 + #define AIO_POSIX_H 19 + 20 + #include "block/aio.h" 21 + 22 + struct AioHandler { 23 + GPollFD pfd; 24 + IOHandler *io_read; 25 + IOHandler *io_write; 26 + AioPollFn *io_poll; 27 + IOHandler *io_poll_begin; 28 + IOHandler *io_poll_end; 29 + void *opaque; 30 + QLIST_ENTRY(AioHandler) node; 31 + QLIST_ENTRY(AioHandler) node_ready; /* only used during aio_poll() */ 32 + QLIST_ENTRY(AioHandler) node_deleted; 33 + QLIST_ENTRY(AioHandler) node_poll; 34 + #ifdef CONFIG_LINUX_IO_URING 35 + QSLIST_ENTRY(AioHandler) node_submitted; 36 + unsigned flags; /* see fdmon-io_uring.c */ 37 + #endif 38 + int64_t poll_idle_timeout; /* when to stop userspace polling */ 39 + bool is_external; 40 + }; 41 + 42 + /* Add a handler to a ready list */ 43 + void aio_add_ready_handler(AioHandlerList *ready_list, AioHandler *node, 44 + int revents); 45 + 46 + extern const FDMonOps fdmon_poll_ops; 47 + 48 + #ifdef CONFIG_EPOLL_CREATE1 49 + bool fdmon_epoll_try_upgrade(AioContext *ctx, unsigned npfd); 50 + void fdmon_epoll_setup(AioContext *ctx); 51 + void fdmon_epoll_disable(AioContext *ctx); 52 + #else 53 + static inline bool fdmon_epoll_try_upgrade(AioContext *ctx, unsigned npfd) 54 + { 55 + return false; 56 + } 57 + 58 + static inline void fdmon_epoll_setup(AioContext *ctx) 59 + { 60 + } 61 + 62 + static inline void fdmon_epoll_disable(AioContext *ctx) 63 + { 64 + } 65 + #endif /* !CONFIG_EPOLL_CREATE1 */ 66 + 67 + #ifdef CONFIG_LINUX_IO_URING 68 + bool fdmon_io_uring_setup(AioContext *ctx); 69 + void fdmon_io_uring_destroy(AioContext *ctx); 70 + #else 71 + static inline bool fdmon_io_uring_setup(AioContext *ctx) 72 + { 73 + return false; 74 + } 75 + 76 + static inline void fdmon_io_uring_destroy(AioContext *ctx) 77 + { 78 + } 79 + #endif /* !CONFIG_LINUX_IO_URING */ 80 + 81 + #endif /* AIO_POSIX_H */
+155
util/fdmon-epoll.c
··· 1 + /* SPDX-License-Identifier: GPL-2.0-or-later */ 2 + /* 3 + * epoll(7) file descriptor monitoring 4 + */ 5 + 6 + #include "qemu/osdep.h" 7 + #include <sys/epoll.h> 8 + #include "qemu/rcu_queue.h" 9 + #include "aio-posix.h" 10 + 11 + /* The fd number threshold to switch to epoll */ 12 + #define EPOLL_ENABLE_THRESHOLD 64 13 + 14 + void fdmon_epoll_disable(AioContext *ctx) 15 + { 16 + if (ctx->epollfd >= 0) { 17 + close(ctx->epollfd); 18 + ctx->epollfd = -1; 19 + } 20 + 21 + /* Switch back */ 22 + ctx->fdmon_ops = &fdmon_poll_ops; 23 + } 24 + 25 + static inline int epoll_events_from_pfd(int pfd_events) 26 + { 27 + return (pfd_events & G_IO_IN ? EPOLLIN : 0) | 28 + (pfd_events & G_IO_OUT ? EPOLLOUT : 0) | 29 + (pfd_events & G_IO_HUP ? EPOLLHUP : 0) | 30 + (pfd_events & G_IO_ERR ? EPOLLERR : 0); 31 + } 32 + 33 + static void fdmon_epoll_update(AioContext *ctx, 34 + AioHandler *old_node, 35 + AioHandler *new_node) 36 + { 37 + struct epoll_event event = { 38 + .data.ptr = new_node, 39 + .events = new_node ? epoll_events_from_pfd(new_node->pfd.events) : 0, 40 + }; 41 + int r; 42 + 43 + if (!new_node) { 44 + r = epoll_ctl(ctx->epollfd, EPOLL_CTL_DEL, old_node->pfd.fd, &event); 45 + } else if (!old_node) { 46 + r = epoll_ctl(ctx->epollfd, EPOLL_CTL_ADD, new_node->pfd.fd, &event); 47 + } else { 48 + r = epoll_ctl(ctx->epollfd, EPOLL_CTL_MOD, new_node->pfd.fd, &event); 49 + } 50 + 51 + if (r) { 52 + fdmon_epoll_disable(ctx); 53 + } 54 + } 55 + 56 + static int fdmon_epoll_wait(AioContext *ctx, AioHandlerList *ready_list, 57 + int64_t timeout) 58 + { 59 + GPollFD pfd = { 60 + .fd = ctx->epollfd, 61 + .events = G_IO_IN | G_IO_OUT | G_IO_HUP | G_IO_ERR, 62 + }; 63 + AioHandler *node; 64 + int i, ret = 0; 65 + struct epoll_event events[128]; 66 + 67 + /* Fall back while external clients are disabled */ 68 + if (atomic_read(&ctx->external_disable_cnt)) { 69 + return fdmon_poll_ops.wait(ctx, ready_list, timeout); 70 + } 71 + 72 + if (timeout > 0) { 73 + ret = qemu_poll_ns(&pfd, 1, timeout); 74 + if (ret > 0) { 75 + timeout = 0; 76 + } 77 + } 78 + if (timeout <= 0 || ret > 0) { 79 + ret = epoll_wait(ctx->epollfd, events, 80 + ARRAY_SIZE(events), 81 + timeout); 82 + if (ret <= 0) { 83 + goto out; 84 + } 85 + for (i = 0; i < ret; i++) { 86 + int ev = events[i].events; 87 + int revents = (ev & EPOLLIN ? G_IO_IN : 0) | 88 + (ev & EPOLLOUT ? G_IO_OUT : 0) | 89 + (ev & EPOLLHUP ? G_IO_HUP : 0) | 90 + (ev & EPOLLERR ? G_IO_ERR : 0); 91 + 92 + node = events[i].data.ptr; 93 + aio_add_ready_handler(ready_list, node, revents); 94 + } 95 + } 96 + out: 97 + return ret; 98 + } 99 + 100 + static const FDMonOps fdmon_epoll_ops = { 101 + .update = fdmon_epoll_update, 102 + .wait = fdmon_epoll_wait, 103 + .need_wait = aio_poll_disabled, 104 + }; 105 + 106 + static bool fdmon_epoll_try_enable(AioContext *ctx) 107 + { 108 + AioHandler *node; 109 + struct epoll_event event; 110 + 111 + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 112 + int r; 113 + if (QLIST_IS_INSERTED(node, node_deleted) || !node->pfd.events) { 114 + continue; 115 + } 116 + event.events = epoll_events_from_pfd(node->pfd.events); 117 + event.data.ptr = node; 118 + r = epoll_ctl(ctx->epollfd, EPOLL_CTL_ADD, node->pfd.fd, &event); 119 + if (r) { 120 + return false; 121 + } 122 + } 123 + 124 + ctx->fdmon_ops = &fdmon_epoll_ops; 125 + return true; 126 + } 127 + 128 + bool fdmon_epoll_try_upgrade(AioContext *ctx, unsigned npfd) 129 + { 130 + if (ctx->epollfd < 0) { 131 + return false; 132 + } 133 + 134 + /* Do not upgrade while external clients are disabled */ 135 + if (atomic_read(&ctx->external_disable_cnt)) { 136 + return false; 137 + } 138 + 139 + if (npfd >= EPOLL_ENABLE_THRESHOLD) { 140 + if (fdmon_epoll_try_enable(ctx)) { 141 + return true; 142 + } else { 143 + fdmon_epoll_disable(ctx); 144 + } 145 + } 146 + return false; 147 + } 148 + 149 + void fdmon_epoll_setup(AioContext *ctx) 150 + { 151 + ctx->epollfd = epoll_create1(EPOLL_CLOEXEC); 152 + if (ctx->epollfd == -1) { 153 + fprintf(stderr, "Failed to create epoll instance: %s", strerror(errno)); 154 + } 155 + }
+332
util/fdmon-io_uring.c
··· 1 + /* SPDX-License-Identifier: GPL-2.0-or-later */ 2 + /* 3 + * Linux io_uring file descriptor monitoring 4 + * 5 + * The Linux io_uring API supports file descriptor monitoring with a few 6 + * advantages over existing APIs like poll(2) and epoll(7): 7 + * 8 + * 1. Userspace polling of events is possible because the completion queue (cq 9 + * ring) is shared between the kernel and userspace. This allows 10 + * applications that rely on userspace polling to also monitor file 11 + * descriptors in the same userspace polling loop. 12 + * 13 + * 2. Submission and completion is batched and done together in a single system 14 + * call. This minimizes the number of system calls. 15 + * 16 + * 3. File descriptor monitoring is O(1) like epoll(7) so it scales better than 17 + * poll(2). 18 + * 19 + * 4. Nanosecond timeouts are supported so it requires fewer syscalls than 20 + * epoll(7). 21 + * 22 + * This code only monitors file descriptors and does not do asynchronous disk 23 + * I/O. Implementing disk I/O efficiently has other requirements and should 24 + * use a separate io_uring so it does not make sense to unify the code. 25 + * 26 + * File descriptor monitoring is implemented using the following operations: 27 + * 28 + * 1. IORING_OP_POLL_ADD - adds a file descriptor to be monitored. 29 + * 2. IORING_OP_POLL_REMOVE - removes a file descriptor being monitored. When 30 + * the poll mask changes for a file descriptor it is first removed and then 31 + * re-added with the new poll mask, so this operation is also used as part 32 + * of modifying an existing monitored file descriptor. 33 + * 3. IORING_OP_TIMEOUT - added every time a blocking syscall is made to wait 34 + * for events. This operation self-cancels if another event completes 35 + * before the timeout. 36 + * 37 + * io_uring calls the submission queue the "sq ring" and the completion queue 38 + * the "cq ring". Ring entries are called "sqe" and "cqe", respectively. 39 + * 40 + * The code is structured so that sq/cq rings are only modified within 41 + * fdmon_io_uring_wait(). Changes to AioHandlers are made by enqueuing them on 42 + * ctx->submit_list so that fdmon_io_uring_wait() can submit IORING_OP_POLL_ADD 43 + * and/or IORING_OP_POLL_REMOVE sqes for them. 44 + */ 45 + 46 + #include "qemu/osdep.h" 47 + #include <poll.h> 48 + #include "qemu/rcu_queue.h" 49 + #include "aio-posix.h" 50 + 51 + enum { 52 + FDMON_IO_URING_ENTRIES = 128, /* sq/cq ring size */ 53 + 54 + /* AioHandler::flags */ 55 + FDMON_IO_URING_PENDING = (1 << 0), 56 + FDMON_IO_URING_ADD = (1 << 1), 57 + FDMON_IO_URING_REMOVE = (1 << 2), 58 + }; 59 + 60 + static inline int poll_events_from_pfd(int pfd_events) 61 + { 62 + return (pfd_events & G_IO_IN ? POLLIN : 0) | 63 + (pfd_events & G_IO_OUT ? POLLOUT : 0) | 64 + (pfd_events & G_IO_HUP ? POLLHUP : 0) | 65 + (pfd_events & G_IO_ERR ? POLLERR : 0); 66 + } 67 + 68 + static inline int pfd_events_from_poll(int poll_events) 69 + { 70 + return (poll_events & POLLIN ? G_IO_IN : 0) | 71 + (poll_events & POLLOUT ? G_IO_OUT : 0) | 72 + (poll_events & POLLHUP ? G_IO_HUP : 0) | 73 + (poll_events & POLLERR ? G_IO_ERR : 0); 74 + } 75 + 76 + /* 77 + * Returns an sqe for submitting a request. Only be called within 78 + * fdmon_io_uring_wait(). 79 + */ 80 + static struct io_uring_sqe *get_sqe(AioContext *ctx) 81 + { 82 + struct io_uring *ring = &ctx->fdmon_io_uring; 83 + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); 84 + int ret; 85 + 86 + if (likely(sqe)) { 87 + return sqe; 88 + } 89 + 90 + /* No free sqes left, submit pending sqes first */ 91 + ret = io_uring_submit(ring); 92 + assert(ret > 1); 93 + sqe = io_uring_get_sqe(ring); 94 + assert(sqe); 95 + return sqe; 96 + } 97 + 98 + /* Atomically enqueue an AioHandler for sq ring submission */ 99 + static void enqueue(AioHandlerSList *head, AioHandler *node, unsigned flags) 100 + { 101 + unsigned old_flags; 102 + 103 + old_flags = atomic_fetch_or(&node->flags, FDMON_IO_URING_PENDING | flags); 104 + if (!(old_flags & FDMON_IO_URING_PENDING)) { 105 + QSLIST_INSERT_HEAD_ATOMIC(head, node, node_submitted); 106 + } 107 + } 108 + 109 + /* Dequeue an AioHandler for sq ring submission. Called by fill_sq_ring(). */ 110 + static AioHandler *dequeue(AioHandlerSList *head, unsigned *flags) 111 + { 112 + AioHandler *node = QSLIST_FIRST(head); 113 + 114 + if (!node) { 115 + return NULL; 116 + } 117 + 118 + /* Doesn't need to be atomic since fill_sq_ring() moves the list */ 119 + QSLIST_REMOVE_HEAD(head, node_submitted); 120 + 121 + /* 122 + * Don't clear FDMON_IO_URING_REMOVE. It's sticky so it can serve two 123 + * purposes: telling fill_sq_ring() to submit IORING_OP_POLL_REMOVE and 124 + * telling process_cqe() to delete the AioHandler when its 125 + * IORING_OP_POLL_ADD completes. 126 + */ 127 + *flags = atomic_fetch_and(&node->flags, ~(FDMON_IO_URING_PENDING | 128 + FDMON_IO_URING_ADD)); 129 + return node; 130 + } 131 + 132 + static void fdmon_io_uring_update(AioContext *ctx, 133 + AioHandler *old_node, 134 + AioHandler *new_node) 135 + { 136 + if (new_node) { 137 + enqueue(&ctx->submit_list, new_node, FDMON_IO_URING_ADD); 138 + } 139 + 140 + if (old_node) { 141 + /* 142 + * Deletion is tricky because IORING_OP_POLL_ADD and 143 + * IORING_OP_POLL_REMOVE are async. We need to wait for the original 144 + * IORING_OP_POLL_ADD to complete before this handler can be freed 145 + * safely. 146 + * 147 + * It's possible that the file descriptor becomes ready and the 148 + * IORING_OP_POLL_ADD cqe is enqueued before IORING_OP_POLL_REMOVE is 149 + * submitted, too. 150 + * 151 + * Mark this handler deleted right now but don't place it on 152 + * ctx->deleted_aio_handlers yet. Instead, manually fudge the list 153 + * entry to make QLIST_IS_INSERTED() think this handler has been 154 + * inserted and other code recognizes this AioHandler as deleted. 155 + * 156 + * Once the original IORING_OP_POLL_ADD completes we enqueue the 157 + * handler on the real ctx->deleted_aio_handlers list to be freed. 158 + */ 159 + assert(!QLIST_IS_INSERTED(old_node, node_deleted)); 160 + old_node->node_deleted.le_prev = &old_node->node_deleted.le_next; 161 + 162 + enqueue(&ctx->submit_list, old_node, FDMON_IO_URING_REMOVE); 163 + } 164 + } 165 + 166 + static void add_poll_add_sqe(AioContext *ctx, AioHandler *node) 167 + { 168 + struct io_uring_sqe *sqe = get_sqe(ctx); 169 + int events = poll_events_from_pfd(node->pfd.events); 170 + 171 + io_uring_prep_poll_add(sqe, node->pfd.fd, events); 172 + io_uring_sqe_set_data(sqe, node); 173 + } 174 + 175 + static void add_poll_remove_sqe(AioContext *ctx, AioHandler *node) 176 + { 177 + struct io_uring_sqe *sqe = get_sqe(ctx); 178 + 179 + io_uring_prep_poll_remove(sqe, node); 180 + } 181 + 182 + /* Add a timeout that self-cancels when another cqe becomes ready */ 183 + static void add_timeout_sqe(AioContext *ctx, int64_t ns) 184 + { 185 + struct io_uring_sqe *sqe; 186 + struct __kernel_timespec ts = { 187 + .tv_sec = ns / NANOSECONDS_PER_SECOND, 188 + .tv_nsec = ns % NANOSECONDS_PER_SECOND, 189 + }; 190 + 191 + sqe = get_sqe(ctx); 192 + io_uring_prep_timeout(sqe, &ts, 1, 0); 193 + } 194 + 195 + /* Add sqes from ctx->submit_list for submission */ 196 + static void fill_sq_ring(AioContext *ctx) 197 + { 198 + AioHandlerSList submit_list; 199 + AioHandler *node; 200 + unsigned flags; 201 + 202 + QSLIST_MOVE_ATOMIC(&submit_list, &ctx->submit_list); 203 + 204 + while ((node = dequeue(&submit_list, &flags))) { 205 + /* Order matters, just in case both flags were set */ 206 + if (flags & FDMON_IO_URING_ADD) { 207 + add_poll_add_sqe(ctx, node); 208 + } 209 + if (flags & FDMON_IO_URING_REMOVE) { 210 + add_poll_remove_sqe(ctx, node); 211 + } 212 + } 213 + } 214 + 215 + /* Returns true if a handler became ready */ 216 + static bool process_cqe(AioContext *ctx, 217 + AioHandlerList *ready_list, 218 + struct io_uring_cqe *cqe) 219 + { 220 + AioHandler *node = io_uring_cqe_get_data(cqe); 221 + unsigned flags; 222 + 223 + /* poll_timeout and poll_remove have a zero user_data field */ 224 + if (!node) { 225 + return false; 226 + } 227 + 228 + /* 229 + * Deletion can only happen when IORING_OP_POLL_ADD completes. If we race 230 + * with enqueue() here then we can safely clear the FDMON_IO_URING_REMOVE 231 + * bit before IORING_OP_POLL_REMOVE is submitted. 232 + */ 233 + flags = atomic_fetch_and(&node->flags, ~FDMON_IO_URING_REMOVE); 234 + if (flags & FDMON_IO_URING_REMOVE) { 235 + QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted); 236 + return false; 237 + } 238 + 239 + aio_add_ready_handler(ready_list, node, pfd_events_from_poll(cqe->res)); 240 + 241 + /* IORING_OP_POLL_ADD is one-shot so we must re-arm it */ 242 + add_poll_add_sqe(ctx, node); 243 + return true; 244 + } 245 + 246 + static int process_cq_ring(AioContext *ctx, AioHandlerList *ready_list) 247 + { 248 + struct io_uring *ring = &ctx->fdmon_io_uring; 249 + struct io_uring_cqe *cqe; 250 + unsigned num_cqes = 0; 251 + unsigned num_ready = 0; 252 + unsigned head; 253 + 254 + io_uring_for_each_cqe(ring, head, cqe) { 255 + if (process_cqe(ctx, ready_list, cqe)) { 256 + num_ready++; 257 + } 258 + 259 + num_cqes++; 260 + } 261 + 262 + io_uring_cq_advance(ring, num_cqes); 263 + return num_ready; 264 + } 265 + 266 + static int fdmon_io_uring_wait(AioContext *ctx, AioHandlerList *ready_list, 267 + int64_t timeout) 268 + { 269 + unsigned wait_nr = 1; /* block until at least one cqe is ready */ 270 + int ret; 271 + 272 + /* Fall back while external clients are disabled */ 273 + if (atomic_read(&ctx->external_disable_cnt)) { 274 + return fdmon_poll_ops.wait(ctx, ready_list, timeout); 275 + } 276 + 277 + if (timeout == 0) { 278 + wait_nr = 0; /* non-blocking */ 279 + } else if (timeout > 0) { 280 + add_timeout_sqe(ctx, timeout); 281 + } 282 + 283 + fill_sq_ring(ctx); 284 + 285 + ret = io_uring_submit_and_wait(&ctx->fdmon_io_uring, wait_nr); 286 + assert(ret >= 0); 287 + 288 + return process_cq_ring(ctx, ready_list); 289 + } 290 + 291 + static bool fdmon_io_uring_need_wait(AioContext *ctx) 292 + { 293 + return io_uring_cq_ready(&ctx->fdmon_io_uring); 294 + } 295 + 296 + static const FDMonOps fdmon_io_uring_ops = { 297 + .update = fdmon_io_uring_update, 298 + .wait = fdmon_io_uring_wait, 299 + .need_wait = fdmon_io_uring_need_wait, 300 + }; 301 + 302 + bool fdmon_io_uring_setup(AioContext *ctx) 303 + { 304 + int ret; 305 + 306 + ret = io_uring_queue_init(FDMON_IO_URING_ENTRIES, &ctx->fdmon_io_uring, 0); 307 + if (ret != 0) { 308 + return false; 309 + } 310 + 311 + QSLIST_INIT(&ctx->submit_list); 312 + ctx->fdmon_ops = &fdmon_io_uring_ops; 313 + return true; 314 + } 315 + 316 + void fdmon_io_uring_destroy(AioContext *ctx) 317 + { 318 + if (ctx->fdmon_ops == &fdmon_io_uring_ops) { 319 + AioHandler *node; 320 + 321 + io_uring_queue_exit(&ctx->fdmon_io_uring); 322 + 323 + /* No need to submit these anymore, just free them. */ 324 + while ((node = QSLIST_FIRST_RCU(&ctx->submit_list))) { 325 + QSLIST_REMOVE_HEAD_RCU(&ctx->submit_list, node_submitted); 326 + QLIST_REMOVE(node, node); 327 + g_free(node); 328 + } 329 + 330 + ctx->fdmon_ops = &fdmon_poll_ops; 331 + } 332 + }
+107
util/fdmon-poll.c
··· 1 + /* SPDX-License-Identifier: GPL-2.0-or-later */ 2 + /* 3 + * poll(2) file descriptor monitoring 4 + * 5 + * Uses ppoll(2) when available, g_poll() otherwise. 6 + */ 7 + 8 + #include "qemu/osdep.h" 9 + #include "aio-posix.h" 10 + #include "qemu/rcu_queue.h" 11 + 12 + /* 13 + * These thread-local variables are used only in fdmon_poll_wait() around the 14 + * call to the poll() system call. In particular they are not used while 15 + * aio_poll is performing callbacks, which makes it much easier to think about 16 + * reentrancy! 17 + * 18 + * Stack-allocated arrays would be perfect but they have size limitations; 19 + * heap allocation is expensive enough that we want to reuse arrays across 20 + * calls to aio_poll(). And because poll() has to be called without holding 21 + * any lock, the arrays cannot be stored in AioContext. Thread-local data 22 + * has none of the disadvantages of these three options. 23 + */ 24 + static __thread GPollFD *pollfds; 25 + static __thread AioHandler **nodes; 26 + static __thread unsigned npfd, nalloc; 27 + static __thread Notifier pollfds_cleanup_notifier; 28 + 29 + static void pollfds_cleanup(Notifier *n, void *unused) 30 + { 31 + g_assert(npfd == 0); 32 + g_free(pollfds); 33 + g_free(nodes); 34 + nalloc = 0; 35 + } 36 + 37 + static void add_pollfd(AioHandler *node) 38 + { 39 + if (npfd == nalloc) { 40 + if (nalloc == 0) { 41 + pollfds_cleanup_notifier.notify = pollfds_cleanup; 42 + qemu_thread_atexit_add(&pollfds_cleanup_notifier); 43 + nalloc = 8; 44 + } else { 45 + g_assert(nalloc <= INT_MAX); 46 + nalloc *= 2; 47 + } 48 + pollfds = g_renew(GPollFD, pollfds, nalloc); 49 + nodes = g_renew(AioHandler *, nodes, nalloc); 50 + } 51 + nodes[npfd] = node; 52 + pollfds[npfd] = (GPollFD) { 53 + .fd = node->pfd.fd, 54 + .events = node->pfd.events, 55 + }; 56 + npfd++; 57 + } 58 + 59 + static int fdmon_poll_wait(AioContext *ctx, AioHandlerList *ready_list, 60 + int64_t timeout) 61 + { 62 + AioHandler *node; 63 + int ret; 64 + 65 + assert(npfd == 0); 66 + 67 + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 68 + if (!QLIST_IS_INSERTED(node, node_deleted) && node->pfd.events 69 + && aio_node_check(ctx, node->is_external)) { 70 + add_pollfd(node); 71 + } 72 + } 73 + 74 + /* epoll(7) is faster above a certain number of fds */ 75 + if (fdmon_epoll_try_upgrade(ctx, npfd)) { 76 + return ctx->fdmon_ops->wait(ctx, ready_list, timeout); 77 + } 78 + 79 + ret = qemu_poll_ns(pollfds, npfd, timeout); 80 + if (ret > 0) { 81 + int i; 82 + 83 + for (i = 0; i < npfd; i++) { 84 + int revents = pollfds[i].revents; 85 + 86 + if (revents) { 87 + aio_add_ready_handler(ready_list, nodes[i], revents); 88 + } 89 + } 90 + } 91 + 92 + npfd = 0; 93 + return ret; 94 + } 95 + 96 + static void fdmon_poll_update(AioContext *ctx, 97 + AioHandler *old_node, 98 + AioHandler *new_node) 99 + { 100 + /* Do nothing, AioHandler already contains the state we'll need */ 101 + } 102 + 103 + const FDMonOps fdmon_poll_ops = { 104 + .update = fdmon_poll_update, 105 + .wait = fdmon_poll_wait, 106 + .need_wait = aio_poll_disabled, 107 + };
+2
util/trace-events
··· 5 5 run_poll_handlers_end(void *ctx, bool progress, int64_t timeout) "ctx %p progress %d new timeout %"PRId64 6 6 poll_shrink(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64 7 7 poll_grow(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64 8 + poll_add(void *ctx, void *node, int fd, unsigned revents) "ctx %p node %p fd %d revents 0x%x" 9 + poll_remove(void *ctx, void *node, int fd) "ctx %p node %p fd %d" 8 10 9 11 # async.c 10 12 aio_co_schedule(void *ctx, void *co) "ctx %p co %p"