tangled
alpha
login
or
join now
ptr.pet
/
nsid-tracker
3
fork
atom
tracks lexicons and how many times they appeared on the jetstream
3
fork
atom
overview
issues
pulls
pipelines
feat(server): implement compaction :3
ptr.pet
7 months ago
bf7842d0
2a7d6005
verified
This commit was signed with the committer's
known signature
.
ptr.pet
SSH Key Fingerprint:
SHA256:Abmvag+juovVufZTxyWY8KcVgrznxvBjQpJesv071Aw=
+520
-222
4 changed files
expand all
collapse all
unified
split
server
src
db
handle.rs
mod.rs
main.rs
utils.rs
+219
server/src/db/handle.rs
···
1
1
+
use std::{
2
2
+
fmt::Debug,
3
3
+
io::Cursor,
4
4
+
ops::{Bound, Deref, RangeBounds},
5
5
+
sync::atomic::{AtomicU64, Ordering as AtomicOrdering},
6
6
+
time::Duration,
7
7
+
};
8
8
+
9
9
+
use byteview::ByteView;
10
10
+
use fjall::{Keyspace, Partition, PartitionCreateOptions, Slice};
11
11
+
use itertools::Itertools;
12
12
+
use parking_lot::Mutex;
13
13
+
use rayon::iter::{IntoParallelIterator, ParallelIterator};
14
14
+
use rclite::Arc;
15
15
+
use smol_str::SmolStr;
16
16
+
17
17
+
use crate::{
18
18
+
db::{EventRecord, NsidHit, block},
19
19
+
error::AppResult,
20
20
+
utils::{
21
21
+
CLOCK, DefaultRateTracker, RateTracker, ReadVariableExt, WritableByteView,
22
22
+
varints_unsigned_encoded,
23
23
+
},
24
24
+
};
25
25
+
26
26
+
pub type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>;
27
27
+
pub type ItemEncoder = block::ItemEncoder<WritableByteView, NsidHit>;
28
28
+
pub type Item = block::Item<NsidHit>;
29
29
+
30
30
+
pub struct Block {
31
31
+
pub written: usize,
32
32
+
pub key: ByteView,
33
33
+
pub data: ByteView,
34
34
+
}
35
35
+
36
36
+
pub struct LexiconHandle {
37
37
+
tree: Partition,
38
38
+
nsid: SmolStr,
39
39
+
buf: Arc<Mutex<Vec<EventRecord>>>,
40
40
+
last_insert: AtomicU64, // relaxed
41
41
+
eps: DefaultRateTracker,
42
42
+
}
43
43
+
44
44
+
impl Debug for LexiconHandle {
45
45
+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46
46
+
f.debug_struct("LexiconHandle")
47
47
+
.field("nsid", self.nsid())
48
48
+
.finish()
49
49
+
}
50
50
+
}
51
51
+
52
52
+
impl Deref for LexiconHandle {
53
53
+
type Target = Partition;
54
54
+
55
55
+
fn deref(&self) -> &Self::Target {
56
56
+
&self.tree
57
57
+
}
58
58
+
}
59
59
+
60
60
+
impl LexiconHandle {
61
61
+
pub fn new(keyspace: &Keyspace, nsid: &str) -> Self {
62
62
+
let opts = PartitionCreateOptions::default().compression(fjall::CompressionType::Miniz(9));
63
63
+
Self {
64
64
+
tree: keyspace.open_partition(nsid, opts).unwrap(),
65
65
+
nsid: nsid.into(),
66
66
+
buf: Default::default(),
67
67
+
last_insert: AtomicU64::new(0),
68
68
+
eps: RateTracker::new(Duration::from_secs(10)),
69
69
+
}
70
70
+
}
71
71
+
72
72
+
pub fn nsid(&self) -> &SmolStr {
73
73
+
&self.nsid
74
74
+
}
75
75
+
76
76
+
pub fn item_count(&self) -> usize {
77
77
+
self.buf.lock().len()
78
78
+
}
79
79
+
80
80
+
pub fn since_last_activity(&self) -> u64 {
81
81
+
CLOCK.delta_as_nanos(self.last_insert.load(AtomicOrdering::Relaxed), CLOCK.raw())
82
82
+
}
83
83
+
84
84
+
pub fn suggested_block_size(&self) -> usize {
85
85
+
self.eps.rate() as usize * 60
86
86
+
}
87
87
+
88
88
+
pub fn queue(&self, event: EventRecord) {
89
89
+
self.buf.lock().push(event);
90
90
+
self.last_insert.store(CLOCK.raw(), AtomicOrdering::Relaxed);
91
91
+
self.eps.observe();
92
92
+
}
93
93
+
94
94
+
pub fn compact(&self, compact_to: usize, range: impl RangeBounds<u64>) -> AppResult<()> {
95
95
+
let start_limit = match range.start_bound().cloned() {
96
96
+
Bound::Included(start) => start,
97
97
+
Bound::Excluded(start) => start.saturating_add(1),
98
98
+
Bound::Unbounded => 0,
99
99
+
};
100
100
+
let end_limit = match range.end_bound().cloned() {
101
101
+
Bound::Included(end) => end,
102
102
+
Bound::Excluded(end) => end.saturating_sub(1),
103
103
+
Bound::Unbounded => u64::MAX,
104
104
+
};
105
105
+
106
106
+
let start_key = varints_unsigned_encoded([start_limit]);
107
107
+
let end_key = varints_unsigned_encoded([end_limit]);
108
108
+
109
109
+
let blocks_to_compact = self
110
110
+
.tree
111
111
+
.range(start_key..end_key)
112
112
+
.collect::<Result<Vec<_>, _>>()?;
113
113
+
if blocks_to_compact.len() < 2 {
114
114
+
tracing::info!("{}: nothing to compact", self.nsid);
115
115
+
return Ok(());
116
116
+
}
117
117
+
118
118
+
let start_blocks_size = blocks_to_compact.len();
119
119
+
let keys_to_delete = blocks_to_compact.iter().map(|(key, _)| key);
120
120
+
let all_items =
121
121
+
blocks_to_compact
122
122
+
.iter()
123
123
+
.try_fold(Vec::new(), |mut acc, (key, value)| {
124
124
+
let mut timestamps = Cursor::new(key);
125
125
+
let start_timestamp = timestamps.read_varint()?;
126
126
+
let decoder = block::ItemDecoder::new(Cursor::new(value), start_timestamp)?;
127
127
+
let mut items = decoder.collect::<Result<Vec<_>, _>>()?;
128
128
+
acc.append(&mut items);
129
129
+
AppResult::Ok(acc)
130
130
+
})?;
131
131
+
132
132
+
let new_blocks = all_items
133
133
+
.into_iter()
134
134
+
.chunks(compact_to)
135
135
+
.into_iter()
136
136
+
.map(|chunk| chunk.collect_vec())
137
137
+
.collect_vec()
138
138
+
.into_par_iter()
139
139
+
.map(|chunk| {
140
140
+
let count = chunk.len();
141
141
+
Self::encode_block_from_items(chunk, count)
142
142
+
})
143
143
+
.collect::<Result<Vec<_>, _>>()?;
144
144
+
let end_blocks_size = new_blocks.len();
145
145
+
146
146
+
for key in keys_to_delete {
147
147
+
self.tree.remove(key.clone())?;
148
148
+
}
149
149
+
for block in new_blocks {
150
150
+
self.tree.insert(block.key, block.data)?;
151
151
+
}
152
152
+
153
153
+
tracing::info!(
154
154
+
"{}: compacted {} blocks to {} blocks ({}% reduction)",
155
155
+
self.nsid,
156
156
+
start_blocks_size,
157
157
+
end_blocks_size,
158
158
+
((start_blocks_size - end_blocks_size) as f64 / start_blocks_size as f64) * 100.0,
159
159
+
);
160
160
+
161
161
+
Ok(())
162
162
+
}
163
163
+
164
164
+
pub fn encode_block_from_items(
165
165
+
items: impl IntoIterator<Item = Item>,
166
166
+
count: usize,
167
167
+
) -> AppResult<Block> {
168
168
+
let mut writer = ItemEncoder::new(
169
169
+
WritableByteView::with_size(ItemEncoder::encoded_len(count)),
170
170
+
count,
171
171
+
);
172
172
+
let mut start_timestamp = None;
173
173
+
let mut end_timestamp = None;
174
174
+
let mut written = 0_usize;
175
175
+
for item in items {
176
176
+
writer.encode(&item)?;
177
177
+
if start_timestamp.is_none() {
178
178
+
start_timestamp = Some(item.timestamp);
179
179
+
}
180
180
+
end_timestamp = Some(item.timestamp);
181
181
+
if written >= count {
182
182
+
break;
183
183
+
}
184
184
+
written += 1;
185
185
+
}
186
186
+
if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) {
187
187
+
let value = writer.finish()?;
188
188
+
let key = varints_unsigned_encoded([start_timestamp, end_timestamp]);
189
189
+
return Ok(Block {
190
190
+
written,
191
191
+
key,
192
192
+
data: value.into_inner(),
193
193
+
});
194
194
+
}
195
195
+
Err(std::io::Error::new(std::io::ErrorKind::WriteZero, "no items are in queue").into())
196
196
+
}
197
197
+
198
198
+
pub fn encode_block(&self, item_count: usize) -> AppResult<Block> {
199
199
+
let block = Self::encode_block_from_items(
200
200
+
self.buf.lock().drain(..).map(|event| {
201
201
+
Item::new(
202
202
+
event.timestamp,
203
203
+
&NsidHit {
204
204
+
deleted: event.deleted,
205
205
+
},
206
206
+
)
207
207
+
}),
208
208
+
item_count,
209
209
+
)?;
210
210
+
if block.written != item_count {
211
211
+
return Err(std::io::Error::new(
212
212
+
std::io::ErrorKind::InvalidData,
213
213
+
"unexpected number of items, invalid data?",
214
214
+
)
215
215
+
.into());
216
216
+
}
217
217
+
Ok(block)
218
218
+
}
219
219
+
}
+133
-198
server/src/db/mod.rs
···
1
1
use std::{
2
2
+
collections::HashMap,
3
3
+
fmt::Debug,
2
4
io::Cursor,
3
5
ops::{Bound, Deref, RangeBounds},
4
6
path::Path,
5
5
-
sync::atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering},
6
7
time::Duration,
7
8
};
8
9
9
9
-
use byteview::ByteView;
10
10
-
use fjall::{Config, Keyspace, Partition, PartitionCreateOptions, Slice};
10
10
+
use byteview::StrView;
11
11
+
use fjall::{Config, Keyspace, Partition, PartitionCreateOptions};
11
12
use itertools::{Either, Itertools};
12
12
-
use parking_lot::Mutex;
13
13
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
14
14
use rclite::Arc;
15
15
use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
16
16
-
use smol_str::SmolStr;
16
16
+
use smol_str::{SmolStr, ToSmolStr};
17
17
use tokio::sync::broadcast;
18
18
use tokio_util::sync::CancellationToken;
19
19
20
20
use crate::{
21
21
+
db::handle::{ItemDecoder, LexiconHandle},
21
22
error::{AppError, AppResult},
22
23
jetstream::JetstreamEvent,
23
23
-
utils::{
24
24
-
CLOCK, DefaultRateTracker, RateTracker, ReadVariableExt, WritableByteView,
25
25
-
varints_unsigned_encoded,
26
26
-
},
24
24
+
utils::{RateTracker, ReadVariableExt, varints_unsigned_encoded},
27
25
};
28
26
29
27
mod block;
28
28
+
mod handle;
30
29
31
30
#[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
32
31
#[rkyv(compare(PartialEq), derive(Debug))]
···
71
70
}
72
71
}
73
72
74
74
-
type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>;
75
75
-
type ItemEncoder = block::ItemEncoder<WritableByteView, NsidHit>;
76
76
-
type Item = block::Item<NsidHit>;
77
77
-
78
78
-
struct Block {
79
79
-
written: usize,
80
80
-
key: ByteView,
81
81
-
data: ByteView,
82
82
-
}
83
83
-
84
84
-
pub struct LexiconHandle {
85
85
-
tree: Partition,
86
86
-
nsid: SmolStr,
87
87
-
buf: Arc<Mutex<Vec<EventRecord>>>,
88
88
-
last_insert: AtomicU64, // relaxed
89
89
-
eps: DefaultRateTracker,
90
90
-
}
91
91
-
92
92
-
impl LexiconHandle {
93
93
-
fn new(keyspace: &Keyspace, nsid: &str) -> Self {
94
94
-
let opts = PartitionCreateOptions::default().compression(fjall::CompressionType::Miniz(9));
95
95
-
Self {
96
96
-
tree: keyspace.open_partition(nsid, opts).unwrap(),
97
97
-
nsid: nsid.into(),
98
98
-
buf: Default::default(),
99
99
-
last_insert: AtomicU64::new(0),
100
100
-
eps: RateTracker::new(Duration::from_secs(10)),
101
101
-
}
102
102
-
}
103
103
-
104
104
-
fn item_count(&self) -> usize {
105
105
-
self.buf.lock().len()
106
106
-
}
107
107
-
108
108
-
fn since_last_activity(&self) -> u64 {
109
109
-
CLOCK.delta_as_nanos(self.last_insert.load(AtomicOrdering::Relaxed), CLOCK.raw())
110
110
-
}
111
111
-
112
112
-
fn suggested_block_size(&self) -> usize {
113
113
-
self.eps.rate() as usize * 60
114
114
-
}
115
115
-
116
116
-
fn insert(&self, event: EventRecord) {
117
117
-
self.buf.lock().push(event);
118
118
-
self.last_insert.store(CLOCK.raw(), AtomicOrdering::Relaxed);
119
119
-
self.eps.observe();
120
120
-
}
121
121
-
122
122
-
fn compact(&self) {}
123
123
-
124
124
-
fn encode_block(&self, item_count: usize) -> AppResult<Block> {
125
125
-
let mut writer = ItemEncoder::new(
126
126
-
WritableByteView::with_size(ItemEncoder::encoded_len(item_count)),
127
127
-
item_count,
128
128
-
);
129
129
-
let mut start_timestamp = None;
130
130
-
let mut end_timestamp = None;
131
131
-
let mut written = 0_usize;
132
132
-
for event in self.buf.lock().drain(..) {
133
133
-
let item = Item::new(
134
134
-
event.timestamp,
135
135
-
&NsidHit {
136
136
-
deleted: event.deleted,
137
137
-
},
138
138
-
);
139
139
-
writer.encode(&item)?;
140
140
-
if start_timestamp.is_none() {
141
141
-
start_timestamp = Some(event.timestamp);
142
142
-
}
143
143
-
end_timestamp = Some(event.timestamp);
144
144
-
if written >= item_count {
145
145
-
break;
146
146
-
}
147
147
-
written += 1;
148
148
-
}
149
149
-
if written != item_count {
150
150
-
return Err(std::io::Error::new(
151
151
-
std::io::ErrorKind::InvalidData,
152
152
-
"unexpected number of items, invalid data?",
153
153
-
)
154
154
-
.into());
155
155
-
}
156
156
-
if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) {
157
157
-
let value = writer.finish()?;
158
158
-
let key = varints_unsigned_encoded([start_timestamp, end_timestamp]);
159
159
-
return Ok(Block {
160
160
-
written,
161
161
-
key,
162
162
-
data: value.into_inner(),
163
163
-
});
164
164
-
}
165
165
-
Err(std::io::Error::new(std::io::ErrorKind::WriteZero, "no items are in queue").into())
166
166
-
}
73
73
+
pub struct DbInfo {
74
74
+
pub nsids: HashMap<SmolStr, Vec<usize>>,
75
75
+
pub disk_size: u64,
167
76
}
168
77
169
78
// counts is nsid -> NsidCounts
···
176
85
event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
177
86
eps: RateTracker<100>,
178
87
cancel_token: CancellationToken,
179
179
-
min_block_size: usize,
180
180
-
max_block_size: usize,
181
181
-
max_last_activity: u64,
88
88
+
pub min_block_size: usize,
89
89
+
pub max_block_size: usize,
90
90
+
pub max_last_activity: u64,
182
91
}
183
92
184
93
impl Db {
···
231
140
for (_, handle) in self.hits.iter(&_guard) {
232
141
let mut nsid_data = Vec::with_capacity(2);
233
142
let is_too_old = handle.since_last_activity() > self.max_last_activity;
234
234
-
// if we disconnect for a long time, we want to sync all of what we have
235
235
-
// to avoid having many small blocks (even if we run compaction later)
143
143
+
// if we disconnect for a long time, we want to sync all of what we
144
144
+
// have to avoid having many small blocks (even if we run compaction
145
145
+
// later, it reduces work until we run compaction)
236
146
let block_size = is_too_old
237
147
.then_some(self.max_block_size)
238
148
.unwrap_or_else(|| {
···
275
185
let chunk = chunk?;
276
186
for (i, block, handle) in chunk {
277
187
self.sync_pool
278
278
-
.execute(move || match handle.tree.insert(block.key, block.data) {
188
188
+
.execute(move || match handle.insert(block.key, block.data) {
279
189
Ok(_) => {
280
190
tracing::info!(
281
191
"[{i}] synced {} of {} to db",
282
192
block.written,
283
283
-
handle.nsid
193
193
+
handle.nsid()
284
194
)
285
195
}
286
196
Err(err) => tracing::error!("failed to sync block: {}", err),
···
292
202
Ok(())
293
203
}
294
204
295
295
-
pub fn compact(&self) {}
205
205
+
pub fn compact(
206
206
+
&self,
207
207
+
nsid: impl AsRef<str>,
208
208
+
max_count: usize,
209
209
+
range: impl RangeBounds<u64>,
210
210
+
) -> AppResult<()> {
211
211
+
let Some(handle) = self.get_handle(nsid) else {
212
212
+
return Ok(());
213
213
+
};
214
214
+
handle.compact(max_count, range)
215
215
+
}
216
216
+
217
217
+
pub fn compact_all(
218
218
+
&self,
219
219
+
max_count: usize,
220
220
+
range: impl RangeBounds<u64> + Clone,
221
221
+
) -> AppResult<()> {
222
222
+
for nsid in self.get_nsids() {
223
223
+
self.compact(nsid, max_count, range.clone())?;
224
224
+
}
225
225
+
Ok(())
226
226
+
}
227
227
+
228
228
+
pub fn major_compact(&self) -> AppResult<()> {
229
229
+
self.compact_all(self.max_block_size, ..)?;
230
230
+
let _guard = scc::ebr::Guard::new();
231
231
+
for (_, handle) in self.hits.iter(&_guard) {
232
232
+
handle.deref().major_compact()?;
233
233
+
}
234
234
+
Ok(())
235
235
+
}
296
236
297
237
#[inline(always)]
298
298
-
fn maybe_run_in_nsid_tree<T>(
299
299
-
&self,
300
300
-
nsid: &str,
301
301
-
f: impl FnOnce(&LexiconHandle) -> T,
302
302
-
) -> Option<T> {
238
238
+
fn get_handle(&self, nsid: impl AsRef<str>) -> Option<Arc<LexiconHandle>> {
303
239
let _guard = scc::ebr::Guard::new();
304
304
-
let handle = match self.hits.peek(nsid, &_guard) {
240
240
+
let handle = match self.hits.peek(nsid.as_ref(), &_guard) {
305
241
Some(handle) => handle.clone(),
306
242
None => {
307
307
-
if self.ks.partition_exists(nsid) {
308
308
-
let handle = Arc::new(LexiconHandle::new(&self.ks, nsid));
243
243
+
if self.ks.partition_exists(nsid.as_ref()) {
244
244
+
let handle = Arc::new(LexiconHandle::new(&self.ks, nsid.as_ref()));
309
245
let _ = self.hits.insert(SmolStr::new(nsid), handle.clone());
310
246
handle
311
247
} else {
···
313
249
}
314
250
}
315
251
};
316
316
-
Some(f(&handle))
252
252
+
Some(handle)
317
253
}
318
254
319
255
#[inline(always)]
320
320
-
fn run_in_nsid_tree<T>(
321
321
-
&self,
322
322
-
nsid: &SmolStr,
323
323
-
f: impl FnOnce(&LexiconHandle) -> AppResult<T>,
324
324
-
) -> AppResult<T> {
325
325
-
f(self
326
326
-
.hits
256
256
+
fn ensure_handle(&self, nsid: &SmolStr) -> impl Deref<Target = Arc<LexiconHandle>> + use<'_> {
257
257
+
self.hits
327
258
.entry(nsid.clone())
328
259
.or_insert_with(|| Arc::new(LexiconHandle::new(&self.ks, &nsid)))
329
329
-
.get())
330
260
}
331
261
332
262
pub fn ingest_events(&self, events: impl Iterator<Item = EventRecord>) -> AppResult<()> {
333
263
for (key, chunk) in events.chunk_by(|event| event.nsid.clone()).into_iter() {
334
264
let mut counts = self.get_count(&key)?;
335
335
-
self.run_in_nsid_tree(&key, move |tree| {
336
336
-
for event in chunk {
337
337
-
let EventRecord {
338
338
-
timestamp, deleted, ..
339
339
-
} = event.clone();
265
265
+
let handle = self.ensure_handle(&key);
266
266
+
for event in chunk {
267
267
+
let EventRecord {
268
268
+
timestamp, deleted, ..
269
269
+
} = event.clone();
340
270
341
341
-
tree.insert(event);
271
271
+
handle.queue(event);
342
272
343
343
-
// increment count
344
344
-
counts.last_seen = timestamp;
345
345
-
if deleted {
346
346
-
counts.deleted_count += 1;
347
347
-
} else {
348
348
-
counts.count += 1;
349
349
-
}
273
273
+
// increment count
274
274
+
counts.last_seen = timestamp;
275
275
+
if deleted {
276
276
+
counts.deleted_count += 1;
277
277
+
} else {
278
278
+
counts.count += 1;
279
279
+
}
350
280
351
351
-
self.eps.observe();
352
352
-
}
353
353
-
Ok(())
354
354
-
})?;
281
281
+
self.eps.observe();
282
282
+
}
355
283
self.insert_count(&key, &counts)?;
356
284
if self.event_broadcaster.receiver_count() > 0 {
357
285
let _ = self.event_broadcaster.send((key, counts));
···
392
320
})
393
321
}
394
322
395
395
-
pub fn get_nsids(&self) -> impl Iterator<Item = impl Deref<Target = str> + 'static> {
323
323
+
pub fn get_nsids(&self) -> impl Iterator<Item = StrView> {
396
324
self.ks
397
325
.list_partitions()
398
326
.into_iter()
399
327
.filter(|k| k.deref() != "_counts")
400
328
}
401
329
402
402
-
pub fn get_hits_debug(&self, nsid: &str) -> impl Iterator<Item = AppResult<(Slice, Slice)>> {
403
403
-
self.maybe_run_in_nsid_tree(nsid, |handle| {
404
404
-
Either::Left(
405
405
-
handle
406
406
-
.tree
407
407
-
.iter()
408
408
-
.rev()
409
409
-
.map(|res| res.map_err(AppError::from)),
410
410
-
)
330
330
+
pub fn info(&self) -> AppResult<DbInfo> {
331
331
+
let mut nsids = HashMap::new();
332
332
+
for nsid in self.get_nsids() {
333
333
+
let Some(handle) = self.get_handle(&nsid) else {
334
334
+
continue;
335
335
+
};
336
336
+
let block_lens = handle.iter().rev().try_fold(Vec::new(), |mut acc, item| {
337
337
+
let (key, value) = item?;
338
338
+
let mut timestamps = Cursor::new(key);
339
339
+
let start_timestamp = timestamps.read_varint()?;
340
340
+
let decoder = ItemDecoder::new(Cursor::new(value), start_timestamp)?;
341
341
+
acc.push(decoder.item_count());
342
342
+
AppResult::Ok(acc)
343
343
+
})?;
344
344
+
nsids.insert(nsid.to_smolstr(), block_lens);
345
345
+
}
346
346
+
Ok(DbInfo {
347
347
+
nsids,
348
348
+
disk_size: self.ks.disk_space(),
411
349
})
412
412
-
.unwrap_or_else(|| Either::Right(std::iter::empty()))
413
350
}
414
351
415
352
pub fn get_hits(
416
353
&self,
417
354
nsid: &str,
418
355
range: impl RangeBounds<u64> + std::fmt::Debug,
419
419
-
) -> impl Iterator<Item = AppResult<Item>> {
356
356
+
) -> impl Iterator<Item = AppResult<handle::Item>> {
420
357
let start_limit = match range.start_bound().cloned() {
421
358
Bound::Included(start) => start,
422
359
Bound::Excluded(start) => start.saturating_add(1),
···
429
366
};
430
367
let end_key = varints_unsigned_encoded([end_limit]);
431
368
432
432
-
self.maybe_run_in_nsid_tree(nsid, move |handle| {
433
433
-
let map_block = move |(key, val)| {
434
434
-
let mut key_reader = Cursor::new(key);
435
435
-
let start_timestamp = key_reader.read_varint::<u64>()?;
436
436
-
if start_timestamp < start_limit {
437
437
-
return Ok(None);
438
438
-
}
439
439
-
let items = ItemDecoder::new(Cursor::new(val), start_timestamp)?
440
440
-
.take_while(move |item| {
441
441
-
item.as_ref().map_or(true, |item| {
442
442
-
item.timestamp <= end_limit && item.timestamp >= start_limit
443
443
-
})
444
444
-
})
445
445
-
.map(|res| res.map_err(AppError::from));
446
446
-
Ok(Some(items))
447
447
-
};
369
369
+
let Some(handle) = self.get_handle(nsid) else {
370
370
+
return Either::Right(std::iter::empty());
371
371
+
};
448
372
449
449
-
Either::Left(
450
450
-
handle
451
451
-
.tree
452
452
-
.range(..end_key)
453
453
-
.rev()
454
454
-
.map_while(move |res| {
455
455
-
res.map_err(AppError::from).and_then(map_block).transpose()
373
373
+
let map_block = move |(key, val)| {
374
374
+
let mut key_reader = Cursor::new(key);
375
375
+
let start_timestamp = key_reader.read_varint::<u64>()?;
376
376
+
if start_timestamp < start_limit {
377
377
+
return Ok(None);
378
378
+
}
379
379
+
let items = handle::ItemDecoder::new(Cursor::new(val), start_timestamp)?
380
380
+
.take_while(move |item| {
381
381
+
item.as_ref().map_or(true, |item| {
382
382
+
item.timestamp <= end_limit && item.timestamp >= start_limit
456
383
})
457
457
-
.collect::<Vec<_>>()
458
458
-
.into_iter()
459
459
-
.rev()
460
460
-
.flatten()
461
461
-
.flatten(),
462
462
-
)
463
463
-
})
464
464
-
.unwrap_or_else(|| Either::Right(std::iter::empty()))
384
384
+
})
385
385
+
.map(|res| res.map_err(AppError::from));
386
386
+
Ok(Some(items))
387
387
+
};
388
388
+
389
389
+
Either::Left(
390
390
+
handle
391
391
+
.range(..end_key)
392
392
+
.rev()
393
393
+
.map_while(move |res| res.map_err(AppError::from).and_then(map_block).transpose())
394
394
+
.collect::<Vec<_>>()
395
395
+
.into_iter()
396
396
+
.rev()
397
397
+
.flatten()
398
398
+
.flatten(),
399
399
+
)
465
400
}
466
401
467
402
pub fn tracking_since(&self) -> AppResult<u64> {
468
403
// HACK: we should actually store when we started tracking but im lazy
469
469
-
// should be accurate enough
470
470
-
self.maybe_run_in_nsid_tree("app.bsky.feed.like", |handle| {
471
471
-
let Some((timestamps_raw, _)) = handle.tree.first_key_value()? else {
472
472
-
return Ok(0);
473
473
-
};
474
474
-
let mut timestamp_reader = Cursor::new(timestamps_raw);
475
475
-
timestamp_reader
476
476
-
.read_varint::<u64>()
477
477
-
.map_err(AppError::from)
478
478
-
})
479
479
-
.unwrap_or(Ok(0))
404
404
+
// this should be accurate enough
405
405
+
let Some(handle) = self.get_handle("app.bsky.feed.like") else {
406
406
+
return Ok(0);
407
407
+
};
408
408
+
let Some((timestamps_raw, _)) = handle.first_key_value()? else {
409
409
+
return Ok(0);
410
410
+
};
411
411
+
let mut timestamp_reader = Cursor::new(timestamps_raw);
412
412
+
timestamp_reader
413
413
+
.read_varint::<u64>()
414
414
+
.map_err(AppError::from)
480
415
}
481
416
}
+95
-24
server/src/main.rs
···
1
1
-
use std::ops::Deref;
1
1
+
use std::{ops::Deref, time::Duration};
2
2
3
3
use rclite::Arc;
4
4
use smol_str::ToSmolStr;
···
11
11
db::{Db, EventRecord},
12
12
error::AppError,
13
13
jetstream::JetstreamClient,
14
14
-
utils::CLOCK,
14
14
+
utils::{CLOCK, RelativeDateTime, get_time},
15
15
};
16
16
17
17
mod api;
···
37
37
match std::env::args().nth(1).as_deref() {
38
38
Some("compact") => {
39
39
compact();
40
40
+
return;
41
41
+
}
42
42
+
Some("migrate") => {
43
43
+
migrate();
40
44
return;
41
45
}
42
46
Some("debug") => {
···
106
110
}
107
111
});
108
112
109
109
-
let sync_task = tokio::task::spawn({
113
113
+
let db_task = tokio::task::spawn({
110
114
let db = db.clone();
111
115
async move {
116
116
+
let sync_period = Duration::from_secs(10);
117
117
+
let mut sync_interval = tokio::time::interval(sync_period);
118
118
+
sync_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
119
119
+
120
120
+
let compact_period = std::time::Duration::from_secs(60 * 30); // 30 mins
121
121
+
let mut compact_interval = tokio::time::interval(compact_period);
122
122
+
compact_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
123
123
+
112
124
loop {
113
113
-
let sync_db = tokio::task::spawn_blocking({
114
114
-
let db = db.clone();
115
115
-
move || {
116
116
-
if db.is_shutting_down() {
117
117
-
return;
125
125
+
let sync_db = async || {
126
126
+
tokio::task::spawn_blocking({
127
127
+
let db = db.clone();
128
128
+
move || {
129
129
+
if db.is_shutting_down() {
130
130
+
return;
131
131
+
}
132
132
+
match db.sync(false) {
133
133
+
Ok(_) => (),
134
134
+
Err(e) => tracing::error!("failed to sync db: {}", e),
135
135
+
}
118
136
}
119
119
-
match db.sync(false) {
120
120
-
Ok(_) => (),
121
121
-
Err(e) => tracing::error!("failed to sync db: {}", e),
137
137
+
})
138
138
+
.await
139
139
+
.unwrap();
140
140
+
};
141
141
+
let compact_db = async || {
142
142
+
tokio::task::spawn_blocking({
143
143
+
let db = db.clone();
144
144
+
move || {
145
145
+
if db.is_shutting_down() {
146
146
+
return;
147
147
+
}
148
148
+
let end = get_time() - compact_period / 2;
149
149
+
let start = end - compact_period;
150
150
+
let range = start.as_secs()..end.as_secs();
151
151
+
tracing::info!(
152
152
+
{
153
153
+
start = %RelativeDateTime::from_now(start),
154
154
+
end = %RelativeDateTime::from_now(end),
155
155
+
},
156
156
+
"running compaction...",
157
157
+
);
158
158
+
match db.compact_all(db.max_block_size, range) {
159
159
+
Ok(_) => (),
160
160
+
Err(e) => tracing::error!("failed to compact db: {}", e),
161
161
+
}
122
162
}
123
123
-
}
124
124
-
});
163
163
+
})
164
164
+
.await
165
165
+
.unwrap();
166
166
+
};
125
167
tokio::select! {
126
126
-
_ = sync_db => {}
127
127
-
_ = db.shutting_down() => break,
128
128
-
}
129
129
-
tokio::select! {
130
130
-
_ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {}
168
168
+
_ = sync_interval.tick() => sync_db().await,
169
169
+
_ = compact_interval.tick() => compact_db().await,
131
170
_ = db.shutting_down() => break,
132
171
}
133
172
}
···
157
196
tracing::info!("shutting down...");
158
197
cancel_token.cancel();
159
198
ingest_events.join().expect("failed to join ingest events");
160
160
-
sync_task.await.expect("cant join sync task");
199
199
+
db_task.await.expect("cant join db task");
161
200
db.sync(true).expect("cant sync db");
162
201
}
163
202
164
203
fn debug() {
165
204
let db = Db::new(".fjall_data", CancellationToken::new()).expect("couldnt create db");
166
166
-
for nsid in db.get_nsids() {
167
167
-
let nsid = nsid.deref();
168
168
-
for hit in db.get_hits(nsid, ..) {
169
169
-
let hit = hit.expect("cant read event");
170
170
-
println!("{nsid} {}", hit.timestamp);
205
205
+
let info = db.info().expect("cant get db info");
206
206
+
println!("disk size: {}", info.disk_size);
207
207
+
for (nsid, blocks) in info.nsids {
208
208
+
print!("{nsid}:");
209
209
+
let mut last_size = 0;
210
210
+
let mut same_size_count = 0;
211
211
+
for item_count in blocks {
212
212
+
if item_count == last_size {
213
213
+
same_size_count += 1;
214
214
+
} else {
215
215
+
if same_size_count > 1 {
216
216
+
print!("x{}", same_size_count);
217
217
+
}
218
218
+
print!(" {item_count}");
219
219
+
same_size_count = 0;
220
220
+
}
221
221
+
last_size = item_count;
171
222
}
223
223
+
print!("\n");
172
224
}
173
225
}
174
226
175
227
fn compact() {
228
228
+
let db = Db::new(".fjall_data", CancellationToken::new()).expect("couldnt create db");
229
229
+
let info = db.info().expect("cant get db info");
230
230
+
db.major_compact().expect("cant compact");
231
231
+
std::thread::sleep(Duration::from_secs(5));
232
232
+
let compacted_info = db.info().expect("cant get db info");
233
233
+
println!(
234
234
+
"disk size: {} -> {}",
235
235
+
info.disk_size, compacted_info.disk_size
236
236
+
);
237
237
+
for (nsid, blocks) in info.nsids {
238
238
+
println!(
239
239
+
"{nsid}: {} -> {}",
240
240
+
blocks.len(),
241
241
+
compacted_info.nsids[&nsid].len()
242
242
+
)
243
243
+
}
244
244
+
}
245
245
+
246
246
+
fn migrate() {
176
247
let cancel_token = CancellationToken::new();
177
248
let from = Arc::new(
178
249
Db::new(".fjall_data_from", cancel_token.child_token()).expect("couldnt create db"),
+73
server/src/utils.rs
···
5
5
use byteview::ByteView;
6
6
use ordered_varint::Variable;
7
7
8
8
+
pub fn get_time() -> Duration {
9
9
+
std::time::SystemTime::now()
10
10
+
.duration_since(std::time::UNIX_EPOCH)
11
11
+
.unwrap()
12
12
+
}
13
13
+
8
14
pub trait WriteVariableExt: Write {
9
15
fn write_varint(&mut self, value: impl Variable) -> io::Result<usize> {
10
16
value.encode_variable(self)
···
257
263
assert_eq!(rate, 40.0); // 40 events in 1 second
258
264
}
259
265
}
266
266
+
267
267
+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
268
268
+
pub enum TimeDirection {
269
269
+
Backwards, // Past (default)
270
270
+
Forwards, // Future
271
271
+
}
272
272
+
273
273
+
impl Default for TimeDirection {
274
274
+
fn default() -> Self {
275
275
+
TimeDirection::Backwards
276
276
+
}
277
277
+
}
278
278
+
279
279
+
#[derive(Debug, Clone, PartialEq, Eq)]
280
280
+
pub struct RelativeDateTime {
281
281
+
duration: Duration,
282
282
+
direction: TimeDirection,
283
283
+
}
284
284
+
285
285
+
impl RelativeDateTime {
286
286
+
pub fn new(duration: Duration, direction: TimeDirection) -> Self {
287
287
+
Self {
288
288
+
duration,
289
289
+
direction,
290
290
+
}
291
291
+
}
292
292
+
293
293
+
pub fn ago(duration: Duration) -> Self {
294
294
+
Self::new(duration, TimeDirection::Backwards)
295
295
+
}
296
296
+
297
297
+
pub fn from_now(duration: Duration) -> Self {
298
298
+
let cur = get_time();
299
299
+
if duration > cur {
300
300
+
Self::new(duration - cur, TimeDirection::Forwards)
301
301
+
} else {
302
302
+
Self::new(cur - duration, TimeDirection::Backwards)
303
303
+
}
304
304
+
}
305
305
+
}
306
306
+
307
307
+
impl std::fmt::Display for RelativeDateTime {
308
308
+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
309
309
+
let secs = self.duration.as_secs();
310
310
+
311
311
+
if secs == 0 {
312
312
+
return write!(f, "now");
313
313
+
}
314
314
+
315
315
+
let (amount, unit) = match secs {
316
316
+
0 => unreachable!(), // handled above
317
317
+
1..=59 => (secs, "second"),
318
318
+
60..=3599 => (secs / 60, "minute"),
319
319
+
3600..=86399 => (secs / 3600, "hour"),
320
320
+
86400..=2591999 => (secs / 86400, "day"), // up to 29 days
321
321
+
2592000..=31535999 => (secs / 2592000, "month"), // 30 days to 364 days
322
322
+
_ => (secs / 31536000, "year"), // 365 days+
323
323
+
};
324
324
+
325
325
+
let plural = if amount != 1 { "s" } else { "" };
326
326
+
327
327
+
match self.direction {
328
328
+
TimeDirection::Forwards => write!(f, "in {} {}{}", amount, unit, plural),
329
329
+
TimeDirection::Backwards => write!(f, "{} {}{} ago", amount, unit, plural),
330
330
+
}
331
331
+
}
332
332
+
}