qemu with hax to log dma reads & writes jcs.org/2018/11/12/vfio
at master 1001 lines 30 kB view raw
1/* 2 * virtio-fs glue for FUSE 3 * Copyright (C) 2018 Red Hat, Inc. and/or its affiliates 4 * 5 * Authors: 6 * Dave Gilbert <dgilbert@redhat.com> 7 * 8 * Implements the glue between libfuse and libvhost-user 9 * 10 * This program can be distributed under the terms of the GNU LGPLv2. 11 * See the file COPYING.LIB 12 */ 13 14#include "qemu/osdep.h" 15#include "qemu/iov.h" 16#include "qapi/error.h" 17#include "fuse_i.h" 18#include "standard-headers/linux/fuse.h" 19#include "fuse_misc.h" 20#include "fuse_opt.h" 21#include "fuse_virtio.h" 22 23#include <assert.h> 24#include <errno.h> 25#include <glib.h> 26#include <stdint.h> 27#include <stdio.h> 28#include <stdlib.h> 29#include <string.h> 30#include <sys/eventfd.h> 31#include <sys/socket.h> 32#include <sys/types.h> 33#include <sys/un.h> 34#include <unistd.h> 35 36#include "contrib/libvhost-user/libvhost-user.h" 37 38struct fv_VuDev; 39struct fv_QueueInfo { 40 pthread_t thread; 41 /* 42 * This lock protects the VuVirtq preventing races between 43 * fv_queue_thread() and fv_queue_worker(). 44 */ 45 pthread_mutex_t vq_lock; 46 47 struct fv_VuDev *virtio_dev; 48 49 /* Our queue index, corresponds to array position */ 50 int qidx; 51 int kick_fd; 52 int kill_fd; /* For killing the thread */ 53}; 54 55/* A FUSE request */ 56typedef struct { 57 VuVirtqElement elem; 58 struct fuse_chan ch; 59 60 /* Used to complete requests that involve no reply */ 61 bool reply_sent; 62} FVRequest; 63 64/* 65 * We pass the dev element into libvhost-user 66 * and then use it to get back to the outer 67 * container for other data. 68 */ 69struct fv_VuDev { 70 VuDev dev; 71 struct fuse_session *se; 72 73 /* 74 * Either handle virtqueues or vhost-user protocol messages. Don't do 75 * both at the same time since that could lead to race conditions if 76 * virtqueues or memory tables change while another thread is accessing 77 * them. 78 * 79 * The assumptions are: 80 * 1. fv_queue_thread() reads/writes to virtqueues and only reads VuDev. 81 * 2. virtio_loop() reads/writes virtqueues and VuDev. 82 */ 83 pthread_rwlock_t vu_dispatch_rwlock; 84 85 /* 86 * The following pair of fields are only accessed in the main 87 * virtio_loop 88 */ 89 size_t nqueues; 90 struct fv_QueueInfo **qi; 91}; 92 93/* From spec */ 94struct virtio_fs_config { 95 char tag[36]; 96 uint32_t num_queues; 97}; 98 99/* Callback from libvhost-user */ 100static uint64_t fv_get_features(VuDev *dev) 101{ 102 return 1ULL << VIRTIO_F_VERSION_1; 103} 104 105/* Callback from libvhost-user */ 106static void fv_set_features(VuDev *dev, uint64_t features) 107{ 108} 109 110/* 111 * Callback from libvhost-user if there's a new fd we're supposed to listen 112 * to, typically a queue kick? 113 */ 114static void fv_set_watch(VuDev *dev, int fd, int condition, vu_watch_cb cb, 115 void *data) 116{ 117 fuse_log(FUSE_LOG_WARNING, "%s: TODO! fd=%d\n", __func__, fd); 118} 119 120/* 121 * Callback from libvhost-user if we're no longer supposed to listen on an fd 122 */ 123static void fv_remove_watch(VuDev *dev, int fd) 124{ 125 fuse_log(FUSE_LOG_WARNING, "%s: TODO! fd=%d\n", __func__, fd); 126} 127 128/* Callback from libvhost-user to panic */ 129static void fv_panic(VuDev *dev, const char *err) 130{ 131 fuse_log(FUSE_LOG_ERR, "%s: libvhost-user: %s\n", __func__, err); 132 /* TODO: Allow reconnects?? */ 133 exit(EXIT_FAILURE); 134} 135 136/* 137 * Copy from an iovec into a fuse_buf (memory only) 138 * Caller must ensure there is space 139 */ 140static void copy_from_iov(struct fuse_buf *buf, size_t out_num, 141 const struct iovec *out_sg) 142{ 143 void *dest = buf->mem; 144 145 while (out_num) { 146 size_t onelen = out_sg->iov_len; 147 memcpy(dest, out_sg->iov_base, onelen); 148 dest += onelen; 149 out_sg++; 150 out_num--; 151 } 152} 153 154/* 155 * Copy from one iov to another, the given number of bytes 156 * The caller must have checked sizes. 157 */ 158static void copy_iov(struct iovec *src_iov, int src_count, 159 struct iovec *dst_iov, int dst_count, size_t to_copy) 160{ 161 size_t dst_offset = 0; 162 /* Outer loop copies 'src' elements */ 163 while (to_copy) { 164 assert(src_count); 165 size_t src_len = src_iov[0].iov_len; 166 size_t src_offset = 0; 167 168 if (src_len > to_copy) { 169 src_len = to_copy; 170 } 171 /* Inner loop copies contents of one 'src' to maybe multiple dst. */ 172 while (src_len) { 173 assert(dst_count); 174 size_t dst_len = dst_iov[0].iov_len - dst_offset; 175 if (dst_len > src_len) { 176 dst_len = src_len; 177 } 178 179 memcpy(dst_iov[0].iov_base + dst_offset, 180 src_iov[0].iov_base + src_offset, dst_len); 181 src_len -= dst_len; 182 to_copy -= dst_len; 183 src_offset += dst_len; 184 dst_offset += dst_len; 185 186 assert(dst_offset <= dst_iov[0].iov_len); 187 if (dst_offset == dst_iov[0].iov_len) { 188 dst_offset = 0; 189 dst_iov++; 190 dst_count--; 191 } 192 } 193 src_iov++; 194 src_count--; 195 } 196} 197 198/* 199 * Called back by ll whenever it wants to send a reply/message back 200 * The 1st element of the iov starts with the fuse_out_header 201 * 'unique'==0 means it's a notify message. 202 */ 203int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch, 204 struct iovec *iov, int count) 205{ 206 FVRequest *req = container_of(ch, FVRequest, ch); 207 struct fv_QueueInfo *qi = ch->qi; 208 VuDev *dev = &se->virtio_dev->dev; 209 VuVirtq *q = vu_get_queue(dev, qi->qidx); 210 VuVirtqElement *elem = &req->elem; 211 int ret = 0; 212 213 assert(count >= 1); 214 assert(iov[0].iov_len >= sizeof(struct fuse_out_header)); 215 216 struct fuse_out_header *out = iov[0].iov_base; 217 /* TODO: Endianness! */ 218 219 size_t tosend_len = iov_size(iov, count); 220 221 /* unique == 0 is notification, which we don't support */ 222 assert(out->unique); 223 assert(!req->reply_sent); 224 225 /* The 'in' part of the elem is to qemu */ 226 unsigned int in_num = elem->in_num; 227 struct iovec *in_sg = elem->in_sg; 228 size_t in_len = iov_size(in_sg, in_num); 229 fuse_log(FUSE_LOG_DEBUG, "%s: elem %d: with %d in desc of length %zd\n", 230 __func__, elem->index, in_num, in_len); 231 232 /* 233 * The elem should have room for a 'fuse_out_header' (out from fuse) 234 * plus the data based on the len in the header. 235 */ 236 if (in_len < sizeof(struct fuse_out_header)) { 237 fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for out_header\n", 238 __func__, elem->index); 239 ret = -E2BIG; 240 goto err; 241 } 242 if (in_len < tosend_len) { 243 fuse_log(FUSE_LOG_ERR, "%s: elem %d too small for data len %zd\n", 244 __func__, elem->index, tosend_len); 245 ret = -E2BIG; 246 goto err; 247 } 248 249 copy_iov(iov, count, in_sg, in_num, tosend_len); 250 251 pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock); 252 pthread_mutex_lock(&qi->vq_lock); 253 vu_queue_push(dev, q, elem, tosend_len); 254 vu_queue_notify(dev, q); 255 pthread_mutex_unlock(&qi->vq_lock); 256 pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock); 257 258 req->reply_sent = true; 259 260err: 261 return ret; 262} 263 264/* 265 * Callback from fuse_send_data_iov_* when it's virtio and the buffer 266 * is a single FD with FUSE_BUF_IS_FD | FUSE_BUF_FD_SEEK 267 * We need send the iov and then the buffer. 268 * Return 0 on success 269 */ 270int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch, 271 struct iovec *iov, int count, struct fuse_bufvec *buf, 272 size_t len) 273{ 274 FVRequest *req = container_of(ch, FVRequest, ch); 275 struct fv_QueueInfo *qi = ch->qi; 276 VuDev *dev = &se->virtio_dev->dev; 277 VuVirtq *q = vu_get_queue(dev, qi->qidx); 278 VuVirtqElement *elem = &req->elem; 279 int ret = 0; 280 281 assert(count >= 1); 282 assert(iov[0].iov_len >= sizeof(struct fuse_out_header)); 283 284 struct fuse_out_header *out = iov[0].iov_base; 285 /* TODO: Endianness! */ 286 287 size_t iov_len = iov_size(iov, count); 288 size_t tosend_len = iov_len + len; 289 290 out->len = tosend_len; 291 292 fuse_log(FUSE_LOG_DEBUG, "%s: count=%d len=%zd iov_len=%zd\n", __func__, 293 count, len, iov_len); 294 295 /* unique == 0 is notification which we don't support */ 296 assert(out->unique); 297 298 assert(!req->reply_sent); 299 300 /* The 'in' part of the elem is to qemu */ 301 unsigned int in_num = elem->in_num; 302 struct iovec *in_sg = elem->in_sg; 303 size_t in_len = iov_size(in_sg, in_num); 304 fuse_log(FUSE_LOG_DEBUG, "%s: elem %d: with %d in desc of length %zd\n", 305 __func__, elem->index, in_num, in_len); 306 307 /* 308 * The elem should have room for a 'fuse_out_header' (out from fuse) 309 * plus the data based on the len in the header. 310 */ 311 if (in_len < sizeof(struct fuse_out_header)) { 312 fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for out_header\n", 313 __func__, elem->index); 314 ret = E2BIG; 315 goto err; 316 } 317 if (in_len < tosend_len) { 318 fuse_log(FUSE_LOG_ERR, "%s: elem %d too small for data len %zd\n", 319 __func__, elem->index, tosend_len); 320 ret = E2BIG; 321 goto err; 322 } 323 324 /* TODO: Limit to 'len' */ 325 326 /* First copy the header data from iov->in_sg */ 327 copy_iov(iov, count, in_sg, in_num, iov_len); 328 329 /* 330 * Build a copy of the the in_sg iov so we can skip bits in it, 331 * including changing the offsets 332 */ 333 struct iovec *in_sg_cpy = calloc(sizeof(struct iovec), in_num); 334 assert(in_sg_cpy); 335 memcpy(in_sg_cpy, in_sg, sizeof(struct iovec) * in_num); 336 /* These get updated as we skip */ 337 struct iovec *in_sg_ptr = in_sg_cpy; 338 int in_sg_cpy_count = in_num; 339 340 /* skip over parts of in_sg that contained the header iov */ 341 size_t skip_size = iov_len; 342 343 size_t in_sg_left = 0; 344 do { 345 while (skip_size != 0 && in_sg_cpy_count) { 346 if (skip_size >= in_sg_ptr[0].iov_len) { 347 skip_size -= in_sg_ptr[0].iov_len; 348 in_sg_ptr++; 349 in_sg_cpy_count--; 350 } else { 351 in_sg_ptr[0].iov_len -= skip_size; 352 in_sg_ptr[0].iov_base += skip_size; 353 break; 354 } 355 } 356 357 int i; 358 for (i = 0, in_sg_left = 0; i < in_sg_cpy_count; i++) { 359 in_sg_left += in_sg_ptr[i].iov_len; 360 } 361 fuse_log(FUSE_LOG_DEBUG, 362 "%s: after skip skip_size=%zd in_sg_cpy_count=%d " 363 "in_sg_left=%zd\n", 364 __func__, skip_size, in_sg_cpy_count, in_sg_left); 365 ret = preadv(buf->buf[0].fd, in_sg_ptr, in_sg_cpy_count, 366 buf->buf[0].pos); 367 368 if (ret == -1) { 369 ret = errno; 370 fuse_log(FUSE_LOG_DEBUG, "%s: preadv failed (%m) len=%zd\n", 371 __func__, len); 372 free(in_sg_cpy); 373 goto err; 374 } 375 fuse_log(FUSE_LOG_DEBUG, "%s: preadv ret=%d len=%zd\n", __func__, 376 ret, len); 377 if (ret < len && ret) { 378 fuse_log(FUSE_LOG_DEBUG, "%s: ret < len\n", __func__); 379 /* Skip over this much next time around */ 380 skip_size = ret; 381 buf->buf[0].pos += ret; 382 len -= ret; 383 384 /* Lets do another read */ 385 continue; 386 } 387 if (!ret) { 388 /* EOF case? */ 389 fuse_log(FUSE_LOG_DEBUG, "%s: !ret in_sg_left=%zd\n", __func__, 390 in_sg_left); 391 break; 392 } 393 if (ret != len) { 394 fuse_log(FUSE_LOG_DEBUG, "%s: ret!=len\n", __func__); 395 ret = EIO; 396 free(in_sg_cpy); 397 goto err; 398 } 399 in_sg_left -= ret; 400 len -= ret; 401 } while (in_sg_left); 402 free(in_sg_cpy); 403 404 /* Need to fix out->len on EOF */ 405 if (len) { 406 struct fuse_out_header *out_sg = in_sg[0].iov_base; 407 408 tosend_len -= len; 409 out_sg->len = tosend_len; 410 } 411 412 ret = 0; 413 414 pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock); 415 pthread_mutex_lock(&qi->vq_lock); 416 vu_queue_push(dev, q, elem, tosend_len); 417 vu_queue_notify(dev, q); 418 pthread_mutex_unlock(&qi->vq_lock); 419 pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock); 420 421err: 422 if (ret == 0) { 423 req->reply_sent = true; 424 } 425 426 return ret; 427} 428 429static __thread bool clone_fs_called; 430 431/* Process one FVRequest in a thread pool */ 432static void fv_queue_worker(gpointer data, gpointer user_data) 433{ 434 struct fv_QueueInfo *qi = user_data; 435 struct fuse_session *se = qi->virtio_dev->se; 436 struct VuDev *dev = &qi->virtio_dev->dev; 437 FVRequest *req = data; 438 VuVirtqElement *elem = &req->elem; 439 struct fuse_buf fbuf = {}; 440 bool allocated_bufv = false; 441 struct fuse_bufvec bufv; 442 struct fuse_bufvec *pbufv; 443 444 assert(se->bufsize > sizeof(struct fuse_in_header)); 445 446 if (!clone_fs_called) { 447 int ret; 448 449 /* unshare FS for xattr operation */ 450 ret = unshare(CLONE_FS); 451 /* should not fail */ 452 assert(ret == 0); 453 454 clone_fs_called = true; 455 } 456 457 /* 458 * An element contains one request and the space to send our response 459 * They're spread over multiple descriptors in a scatter/gather set 460 * and we can't trust the guest to keep them still; so copy in/out. 461 */ 462 fbuf.mem = malloc(se->bufsize); 463 assert(fbuf.mem); 464 465 fuse_mutex_init(&req->ch.lock); 466 req->ch.fd = -1; 467 req->ch.qi = qi; 468 469 /* The 'out' part of the elem is from qemu */ 470 unsigned int out_num = elem->out_num; 471 struct iovec *out_sg = elem->out_sg; 472 size_t out_len = iov_size(out_sg, out_num); 473 fuse_log(FUSE_LOG_DEBUG, 474 "%s: elem %d: with %d out desc of length %zd\n", 475 __func__, elem->index, out_num, out_len); 476 477 /* 478 * The elem should contain a 'fuse_in_header' (in to fuse) 479 * plus the data based on the len in the header. 480 */ 481 if (out_len < sizeof(struct fuse_in_header)) { 482 fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n", 483 __func__, elem->index); 484 assert(0); /* TODO */ 485 } 486 if (out_len > se->bufsize) { 487 fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n", __func__, 488 elem->index); 489 assert(0); /* TODO */ 490 } 491 /* Copy just the first element and look at it */ 492 copy_from_iov(&fbuf, 1, out_sg); 493 494 pbufv = NULL; /* Compiler thinks an unitialised path */ 495 if (out_num > 2 && 496 out_sg[0].iov_len == sizeof(struct fuse_in_header) && 497 ((struct fuse_in_header *)fbuf.mem)->opcode == FUSE_WRITE && 498 out_sg[1].iov_len == sizeof(struct fuse_write_in)) { 499 /* 500 * For a write we don't actually need to copy the 501 * data, we can just do it straight out of guest memory 502 * but we must still copy the headers in case the guest 503 * was nasty and changed them while we were using them. 504 */ 505 fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__); 506 507 /* copy the fuse_write_in header afte rthe fuse_in_header */ 508 fbuf.mem += out_sg->iov_len; 509 copy_from_iov(&fbuf, 1, out_sg + 1); 510 fbuf.mem -= out_sg->iov_len; 511 fbuf.size = out_sg[0].iov_len + out_sg[1].iov_len; 512 513 /* Allocate the bufv, with space for the rest of the iov */ 514 pbufv = malloc(sizeof(struct fuse_bufvec) + 515 sizeof(struct fuse_buf) * (out_num - 2)); 516 if (!pbufv) { 517 fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n", 518 __func__); 519 goto out; 520 } 521 522 allocated_bufv = true; 523 pbufv->count = 1; 524 pbufv->buf[0] = fbuf; 525 526 size_t iovindex, pbufvindex; 527 iovindex = 2; /* 2 headers, separate iovs */ 528 pbufvindex = 1; /* 2 headers, 1 fusebuf */ 529 530 for (; iovindex < out_num; iovindex++, pbufvindex++) { 531 pbufv->count++; 532 pbufv->buf[pbufvindex].pos = ~0; /* Dummy */ 533 pbufv->buf[pbufvindex].flags = 0; 534 pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base; 535 pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len; 536 } 537 } else { 538 /* Normal (non fast write) path */ 539 540 /* Copy the rest of the buffer */ 541 fbuf.mem += out_sg->iov_len; 542 copy_from_iov(&fbuf, out_num - 1, out_sg + 1); 543 fbuf.mem -= out_sg->iov_len; 544 fbuf.size = out_len; 545 546 /* TODO! Endianness of header */ 547 548 /* TODO: Add checks for fuse_session_exited */ 549 bufv.buf[0] = fbuf; 550 bufv.count = 1; 551 pbufv = &bufv; 552 } 553 pbufv->idx = 0; 554 pbufv->off = 0; 555 fuse_session_process_buf_int(se, pbufv, &req->ch); 556 557out: 558 if (allocated_bufv) { 559 free(pbufv); 560 } 561 562 /* If the request has no reply, still recycle the virtqueue element */ 563 if (!req->reply_sent) { 564 struct VuVirtq *q = vu_get_queue(dev, qi->qidx); 565 566 fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n", __func__, 567 elem->index); 568 569 pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock); 570 pthread_mutex_lock(&qi->vq_lock); 571 vu_queue_push(dev, q, elem, 0); 572 vu_queue_notify(dev, q); 573 pthread_mutex_unlock(&qi->vq_lock); 574 pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock); 575 } 576 577 pthread_mutex_destroy(&req->ch.lock); 578 free(fbuf.mem); 579 free(req); 580} 581 582/* Thread function for individual queues, created when a queue is 'started' */ 583static void *fv_queue_thread(void *opaque) 584{ 585 struct fv_QueueInfo *qi = opaque; 586 struct VuDev *dev = &qi->virtio_dev->dev; 587 struct VuVirtq *q = vu_get_queue(dev, qi->qidx); 588 struct fuse_session *se = qi->virtio_dev->se; 589 GThreadPool *pool; 590 591 pool = g_thread_pool_new(fv_queue_worker, qi, se->thread_pool_size, TRUE, 592 NULL); 593 if (!pool) { 594 fuse_log(FUSE_LOG_ERR, "%s: g_thread_pool_new failed\n", __func__); 595 return NULL; 596 } 597 598 fuse_log(FUSE_LOG_INFO, "%s: Start for queue %d kick_fd %d\n", __func__, 599 qi->qidx, qi->kick_fd); 600 while (1) { 601 struct pollfd pf[2]; 602 int ret; 603 604 pf[0].fd = qi->kick_fd; 605 pf[0].events = POLLIN; 606 pf[0].revents = 0; 607 pf[1].fd = qi->kill_fd; 608 pf[1].events = POLLIN; 609 pf[1].revents = 0; 610 611 fuse_log(FUSE_LOG_DEBUG, "%s: Waiting for Queue %d event\n", __func__, 612 qi->qidx); 613 int poll_res = ppoll(pf, 2, NULL, NULL); 614 615 if (poll_res == -1) { 616 if (errno == EINTR) { 617 fuse_log(FUSE_LOG_INFO, "%s: ppoll interrupted, going around\n", 618 __func__); 619 continue; 620 } 621 fuse_log(FUSE_LOG_ERR, "fv_queue_thread ppoll: %m\n"); 622 break; 623 } 624 assert(poll_res >= 1); 625 if (pf[0].revents & (POLLERR | POLLHUP | POLLNVAL)) { 626 fuse_log(FUSE_LOG_ERR, "%s: Unexpected poll revents %x Queue %d\n", 627 __func__, pf[0].revents, qi->qidx); 628 break; 629 } 630 if (pf[1].revents & (POLLERR | POLLHUP | POLLNVAL)) { 631 fuse_log(FUSE_LOG_ERR, 632 "%s: Unexpected poll revents %x Queue %d killfd\n", 633 __func__, pf[1].revents, qi->qidx); 634 break; 635 } 636 if (pf[1].revents) { 637 fuse_log(FUSE_LOG_INFO, "%s: kill event on queue %d - quitting\n", 638 __func__, qi->qidx); 639 break; 640 } 641 assert(pf[0].revents & POLLIN); 642 fuse_log(FUSE_LOG_DEBUG, "%s: Got queue event on Queue %d\n", __func__, 643 qi->qidx); 644 645 eventfd_t evalue; 646 if (eventfd_read(qi->kick_fd, &evalue)) { 647 fuse_log(FUSE_LOG_ERR, "Eventfd_read for queue: %m\n"); 648 break; 649 } 650 /* Mutual exclusion with virtio_loop() */ 651 ret = pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock); 652 assert(ret == 0); /* there is no possible error case */ 653 pthread_mutex_lock(&qi->vq_lock); 654 /* out is from guest, in is too guest */ 655 unsigned int in_bytes, out_bytes; 656 vu_queue_get_avail_bytes(dev, q, &in_bytes, &out_bytes, ~0, ~0); 657 658 fuse_log(FUSE_LOG_DEBUG, 659 "%s: Queue %d gave evalue: %zx available: in: %u out: %u\n", 660 __func__, qi->qidx, (size_t)evalue, in_bytes, out_bytes); 661 662 while (1) { 663 FVRequest *req = vu_queue_pop(dev, q, sizeof(FVRequest)); 664 if (!req) { 665 break; 666 } 667 668 req->reply_sent = false; 669 670 g_thread_pool_push(pool, req, NULL); 671 } 672 673 pthread_mutex_unlock(&qi->vq_lock); 674 pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock); 675 } 676 677 g_thread_pool_free(pool, FALSE, TRUE); 678 679 return NULL; 680} 681 682static void fv_queue_cleanup_thread(struct fv_VuDev *vud, int qidx) 683{ 684 int ret; 685 struct fv_QueueInfo *ourqi; 686 687 assert(qidx < vud->nqueues); 688 ourqi = vud->qi[qidx]; 689 690 /* Kill the thread */ 691 if (eventfd_write(ourqi->kill_fd, 1)) { 692 fuse_log(FUSE_LOG_ERR, "Eventfd_write for queue %d: %s\n", 693 qidx, strerror(errno)); 694 } 695 ret = pthread_join(ourqi->thread, NULL); 696 if (ret) { 697 fuse_log(FUSE_LOG_ERR, "%s: Failed to join thread idx %d err %d\n", 698 __func__, qidx, ret); 699 } 700 pthread_mutex_destroy(&ourqi->vq_lock); 701 close(ourqi->kill_fd); 702 ourqi->kick_fd = -1; 703 free(vud->qi[qidx]); 704 vud->qi[qidx] = NULL; 705} 706 707/* Callback from libvhost-user on start or stop of a queue */ 708static void fv_queue_set_started(VuDev *dev, int qidx, bool started) 709{ 710 struct fv_VuDev *vud = container_of(dev, struct fv_VuDev, dev); 711 struct fv_QueueInfo *ourqi; 712 713 fuse_log(FUSE_LOG_INFO, "%s: qidx=%d started=%d\n", __func__, qidx, 714 started); 715 assert(qidx >= 0); 716 717 /* 718 * Ignore additional request queues for now. passthrough_ll.c must be 719 * audited for thread-safety issues first. It was written with a 720 * well-behaved client in mind and may not protect against all types of 721 * races yet. 722 */ 723 if (qidx > 1) { 724 fuse_log(FUSE_LOG_ERR, 725 "%s: multiple request queues not yet implemented, please only " 726 "configure 1 request queue\n", 727 __func__); 728 exit(EXIT_FAILURE); 729 } 730 731 if (started) { 732 /* Fire up a thread to watch this queue */ 733 if (qidx >= vud->nqueues) { 734 vud->qi = realloc(vud->qi, (qidx + 1) * sizeof(vud->qi[0])); 735 assert(vud->qi); 736 memset(vud->qi + vud->nqueues, 0, 737 sizeof(vud->qi[0]) * (1 + (qidx - vud->nqueues))); 738 vud->nqueues = qidx + 1; 739 } 740 if (!vud->qi[qidx]) { 741 vud->qi[qidx] = calloc(sizeof(struct fv_QueueInfo), 1); 742 assert(vud->qi[qidx]); 743 vud->qi[qidx]->virtio_dev = vud; 744 vud->qi[qidx]->qidx = qidx; 745 } else { 746 /* Shouldn't have been started */ 747 assert(vud->qi[qidx]->kick_fd == -1); 748 } 749 ourqi = vud->qi[qidx]; 750 ourqi->kick_fd = dev->vq[qidx].kick_fd; 751 752 ourqi->kill_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE); 753 assert(ourqi->kill_fd != -1); 754 pthread_mutex_init(&ourqi->vq_lock, NULL); 755 756 if (pthread_create(&ourqi->thread, NULL, fv_queue_thread, ourqi)) { 757 fuse_log(FUSE_LOG_ERR, "%s: Failed to create thread for queue %d\n", 758 __func__, qidx); 759 assert(0); 760 } 761 } else { 762 fv_queue_cleanup_thread(vud, qidx); 763 } 764} 765 766static bool fv_queue_order(VuDev *dev, int qidx) 767{ 768 return false; 769} 770 771static const VuDevIface fv_iface = { 772 .get_features = fv_get_features, 773 .set_features = fv_set_features, 774 775 /* Don't need process message, we've not got any at vhost-user level */ 776 .queue_set_started = fv_queue_set_started, 777 778 .queue_is_processed_in_order = fv_queue_order, 779}; 780 781/* 782 * Main loop; this mostly deals with events on the vhost-user 783 * socket itself, and not actual fuse data. 784 */ 785int virtio_loop(struct fuse_session *se) 786{ 787 fuse_log(FUSE_LOG_INFO, "%s: Entry\n", __func__); 788 789 while (!fuse_session_exited(se)) { 790 struct pollfd pf[1]; 791 bool ok; 792 int ret; 793 pf[0].fd = se->vu_socketfd; 794 pf[0].events = POLLIN; 795 pf[0].revents = 0; 796 797 fuse_log(FUSE_LOG_DEBUG, "%s: Waiting for VU event\n", __func__); 798 int poll_res = ppoll(pf, 1, NULL, NULL); 799 800 if (poll_res == -1) { 801 if (errno == EINTR) { 802 fuse_log(FUSE_LOG_INFO, "%s: ppoll interrupted, going around\n", 803 __func__); 804 continue; 805 } 806 fuse_log(FUSE_LOG_ERR, "virtio_loop ppoll: %m\n"); 807 break; 808 } 809 assert(poll_res == 1); 810 if (pf[0].revents & (POLLERR | POLLHUP | POLLNVAL)) { 811 fuse_log(FUSE_LOG_ERR, "%s: Unexpected poll revents %x\n", __func__, 812 pf[0].revents); 813 break; 814 } 815 assert(pf[0].revents & POLLIN); 816 fuse_log(FUSE_LOG_DEBUG, "%s: Got VU event\n", __func__); 817 /* Mutual exclusion with fv_queue_thread() */ 818 ret = pthread_rwlock_wrlock(&se->virtio_dev->vu_dispatch_rwlock); 819 assert(ret == 0); /* there is no possible error case */ 820 821 ok = vu_dispatch(&se->virtio_dev->dev); 822 823 pthread_rwlock_unlock(&se->virtio_dev->vu_dispatch_rwlock); 824 825 if (!ok) { 826 fuse_log(FUSE_LOG_ERR, "%s: vu_dispatch failed\n", __func__); 827 break; 828 } 829 } 830 831 /* 832 * Make sure all fv_queue_thread()s quit on exit, as we're about to 833 * free virtio dev and fuse session, no one should access them anymore. 834 */ 835 for (int i = 0; i < se->virtio_dev->nqueues; i++) { 836 if (!se->virtio_dev->qi[i]) { 837 continue; 838 } 839 840 fuse_log(FUSE_LOG_INFO, "%s: Stopping queue %d thread\n", __func__, i); 841 fv_queue_cleanup_thread(se->virtio_dev, i); 842 } 843 844 fuse_log(FUSE_LOG_INFO, "%s: Exit\n", __func__); 845 846 return 0; 847} 848 849static void strreplace(char *s, char old, char new) 850{ 851 for (; *s; ++s) { 852 if (*s == old) { 853 *s = new; 854 } 855 } 856} 857 858static bool fv_socket_lock(struct fuse_session *se) 859{ 860 g_autofree gchar *sk_name = NULL; 861 g_autofree gchar *pidfile = NULL; 862 g_autofree gchar *dir = NULL; 863 Error *local_err = NULL; 864 865 dir = qemu_get_local_state_pathname("run/virtiofsd"); 866 867 if (g_mkdir_with_parents(dir, S_IRWXU) < 0) { 868 fuse_log(FUSE_LOG_ERR, "%s: Failed to create directory %s: %s", 869 __func__, dir, strerror(errno)); 870 return false; 871 } 872 873 sk_name = g_strdup(se->vu_socket_path); 874 strreplace(sk_name, '/', '.'); 875 pidfile = g_strdup_printf("%s/%s.pid", dir, sk_name); 876 877 if (!qemu_write_pidfile(pidfile, &local_err)) { 878 error_report_err(local_err); 879 return false; 880 } 881 882 return true; 883} 884 885static int fv_create_listen_socket(struct fuse_session *se) 886{ 887 struct sockaddr_un un; 888 mode_t old_umask; 889 890 /* Nothing to do if fd is already initialized */ 891 if (se->vu_listen_fd >= 0) { 892 return 0; 893 } 894 895 if (strlen(se->vu_socket_path) >= sizeof(un.sun_path)) { 896 fuse_log(FUSE_LOG_ERR, "Socket path too long\n"); 897 return -1; 898 } 899 900 if (!strlen(se->vu_socket_path)) { 901 fuse_log(FUSE_LOG_ERR, "Socket path is empty\n"); 902 return -1; 903 } 904 905 /* Check the vu_socket_path is already used */ 906 if (!fv_socket_lock(se)) { 907 return -1; 908 } 909 910 /* 911 * Create the Unix socket to communicate with qemu 912 * based on QEMU's vhost-user-bridge 913 */ 914 unlink(se->vu_socket_path); 915 strcpy(un.sun_path, se->vu_socket_path); 916 size_t addr_len = sizeof(un); 917 918 int listen_sock = socket(AF_UNIX, SOCK_STREAM, 0); 919 if (listen_sock == -1) { 920 fuse_log(FUSE_LOG_ERR, "vhost socket creation: %m\n"); 921 return -1; 922 } 923 un.sun_family = AF_UNIX; 924 925 /* 926 * Unfortunately bind doesn't let you set the mask on the socket, 927 * so set umask to 077 and restore it later. 928 */ 929 old_umask = umask(0077); 930 if (bind(listen_sock, (struct sockaddr *)&un, addr_len) == -1) { 931 fuse_log(FUSE_LOG_ERR, "vhost socket bind: %m\n"); 932 close(listen_sock); 933 umask(old_umask); 934 return -1; 935 } 936 umask(old_umask); 937 938 if (listen(listen_sock, 1) == -1) { 939 fuse_log(FUSE_LOG_ERR, "vhost socket listen: %m\n"); 940 close(listen_sock); 941 return -1; 942 } 943 944 se->vu_listen_fd = listen_sock; 945 return 0; 946} 947 948int virtio_session_mount(struct fuse_session *se) 949{ 950 int ret; 951 952 ret = fv_create_listen_socket(se); 953 if (ret < 0) { 954 return ret; 955 } 956 957 se->fd = -1; 958 959 fuse_log(FUSE_LOG_INFO, "%s: Waiting for vhost-user socket connection...\n", 960 __func__); 961 int data_sock = accept(se->vu_listen_fd, NULL, NULL); 962 if (data_sock == -1) { 963 fuse_log(FUSE_LOG_ERR, "vhost socket accept: %m\n"); 964 close(se->vu_listen_fd); 965 return -1; 966 } 967 close(se->vu_listen_fd); 968 se->vu_listen_fd = -1; 969 fuse_log(FUSE_LOG_INFO, "%s: Received vhost-user socket connection\n", 970 __func__); 971 972 /* TODO: Some cleanup/deallocation! */ 973 se->virtio_dev = calloc(sizeof(struct fv_VuDev), 1); 974 if (!se->virtio_dev) { 975 fuse_log(FUSE_LOG_ERR, "%s: virtio_dev calloc failed\n", __func__); 976 close(data_sock); 977 return -1; 978 } 979 980 se->vu_socketfd = data_sock; 981 se->virtio_dev->se = se; 982 pthread_rwlock_init(&se->virtio_dev->vu_dispatch_rwlock, NULL); 983 vu_init(&se->virtio_dev->dev, 2, se->vu_socketfd, fv_panic, fv_set_watch, 984 fv_remove_watch, &fv_iface); 985 986 return 0; 987} 988 989void virtio_session_close(struct fuse_session *se) 990{ 991 close(se->vu_socketfd); 992 993 if (!se->virtio_dev) { 994 return; 995 } 996 997 free(se->virtio_dev->qi); 998 pthread_rwlock_destroy(&se->virtio_dev->vu_dispatch_rwlock); 999 free(se->virtio_dev); 1000 se->virtio_dev = NULL; 1001}