tangled
alpha
login
or
join now
microcosm.blue
/
microcosm-rs
65
fork
atom
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
65
fork
atom
overview
issues
8
pulls
2
pipelines
moar metrics
bad-example.com
7 months ago
1899b586
471f35e2
+80
-15
1 changed file
expand all
collapse all
unified
split
ufos
src
storage_fjall.rs
+80
-15
ufos/src/storage_fjall.rs
···
242
rollups,
243
queues,
244
};
0
245
Ok((reader, writer, js_cursor, sketch_secret))
246
}
247
}
···
392
"storage_fjall_l0_run_count",
393
Unit::Count,
394
"number of L0 runs in a partition"
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
395
);
396
}
397
···
1025
.set(self.rollups.tree.l0_run_count() as f64);
1026
gauge!("storage_fjall_l0_run_count", "partition" => "queues")
1027
.set(self.queues.tree.l0_run_count() as f64);
0
0
0
1028
}
1029
async fn get_storage_stats(&self) -> StorageResult<serde_json::Value> {
1030
let s = self.clone();
···
1117
}
1118
1119
impl FjallWriter {
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
1120
fn rollup_delete_account(
1121
&mut self,
1122
cursor: Cursor,
···
1284
1285
insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, last_cursor)?;
1286
0
0
0
1287
batch.commit()?;
1288
Ok((cursors_advanced, dirty_nsids))
1289
}
···
1294
if self.bg_taken.swap(true, Ordering::SeqCst) {
1295
return Err(StorageError::BackgroundAlreadyStarted);
1296
}
1297
-
describe_histogram!(
1298
-
"storage_trim_dirty_nsids",
1299
-
Unit::Count,
1300
-
"number of NSIDs trimmed"
1301
-
);
1302
-
describe_histogram!(
1303
-
"storage_trim_duration",
1304
-
Unit::Microseconds,
1305
-
"how long it took to trim the dirty NSIDs"
1306
-
);
1307
-
describe_counter!(
1308
-
"storage_trim_removed",
1309
-
Unit::Count,
1310
-
"how many records were removed during trim"
1311
-
);
1312
if reroll {
1313
log::info!("reroll: resetting rollup cursor...");
1314
insert_static_neu::<NewRollupCursorKey>(&self.global, Cursor::from_start())?;
···
1403
latest.to_db_bytes()?,
1404
);
1405
0
0
0
1406
batch.commit()?;
1407
Ok(())
1408
}
···
1584
batch.remove(&self.records, key_bytes);
1585
records_deleted += 1;
1586
if batch.len() >= MAX_BATCHED_ACCOUNT_DELETE_RECORDS {
0
1587
batch.commit()?;
1588
batch = self.keyspace.batch();
1589
}
1590
}
0
0
1591
batch.commit()?;
1592
Ok(records_deleted)
1593
}
···
242
rollups,
243
queues,
244
};
245
+
writer.describe_metrics();
246
Ok((reader, writer, js_cursor, sketch_secret))
247
}
248
}
···
393
"storage_fjall_l0_run_count",
394
Unit::Count,
395
"number of L0 runs in a partition"
396
+
);
397
+
describe_gauge!(
398
+
"storage_fjall_keyspace_disk_space",
399
+
Unit::Bytes,
400
+
"total storage used according to fjall"
401
+
);
402
+
describe_gauge!(
403
+
"storage_fjall_journal_count",
404
+
Unit::Count,
405
+
"total keyspace journals according to fjall"
406
+
);
407
+
describe_gauge!(
408
+
"storage_fjall_keyspace_sequence",
409
+
Unit::Count,
410
+
"fjall keyspace sequence"
411
);
412
}
413
···
1041
.set(self.rollups.tree.l0_run_count() as f64);
1042
gauge!("storage_fjall_l0_run_count", "partition" => "queues")
1043
.set(self.queues.tree.l0_run_count() as f64);
1044
+
gauge!("storage_fjall_keyspace_disk_space").set(self.keyspace.disk_space() as f64);
1045
+
gauge!("storage_fjall_journal_count").set(self.keyspace.journal_count() as f64);
1046
+
gauge!("storage_fjall_keyspace_sequence").set(self.keyspace.instant() as f64);
1047
}
1048
async fn get_storage_stats(&self) -> StorageResult<serde_json::Value> {
1049
let s = self.clone();
···
1136
}
1137
1138
impl FjallWriter {
1139
+
fn describe_metrics(&self) {
1140
+
describe_histogram!(
1141
+
"storage_insert_batch_db_batch_items",
1142
+
Unit::Count,
1143
+
"how many items are in the fjall batch for batched inserts"
1144
+
);
1145
+
describe_histogram!(
1146
+
"storage_insert_batch_db_batch_size",
1147
+
Unit::Count,
1148
+
"in-memory size of the fjall batch for batched inserts"
1149
+
);
1150
+
describe_histogram!(
1151
+
"storage_rollup_counts_db_batch_items",
1152
+
Unit::Count,
1153
+
"how many items are in the fjall batch for a timlies rollup"
1154
+
);
1155
+
describe_histogram!(
1156
+
"storage_rollup_counts_db_batch_size",
1157
+
Unit::Count,
1158
+
"in-memory size of the fjall batch for a timelies rollup"
1159
+
);
1160
+
describe_counter!(
1161
+
"storage_delete_account_partial_commits",
1162
+
Unit::Count,
1163
+
"fjall checkpoint commits for cleaning up accounts with too many records"
1164
+
);
1165
+
describe_counter!(
1166
+
"storage_delete_account_completions",
1167
+
Unit::Count,
1168
+
"total count of account deletes handled"
1169
+
);
1170
+
describe_counter!(
1171
+
"storage_delete_account_records_deleted",
1172
+
Unit::Count,
1173
+
"total records deleted when handling account deletes"
1174
+
);
1175
+
describe_histogram!(
1176
+
"storage_trim_dirty_nsids",
1177
+
Unit::Count,
1178
+
"number of NSIDs trimmed"
1179
+
);
1180
+
describe_histogram!(
1181
+
"storage_trim_duration",
1182
+
Unit::Microseconds,
1183
+
"how long it took to trim the dirty NSIDs"
1184
+
);
1185
+
describe_counter!(
1186
+
"storage_trim_removed",
1187
+
Unit::Count,
1188
+
"how many records were removed during trim"
1189
+
);
1190
+
}
1191
fn rollup_delete_account(
1192
&mut self,
1193
cursor: Cursor,
···
1355
1356
insert_batch_static_neu::<NewRollupCursorKey>(&mut batch, &self.global, last_cursor)?;
1357
1358
+
histogram!("storage_rollup_counts_db_batch_items").record(batch.len() as f64);
1359
+
histogram!("storage_rollup_counts_db_batch_size")
1360
+
.record(std::mem::size_of_val(&batch) as f64);
1361
batch.commit()?;
1362
Ok((cursors_advanced, dirty_nsids))
1363
}
···
1368
if self.bg_taken.swap(true, Ordering::SeqCst) {
1369
return Err(StorageError::BackgroundAlreadyStarted);
1370
}
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
1371
if reroll {
1372
log::info!("reroll: resetting rollup cursor...");
1373
insert_static_neu::<NewRollupCursorKey>(&self.global, Cursor::from_start())?;
···
1462
latest.to_db_bytes()?,
1463
);
1464
1465
+
histogram!("storage_insert_batch_db_batch_items").record(batch.len() as f64);
1466
+
histogram!("storage_insert_batch_db_batch_size")
1467
+
.record(std::mem::size_of_val(&batch) as f64);
1468
batch.commit()?;
1469
Ok(())
1470
}
···
1646
batch.remove(&self.records, key_bytes);
1647
records_deleted += 1;
1648
if batch.len() >= MAX_BATCHED_ACCOUNT_DELETE_RECORDS {
1649
+
counter!("storage_delete_account_partial_commits").increment(1);
1650
batch.commit()?;
1651
batch = self.keyspace.batch();
1652
}
1653
}
1654
+
counter!("storage_delete_account_completions").increment(1);
1655
+
counter!("storage_delete_account_records_deleted").increment(records_deleted as u64);
1656
batch.commit()?;
1657
Ok(records_deleted)
1658
}