qemu with hax to log dma reads & writes jcs.org/2018/11/12/vfio

qed: protect table cache with CoMutex

This makes the driver thread-safe. The CoMutex is dropped temporarily
while accessing the data clusters or the backing file.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
Message-Id: <20170629132749.997-10-pbonzini@redhat.com>
Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Reviewed-by: Fam Zheng <famz@redhat.com>
Signed-off-by: Fam Zheng <famz@redhat.com>

authored by

Paolo Bonzini and committed by
Fam Zheng
1f01e50b 61c7887e

+129 -54
+2 -2
block/qed-cluster.c
··· 85 85 * 86 86 * On failure QED_CLUSTER_L2 or QED_CLUSTER_L1 is returned for missing L2 or L1 87 87 * table offset, respectively. len is number of contiguous unallocated bytes. 88 + * 89 + * Called with table_lock held. 88 90 */ 89 91 int coroutine_fn qed_find_cluster(BDRVQEDState *s, QEDRequest *request, 90 92 uint64_t pos, size_t *len, ··· 112 114 } 113 115 114 116 ret = qed_read_l2_table(s, request, l2_offset); 115 - qed_acquire(s); 116 117 if (ret) { 117 118 goto out; 118 119 } ··· 137 138 138 139 out: 139 140 *img_offset = offset; 140 - qed_release(s); 141 141 return ret; 142 142 }
+6
block/qed-l2-cache.c
··· 101 101 /** 102 102 * Decrease an entry's reference count and free if necessary when the reference 103 103 * count drops to zero. 104 + * 105 + * Called with table_lock held. 104 106 */ 105 107 void qed_unref_l2_cache_entry(CachedL2Table *entry) 106 108 { ··· 122 124 * 123 125 * For a cached entry, this function increases the reference count and returns 124 126 * the entry. 127 + * 128 + * Called with table_lock held. 125 129 */ 126 130 CachedL2Table *qed_find_l2_cache_entry(L2TableCache *l2_cache, uint64_t offset) 127 131 { ··· 150 154 * N.B. This function steals a reference to the l2_table from the caller so the 151 155 * caller must obtain a new reference by issuing a call to 152 156 * qed_find_l2_cache_entry(). 157 + * 158 + * Called with table_lock held. 153 159 */ 154 160 void qed_commit_l2_cache_entry(L2TableCache *l2_cache, CachedL2Table *l2_table) 155 161 {
+18 -6
block/qed-table.c
··· 18 18 #include "qed.h" 19 19 #include "qemu/bswap.h" 20 20 21 + /* Called either from qed_check or with table_lock held. */ 21 22 static int qed_read_table(BDRVQEDState *s, uint64_t offset, QEDTable *table) 22 23 { 23 24 QEMUIOVector qiov; ··· 32 33 33 34 trace_qed_read_table(s, offset, table); 34 35 36 + if (qemu_in_coroutine()) { 37 + qemu_co_mutex_unlock(&s->table_lock); 38 + } 35 39 ret = bdrv_preadv(s->bs->file, offset, &qiov); 40 + if (qemu_in_coroutine()) { 41 + qemu_co_mutex_lock(&s->table_lock); 42 + } 36 43 if (ret < 0) { 37 44 goto out; 38 45 } 39 46 40 47 /* Byteswap offsets */ 41 - qed_acquire(s); 42 48 noffsets = qiov.size / sizeof(uint64_t); 43 49 for (i = 0; i < noffsets; i++) { 44 50 table->offsets[i] = le64_to_cpu(table->offsets[i]); 45 51 } 46 - qed_release(s); 47 52 48 53 ret = 0; 49 54 out: ··· 61 66 * @index: Index of first element 62 67 * @n: Number of elements 63 68 * @flush: Whether or not to sync to disk 69 + * 70 + * Called either from qed_check or with table_lock held. 64 71 */ 65 72 static int qed_write_table(BDRVQEDState *s, uint64_t offset, QEDTable *table, 66 73 unsigned int index, unsigned int n, bool flush) ··· 97 104 /* Adjust for offset into table */ 98 105 offset += start * sizeof(uint64_t); 99 106 107 + if (qemu_in_coroutine()) { 108 + qemu_co_mutex_unlock(&s->table_lock); 109 + } 100 110 ret = bdrv_pwritev(s->bs->file, offset, &qiov); 111 + if (qemu_in_coroutine()) { 112 + qemu_co_mutex_lock(&s->table_lock); 113 + } 101 114 trace_qed_write_table_cb(s, table, flush, ret); 102 115 if (ret < 0) { 103 116 goto out; 104 117 } 105 118 106 119 if (flush) { 107 - qed_acquire(s); 108 120 ret = bdrv_flush(s->bs); 109 - qed_release(s); 110 121 if (ret < 0) { 111 122 goto out; 112 123 } ··· 123 134 return qed_read_table(s, s->header.l1_table_offset, s->l1_table); 124 135 } 125 136 137 + /* Called either from qed_check or with table_lock held. */ 126 138 int qed_write_l1_table(BDRVQEDState *s, unsigned int index, unsigned int n) 127 139 { 128 140 BLKDBG_EVENT(s->bs->file, BLKDBG_L1_UPDATE); ··· 136 148 return qed_write_l1_table(s, index, n); 137 149 } 138 150 151 + /* Called either from qed_check or with table_lock held. */ 139 152 int qed_read_l2_table(BDRVQEDState *s, QEDRequest *request, uint64_t offset) 140 153 { 141 154 int ret; ··· 154 167 BLKDBG_EVENT(s->bs->file, BLKDBG_L2_LOAD); 155 168 ret = qed_read_table(s, offset, request->l2_table->table); 156 169 157 - qed_acquire(s); 158 170 if (ret) { 159 171 /* can't trust loaded L2 table anymore */ 160 172 qed_unref_l2_cache_entry(request->l2_table); ··· 170 182 request->l2_table = qed_find_l2_cache_entry(&s->l2_cache, offset); 171 183 assert(request->l2_table != NULL); 172 184 } 173 - qed_release(s); 174 185 175 186 return ret; 176 187 } ··· 180 191 return qed_read_l2_table(s, request, offset); 181 192 } 182 193 194 + /* Called either from qed_check or with table_lock held. */ 183 195 int qed_write_l2_table(BDRVQEDState *s, QEDRequest *request, 184 196 unsigned int index, unsigned int n, bool flush) 185 197 {
+96 -42
block/qed.c
··· 93 93 * 94 94 * This function only updates known header fields in-place and does not affect 95 95 * extra data after the QED header. 96 + * 97 + * No new allocating reqs can start while this function runs. 96 98 */ 97 99 static int coroutine_fn qed_write_header(BDRVQEDState *s) 98 100 { ··· 108 110 struct iovec iov; 109 111 QEMUIOVector qiov; 110 112 int ret; 113 + 114 + assert(s->allocating_acb || s->allocating_write_reqs_plugged); 111 115 112 116 buf = qemu_blockalign(s->bs, len); 113 117 iov = (struct iovec) { ··· 219 223 * This function only produces the offset where the new clusters should be 220 224 * written. It updates BDRVQEDState but does not make any changes to the image 221 225 * file. 226 + * 227 + * Called with table_lock held. 222 228 */ 223 229 static uint64_t qed_alloc_clusters(BDRVQEDState *s, unsigned int n) 224 230 { ··· 236 242 237 243 /** 238 244 * Allocate a new zeroed L2 table 245 + * 246 + * Called with table_lock held. 239 247 */ 240 248 static CachedL2Table *qed_new_l2_table(BDRVQEDState *s) 241 249 { ··· 249 257 return l2_table; 250 258 } 251 259 252 - static void qed_plug_allocating_write_reqs(BDRVQEDState *s) 260 + static bool qed_plug_allocating_write_reqs(BDRVQEDState *s) 253 261 { 262 + qemu_co_mutex_lock(&s->table_lock); 263 + 264 + /* No reentrancy is allowed. */ 254 265 assert(!s->allocating_write_reqs_plugged); 266 + if (s->allocating_acb != NULL) { 267 + /* Another allocating write came concurrently. This cannot happen 268 + * from bdrv_qed_co_drain, but it can happen when the timer runs. 269 + */ 270 + qemu_co_mutex_unlock(&s->table_lock); 271 + return false; 272 + } 255 273 256 274 s->allocating_write_reqs_plugged = true; 275 + qemu_co_mutex_unlock(&s->table_lock); 276 + return true; 257 277 } 258 278 259 279 static void qed_unplug_allocating_write_reqs(BDRVQEDState *s) 260 280 { 281 + qemu_co_mutex_lock(&s->table_lock); 261 282 assert(s->allocating_write_reqs_plugged); 262 - 263 283 s->allocating_write_reqs_plugged = false; 264 - qemu_co_enter_next(&s->allocating_write_reqs); 284 + qemu_co_queue_next(&s->allocating_write_reqs); 285 + qemu_co_mutex_unlock(&s->table_lock); 265 286 } 266 287 267 288 static void coroutine_fn qed_need_check_timer_entry(void *opaque) ··· 269 290 BDRVQEDState *s = opaque; 270 291 int ret; 271 292 272 - /* The timer should only fire when allocating writes have drained */ 273 - assert(!s->allocating_acb); 274 - 275 293 trace_qed_need_check_timer_cb(s); 276 294 277 - qed_acquire(s); 278 - qed_plug_allocating_write_reqs(s); 295 + if (!qed_plug_allocating_write_reqs(s)) { 296 + return; 297 + } 279 298 280 299 /* Ensure writes are on disk before clearing flag */ 281 300 ret = bdrv_co_flush(s->bs->file->bs); 282 - qed_release(s); 283 301 if (ret < 0) { 284 302 qed_unplug_allocating_write_reqs(s); 285 303 return; ··· 299 317 { 300 318 Coroutine *co = qemu_coroutine_create(qed_need_check_timer_entry, opaque); 301 319 qemu_coroutine_enter(co); 302 - } 303 - 304 - void qed_acquire(BDRVQEDState *s) 305 - { 306 - aio_context_acquire(bdrv_get_aio_context(s->bs)); 307 - } 308 - 309 - void qed_release(BDRVQEDState *s) 310 - { 311 - aio_context_release(bdrv_get_aio_context(s->bs)); 312 320 } 313 321 314 322 static void qed_start_need_check_timer(BDRVQEDState *s) ··· 369 377 370 378 memset(s, 0, sizeof(BDRVQEDState)); 371 379 s->bs = bs; 380 + qemu_co_mutex_init(&s->table_lock); 372 381 qemu_co_queue_init(&s->allocating_write_reqs); 373 382 } 374 383 ··· 688 697 BlockDriverState **file; 689 698 } QEDIsAllocatedCB; 690 699 700 + /* Called with table_lock held. */ 691 701 static void qed_is_allocated_cb(void *opaque, int ret, uint64_t offset, size_t len) 692 702 { 693 703 QEDIsAllocatedCB *cb = opaque; ··· 735 745 uint64_t offset; 736 746 int ret; 737 747 748 + qemu_co_mutex_lock(&s->table_lock); 738 749 ret = qed_find_cluster(s, &request, cb.pos, &len, &offset); 739 750 qed_is_allocated_cb(&cb, ret, offset, len); 740 751 ··· 742 753 assert(cb.status != BDRV_BLOCK_OFFSET_MASK); 743 754 744 755 qed_unref_l2_cache_entry(request.l2_table); 756 + qemu_co_mutex_unlock(&s->table_lock); 745 757 746 758 return cb.status; 747 759 } ··· 872 884 * 873 885 * The cluster offset may be an allocated byte offset in the image file, the 874 886 * zero cluster marker, or the unallocated cluster marker. 887 + * 888 + * Called with table_lock held. 875 889 */ 876 890 static void coroutine_fn qed_update_l2_table(BDRVQEDState *s, QEDTable *table, 877 891 int index, unsigned int n, ··· 887 901 } 888 902 } 889 903 904 + /* Called with table_lock held. */ 890 905 static void coroutine_fn qed_aio_complete(QEDAIOCB *acb) 891 906 { 892 907 BDRVQEDState *s = acb_to_s(acb); ··· 910 925 if (acb == s->allocating_acb) { 911 926 s->allocating_acb = NULL; 912 927 if (!qemu_co_queue_empty(&s->allocating_write_reqs)) { 913 - qemu_co_enter_next(&s->allocating_write_reqs); 928 + qemu_co_queue_next(&s->allocating_write_reqs); 914 929 } else if (s->header.features & QED_F_NEED_CHECK) { 915 930 qed_start_need_check_timer(s); 916 931 } ··· 919 934 920 935 /** 921 936 * Update L1 table with new L2 table offset and write it out 937 + * 938 + * Called with table_lock held. 922 939 */ 923 940 static int coroutine_fn qed_aio_write_l1_update(QEDAIOCB *acb) 924 941 { ··· 947 964 948 965 /** 949 966 * Update L2 table with new cluster offsets and write them out 967 + * 968 + * Called with table_lock held. 950 969 */ 951 970 static int coroutine_fn qed_aio_write_l2_update(QEDAIOCB *acb, uint64_t offset) 952 971 { ··· 983 1002 984 1003 /** 985 1004 * Write data to the image file 1005 + * 1006 + * Called with table_lock *not* held. 986 1007 */ 987 1008 static int coroutine_fn qed_aio_write_main(QEDAIOCB *acb) 988 1009 { ··· 999 1020 1000 1021 /** 1001 1022 * Populate untouched regions of new data cluster 1023 + * 1024 + * Called with table_lock held. 1002 1025 */ 1003 1026 static int coroutine_fn qed_aio_write_cow(QEDAIOCB *acb) 1004 1027 { 1005 1028 BDRVQEDState *s = acb_to_s(acb); 1006 1029 uint64_t start, len, offset; 1007 1030 int ret; 1031 + 1032 + qemu_co_mutex_unlock(&s->table_lock); 1008 1033 1009 1034 /* Populate front untouched region of new data cluster */ 1010 1035 start = qed_start_of_cluster(s, acb->cur_pos); ··· 1013 1038 trace_qed_aio_write_prefill(s, acb, start, len, acb->cur_cluster); 1014 1039 ret = qed_copy_from_backing_file(s, start, len, acb->cur_cluster); 1015 1040 if (ret < 0) { 1016 - return ret; 1041 + goto out; 1017 1042 } 1018 1043 1019 1044 /* Populate back untouched region of new data cluster */ ··· 1026 1051 trace_qed_aio_write_postfill(s, acb, start, len, offset); 1027 1052 ret = qed_copy_from_backing_file(s, start, len, offset); 1028 1053 if (ret < 0) { 1029 - return ret; 1054 + goto out; 1030 1055 } 1031 1056 1032 1057 ret = qed_aio_write_main(acb); 1033 1058 if (ret < 0) { 1034 - return ret; 1059 + goto out; 1035 1060 } 1036 1061 1037 1062 if (s->bs->backing) { ··· 1046 1071 * cluster and before updating the L2 table. 1047 1072 */ 1048 1073 ret = bdrv_co_flush(s->bs->file->bs); 1049 - if (ret < 0) { 1050 - return ret; 1051 - } 1052 1074 } 1053 1075 1054 - return 0; 1076 + out: 1077 + qemu_co_mutex_lock(&s->table_lock); 1078 + return ret; 1055 1079 } 1056 1080 1057 1081 /** ··· 1074 1098 * @len: Length in bytes 1075 1099 * 1076 1100 * This path is taken when writing to previously unallocated clusters. 1101 + * 1102 + * Called with table_lock held. 1077 1103 */ 1078 1104 static int coroutine_fn qed_aio_write_alloc(QEDAIOCB *acb, size_t len) 1079 1105 { ··· 1088 1114 /* Freeze this request if another allocating write is in progress */ 1089 1115 if (s->allocating_acb != acb || s->allocating_write_reqs_plugged) { 1090 1116 if (s->allocating_acb != NULL) { 1091 - qemu_co_queue_wait(&s->allocating_write_reqs, NULL); 1117 + qemu_co_queue_wait(&s->allocating_write_reqs, &s->table_lock); 1092 1118 assert(s->allocating_acb == NULL); 1093 1119 } 1094 1120 s->allocating_acb = acb; ··· 1135 1161 * @len: Length in bytes 1136 1162 * 1137 1163 * This path is taken when writing to already allocated clusters. 1164 + * 1165 + * Called with table_lock held. 1138 1166 */ 1139 1167 static int coroutine_fn qed_aio_write_inplace(QEDAIOCB *acb, uint64_t offset, 1140 1168 size_t len) 1141 1169 { 1170 + BDRVQEDState *s = acb_to_s(acb); 1171 + int r; 1172 + 1173 + qemu_co_mutex_unlock(&s->table_lock); 1174 + 1142 1175 /* Allocate buffer for zero writes */ 1143 1176 if (acb->flags & QED_AIOCB_ZERO) { 1144 1177 struct iovec *iov = acb->qiov->iov; ··· 1146 1179 if (!iov->iov_base) { 1147 1180 iov->iov_base = qemu_try_blockalign(acb->bs, iov->iov_len); 1148 1181 if (iov->iov_base == NULL) { 1149 - return -ENOMEM; 1182 + r = -ENOMEM; 1183 + goto out; 1150 1184 } 1151 1185 memset(iov->iov_base, 0, iov->iov_len); 1152 1186 } ··· 1156 1190 acb->cur_cluster = offset; 1157 1191 qemu_iovec_concat(&acb->cur_qiov, acb->qiov, acb->qiov_offset, len); 1158 1192 1159 - /* Do the actual write */ 1160 - return qed_aio_write_main(acb); 1193 + /* Do the actual write. */ 1194 + r = qed_aio_write_main(acb); 1195 + out: 1196 + qemu_co_mutex_lock(&s->table_lock); 1197 + return r; 1161 1198 } 1162 1199 1163 1200 /** ··· 1167 1204 * @ret: QED_CLUSTER_FOUND, QED_CLUSTER_L2 or QED_CLUSTER_L1 1168 1205 * @offset: Cluster offset in bytes 1169 1206 * @len: Length in bytes 1207 + * 1208 + * Called with table_lock held. 1170 1209 */ 1171 1210 static int coroutine_fn qed_aio_write_data(void *opaque, int ret, 1172 1211 uint64_t offset, size_t len) ··· 1198 1237 * @ret: QED_CLUSTER_FOUND, QED_CLUSTER_L2 or QED_CLUSTER_L1 1199 1238 * @offset: Cluster offset in bytes 1200 1239 * @len: Length in bytes 1240 + * 1241 + * Called with table_lock held. 1201 1242 */ 1202 1243 static int coroutine_fn qed_aio_read_data(void *opaque, int ret, 1203 1244 uint64_t offset, size_t len) ··· 1205 1246 QEDAIOCB *acb = opaque; 1206 1247 BDRVQEDState *s = acb_to_s(acb); 1207 1248 BlockDriverState *bs = acb->bs; 1249 + int r; 1250 + 1251 + qemu_co_mutex_unlock(&s->table_lock); 1208 1252 1209 1253 /* Adjust offset into cluster */ 1210 1254 offset += qed_offset_into_cluster(s, acb->cur_pos); ··· 1213 1257 1214 1258 qemu_iovec_concat(&acb->cur_qiov, acb->qiov, acb->qiov_offset, len); 1215 1259 1216 - /* Handle zero cluster and backing file reads */ 1260 + /* Handle zero cluster and backing file reads, otherwise read 1261 + * data cluster directly. 1262 + */ 1217 1263 if (ret == QED_CLUSTER_ZERO) { 1218 1264 qemu_iovec_memset(&acb->cur_qiov, 0, 0, acb->cur_qiov.size); 1219 - return 0; 1265 + r = 0; 1220 1266 } else if (ret != QED_CLUSTER_FOUND) { 1221 - return qed_read_backing_file(s, acb->cur_pos, &acb->cur_qiov, 1222 - &acb->backing_qiov); 1267 + r = qed_read_backing_file(s, acb->cur_pos, &acb->cur_qiov, 1268 + &acb->backing_qiov); 1269 + } else { 1270 + BLKDBG_EVENT(bs->file, BLKDBG_READ_AIO); 1271 + r = bdrv_co_preadv(bs->file, offset, acb->cur_qiov.size, 1272 + &acb->cur_qiov, 0); 1223 1273 } 1224 1274 1225 - BLKDBG_EVENT(bs->file, BLKDBG_READ_AIO); 1226 - ret = bdrv_co_preadv(bs->file, offset, acb->cur_qiov.size, 1227 - &acb->cur_qiov, 0); 1228 - if (ret < 0) { 1229 - return ret; 1230 - } 1231 - return 0; 1275 + qemu_co_mutex_lock(&s->table_lock); 1276 + return r; 1232 1277 } 1233 1278 1234 1279 /** ··· 1241 1286 size_t len; 1242 1287 int ret; 1243 1288 1289 + qemu_co_mutex_lock(&s->table_lock); 1244 1290 while (1) { 1245 1291 trace_qed_aio_next_io(s, acb, 0, acb->cur_pos + acb->cur_qiov.size); 1246 1292 ··· 1280 1326 1281 1327 trace_qed_aio_complete(s, acb, ret); 1282 1328 qed_aio_complete(acb); 1329 + qemu_co_mutex_unlock(&s->table_lock); 1283 1330 return ret; 1284 1331 } 1285 1332 ··· 1469 1516 1470 1517 static void bdrv_qed_invalidate_cache(BlockDriverState *bs, Error **errp) 1471 1518 { 1519 + BDRVQEDState *s = bs->opaque; 1472 1520 Error *local_err = NULL; 1473 1521 int ret; 1474 1522 1475 1523 bdrv_qed_close(bs); 1476 1524 1477 1525 bdrv_qed_init_state(bs); 1526 + if (qemu_in_coroutine()) { 1527 + qemu_co_mutex_lock(&s->table_lock); 1528 + } 1478 1529 ret = bdrv_qed_do_open(bs, NULL, bs->open_flags, &local_err); 1530 + if (qemu_in_coroutine()) { 1531 + qemu_co_mutex_unlock(&s->table_lock); 1532 + } 1479 1533 if (local_err) { 1480 1534 error_propagate(errp, local_err); 1481 1535 error_prepend(errp, "Could not reopen qed layer: ");
+7 -4
block/qed.h
··· 151 151 152 152 typedef struct { 153 153 BlockDriverState *bs; /* device */ 154 - uint64_t file_size; /* length of image file, in bytes */ 155 154 155 + /* Written only by an allocating write or the timer handler (the latter 156 + * while allocating reqs are plugged). 157 + */ 156 158 QEDHeader header; /* always cpu-endian */ 159 + 160 + /* Protected by table_lock. */ 161 + CoMutex table_lock; 157 162 QEDTable *l1_table; 158 163 L2TableCache l2_cache; /* l2 table cache */ 159 164 uint32_t table_nelems; 160 165 uint32_t l1_shift; 161 166 uint32_t l2_shift; 162 167 uint32_t l2_mask; 168 + uint64_t file_size; /* length of image file, in bytes */ 163 169 164 170 /* Allocating write request queue */ 165 171 QEDAIOCB *allocating_acb; ··· 176 182 QED_CLUSTER_L2, /* cluster missing in L2 */ 177 183 QED_CLUSTER_L1, /* cluster missing in L1 */ 178 184 }; 179 - 180 - void qed_acquire(BDRVQEDState *s); 181 - void qed_release(BDRVQEDState *s); 182 185 183 186 /** 184 187 * Header functions