tangled
alpha
login
or
join now
ptr.pet
/
nsid-tracker
3
fork
atom
tracks lexicons and how many times they appeared on the jetstream
3
fork
atom
overview
issues
pulls
pipelines
migrate thgingy
ptr.pet
7 months ago
80e6a430
d6b2e4c8
verified
This commit was signed with the committer's
known signature
.
ptr.pet
SSH Key Fingerprint:
SHA256:Abmvag+juovVufZTxyWY8KcVgrznxvBjQpJesv071Aw=
+929
-7
3 changed files
expand all
collapse all
unified
split
server
src
db_old
block.rs
mod.rs
main.rs
+501
server/src/db_old/block.rs
···
1
1
+
use ordered_varint::Variable;
2
2
+
use rkyv::{
3
3
+
Archive, Deserialize, Serialize,
4
4
+
api::high::{HighSerializer, HighValidator},
5
5
+
bytecheck::CheckBytes,
6
6
+
de::Pool,
7
7
+
rancor::{self, Strategy},
8
8
+
ser::allocator::ArenaHandle,
9
9
+
util::AlignedVec,
10
10
+
};
11
11
+
use std::{
12
12
+
io::{self, Read, Write},
13
13
+
marker::PhantomData,
14
14
+
};
15
15
+
16
16
+
use crate::error::{AppError, AppResult};
17
17
+
18
18
+
pub struct Item<T> {
19
19
+
pub timestamp: u64,
20
20
+
data: AlignedVec,
21
21
+
phantom: PhantomData<T>,
22
22
+
}
23
23
+
24
24
+
impl<T: Archive> Item<T> {
25
25
+
pub fn access(&self) -> &T::Archived {
26
26
+
unsafe { rkyv::access_unchecked::<T::Archived>(&self.data) }
27
27
+
}
28
28
+
}
29
29
+
30
30
+
impl<T> Item<T>
31
31
+
where
32
32
+
T: Archive,
33
33
+
T::Archived: for<'a> CheckBytes<HighValidator<'a, rancor::Error>>
34
34
+
+ Deserialize<T, Strategy<Pool, rancor::Error>>,
35
35
+
{
36
36
+
pub fn deser(&self) -> AppResult<T> {
37
37
+
rkyv::from_bytes(&self.data).map_err(AppError::from)
38
38
+
}
39
39
+
}
40
40
+
41
41
+
impl<T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rancor::Error>>> Item<T> {
42
42
+
pub fn new(timestamp: u64, data: &T) -> Self {
43
43
+
Item {
44
44
+
timestamp,
45
45
+
data: unsafe { rkyv::to_bytes(data).unwrap_unchecked() },
46
46
+
phantom: PhantomData,
47
47
+
}
48
48
+
}
49
49
+
}
50
50
+
51
51
+
pub struct ItemEncoder<W: Write, T> {
52
52
+
writer: W,
53
53
+
prev_timestamp: u64,
54
54
+
prev_delta: i64,
55
55
+
_item: PhantomData<T>,
56
56
+
}
57
57
+
58
58
+
impl<W: Write, T> ItemEncoder<W, T> {
59
59
+
pub fn new(writer: W) -> Self {
60
60
+
ItemEncoder {
61
61
+
writer,
62
62
+
prev_timestamp: 0,
63
63
+
prev_delta: 0,
64
64
+
_item: PhantomData,
65
65
+
}
66
66
+
}
67
67
+
68
68
+
pub fn encode(&mut self, item: &Item<T>) -> AppResult<()> {
69
69
+
if self.prev_timestamp == 0 {
70
70
+
// self.writer.write_varint(item.timestamp)?;
71
71
+
self.prev_timestamp = item.timestamp;
72
72
+
self.write_data(&item.data)?;
73
73
+
return Ok(());
74
74
+
}
75
75
+
76
76
+
let delta = (item.timestamp as i128 - self.prev_timestamp as i128) as i64;
77
77
+
78
78
+
self.writer.write_varint(delta - self.prev_delta)?;
79
79
+
self.prev_timestamp = item.timestamp;
80
80
+
self.prev_delta = delta;
81
81
+
82
82
+
self.write_data(&item.data)?;
83
83
+
84
84
+
Ok(())
85
85
+
}
86
86
+
87
87
+
fn write_data(&mut self, data: &[u8]) -> AppResult<()> {
88
88
+
self.writer.write_varint(data.len())?;
89
89
+
self.writer.write_all(data)?;
90
90
+
Ok(())
91
91
+
}
92
92
+
93
93
+
pub fn finish(mut self) -> AppResult<W> {
94
94
+
self.writer.flush()?;
95
95
+
Ok(self.writer)
96
96
+
}
97
97
+
}
98
98
+
99
99
+
pub struct ItemDecoder<R, T> {
100
100
+
reader: R,
101
101
+
current_timestamp: u64,
102
102
+
current_delta: i64,
103
103
+
first_item: bool,
104
104
+
_item: PhantomData<T>,
105
105
+
}
106
106
+
107
107
+
impl<R: Read, T: Archive> ItemDecoder<R, T> {
108
108
+
pub fn new(reader: R, start_timestamp: u64) -> AppResult<Self> {
109
109
+
Ok(ItemDecoder {
110
110
+
reader,
111
111
+
current_timestamp: start_timestamp,
112
112
+
current_delta: 0,
113
113
+
first_item: true,
114
114
+
_item: PhantomData,
115
115
+
})
116
116
+
}
117
117
+
118
118
+
pub fn decode(&mut self) -> AppResult<Option<Item<T>>> {
119
119
+
if self.first_item {
120
120
+
// read the first timestamp
121
121
+
// let timestamp = match self.reader.read_varint::<u64>() {
122
122
+
// Ok(timestamp) => timestamp,
123
123
+
// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
124
124
+
// Err(e) => return Err(e.into()),
125
125
+
// };
126
126
+
// self.current_timestamp = timestamp;
127
127
+
128
128
+
let Some(data_raw) = self.read_item()? else {
129
129
+
return Ok(None);
130
130
+
};
131
131
+
self.first_item = false;
132
132
+
return Ok(Some(Item {
133
133
+
timestamp: self.current_timestamp,
134
134
+
data: data_raw,
135
135
+
phantom: PhantomData,
136
136
+
}));
137
137
+
}
138
138
+
139
139
+
let Some(_delta) = self.read_timestamp()? else {
140
140
+
return Ok(None);
141
141
+
};
142
142
+
143
143
+
// read data
144
144
+
let data_raw = match self.read_item()? {
145
145
+
Some(data_raw) => data_raw,
146
146
+
None => {
147
147
+
return Err(io::Error::new(
148
148
+
io::ErrorKind::UnexpectedEof,
149
149
+
"expected data after delta",
150
150
+
)
151
151
+
.into());
152
152
+
}
153
153
+
};
154
154
+
155
155
+
Ok(Some(Item {
156
156
+
timestamp: self.current_timestamp,
157
157
+
data: data_raw,
158
158
+
phantom: PhantomData,
159
159
+
}))
160
160
+
}
161
161
+
162
162
+
// [10, 11, 12, 14] -> [1, 1, 2] -> [0, 1]
163
163
+
fn read_timestamp(&mut self) -> AppResult<Option<u64>> {
164
164
+
let delta = match self.reader.read_varint::<i64>() {
165
165
+
Ok(delta) => delta,
166
166
+
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
167
167
+
Err(e) => return Err(e.into()),
168
168
+
};
169
169
+
self.current_delta += delta;
170
170
+
self.current_timestamp =
171
171
+
(self.current_timestamp as i128 + self.current_delta as i128) as u64;
172
172
+
Ok(Some(self.current_timestamp))
173
173
+
}
174
174
+
175
175
+
fn read_item(&mut self) -> AppResult<Option<AlignedVec>> {
176
176
+
let data_len = match self.reader.read_varint::<usize>() {
177
177
+
Ok(data_len) => data_len,
178
178
+
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
179
179
+
Err(e) => return Err(e.into()),
180
180
+
};
181
181
+
let mut data_raw = AlignedVec::with_capacity(data_len);
182
182
+
for _ in 0..data_len {
183
183
+
data_raw.push(0);
184
184
+
}
185
185
+
self.reader.read_exact(data_raw.as_mut_slice())?;
186
186
+
Ok(Some(data_raw))
187
187
+
}
188
188
+
}
189
189
+
190
190
+
impl<R: Read, T: Archive> Iterator for ItemDecoder<R, T> {
191
191
+
type Item = AppResult<Item<T>>;
192
192
+
193
193
+
fn next(&mut self) -> Option<Self::Item> {
194
194
+
self.decode().transpose()
195
195
+
}
196
196
+
}
197
197
+
198
198
+
pub trait WriteVariableExt: Write {
199
199
+
fn write_varint(&mut self, value: impl Variable) -> io::Result<usize> {
200
200
+
value.encode_variable(self)
201
201
+
}
202
202
+
}
203
203
+
impl<W: Write> WriteVariableExt for W {}
204
204
+
205
205
+
pub trait ReadVariableExt: Read {
206
206
+
fn read_varint<T: Variable>(&mut self) -> io::Result<T> {
207
207
+
T::decode_variable(self)
208
208
+
}
209
209
+
}
210
210
+
impl<R: Read> ReadVariableExt for R {}
211
211
+
212
212
+
#[cfg(test)]
213
213
+
mod test {
214
214
+
use super::*;
215
215
+
use rkyv::{Archive, Deserialize, Serialize};
216
216
+
use std::io::Cursor;
217
217
+
218
218
+
#[derive(Archive, Deserialize, Serialize, Debug, PartialEq)]
219
219
+
#[rkyv(compare(PartialEq))]
220
220
+
struct TestData {
221
221
+
id: u32,
222
222
+
value: String,
223
223
+
}
224
224
+
225
225
+
#[test]
226
226
+
fn test_encoder_decoder_single_item() {
227
227
+
let data = TestData {
228
228
+
id: 123,
229
229
+
value: "test".to_string(),
230
230
+
};
231
231
+
232
232
+
let item = Item::new(1000, &data);
233
233
+
234
234
+
// encode
235
235
+
let mut buffer = Vec::new();
236
236
+
let mut encoder = ItemEncoder::new(&mut buffer);
237
237
+
encoder.encode(&item).unwrap();
238
238
+
encoder.finish().unwrap();
239
239
+
240
240
+
// decode
241
241
+
let cursor = Cursor::new(buffer);
242
242
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
243
243
+
244
244
+
let decoded_item = decoder.decode().unwrap().unwrap();
245
245
+
assert_eq!(decoded_item.timestamp, 1000);
246
246
+
247
247
+
let decoded_data = decoded_item.access();
248
248
+
assert_eq!(decoded_data.id, 123);
249
249
+
assert_eq!(decoded_data.value.as_str(), "test");
250
250
+
}
251
251
+
252
252
+
#[test]
253
253
+
fn test_encoder_decoder_multiple_items() {
254
254
+
let items = vec![
255
255
+
Item::new(
256
256
+
1000,
257
257
+
&TestData {
258
258
+
id: 1,
259
259
+
value: "first".to_string(),
260
260
+
},
261
261
+
),
262
262
+
Item::new(
263
263
+
1010,
264
264
+
&TestData {
265
265
+
id: 2,
266
266
+
value: "second".to_string(),
267
267
+
},
268
268
+
),
269
269
+
Item::new(
270
270
+
1015,
271
271
+
&TestData {
272
272
+
id: 3,
273
273
+
value: "third".to_string(),
274
274
+
},
275
275
+
),
276
276
+
Item::new(
277
277
+
1025,
278
278
+
&TestData {
279
279
+
id: 4,
280
280
+
value: "fourth".to_string(),
281
281
+
},
282
282
+
),
283
283
+
];
284
284
+
285
285
+
// encode
286
286
+
let mut buffer = Vec::new();
287
287
+
let mut encoder = ItemEncoder::new(&mut buffer);
288
288
+
289
289
+
for item in &items {
290
290
+
encoder.encode(item).unwrap();
291
291
+
}
292
292
+
encoder.finish().unwrap();
293
293
+
294
294
+
// decode
295
295
+
let cursor = Cursor::new(buffer);
296
296
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
297
297
+
298
298
+
let mut decoded_items = Vec::new();
299
299
+
while let Some(item) = decoder.decode().unwrap() {
300
300
+
decoded_items.push(item);
301
301
+
}
302
302
+
303
303
+
assert_eq!(decoded_items.len(), 4);
304
304
+
305
305
+
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
306
306
+
assert_eq!(original.timestamp, decoded.timestamp);
307
307
+
assert_eq!(original.access().id, decoded.access().id);
308
308
+
assert_eq!(
309
309
+
original.access().value.as_str(),
310
310
+
decoded.access().value.as_str()
311
311
+
);
312
312
+
}
313
313
+
}
314
314
+
315
315
+
#[test]
316
316
+
fn test_encoder_decoder_with_iterator() {
317
317
+
let items = vec![
318
318
+
Item::new(
319
319
+
2000,
320
320
+
&TestData {
321
321
+
id: 10,
322
322
+
value: "a".to_string(),
323
323
+
},
324
324
+
),
325
325
+
Item::new(
326
326
+
2005,
327
327
+
&TestData {
328
328
+
id: 20,
329
329
+
value: "b".to_string(),
330
330
+
},
331
331
+
),
332
332
+
Item::new(
333
333
+
2012,
334
334
+
&TestData {
335
335
+
id: 30,
336
336
+
value: "c".to_string(),
337
337
+
},
338
338
+
),
339
339
+
];
340
340
+
341
341
+
// encode
342
342
+
let mut buffer = Vec::new();
343
343
+
let mut encoder = ItemEncoder::new(&mut buffer);
344
344
+
345
345
+
for item in &items {
346
346
+
encoder.encode(item).unwrap();
347
347
+
}
348
348
+
encoder.finish().unwrap();
349
349
+
350
350
+
// decode
351
351
+
let cursor = Cursor::new(buffer);
352
352
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 2000).unwrap();
353
353
+
354
354
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
355
355
+
let decoded_items = decoded_items.unwrap();
356
356
+
357
357
+
assert_eq!(decoded_items.len(), 3);
358
358
+
assert_eq!(decoded_items[0].timestamp, 2000);
359
359
+
assert_eq!(decoded_items[1].timestamp, 2005);
360
360
+
assert_eq!(decoded_items[2].timestamp, 2012);
361
361
+
362
362
+
assert_eq!(decoded_items[0].access().id, 10);
363
363
+
assert_eq!(decoded_items[1].access().id, 20);
364
364
+
assert_eq!(decoded_items[2].access().id, 30);
365
365
+
}
366
366
+
367
367
+
#[test]
368
368
+
fn test_delta_compression() {
369
369
+
let items = vec![
370
370
+
Item::new(
371
371
+
1000,
372
372
+
&TestData {
373
373
+
id: 1,
374
374
+
value: "a".to_string(),
375
375
+
},
376
376
+
),
377
377
+
Item::new(
378
378
+
1010,
379
379
+
&TestData {
380
380
+
id: 2,
381
381
+
value: "b".to_string(),
382
382
+
},
383
383
+
), // delta = 10
384
384
+
Item::new(
385
385
+
1020,
386
386
+
&TestData {
387
387
+
id: 3,
388
388
+
value: "c".to_string(),
389
389
+
},
390
390
+
), // delta = 10, delta-of-delta = 0
391
391
+
Item::new(
392
392
+
1025,
393
393
+
&TestData {
394
394
+
id: 4,
395
395
+
value: "d".to_string(),
396
396
+
},
397
397
+
), // delta = 5, delta-of-delta = -5
398
398
+
];
399
399
+
400
400
+
let mut buffer = Vec::new();
401
401
+
let mut encoder = ItemEncoder::new(&mut buffer);
402
402
+
403
403
+
for item in &items {
404
404
+
encoder.encode(item).unwrap();
405
405
+
}
406
406
+
encoder.finish().unwrap();
407
407
+
408
408
+
// decode and verify
409
409
+
let cursor = Cursor::new(buffer);
410
410
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
411
411
+
412
412
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
413
413
+
let decoded_items = decoded_items.unwrap();
414
414
+
415
415
+
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
416
416
+
assert_eq!(original.timestamp, decoded.timestamp);
417
417
+
assert_eq!(original.access().id, decoded.access().id);
418
418
+
}
419
419
+
}
420
420
+
421
421
+
#[test]
422
422
+
fn test_empty_decode() {
423
423
+
let buffer = Vec::new();
424
424
+
let cursor = Cursor::new(buffer);
425
425
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
426
426
+
427
427
+
let result = decoder.decode().unwrap();
428
428
+
assert!(result.is_none());
429
429
+
}
430
430
+
431
431
+
#[test]
432
432
+
fn test_backwards_timestamp() {
433
433
+
let items = vec![
434
434
+
Item::new(
435
435
+
1000,
436
436
+
&TestData {
437
437
+
id: 1,
438
438
+
value: "first".to_string(),
439
439
+
},
440
440
+
),
441
441
+
Item::new(
442
442
+
900,
443
443
+
&TestData {
444
444
+
id: 2,
445
445
+
value: "second".to_string(),
446
446
+
},
447
447
+
),
448
448
+
];
449
449
+
450
450
+
let mut buffer = Vec::new();
451
451
+
let mut encoder = ItemEncoder::new(&mut buffer);
452
452
+
453
453
+
for item in &items {
454
454
+
encoder.encode(item).unwrap();
455
455
+
}
456
456
+
encoder.finish().unwrap();
457
457
+
458
458
+
let cursor = Cursor::new(buffer);
459
459
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
460
460
+
461
461
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
462
462
+
let decoded_items = decoded_items.unwrap();
463
463
+
464
464
+
assert_eq!(decoded_items.len(), 2);
465
465
+
assert_eq!(decoded_items[0].timestamp, 1000);
466
466
+
assert_eq!(decoded_items[1].timestamp, 900);
467
467
+
}
468
468
+
469
469
+
#[test]
470
470
+
fn test_different_data_sizes() {
471
471
+
let small_data = TestData {
472
472
+
id: 1,
473
473
+
value: "x".to_string(),
474
474
+
};
475
475
+
let large_data = TestData {
476
476
+
id: 2,
477
477
+
value: "a".repeat(1000),
478
478
+
};
479
479
+
480
480
+
let items = vec![Item::new(1000, &small_data), Item::new(1001, &large_data)];
481
481
+
482
482
+
let mut buffer = Vec::new();
483
483
+
let mut encoder = ItemEncoder::new(&mut buffer);
484
484
+
485
485
+
for item in &items {
486
486
+
encoder.encode(item).unwrap();
487
487
+
}
488
488
+
encoder.finish().unwrap();
489
489
+
490
490
+
let cursor = Cursor::new(buffer);
491
491
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
492
492
+
493
493
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
494
494
+
let decoded_items = decoded_items.unwrap();
495
495
+
496
496
+
assert_eq!(decoded_items.len(), 2);
497
497
+
assert_eq!(decoded_items[0].access().value.as_str(), "x");
498
498
+
assert_eq!(decoded_items[1].access().value.len(), 1000);
499
499
+
assert_eq!(decoded_items[1].access().value.as_str(), "a".repeat(1000));
500
500
+
}
501
501
+
}
+424
server/src/db_old/mod.rs
···
1
1
+
use std::{
2
2
+
io::Cursor,
3
3
+
ops::{Bound, Deref, RangeBounds},
4
4
+
path::Path,
5
5
+
sync::{
6
6
+
Arc,
7
7
+
atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering},
8
8
+
},
9
9
+
time::{Duration, Instant},
10
10
+
};
11
11
+
12
12
+
use fjall::{Config, Keyspace, Partition, PartitionCreateOptions, Slice};
13
13
+
use ordered_varint::Variable;
14
14
+
use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
15
15
+
use smol_str::SmolStr;
16
16
+
use tokio::sync::broadcast;
17
17
+
18
18
+
use crate::{
19
19
+
db_old::block::{ReadVariableExt, WriteVariableExt},
20
20
+
error::{AppError, AppResult},
21
21
+
jetstream::JetstreamEvent,
22
22
+
utils::{DefaultRateTracker, get_time},
23
23
+
};
24
24
+
25
25
+
mod block;
26
26
+
27
27
+
#[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
28
28
+
#[rkyv(compare(PartialEq), derive(Debug))]
29
29
+
pub struct NsidCounts {
30
30
+
pub count: u128,
31
31
+
pub deleted_count: u128,
32
32
+
pub last_seen: u64,
33
33
+
}
34
34
+
35
35
+
#[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
36
36
+
#[rkyv(compare(PartialEq), derive(Debug))]
37
37
+
pub struct NsidHit {
38
38
+
pub deleted: bool,
39
39
+
}
40
40
+
41
41
+
#[derive(Clone)]
42
42
+
pub struct EventRecord {
43
43
+
pub nsid: SmolStr,
44
44
+
pub timestamp: u64, // seconds
45
45
+
pub deleted: bool,
46
46
+
}
47
47
+
48
48
+
impl EventRecord {
49
49
+
pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> {
50
50
+
match event {
51
51
+
JetstreamEvent::Commit {
52
52
+
time_us, commit, ..
53
53
+
} => Some(Self {
54
54
+
nsid: commit.collection.into(),
55
55
+
timestamp: time_us / 1_000_000,
56
56
+
deleted: false,
57
57
+
}),
58
58
+
JetstreamEvent::Delete {
59
59
+
time_us, commit, ..
60
60
+
} => Some(Self {
61
61
+
nsid: commit.collection.into(),
62
62
+
timestamp: time_us / 1_000_000,
63
63
+
deleted: true,
64
64
+
}),
65
65
+
_ => None,
66
66
+
}
67
67
+
}
68
68
+
}
69
69
+
70
70
+
type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>;
71
71
+
type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>;
72
72
+
type Item = block::Item<NsidHit>;
73
73
+
74
74
+
pub struct LexiconHandle {
75
75
+
tree: Partition,
76
76
+
buf: Arc<scc::Queue<EventRecord>>,
77
77
+
buf_len: AtomicUsize,
78
78
+
last_insert: AtomicU64,
79
79
+
eps: DefaultRateTracker,
80
80
+
block_size: AtomicUsize,
81
81
+
}
82
82
+
83
83
+
impl LexiconHandle {
84
84
+
fn new(keyspace: &Keyspace, nsid: &str) -> Self {
85
85
+
let opts = PartitionCreateOptions::default().compression(fjall::CompressionType::Miniz(9));
86
86
+
Self {
87
87
+
tree: keyspace.open_partition(nsid, opts).unwrap(),
88
88
+
buf: Default::default(),
89
89
+
buf_len: AtomicUsize::new(0),
90
90
+
last_insert: AtomicU64::new(0),
91
91
+
eps: DefaultRateTracker::new(Duration::from_secs(5)),
92
92
+
block_size: AtomicUsize::new(1000),
93
93
+
}
94
94
+
}
95
95
+
96
96
+
fn item_count(&self) -> usize {
97
97
+
self.buf_len.load(AtomicOrdering::Acquire)
98
98
+
}
99
99
+
100
100
+
fn last_insert(&self) -> u64 {
101
101
+
self.last_insert.load(AtomicOrdering::Acquire)
102
102
+
}
103
103
+
104
104
+
fn suggested_block_size(&self) -> usize {
105
105
+
self.block_size.load(AtomicOrdering::Relaxed)
106
106
+
}
107
107
+
108
108
+
fn insert(&self, event: EventRecord) {
109
109
+
self.buf.push(event);
110
110
+
self.buf_len.fetch_add(1, AtomicOrdering::Release);
111
111
+
self.last_insert
112
112
+
.store(get_time().as_millis() as u64, AtomicOrdering::Release);
113
113
+
self.eps.observe(1);
114
114
+
let rate = self.eps.rate() as usize;
115
115
+
if rate != 0 {
116
116
+
self.block_size.store(rate * 60, AtomicOrdering::Relaxed);
117
117
+
}
118
118
+
}
119
119
+
120
120
+
fn sync(&self, max_block_size: usize) -> AppResult<usize> {
121
121
+
let mut writer = ItemEncoder::new(Vec::with_capacity(
122
122
+
size_of::<u64>() + self.item_count().min(max_block_size) * size_of::<(u64, NsidHit)>(),
123
123
+
));
124
124
+
let mut start_timestamp = None;
125
125
+
let mut end_timestamp = None;
126
126
+
let mut written = 0_usize;
127
127
+
while let Some(event) = self.buf.pop() {
128
128
+
let item = Item::new(
129
129
+
event.timestamp,
130
130
+
&NsidHit {
131
131
+
deleted: event.deleted,
132
132
+
},
133
133
+
);
134
134
+
writer.encode(&item)?;
135
135
+
if start_timestamp.is_none() {
136
136
+
start_timestamp = Some(event.timestamp);
137
137
+
}
138
138
+
end_timestamp = Some(event.timestamp);
139
139
+
if written >= max_block_size {
140
140
+
break;
141
141
+
}
142
142
+
written += 1;
143
143
+
}
144
144
+
if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) {
145
145
+
self.buf_len.store(0, AtomicOrdering::Release);
146
146
+
let value = writer.finish()?;
147
147
+
let mut key = Vec::with_capacity(size_of::<u64>() * 2);
148
148
+
key.write_varint(start_timestamp)?;
149
149
+
key.write_varint(end_timestamp)?;
150
150
+
self.tree.insert(key, value)?;
151
151
+
}
152
152
+
Ok(written)
153
153
+
}
154
154
+
}
155
155
+
156
156
+
type BoxedIter<T> = Box<dyn Iterator<Item = T>>;
157
157
+
158
158
+
// counts is nsid -> NsidCounts
159
159
+
// hits is tree per nsid: varint start time + varint end time -> block of hits
160
160
+
pub struct Db {
161
161
+
inner: Keyspace,
162
162
+
hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>,
163
163
+
counts: Partition,
164
164
+
event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
165
165
+
eps: DefaultRateTracker,
166
166
+
min_block_size: usize,
167
167
+
max_block_size: usize,
168
168
+
max_last_activity: Duration,
169
169
+
}
170
170
+
171
171
+
impl Db {
172
172
+
pub fn new(path: impl AsRef<Path>) -> AppResult<Self> {
173
173
+
tracing::info!("opening db...");
174
174
+
let ks = Config::new(path)
175
175
+
.cache_size(8 * 1024 * 1024) // from talna
176
176
+
.open()?;
177
177
+
Ok(Self {
178
178
+
hits: Default::default(),
179
179
+
counts: ks.open_partition(
180
180
+
"_counts",
181
181
+
PartitionCreateOptions::default().compression(fjall::CompressionType::None),
182
182
+
)?,
183
183
+
inner: ks,
184
184
+
event_broadcaster: broadcast::channel(1000).0,
185
185
+
eps: DefaultRateTracker::new(Duration::from_secs(1)),
186
186
+
min_block_size: 512,
187
187
+
max_block_size: 100_000,
188
188
+
max_last_activity: Duration::from_secs(10),
189
189
+
})
190
190
+
}
191
191
+
192
192
+
pub fn sync(&self, all: bool) -> AppResult<()> {
193
193
+
let _guard = scc::ebr::Guard::new();
194
194
+
for (nsid, tree) in self.hits.iter(&_guard) {
195
195
+
let count = tree.item_count();
196
196
+
let is_max_block_size = count > self.min_block_size.max(tree.suggested_block_size());
197
197
+
let is_too_old = (get_time().as_millis() as u64 - tree.last_insert())
198
198
+
> self.max_last_activity.as_millis() as u64;
199
199
+
if count > 0 && (all || is_max_block_size || is_too_old) {
200
200
+
loop {
201
201
+
let synced = tree.sync(self.max_block_size)?;
202
202
+
if synced == 0 {
203
203
+
break;
204
204
+
}
205
205
+
tracing::info!("synced {synced} of {nsid} to db");
206
206
+
}
207
207
+
}
208
208
+
}
209
209
+
Ok(())
210
210
+
}
211
211
+
212
212
+
#[inline(always)]
213
213
+
pub fn eps(&self) -> usize {
214
214
+
self.eps.rate() as usize
215
215
+
}
216
216
+
217
217
+
#[inline(always)]
218
218
+
pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> {
219
219
+
self.event_broadcaster.subscribe()
220
220
+
}
221
221
+
222
222
+
#[inline(always)]
223
223
+
fn maybe_run_in_nsid_tree<T>(
224
224
+
&self,
225
225
+
nsid: &str,
226
226
+
f: impl FnOnce(&LexiconHandle) -> T,
227
227
+
) -> Option<T> {
228
228
+
let _guard = scc::ebr::Guard::new();
229
229
+
let handle = match self.hits.peek(nsid, &_guard) {
230
230
+
Some(handle) => handle.clone(),
231
231
+
None => {
232
232
+
if self.inner.partition_exists(nsid) {
233
233
+
let handle = Arc::new(LexiconHandle::new(&self.inner, nsid));
234
234
+
let _ = self.hits.insert(SmolStr::new(nsid), handle.clone());
235
235
+
handle
236
236
+
} else {
237
237
+
return None;
238
238
+
}
239
239
+
}
240
240
+
};
241
241
+
Some(f(&handle))
242
242
+
}
243
243
+
244
244
+
#[inline(always)]
245
245
+
fn run_in_nsid_tree<T>(
246
246
+
&self,
247
247
+
nsid: SmolStr,
248
248
+
f: impl FnOnce(&LexiconHandle) -> AppResult<T>,
249
249
+
) -> AppResult<T> {
250
250
+
f(self
251
251
+
.hits
252
252
+
.entry(nsid.clone())
253
253
+
.or_insert_with(move || Arc::new(LexiconHandle::new(&self.inner, &nsid)))
254
254
+
.get())
255
255
+
}
256
256
+
257
257
+
pub fn record_event(&self, e: EventRecord) -> AppResult<()> {
258
258
+
let EventRecord {
259
259
+
nsid,
260
260
+
timestamp,
261
261
+
deleted,
262
262
+
} = e.clone();
263
263
+
264
264
+
// insert event
265
265
+
self.run_in_nsid_tree(nsid.clone(), move |tree| Ok(tree.insert(e)))?;
266
266
+
// increment count
267
267
+
let mut counts = self.get_count(&nsid)?;
268
268
+
counts.last_seen = timestamp;
269
269
+
if deleted {
270
270
+
counts.deleted_count += 1;
271
271
+
} else {
272
272
+
counts.count += 1;
273
273
+
}
274
274
+
self.insert_count(&nsid, counts.clone())?;
275
275
+
if self.event_broadcaster.receiver_count() > 0 {
276
276
+
let _ = self.event_broadcaster.send((SmolStr::new(&nsid), counts));
277
277
+
}
278
278
+
self.eps.observe(1);
279
279
+
Ok(())
280
280
+
}
281
281
+
282
282
+
#[inline(always)]
283
283
+
fn insert_count(&self, nsid: &str, counts: NsidCounts) -> AppResult<()> {
284
284
+
self.counts
285
285
+
.insert(
286
286
+
nsid,
287
287
+
unsafe { rkyv::to_bytes::<Error>(&counts).unwrap_unchecked() }.as_slice(),
288
288
+
)
289
289
+
.map_err(AppError::from)
290
290
+
}
291
291
+
292
292
+
pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> {
293
293
+
let Some(raw) = self.counts.get(nsid)? else {
294
294
+
return Ok(NsidCounts::default());
295
295
+
};
296
296
+
Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() })
297
297
+
}
298
298
+
299
299
+
pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> {
300
300
+
self.counts.iter().map(|res| {
301
301
+
res.map_err(AppError::from).map(|(key, val)| {
302
302
+
(
303
303
+
SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }),
304
304
+
unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() },
305
305
+
)
306
306
+
})
307
307
+
})
308
308
+
}
309
309
+
310
310
+
pub fn get_nsids(&self) -> impl Iterator<Item = impl Deref<Target = str> + 'static> {
311
311
+
self.inner
312
312
+
.list_partitions()
313
313
+
.into_iter()
314
314
+
.filter(|k| k.deref() != "_counts")
315
315
+
}
316
316
+
317
317
+
pub fn get_hits_debug(&self, nsid: &str) -> BoxedIter<AppResult<(Slice, Slice)>> {
318
318
+
self.maybe_run_in_nsid_tree(nsid, |handle| -> BoxedIter<AppResult<(Slice, Slice)>> {
319
319
+
Box::new(
320
320
+
handle
321
321
+
.tree
322
322
+
.iter()
323
323
+
.rev()
324
324
+
.map(|res| res.map_err(AppError::from)),
325
325
+
)
326
326
+
})
327
327
+
.unwrap_or_else(|| Box::new(std::iter::empty()))
328
328
+
}
329
329
+
330
330
+
pub fn get_hits(
331
331
+
&self,
332
332
+
nsid: &str,
333
333
+
range: impl RangeBounds<u64> + std::fmt::Debug,
334
334
+
) -> BoxedIter<AppResult<Item>> {
335
335
+
let start = range
336
336
+
.start_bound()
337
337
+
.cloned()
338
338
+
.map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() });
339
339
+
let end = range
340
340
+
.end_bound()
341
341
+
.cloned()
342
342
+
.map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() });
343
343
+
let limit = match range.end_bound().cloned() {
344
344
+
Bound::Included(end) => end,
345
345
+
Bound::Excluded(end) => end.saturating_sub(1),
346
346
+
Bound::Unbounded => u64::MAX,
347
347
+
};
348
348
+
349
349
+
self.maybe_run_in_nsid_tree(nsid, move |handle| -> BoxedIter<AppResult<Item>> {
350
350
+
let map_block = move |(key, val)| {
351
351
+
let mut key_reader = Cursor::new(key);
352
352
+
let start_timestamp = key_reader.read_varint::<u64>()?;
353
353
+
let items =
354
354
+
ItemDecoder::new(Cursor::new(val), start_timestamp)?.take_while(move |item| {
355
355
+
item.as_ref().map_or(true, |item| item.timestamp <= limit)
356
356
+
});
357
357
+
Ok(items)
358
358
+
};
359
359
+
360
360
+
Box::new(
361
361
+
handle
362
362
+
.tree
363
363
+
.range(TimestampRange { start, end })
364
364
+
.map(move |res| res.map_err(AppError::from).and_then(map_block))
365
365
+
.flatten()
366
366
+
.flatten(),
367
367
+
)
368
368
+
})
369
369
+
.unwrap_or_else(|| Box::new(std::iter::empty()))
370
370
+
}
371
371
+
372
372
+
pub fn tracking_since(&self) -> AppResult<u64> {
373
373
+
// HACK: we should actually store when we started tracking but im lazy
374
374
+
// should be accurate enough
375
375
+
self.maybe_run_in_nsid_tree("app.bsky.feed.like", |handle| {
376
376
+
let Some((timestamps_raw, _)) = handle.tree.first_key_value()? else {
377
377
+
return Ok(0);
378
378
+
};
379
379
+
let mut timestamp_reader = Cursor::new(timestamps_raw);
380
380
+
timestamp_reader
381
381
+
.read_varint::<u64>()
382
382
+
.map_err(AppError::from)
383
383
+
})
384
384
+
.unwrap_or(Ok(0))
385
385
+
}
386
386
+
}
387
387
+
388
388
+
type TimestampRepr = Vec<u8>;
389
389
+
390
390
+
struct TimestampRange {
391
391
+
start: Bound<TimestampRepr>,
392
392
+
end: Bound<TimestampRepr>,
393
393
+
}
394
394
+
395
395
+
impl RangeBounds<TimestampRepr> for TimestampRange {
396
396
+
#[inline(always)]
397
397
+
fn start_bound(&self) -> Bound<&TimestampRepr> {
398
398
+
self.start.as_ref()
399
399
+
}
400
400
+
401
401
+
#[inline(always)]
402
402
+
fn end_bound(&self) -> Bound<&TimestampRepr> {
403
403
+
self.end.as_ref()
404
404
+
}
405
405
+
}
406
406
+
407
407
+
type TimestampReprOld = [u8; 8];
408
408
+
409
409
+
struct TimestampRangeOld {
410
410
+
start: Bound<TimestampReprOld>,
411
411
+
end: Bound<TimestampReprOld>,
412
412
+
}
413
413
+
414
414
+
impl RangeBounds<TimestampReprOld> for TimestampRangeOld {
415
415
+
#[inline(always)]
416
416
+
fn start_bound(&self) -> Bound<&TimestampReprOld> {
417
417
+
self.start.as_ref()
418
418
+
}
419
419
+
420
420
+
#[inline(always)]
421
421
+
fn end_bound(&self) -> Bound<&TimestampReprOld> {
422
422
+
self.end.as_ref()
423
423
+
}
424
424
+
}
+4
-7
server/src/main.rs
···
17
17
18
18
mod api;
19
19
mod db;
20
20
+
mod db_old;
20
21
mod error;
21
22
mod jetstream;
22
23
mod utils;
···
259
260
260
261
fn migrate() {
261
262
let cancel_token = CancellationToken::new();
262
262
-
let from = Arc::new(
263
263
-
Db::new(
264
264
-
DbConfig::default().path(".fjall_data_from"),
265
265
-
cancel_token.child_token(),
266
266
-
)
267
267
-
.expect("couldnt create db"),
268
268
-
);
263
263
+
264
264
+
let from = Arc::new(db_old::Db::new(".fjall_data_from").expect("couldnt create db"));
265
265
+
269
266
let to = Arc::new(
270
267
Db::new(
271
268
DbConfig::default().path(".fjall_data_to").ks(|c| {