tangled
alpha
login
or
join now
ptr.pet
/
nsid-tracker
3
fork
atom
tracks lexicons and how many times they appeared on the jetstream
3
fork
atom
overview
issues
pulls
pipelines
refactor(server): run sync jobs in threadpool
ptr.pet
7 months ago
5e769184
84d25308
verified
This commit was signed with the committer's
known signature
.
ptr.pet
SSH Key Fingerprint:
SHA256:Abmvag+juovVufZTxyWY8KcVgrznxvBjQpJesv071Aw=
+71
-9
4 changed files
expand all
collapse all
unified
split
server
Cargo.lock
Cargo.toml
src
db
mod.rs
main.rs
+26
server/Cargo.lock
···
643
643
checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5"
644
644
645
645
[[package]]
646
646
+
name = "hermit-abi"
647
647
+
version = "0.5.2"
648
648
+
source = "registry+https://github.com/rust-lang/crates.io-index"
649
649
+
checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c"
650
650
+
651
651
+
[[package]]
646
652
name = "http"
647
653
version = "1.3.1"
648
654
source = "registry+https://github.com/rust-lang/crates.io-index"
···
942
948
dependencies = [
943
949
"overload",
944
950
"winapi",
951
951
+
]
952
952
+
953
953
+
[[package]]
954
954
+
name = "num_cpus"
955
955
+
version = "1.17.0"
956
956
+
source = "registry+https://github.com/rust-lang/crates.io-index"
957
957
+
checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b"
958
958
+
dependencies = [
959
959
+
"hermit-abi",
960
960
+
"libc",
945
961
]
946
962
947
963
[[package]]
···
1478
1494
"serde",
1479
1495
"serde_json",
1480
1496
"smol_str",
1497
1497
+
"threadpool",
1481
1498
"tikv-jemallocator",
1482
1499
"tokio",
1483
1500
"tokio-util",
···
1629
1646
checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185"
1630
1647
dependencies = [
1631
1648
"cfg-if",
1649
1649
+
]
1650
1650
+
1651
1651
+
[[package]]
1652
1652
+
name = "threadpool"
1653
1653
+
version = "1.8.1"
1654
1654
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1655
1655
+
checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
1656
1656
+
dependencies = [
1657
1657
+
"num_cpus",
1632
1658
]
1633
1659
1634
1660
[[package]]
+1
server/Cargo.toml
···
25
25
scc = "2.3.4"
26
26
atomic-time = "0.1.5"
27
27
ordered-varint = "2.0.0"
28
28
+
threadpool = "1.8.1"
28
29
29
30
[target.'cfg(not(target_env = "msvc"))'.dependencies]
30
31
tikv-jemallocator = "0.6"
+39
-7
server/src/db/mod.rs
···
4
4
path::Path,
5
5
sync::{
6
6
Arc,
7
7
-
atomic::{AtomicUsize, Ordering as AtomicOrdering},
7
7
+
atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering},
8
8
},
9
9
time::{Duration, Instant},
10
10
};
···
162
162
pub struct Db {
163
163
inner: Keyspace,
164
164
hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>,
165
165
+
syncpool: threadpool::ThreadPool,
165
166
counts: Partition,
166
167
event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
167
168
eps: Rate,
169
169
+
shutting_down: AtomicBool,
168
170
min_block_size: usize,
169
171
max_block_size: usize,
170
172
max_last_activity: Duration,
···
178
180
.open()?;
179
181
Ok(Self {
180
182
hits: Default::default(),
183
183
+
syncpool: threadpool::Builder::new().num_threads(256).build(),
181
184
counts: ks.open_partition(
182
185
"_counts",
183
186
PartitionCreateOptions::default().compression(fjall::CompressionType::None),
···
185
188
inner: ks,
186
189
event_broadcaster: broadcast::channel(1000).0,
187
190
eps: Rate::new(Duration::from_secs(1)),
191
191
+
shutting_down: AtomicBool::new(false),
188
192
min_block_size: 512,
189
193
max_block_size: 500_000,
190
194
max_last_activity: Duration::from_secs(10),
191
195
})
192
196
}
193
197
198
198
+
pub fn shutdown(&self) -> AppResult<()> {
199
199
+
self.shutting_down.store(true, AtomicOrdering::Release);
200
200
+
self.sync(true)
201
201
+
}
202
202
+
203
203
+
pub fn is_shutting_down(&self) -> bool {
204
204
+
self.shutting_down.load(AtomicOrdering::Acquire)
205
205
+
}
206
206
+
194
207
pub fn sync(&self, all: bool) -> AppResult<()> {
208
208
+
let mut execs = Vec::with_capacity(self.hits.len());
195
209
let _guard = scc::ebr::Guard::new();
196
210
for (nsid, tree) in self.hits.iter(&_guard) {
197
211
let count = tree.item_count();
198
212
let is_max_block_size = count > self.min_block_size.max(tree.suggested_block_size());
199
213
let is_too_old = tree.last_insert().elapsed() > self.max_last_activity;
200
214
if count > 0 && (all || is_max_block_size || is_too_old) {
201
201
-
loop {
202
202
-
let synced = tree.sync(self.max_block_size)?;
203
203
-
if synced == 0 {
204
204
-
break;
215
215
+
let nsid = nsid.clone();
216
216
+
let tree = tree.clone();
217
217
+
let max_block_size = self.max_block_size;
218
218
+
execs.push(move || {
219
219
+
loop {
220
220
+
let synced = match tree.sync(max_block_size) {
221
221
+
Ok(synced) => synced,
222
222
+
Err(err) => {
223
223
+
tracing::error!("failed to sync {nsid}: {err}");
224
224
+
break;
225
225
+
}
226
226
+
};
227
227
+
if synced == 0 {
228
228
+
break;
229
229
+
}
230
230
+
tracing::info!("synced {synced} of {nsid} to db");
205
231
}
206
206
-
tracing::info!("synced {synced} of {nsid} to db");
207
207
-
}
232
232
+
});
208
233
}
209
234
}
235
235
+
drop(_guard);
236
236
+
237
237
+
for exec in execs {
238
238
+
self.syncpool.execute(exec);
239
239
+
}
240
240
+
self.syncpool.join();
241
241
+
210
242
Ok(())
211
243
}
212
244
+5
-2
server/src/main.rs
···
99
99
let db = db.clone();
100
100
move || {
101
101
loop {
102
102
+
if db.is_shutting_down() {
103
103
+
break;
104
104
+
}
102
105
match db.sync(false) {
103
106
Ok(_) => (),
104
107
Err(e) => tracing::error!("failed to sync db: {}", e),
105
108
}
106
106
-
std::thread::sleep(std::time::Duration::from_secs(1));
109
109
+
std::thread::sleep(std::time::Duration::from_secs(10));
107
110
}
108
111
}
109
112
});
···
129
132
}
130
133
131
134
tracing::info!("shutting down...");
132
132
-
db.sync(true).expect("couldnt sync db");
135
135
+
db.shutdown().expect("couldnt shutdown db");
133
136
}
134
137
135
138
fn debug() {