tangled
alpha
login
or
join now
bad-example.com
/
microcosm-links
7
fork
atom
APIs for links and references in the ATmosphere
7
fork
atom
overview
issues
pulls
pipelines
forgot i could run fmt without clippy
anyway, fmt
bad-example.com
11 months ago
b8d4dc93
16adc37c
+120
-108
9 changed files
expand all
collapse all
unified
split
ufos
src
consumer.rs
db_types.rs
error.rs
lib.rs
main.rs
server.rs
storage.rs
storage_fjall.rs
store_types.rs
+12
-12
ufos/src/consumer.rs
···
8
use std::time::Duration;
9
use tokio::sync::mpsc::{channel, Receiver, Sender};
10
11
-
use crate::{DeleteAccount, EventBatch, UFOsCommit};
12
use crate::error::FirehoseEventError;
0
13
14
const MAX_BATCHED_RECORDS: usize = 128; // *non-blocking* limit. drops oldest batched record per collection once reached.
15
const MAX_ACCOUNT_REMOVES: usize = 1024; // hard limit, extremely unlikely to reach, but just in case
···
94
95
match event.kind {
96
EventKind::Commit => {
97
-
let commit = event.commit.ok_or(FirehoseEventError::CommitEventMissingCommit)?;
0
0
98
let (commit, nsid) = UFOsCommit::from_commit_info(commit, event.did, event.cursor)?;
99
self.handle_commit(commit, nsid).await?;
100
}
101
EventKind::Account => {
102
-
let account = event.account.ok_or(FirehoseEventError::AccountEventMissingAccount)?;
0
0
103
if !account.active {
104
self.handle_delete_account(event.did, event.cursor).await?;
105
}
···
109
110
// if the queue is empty and we have enough, send immediately. otherewise, let the current batch fill up.
111
if let Some(earliest) = &self.current_batch.initial_cursor {
112
-
if event.cursor.duration_since(earliest)?.as_secs_f64() > MIN_BATCH_SPAN_SECS &&
113
-
self.batch_sender.capacity() == BATCH_QUEUE_SIZE {
0
114
log::info!("queue empty: immediately sending batch.");
115
self.send_current_batch_now().await?;
116
}
···
119
}
120
121
async fn handle_commit(&mut self, commit: UFOsCommit, nsid: Nsid) -> anyhow::Result<()> {
122
-
if !self
123
-
.current_batch
124
-
.batch
125
-
.commits_by_nsid
126
-
.contains_key(&nsid)
127
&& self.current_batch.batch.commits_by_nsid.len() >= MAX_BATCHED_COLLECTIONS
128
{
129
self.send_current_batch_now().await?;
130
}
131
132
-
self
133
-
.current_batch
134
.batch
135
.commits_by_nsid
136
.entry(nsid)
···
8
use std::time::Duration;
9
use tokio::sync::mpsc::{channel, Receiver, Sender};
10
0
11
use crate::error::FirehoseEventError;
12
+
use crate::{DeleteAccount, EventBatch, UFOsCommit};
13
14
const MAX_BATCHED_RECORDS: usize = 128; // *non-blocking* limit. drops oldest batched record per collection once reached.
15
const MAX_ACCOUNT_REMOVES: usize = 1024; // hard limit, extremely unlikely to reach, but just in case
···
94
95
match event.kind {
96
EventKind::Commit => {
97
+
let commit = event
98
+
.commit
99
+
.ok_or(FirehoseEventError::CommitEventMissingCommit)?;
100
let (commit, nsid) = UFOsCommit::from_commit_info(commit, event.did, event.cursor)?;
101
self.handle_commit(commit, nsid).await?;
102
}
103
EventKind::Account => {
104
+
let account = event
105
+
.account
106
+
.ok_or(FirehoseEventError::AccountEventMissingAccount)?;
107
if !account.active {
108
self.handle_delete_account(event.did, event.cursor).await?;
109
}
···
113
114
// if the queue is empty and we have enough, send immediately. otherewise, let the current batch fill up.
115
if let Some(earliest) = &self.current_batch.initial_cursor {
116
+
if event.cursor.duration_since(earliest)?.as_secs_f64() > MIN_BATCH_SPAN_SECS
117
+
&& self.batch_sender.capacity() == BATCH_QUEUE_SIZE
118
+
{
119
log::info!("queue empty: immediately sending batch.");
120
self.send_current_batch_now().await?;
121
}
···
124
}
125
126
async fn handle_commit(&mut self, commit: UFOsCommit, nsid: Nsid) -> anyhow::Result<()> {
127
+
if !self.current_batch.batch.commits_by_nsid.contains_key(&nsid)
0
0
0
0
128
&& self.current_batch.batch.commits_by_nsid.len() >= MAX_BATCHED_COLLECTIONS
129
{
130
self.send_current_batch_now().await?;
131
}
132
133
+
self.current_batch
0
134
.batch
135
.commits_by_nsid
136
.entry(nsid)
+1
-1
ufos/src/db_types.rs
···
43
#[error("expected exclusive bound from lsm_tree (likely bug)")]
44
BadRangeBound,
45
#[error("expected an hourly-truncated u64, found remainder: {0}")]
46
-
InvalidHourlyTruncated(u64)
47
}
48
49
fn bincode_conf() -> impl Config {
···
43
#[error("expected exclusive bound from lsm_tree (likely bug)")]
44
BadRangeBound,
45
#[error("expected an hourly-truncated u64, found remainder: {0}")]
46
+
InvalidHourlyTruncated(u64),
47
}
48
49
fn bincode_conf() -> impl Config {
+1
-1
ufos/src/error.rs
···
1
-
use thiserror::Error;
2
use crate::db_types::EncodingError;
0
3
4
#[derive(Debug, Error)]
5
pub enum FirehoseEventError {
···
0
1
use crate::db_types::EncodingError;
2
+
use thiserror::Error;
3
4
#[derive(Debug, Error)]
5
pub enum FirehoseEventError {
+10
-7
ufos/src/lib.rs
···
6
pub mod storage_fjall;
7
pub mod store_types;
8
9
-
use jetstream::events::{Cursor, CommitEvent, CommitOp};
10
-
use jetstream::exports::{Did, Nsid, RecordKey};
11
-
use std::collections::{HashMap, VecDeque};
12
-
use serde_json::value::RawValue;
13
use cardinality_estimator::CardinalityEstimator;
14
use error::FirehoseEventError;
0
0
0
0
15
16
#[derive(Debug, Default, Clone)]
17
pub struct CollectionCommits {
···
42
}
43
44
#[derive(Debug, Clone)]
45
-
pub struct PutAction { record: Box<RawValue>, is_update: bool }
0
0
0
46
47
#[derive(Debug, Clone)]
48
pub struct UFOsCommit {
···
57
pub fn from_commit_info(
58
commit: CommitEvent,
59
did: Did,
60
-
cursor: Cursor
61
) -> Result<(Self, Nsid), FirehoseEventError> {
62
let action = match commit.operation {
63
CommitOp::Delete => CommitAction::Cut,
64
cru @ _ => CommitAction::Put(PutAction {
65
record: commit.record.ok_or(FirehoseEventError::CruMissingRecord)?,
66
is_update: cru == CommitOp::Update,
67
-
})
68
};
69
let batched = Self {
70
cursor,
···
6
pub mod storage_fjall;
7
pub mod store_types;
8
0
0
0
0
9
use cardinality_estimator::CardinalityEstimator;
10
use error::FirehoseEventError;
11
+
use jetstream::events::{CommitEvent, CommitOp, Cursor};
12
+
use jetstream::exports::{Did, Nsid, RecordKey};
13
+
use serde_json::value::RawValue;
14
+
use std::collections::{HashMap, VecDeque};
15
16
#[derive(Debug, Default, Clone)]
17
pub struct CollectionCommits {
···
42
}
43
44
#[derive(Debug, Clone)]
45
+
pub struct PutAction {
46
+
record: Box<RawValue>,
47
+
is_update: bool,
48
+
}
49
50
#[derive(Debug, Clone)]
51
pub struct UFOsCommit {
···
60
pub fn from_commit_info(
61
commit: CommitEvent,
62
did: Did,
63
+
cursor: Cursor,
64
) -> Result<(Self, Nsid), FirehoseEventError> {
65
let action = match commit.operation {
66
CommitOp::Delete => CommitAction::Cut,
67
cru @ _ => CommitAction::Put(PutAction {
68
record: commit.record.ok_or(FirehoseEventError::CruMissingRecord)?,
69
is_update: cru == CommitOp::Update,
70
+
}),
71
};
72
let batched = Self {
73
cursor,
+8
-3
ufos/src/main.rs
···
46
47
let args = Args::parse();
48
let jetstream = args.jetstream.clone();
49
-
let (_read_store, mut write_store, cursor) =
50
-
FjallStorage::init(args.data, jetstream, args.jetstream_force, Default::default())?;
0
0
0
0
51
52
// println!("starting server with storage...");
53
// let serving = server::serve(storage.clone());
···
73
write_store.step_rollup()?;
74
}
75
Ok::<(), StorageError>(())
76
-
}).await??;
0
77
78
// let r = storage.receive(batches).await;
79
log::warn!("storage.receive ended with");
···
46
47
let args = Args::parse();
48
let jetstream = args.jetstream.clone();
49
+
let (_read_store, mut write_store, cursor) = FjallStorage::init(
50
+
args.data,
51
+
jetstream,
52
+
args.jetstream_force,
53
+
Default::default(),
54
+
)?;
55
56
// println!("starting server with storage...");
57
// let serving = server::serve(storage.clone());
···
77
write_store.step_rollup()?;
78
}
79
Ok::<(), StorageError>(())
80
+
})
81
+
.await??;
82
83
// let r = storage.receive(batches).await;
84
log::warn!("storage.receive ended with");
+1
-1
ufos/src/server.rs
···
1
use crate::storage_fjall::{Storage, StorageInfo};
2
-
use crate::{Nsid};
3
use dropshot::endpoint;
4
use dropshot::ApiDescription;
5
use dropshot::ConfigDropshot;
···
1
use crate::storage_fjall::{Storage, StorageInfo};
2
+
use crate::Nsid;
3
use dropshot::endpoint;
4
use dropshot::ApiDescription;
5
use dropshot::ConfigDropshot;
+7
-4
ufos/src/storage.rs
···
1
-
use std::path::Path;
2
use jetstream::exports::Nsid;
3
-
use crate::{error::StorageError, Cursor, EventBatch};
4
5
-
pub trait StorageWhatever<R: StoreReader, W: StoreWriter, C> { // TODO: extract this
0
6
fn init(
7
path: impl AsRef<Path>,
8
endpoint: String,
9
force_endpoint: bool,
10
config: C,
11
-
) -> Result<(R, W, Option<Cursor>), StorageError> where Self: Sized;
0
0
12
}
13
14
pub trait StoreWriter {
···
1
+
use crate::{error::StorageError, Cursor, EventBatch};
2
use jetstream::exports::Nsid;
3
+
use std::path::Path;
4
5
+
pub trait StorageWhatever<R: StoreReader, W: StoreWriter, C> {
6
+
// TODO: extract this
7
fn init(
8
path: impl AsRef<Path>,
9
endpoint: String,
10
force_endpoint: bool,
11
config: C,
12
+
) -> Result<(R, W, Option<Cursor>), StorageError>
13
+
where
14
+
Self: Sized;
15
}
16
17
pub trait StoreWriter {
+65
-65
ufos/src/storage_fjall.rs
···
0
0
1
use crate::storage::{StorageWhatever, StoreReader, StoreWriter};
2
-
use crate::db_types::{db_complete, DbBytes, DbStaticStr, EncodingError, StaticStr};
3
use crate::store_types::{
4
ByCollectionKey, ByCollectionValue, ByCursorSeenKey, ByCursorSeenValue, ByIdKey, ByIdValue,
5
-
JetstreamCursorKey, JetstreamCursorValue, JetstreamEndpointKey, JetstreamEndpointValue,
6
-
ModCursorKey, ModCursorValue, ModQueueItemKey, ModQueueItemStringValue, ModQueueItemValue,
7
-
RollupCursorKey, RollupCursorValue, SeenCounter,
8
-
NsidRecordFeedKey, NsidRecordFeedVal, RecordLocationKey, RecordLocationVal,
9
-
LiveRecordsKey, LiveRecordsValue, LiveDidsKey, LiveDidsValue,
10
-
DeleteAccountQueueKey, DeleteAccountQueueVal,
11
-
NewRollupCursorKey, NewRollupCursorValue,
12
-
TakeoffKey, TakeoffValue,
13
};
14
-
use crate::{
15
-
DeleteAccount, Did, EventBatch, Nsid, RecordKey, CommitAction,
16
-
};
17
-
use crate::error::StorageError;
18
use fjall::{
19
Batch as FjallBatch, CompressionType, Config, Keyspace, PartitionCreateOptions, PartitionHandle,
20
};
···
41
struct Db {
42
keyspace: Keyspace,
43
global: PartitionHandle,
44
-
45
}
46
47
/**
···
153
let js_cursor = get_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)?;
154
155
if js_cursor.is_some() {
156
-
let stored_endpoint = get_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)?;
0
157
158
-
let JetstreamEndpointValue(stored) = stored_endpoint
159
-
.ok_or(StorageError::InitError("found cursor but missing js_endpoint, refusing to start.".to_string()))?;
0
160
161
if stored != endpoint {
162
if force_endpoint {
···
175
&global,
176
JetstreamEndpointValue(endpoint.to_string()),
177
)?;
178
-
insert_static_neu::<TakeoffKey>(
179
-
&global,
180
-
Cursor::at(SystemTime::now()),
181
-
)?;
182
-
insert_static_neu::<NewRollupCursorKey>(
183
-
&global,
184
-
Cursor::from_start(),
185
-
)?;
186
}
187
188
let reader = FjallReader {
···
191
records: records.clone(),
192
rollups: rollups.clone(),
193
};
194
-
let writer = FjallWriter { keyspace, global, feeds, records, rollups, queues };
0
0
0
0
0
0
0
195
Ok((reader, writer, js_cursor))
196
}
197
}
···
205
}
206
207
impl StoreReader for FjallReader {
208
-
fn get_total_by_collection(&self, collection: &jetstream::exports::Nsid) -> Result<u64, StorageError> {
0
0
0
209
// TODO: start from rollup
210
let full_range = LiveRecordsKey::range_from_cursor(Cursor::from_start())?;
211
let mut total = 0;
···
234
pub fn step_rollup(&mut self) -> Result<(), StorageError> {
235
// let mut batch = self.keyspace.batch();
236
237
-
let rollup_cursor = get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?
238
-
.ok_or(StorageError::BadStateError("Could not find current rollup cursor".to_string()))?;
0
0
239
240
// timelies
241
let live_records_range = LiveRecordsKey::range_from_cursor(rollup_cursor)?;
···
245
let next_timely = timely_iter
246
.next()
247
.transpose()?
248
-
.map(|(key_bytes, val_bytes)|
249
-
db_complete::<LiveRecordsKey>(&key_bytes)
250
-
.map(|k| (k, val_bytes)))
251
.transpose()?;
252
253
// delete accounts
254
-
let delete_accounts_range = DeleteAccountQueueKey::new(rollup_cursor).range_to_prefix_end()?;
0
255
256
-
let next_delete = self.queues.range(delete_accounts_range)
0
0
257
.next()
258
.transpose()?
259
-
.map(|(key_bytes, val_bytes)|
260
-
db_complete::<DeleteAccountQueueKey>(&key_bytes)
261
-
.map(|k| (k.suffix, val_bytes)))
262
.transpose()?;
263
264
match (next_timely, next_delete) {
···
288
impl StoreWriter for FjallWriter {
289
fn insert_batch(&mut self, event_batch: EventBatch) -> Result<(), StorageError> {
290
if event_batch.is_empty() {
291
-
return Ok(())
292
}
293
294
let mut batch = self.keyspace.batch();
···
306
}
307
CommitAction::Put(put_action) => {
308
let feed_key = NsidRecordFeedKey::from_pair(nsid.clone(), commit.cursor);
309
-
let feed_val: NsidRecordFeedVal = (&commit.did, &commit.rkey, commit.rev.as_str()).into();
0
310
batch.insert(
311
&self.feeds,
312
feed_key.to_db_bytes()?,
313
feed_val.to_db_bytes()?,
314
);
315
316
-
let location_val: RecordLocationVal = (commit.cursor, commit.rev.as_str(), put_action).into();
0
317
batch.insert(
318
&self.records,
319
&location_key.to_db_bytes()?,
···
360
}
361
}
362
363
-
364
#[derive(Clone)]
365
pub struct Storage {
366
/// horrible: gate all db access behind this to force global serialization to avoid deadlock
···
375
PartitionCreateOptions::default().compression(CompressionType::None),
376
)?;
377
Ok(Self {
378
-
db: Db {
379
-
keyspace,
380
-
global,
381
-
},
382
})
383
}
384
···
681
}
682
683
/// Get a value from a fixed key
684
-
fn get_static_neu<K: StaticStr, V: DbBytes>(global: &PartitionHandle) -> Result<Option<V>, StorageError> {
0
0
685
let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
686
let value = global
687
.get(&key_bytes)?
···
736
}
737
738
/// Get stats that haven't been rolled up yet
739
-
fn get_unrolled_collection_seen(
740
-
global: &PartitionHandle,
741
-
collection: Nsid,
742
-
) -> anyhow::Result<u64> {
743
let range =
744
if let Some(cursor_value) = get_static::<RollupCursorKey, RollupCursorValue>(global)? {
745
eprintln!("found existing cursor");
···
773
Ok(collection_total)
774
}
775
776
-
fn get_unrolled_top_collections(
777
-
global: &PartitionHandle,
778
-
) -> anyhow::Result<HashMap<String, u64>> {
779
let range =
780
if let Some(cursor_value) = get_static::<RollupCursorKey, RollupCursorValue>(global)? {
781
eprintln!("found existing cursor");
···
905
906
log::trace!("delete_record: iterate over up to current cursor...");
907
908
-
for (i, pair) in self
909
-
.global
910
-
.range(key_prefix_bytes..key_limit)
911
-
.enumerate()
912
-
{
913
log::trace!("delete_record iter {i}: found");
914
// find all (hopefully 1)
915
let (key_bytes, _) = pair?;
···
1123
)
1124
}
1125
1126
-
1127
-
1128
#[cfg(test)]
1129
mod tests {
0
0
0
1130
use jetstream::exports::Cid;
1131
-
use jetstream::events::{CommitEvent, CommitOp};
1132
use serde_json::value::RawValue;
1133
-
use crate::{UFOsCommit, CollectionCommits};
1134
-
use super::*;
1135
1136
#[test]
1137
fn test_hello() -> anyhow::Result<()> {
···
1166
rev: "asdf".to_string(),
1167
operation: CommitOp::Create,
1168
record: Some(*Box::new(RawValue::from_string("{}".to_string()).unwrap())),
1169
-
cid: Some("bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy".parse().unwrap()),
0
0
0
0
1170
};
1171
-
let (commit, collection) = UFOsCommit::from_commit_info(event, did.clone(), Cursor::from_raw_u64(100))?;
0
1172
1173
let mut commits = CollectionCommits::default();
1174
commits.total_seen += 1;
···
1
+
use crate::db_types::{db_complete, DbBytes, DbStaticStr, EncodingError, StaticStr};
2
+
use crate::error::StorageError;
3
use crate::storage::{StorageWhatever, StoreReader, StoreWriter};
0
4
use crate::store_types::{
5
ByCollectionKey, ByCollectionValue, ByCursorSeenKey, ByCursorSeenValue, ByIdKey, ByIdValue,
6
+
DeleteAccountQueueKey, DeleteAccountQueueVal, JetstreamCursorKey, JetstreamCursorValue,
7
+
JetstreamEndpointKey, JetstreamEndpointValue, LiveDidsKey, LiveDidsValue, LiveRecordsKey,
8
+
LiveRecordsValue, ModCursorKey, ModCursorValue, ModQueueItemKey, ModQueueItemStringValue,
9
+
ModQueueItemValue, NewRollupCursorKey, NewRollupCursorValue, NsidRecordFeedKey,
10
+
NsidRecordFeedVal, RecordLocationKey, RecordLocationVal, RollupCursorKey, RollupCursorValue,
11
+
SeenCounter, TakeoffKey, TakeoffValue,
0
0
12
};
13
+
use crate::{CommitAction, DeleteAccount, Did, EventBatch, Nsid, RecordKey};
0
0
0
14
use fjall::{
15
Batch as FjallBatch, CompressionType, Config, Keyspace, PartitionCreateOptions, PartitionHandle,
16
};
···
37
struct Db {
38
keyspace: Keyspace,
39
global: PartitionHandle,
0
40
}
41
42
/**
···
148
let js_cursor = get_static_neu::<JetstreamCursorKey, JetstreamCursorValue>(&global)?;
149
150
if js_cursor.is_some() {
151
+
let stored_endpoint =
152
+
get_static_neu::<JetstreamEndpointKey, JetstreamEndpointValue>(&global)?;
153
154
+
let JetstreamEndpointValue(stored) = stored_endpoint.ok_or(StorageError::InitError(
155
+
"found cursor but missing js_endpoint, refusing to start.".to_string(),
156
+
))?;
157
158
if stored != endpoint {
159
if force_endpoint {
···
172
&global,
173
JetstreamEndpointValue(endpoint.to_string()),
174
)?;
175
+
insert_static_neu::<TakeoffKey>(&global, Cursor::at(SystemTime::now()))?;
176
+
insert_static_neu::<NewRollupCursorKey>(&global, Cursor::from_start())?;
0
0
0
0
0
0
177
}
178
179
let reader = FjallReader {
···
182
records: records.clone(),
183
rollups: rollups.clone(),
184
};
185
+
let writer = FjallWriter {
186
+
keyspace,
187
+
global,
188
+
feeds,
189
+
records,
190
+
rollups,
191
+
queues,
192
+
};
193
Ok((reader, writer, js_cursor))
194
}
195
}
···
203
}
204
205
impl StoreReader for FjallReader {
206
+
fn get_total_by_collection(
207
+
&self,
208
+
collection: &jetstream::exports::Nsid,
209
+
) -> Result<u64, StorageError> {
210
// TODO: start from rollup
211
let full_range = LiveRecordsKey::range_from_cursor(Cursor::from_start())?;
212
let mut total = 0;
···
235
pub fn step_rollup(&mut self) -> Result<(), StorageError> {
236
// let mut batch = self.keyspace.batch();
237
238
+
let rollup_cursor =
239
+
get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)?.ok_or(
240
+
StorageError::BadStateError("Could not find current rollup cursor".to_string()),
241
+
)?;
242
243
// timelies
244
let live_records_range = LiveRecordsKey::range_from_cursor(rollup_cursor)?;
···
248
let next_timely = timely_iter
249
.next()
250
.transpose()?
251
+
.map(|(key_bytes, val_bytes)| {
252
+
db_complete::<LiveRecordsKey>(&key_bytes).map(|k| (k, val_bytes))
253
+
})
254
.transpose()?;
255
256
// delete accounts
257
+
let delete_accounts_range =
258
+
DeleteAccountQueueKey::new(rollup_cursor).range_to_prefix_end()?;
259
260
+
let next_delete = self
261
+
.queues
262
+
.range(delete_accounts_range)
263
.next()
264
.transpose()?
265
+
.map(|(key_bytes, val_bytes)| {
266
+
db_complete::<DeleteAccountQueueKey>(&key_bytes).map(|k| (k.suffix, val_bytes))
267
+
})
268
.transpose()?;
269
270
match (next_timely, next_delete) {
···
294
impl StoreWriter for FjallWriter {
295
fn insert_batch(&mut self, event_batch: EventBatch) -> Result<(), StorageError> {
296
if event_batch.is_empty() {
297
+
return Ok(());
298
}
299
300
let mut batch = self.keyspace.batch();
···
312
}
313
CommitAction::Put(put_action) => {
314
let feed_key = NsidRecordFeedKey::from_pair(nsid.clone(), commit.cursor);
315
+
let feed_val: NsidRecordFeedVal =
316
+
(&commit.did, &commit.rkey, commit.rev.as_str()).into();
317
batch.insert(
318
&self.feeds,
319
feed_key.to_db_bytes()?,
320
feed_val.to_db_bytes()?,
321
);
322
323
+
let location_val: RecordLocationVal =
324
+
(commit.cursor, commit.rev.as_str(), put_action).into();
325
batch.insert(
326
&self.records,
327
&location_key.to_db_bytes()?,
···
368
}
369
}
370
0
371
#[derive(Clone)]
372
pub struct Storage {
373
/// horrible: gate all db access behind this to force global serialization to avoid deadlock
···
382
PartitionCreateOptions::default().compression(CompressionType::None),
383
)?;
384
Ok(Self {
385
+
db: Db { keyspace, global },
0
0
0
386
})
387
}
388
···
685
}
686
687
/// Get a value from a fixed key
688
+
fn get_static_neu<K: StaticStr, V: DbBytes>(
689
+
global: &PartitionHandle,
690
+
) -> Result<Option<V>, StorageError> {
691
let key_bytes = DbStaticStr::<K>::default().to_db_bytes()?;
692
let value = global
693
.get(&key_bytes)?
···
742
}
743
744
/// Get stats that haven't been rolled up yet
745
+
fn get_unrolled_collection_seen(global: &PartitionHandle, collection: Nsid) -> anyhow::Result<u64> {
0
0
0
746
let range =
747
if let Some(cursor_value) = get_static::<RollupCursorKey, RollupCursorValue>(global)? {
748
eprintln!("found existing cursor");
···
776
Ok(collection_total)
777
}
778
779
+
fn get_unrolled_top_collections(global: &PartitionHandle) -> anyhow::Result<HashMap<String, u64>> {
0
0
780
let range =
781
if let Some(cursor_value) = get_static::<RollupCursorKey, RollupCursorValue>(global)? {
782
eprintln!("found existing cursor");
···
906
907
log::trace!("delete_record: iterate over up to current cursor...");
908
909
+
for (i, pair) in self.global.range(key_prefix_bytes..key_limit).enumerate() {
0
0
0
0
910
log::trace!("delete_record iter {i}: found");
911
// find all (hopefully 1)
912
let (key_bytes, _) = pair?;
···
1120
)
1121
}
1122
0
0
1123
#[cfg(test)]
1124
mod tests {
1125
+
use super::*;
1126
+
use crate::{CollectionCommits, UFOsCommit};
1127
+
use jetstream::events::{CommitEvent, CommitOp};
1128
use jetstream::exports::Cid;
0
1129
use serde_json::value::RawValue;
0
0
1130
1131
#[test]
1132
fn test_hello() -> anyhow::Result<()> {
···
1161
rev: "asdf".to_string(),
1162
operation: CommitOp::Create,
1163
record: Some(*Box::new(RawValue::from_string("{}".to_string()).unwrap())),
1164
+
cid: Some(
1165
+
"bafyreidofvwoqvd2cnzbun6dkzgfucxh57tirf3ohhde7lsvh4fu3jehgy"
1166
+
.parse()
1167
+
.unwrap(),
1168
+
),
1169
};
1170
+
let (commit, collection) =
1171
+
UFOsCommit::from_commit_info(event, did.clone(), Cursor::from_raw_u64(100))?;
1172
1173
let mut commits = CollectionCommits::default();
1174
commits.total_seen += 1;
+15
-14
ufos/src/store_types.rs
···
1
-
use cardinality_estimator::CardinalityEstimator;
2
use crate::db_types::{
3
-
DbBytes, DbConcat, DbEmpty, DbStaticStr, EncodingError, StaticStr, UseBincodePlz, SerdeBytes,
4
};
5
-
use crate::{Cursor, Did, Nsid, RecordKey, UFOsCommit, PutAction};
6
use bincode::{Decode, Encode};
0
7
use std::ops::Range;
8
9
/// key format: ["js_cursor"]
···
37
/// value format: [rollup_cursor(Cursor)|collection(Nsid)]
38
pub type RollupCursorValue = DbConcat<Cursor, Nsid>;
39
40
-
41
/// key format: ["rollup_cursor"]
42
#[derive(Debug, PartialEq)]
43
pub struct NewRollupCursorKey {}
···
49
// pub type NewRollupCursorKey = DbStaticStr<_NewRollupCursorKey>;
50
/// value format: [rollup_cursor(Cursor)|collection(Nsid)]
51
pub type NewRollupCursorValue = Cursor;
52
-
53
54
/// key format: ["js_endpoint"]
55
#[derive(Debug, PartialEq)]
···
61
}
62
pub type TakeoffValue = Cursor;
63
64
-
65
/// key format: ["js_endpoint"]
66
#[derive(Debug, PartialEq)]
67
pub struct JetstreamEndpointKey {}
···
92
fn from((did, rkey, rev): (&Did, &RecordKey, &str)) -> Self {
93
Self::from_pair(
94
did.clone(),
95
-
DbConcat::from_pair(rkey.clone(), rev.to_string()))
0
96
}
97
}
98
99
pub type RecordLocationKey = DbConcat<Did, DbConcat<Nsid, RecordKey>>;
100
impl From<(&UFOsCommit, &Nsid)> for RecordLocationKey {
101
fn from((commit, collection): (&UFOsCommit, &Nsid)) -> Self {
102
-
Self::from_pair(commit.did.clone(), DbConcat::from_pair(collection.clone(), commit.rkey.clone()))
0
0
0
103
}
104
}
105
#[derive(Debug, PartialEq, Encode, Decode)]
···
193
}
194
}
195
#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
196
-
pub struct LiveDidsValue(pub CardinalityEstimator::<Did>);
197
impl SerdeBytes for LiveDidsValue {}
198
impl DbBytes for LiveDidsValue {
199
fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> {
···
219
}
220
}
221
pub type DeleteAccountQueueVal = Did;
222
-
223
224
#[derive(Debug, Clone, Encode, Decode)]
225
pub struct SeenCounter(pub u64);
···
474
}
475
}
476
477
-
478
const HOUR_IN_MICROS: u64 = 1_000_000 * 3600;
479
#[derive(Debug, Copy, Clone, PartialEq, PartialOrd)]
480
pub struct HourTrucatedCursor(u64);
···
487
pub fn try_from_raw_u64(time_us: u64) -> Result<Self, EncodingError> {
488
let rem = time_us % HOUR_IN_MICROS;
489
if rem != 0 {
490
-
return Err(EncodingError::InvalidHourlyTruncated(rem))
491
}
492
Ok(Self(time_us))
493
}
···
503
}
504
}
505
506
-
507
#[cfg(test)]
508
mod test {
509
-
use super::{ByCollectionKey, ByCollectionValue, Cursor, Did, EncodingError, Nsid, RecordKey, HourTrucatedCursor, HOUR_IN_MICROS};
0
0
0
510
use crate::db_types::DbBytes;
511
512
#[test]
···
0
1
use crate::db_types::{
2
+
DbBytes, DbConcat, DbEmpty, DbStaticStr, EncodingError, SerdeBytes, StaticStr, UseBincodePlz,
3
};
4
+
use crate::{Cursor, Did, Nsid, PutAction, RecordKey, UFOsCommit};
5
use bincode::{Decode, Encode};
6
+
use cardinality_estimator::CardinalityEstimator;
7
use std::ops::Range;
8
9
/// key format: ["js_cursor"]
···
37
/// value format: [rollup_cursor(Cursor)|collection(Nsid)]
38
pub type RollupCursorValue = DbConcat<Cursor, Nsid>;
39
0
40
/// key format: ["rollup_cursor"]
41
#[derive(Debug, PartialEq)]
42
pub struct NewRollupCursorKey {}
···
48
// pub type NewRollupCursorKey = DbStaticStr<_NewRollupCursorKey>;
49
/// value format: [rollup_cursor(Cursor)|collection(Nsid)]
50
pub type NewRollupCursorValue = Cursor;
0
51
52
/// key format: ["js_endpoint"]
53
#[derive(Debug, PartialEq)]
···
59
}
60
pub type TakeoffValue = Cursor;
61
0
62
/// key format: ["js_endpoint"]
63
#[derive(Debug, PartialEq)]
64
pub struct JetstreamEndpointKey {}
···
89
fn from((did, rkey, rev): (&Did, &RecordKey, &str)) -> Self {
90
Self::from_pair(
91
did.clone(),
92
+
DbConcat::from_pair(rkey.clone(), rev.to_string()),
93
+
)
94
}
95
}
96
97
pub type RecordLocationKey = DbConcat<Did, DbConcat<Nsid, RecordKey>>;
98
impl From<(&UFOsCommit, &Nsid)> for RecordLocationKey {
99
fn from((commit, collection): (&UFOsCommit, &Nsid)) -> Self {
100
+
Self::from_pair(
101
+
commit.did.clone(),
102
+
DbConcat::from_pair(collection.clone(), commit.rkey.clone()),
103
+
)
104
}
105
}
106
#[derive(Debug, PartialEq, Encode, Decode)]
···
194
}
195
}
196
#[derive(Debug, PartialEq, serde::Serialize, serde::Deserialize)]
197
+
pub struct LiveDidsValue(pub CardinalityEstimator<Did>);
198
impl SerdeBytes for LiveDidsValue {}
199
impl DbBytes for LiveDidsValue {
200
fn to_db_bytes(&self) -> Result<Vec<u8>, EncodingError> {
···
220
}
221
}
222
pub type DeleteAccountQueueVal = Did;
0
223
224
#[derive(Debug, Clone, Encode, Decode)]
225
pub struct SeenCounter(pub u64);
···
474
}
475
}
476
0
477
const HOUR_IN_MICROS: u64 = 1_000_000 * 3600;
478
#[derive(Debug, Copy, Clone, PartialEq, PartialOrd)]
479
pub struct HourTrucatedCursor(u64);
···
486
pub fn try_from_raw_u64(time_us: u64) -> Result<Self, EncodingError> {
487
let rem = time_us % HOUR_IN_MICROS;
488
if rem != 0 {
489
+
return Err(EncodingError::InvalidHourlyTruncated(rem));
490
}
491
Ok(Self(time_us))
492
}
···
502
}
503
}
504
0
505
#[cfg(test)]
506
mod test {
507
+
use super::{
508
+
ByCollectionKey, ByCollectionValue, Cursor, Did, EncodingError, HourTrucatedCursor, Nsid,
509
+
RecordKey, HOUR_IN_MICROS,
510
+
};
511
use crate::db_types::DbBytes;
512
513
#[test]