tangled
alpha
login
or
join now
microcosm.blue
/
microcosm-rs
65
fork
atom
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
65
fork
atom
overview
issues
8
pulls
2
pipelines
hide some rocks stuff so memstore-only tests work
bad-example.com
10 months ago
66f83f22
f1c708d9
+240
-238
2 changed files
expand all
collapse all
unified
split
constellation
src
bin
rocks-link-stats.rs
rocks-restore-from-backup.rs
+238
-238
constellation/src/bin/rocks-link-stats.rs
···
1
1
-
use bincode::config::Options;
2
2
-
use clap::Parser;
3
3
-
use serde::Serialize;
4
4
-
use std::collections::HashMap;
5
5
-
use std::path::PathBuf;
1
1
+
// use bincode::config::Options;
2
2
+
// use clap::Parser;
3
3
+
// use serde::Serialize;
4
4
+
// use std::collections::HashMap;
5
5
+
// use std::path::PathBuf;
6
6
7
7
-
use tokio_util::sync::CancellationToken;
7
7
+
// use tokio_util::sync::CancellationToken;
8
8
9
9
-
use constellation::storage::rocks_store::{
10
10
-
Collection, DidId, RKey, RPath, Target, TargetKey, TargetLinkers, _bincode_opts,
11
11
-
};
12
12
-
use constellation::storage::RocksStorage;
13
13
-
use constellation::Did;
9
9
+
// use constellation::storage::rocks_store::{
10
10
+
// Collection, DidId, RKey, RPath, Target, TargetKey, TargetLinkers, _bincode_opts,
11
11
+
// };
12
12
+
// use constellation::storage::RocksStorage;
13
13
+
// use constellation::Did;
14
14
15
15
-
use links::parse_any_link;
16
16
-
use rocksdb::IteratorMode;
17
17
-
use std::time;
15
15
+
// use links::parse_any_link;
16
16
+
// use rocksdb::IteratorMode;
17
17
+
// use std::time;
18
18
19
19
-
/// Aggregate links in the at-mosphere
20
20
-
#[derive(Parser, Debug)]
21
21
-
#[command(version, about, long_about = None)]
22
22
-
struct Args {
23
23
-
/// where is rocksdb's data
24
24
-
#[arg(short, long)]
25
25
-
data: PathBuf,
26
26
-
/// slow down so we don't kill the firehose consumer, if running concurrently
27
27
-
#[arg(short, long)]
28
28
-
limit: Option<u64>,
29
29
-
}
19
19
+
// /// Aggregate links in the at-mosphere
20
20
+
// #[derive(Parser, Debug)]
21
21
+
// #[command(version, about, long_about = None)]
22
22
+
// struct Args {
23
23
+
// /// where is rocksdb's data
24
24
+
// #[arg(short, long)]
25
25
+
// data: PathBuf,
26
26
+
// /// slow down so we don't kill the firehose consumer, if running concurrently
27
27
+
// #[arg(short, long)]
28
28
+
// limit: Option<u64>,
29
29
+
// }
30
30
31
31
-
type LinkType = String;
31
31
+
// type LinkType = String;
32
32
33
33
-
#[derive(Debug, Eq, Hash, PartialEq, Serialize)]
34
34
-
struct SourceLink(Collection, RPath, LinkType, Option<Collection>); // last is target collection, if it's an at-uri link with a collection
33
33
+
// #[derive(Debug, Eq, Hash, PartialEq, Serialize)]
34
34
+
// struct SourceLink(Collection, RPath, LinkType, Option<Collection>); // last is target collection, if it's an at-uri link with a collection
35
35
36
36
-
#[derive(Debug, Serialize)]
37
37
-
struct SourceSample {
38
38
-
did: String,
39
39
-
rkey: String,
40
40
-
}
36
36
+
// #[derive(Debug, Serialize)]
37
37
+
// struct SourceSample {
38
38
+
// did: String,
39
39
+
// rkey: String,
40
40
+
// }
41
41
42
42
-
#[derive(Debug, Default, Serialize)]
43
43
-
struct Bucket {
44
44
-
count: u64,
45
45
-
sum: u64,
46
46
-
sample: Option<SourceSample>,
47
47
-
}
42
42
+
// #[derive(Debug, Default, Serialize)]
43
43
+
// struct Bucket {
44
44
+
// count: u64,
45
45
+
// sum: u64,
46
46
+
// sample: Option<SourceSample>,
47
47
+
// }
48
48
49
49
-
#[derive(Debug, Default, Serialize)]
50
50
-
struct Buckets([Bucket; 23]);
49
49
+
// #[derive(Debug, Default, Serialize)]
50
50
+
// struct Buckets([Bucket; 23]);
51
51
52
52
-
const BUCKETS: [u64; 23] = [
53
53
-
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 16, 32, 64, 128, 256, 512, 1024, 4096, 16_384, 65_535,
54
54
-
262_144, 1_048_576,
55
55
-
];
52
52
+
// const BUCKETS: [u64; 23] = [
53
53
+
// 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 16, 32, 64, 128, 256, 512, 1024, 4096, 16_384, 65_535,
54
54
+
// 262_144, 1_048_576,
55
55
+
// ];
56
56
57
57
-
// b1, b2, b3, b4, b5, b6, b7, b8, b9, b10, b12, b16, b32, b64, b128, b256, b512, b1024, b4096, b16384, b65535, b262144, bmax
57
57
+
// // b1, b2, b3, b4, b5, b6, b7, b8, b9, b10, b12, b16, b32, b64, b128, b256, b512, b1024, b4096, b16384, b65535, b262144, bmax
58
58
59
59
-
static DID_IDS_CF: &str = "did_ids";
60
60
-
static TARGET_IDS_CF: &str = "target_ids";
61
61
-
static TARGET_LINKERS_CF: &str = "target_links";
59
59
+
// static DID_IDS_CF: &str = "did_ids";
60
60
+
// static TARGET_IDS_CF: &str = "target_ids";
61
61
+
// static TARGET_LINKERS_CF: &str = "target_links";
62
62
63
63
-
const REPORT_INTERVAL: usize = 50_000;
63
63
+
// const REPORT_INTERVAL: usize = 50_000;
64
64
65
65
-
type Stats = HashMap<SourceLink, Buckets>;
65
65
+
// type Stats = HashMap<SourceLink, Buckets>;
66
66
67
67
-
#[derive(Debug, Serialize)]
68
68
-
struct Printable {
69
69
-
collection: String,
70
70
-
path: String,
71
71
-
link_type: String,
72
72
-
target_collection: Option<String>,
73
73
-
buckets: Buckets,
74
74
-
}
67
67
+
// #[derive(Debug, Serialize)]
68
68
+
// struct Printable {
69
69
+
// collection: String,
70
70
+
// path: String,
71
71
+
// link_type: String,
72
72
+
// target_collection: Option<String>,
73
73
+
// buckets: Buckets,
74
74
+
// }
75
75
76
76
-
#[derive(Debug, Default)]
77
77
-
struct ErrStats {
78
78
-
failed_to_get_sample: usize,
79
79
-
failed_to_read_target_id: usize,
80
80
-
failed_to_deserialize_target_key: usize,
81
81
-
failed_to_parse_target_as_link: usize,
82
82
-
failed_to_get_links: usize,
83
83
-
failed_to_deserialize_linkers: usize,
84
84
-
}
76
76
+
// #[derive(Debug, Default)]
77
77
+
// struct ErrStats {
78
78
+
// failed_to_get_sample: usize,
79
79
+
// failed_to_read_target_id: usize,
80
80
+
// failed_to_deserialize_target_key: usize,
81
81
+
// failed_to_parse_target_as_link: usize,
82
82
+
// failed_to_get_links: usize,
83
83
+
// failed_to_deserialize_linkers: usize,
84
84
+
// }
85
85
86
86
-
fn thousands(n: usize) -> String {
87
87
-
n.to_string()
88
88
-
.as_bytes()
89
89
-
.rchunks(3)
90
90
-
.rev()
91
91
-
.map(std::str::from_utf8)
92
92
-
.collect::<Result<Vec<&str>, _>>()
93
93
-
.unwrap()
94
94
-
.join(",")
95
95
-
}
86
86
+
// fn thousands(n: usize) -> String {
87
87
+
// n.to_string()
88
88
+
// .as_bytes()
89
89
+
// .rchunks(3)
90
90
+
// .rev()
91
91
+
// .map(std::str::from_utf8)
92
92
+
// .collect::<Result<Vec<&str>, _>>()
93
93
+
// .unwrap()
94
94
+
// .join(",")
95
95
+
// }
96
96
97
97
-
fn main() {
98
98
-
let args = Args::parse();
97
97
+
// fn main() {
98
98
+
// let args = Args::parse();
99
99
100
100
-
let limit = args.limit.map(|amount| {
101
101
-
ratelimit::Ratelimiter::builder(amount, time::Duration::from_secs(1))
102
102
-
.max_tokens(amount)
103
103
-
.initial_available(amount)
104
104
-
.build()
105
105
-
.unwrap()
106
106
-
});
100
100
+
// let limit = args.limit.map(|amount| {
101
101
+
// ratelimit::Ratelimiter::builder(amount, time::Duration::from_secs(1))
102
102
+
// .max_tokens(amount)
103
103
+
// .initial_available(amount)
104
104
+
// .build()
105
105
+
// .unwrap()
106
106
+
// });
107
107
108
108
-
eprintln!("starting rocksdb...");
109
109
-
let rocks = RocksStorage::open_readonly(args.data).unwrap();
110
110
-
eprintln!("rocks ready.");
108
108
+
// eprintln!("starting rocksdb...");
109
109
+
// let rocks = RocksStorage::open_readonly(args.data).unwrap();
110
110
+
// eprintln!("rocks ready.");
111
111
112
112
-
let RocksStorage { ref db, .. } = rocks;
112
112
+
// let RocksStorage { ref db, .. } = rocks;
113
113
114
114
-
let stay_alive = CancellationToken::new();
115
115
-
ctrlc::set_handler({
116
116
-
let mut desperation: u8 = 0;
117
117
-
let stay_alive = stay_alive.clone();
118
118
-
move || match desperation {
119
119
-
0 => {
120
120
-
eprintln!("ok, shutting down...");
121
121
-
stay_alive.cancel();
122
122
-
desperation += 1;
123
123
-
}
124
124
-
1.. => panic!("fine, panicking!"),
125
125
-
}
126
126
-
})
127
127
-
.unwrap();
114
114
+
// let stay_alive = CancellationToken::new();
115
115
+
// ctrlc::set_handler({
116
116
+
// let mut desperation: u8 = 0;
117
117
+
// let stay_alive = stay_alive.clone();
118
118
+
// move || match desperation {
119
119
+
// 0 => {
120
120
+
// eprintln!("ok, shutting down...");
121
121
+
// stay_alive.cancel();
122
122
+
// desperation += 1;
123
123
+
// }
124
124
+
// 1.. => panic!("fine, panicking!"),
125
125
+
// }
126
126
+
// })
127
127
+
// .unwrap();
128
128
129
129
-
let mut stats = Stats::new();
130
130
-
let mut err_stats: ErrStats = Default::default();
129
129
+
// let mut stats = Stats::new();
130
130
+
// let mut err_stats: ErrStats = Default::default();
131
131
132
132
-
let did_ids_cf = db.cf_handle(DID_IDS_CF).unwrap();
133
133
-
let target_id_cf = db.cf_handle(TARGET_IDS_CF).unwrap();
134
134
-
let target_links_cf = db.cf_handle(TARGET_LINKERS_CF).unwrap();
132
132
+
// let did_ids_cf = db.cf_handle(DID_IDS_CF).unwrap();
133
133
+
// let target_id_cf = db.cf_handle(TARGET_IDS_CF).unwrap();
134
134
+
// let target_links_cf = db.cf_handle(TARGET_LINKERS_CF).unwrap();
135
135
136
136
-
let t0 = time::Instant::now();
137
137
-
let mut t_prev = t0;
136
136
+
// let t0 = time::Instant::now();
137
137
+
// let mut t_prev = t0;
138
138
139
139
-
let mut i = 0;
140
140
-
for item in db.iterator_cf(&target_id_cf, IteratorMode::Start) {
141
141
-
if stay_alive.is_cancelled() {
142
142
-
break;
143
143
-
}
139
139
+
// let mut i = 0;
140
140
+
// for item in db.iterator_cf(&target_id_cf, IteratorMode::Start) {
141
141
+
// if stay_alive.is_cancelled() {
142
142
+
// break;
143
143
+
// }
144
144
145
145
-
if let Some(ref limiter) = limit {
146
146
-
if let Err(dur) = limiter.try_wait() {
147
147
-
std::thread::sleep(dur)
148
148
-
}
149
149
-
}
145
145
+
// if let Some(ref limiter) = limit {
146
146
+
// if let Err(dur) = limiter.try_wait() {
147
147
+
// std::thread::sleep(dur)
148
148
+
// }
149
149
+
// }
150
150
151
151
-
if i > 0 && i % REPORT_INTERVAL == 0 {
152
152
-
let now = time::Instant::now();
153
153
-
let rate = (REPORT_INTERVAL as f32) / (now.duration_since(t_prev).as_secs_f32());
154
154
-
eprintln!(
155
155
-
"{i}\t({}k)\t{:.2}\t{rate:.1}/s",
156
156
-
thousands(i / 1000),
157
157
-
t0.elapsed().as_secs_f32()
158
158
-
);
159
159
-
t_prev = now;
160
160
-
}
161
161
-
i += 1;
151
151
+
// if i > 0 && i % REPORT_INTERVAL == 0 {
152
152
+
// let now = time::Instant::now();
153
153
+
// let rate = (REPORT_INTERVAL as f32) / (now.duration_since(t_prev).as_secs_f32());
154
154
+
// eprintln!(
155
155
+
// "{i}\t({}k)\t{:.2}\t{rate:.1}/s",
156
156
+
// thousands(i / 1000),
157
157
+
// t0.elapsed().as_secs_f32()
158
158
+
// );
159
159
+
// t_prev = now;
160
160
+
// }
161
161
+
// i += 1;
162
162
163
163
-
let Ok((target_key, target_id)) = item else {
164
164
-
err_stats.failed_to_read_target_id += 1;
165
165
-
continue;
166
166
-
};
163
163
+
// let Ok((target_key, target_id)) = item else {
164
164
+
// err_stats.failed_to_read_target_id += 1;
165
165
+
// continue;
166
166
+
// };
167
167
168
168
-
let Ok(TargetKey(Target(target), collection, rpath)) =
169
169
-
_bincode_opts().deserialize(&target_key)
170
170
-
else {
171
171
-
err_stats.failed_to_deserialize_target_key += 1;
172
172
-
continue;
173
173
-
};
168
168
+
// let Ok(TargetKey(Target(target), collection, rpath)) =
169
169
+
// _bincode_opts().deserialize(&target_key)
170
170
+
// else {
171
171
+
// err_stats.failed_to_deserialize_target_key += 1;
172
172
+
// continue;
173
173
+
// };
174
174
175
175
-
let source = {
176
176
-
let Some(parsed) = parse_any_link(&target) else {
177
177
-
err_stats.failed_to_parse_target_as_link += 1;
178
178
-
continue;
179
179
-
};
180
180
-
SourceLink(
181
181
-
collection,
182
182
-
rpath,
183
183
-
parsed.name().into(),
184
184
-
parsed.at_uri_collection().map(Collection),
185
185
-
)
186
186
-
};
175
175
+
// let source = {
176
176
+
// let Some(parsed) = parse_any_link(&target) else {
177
177
+
// err_stats.failed_to_parse_target_as_link += 1;
178
178
+
// continue;
179
179
+
// };
180
180
+
// SourceLink(
181
181
+
// collection,
182
182
+
// rpath,
183
183
+
// parsed.name().into(),
184
184
+
// parsed.at_uri_collection().map(Collection),
185
185
+
// )
186
186
+
// };
187
187
188
188
-
let Ok(Some(links_raw)) = db.get_cf(&target_links_cf, &target_id) else {
189
189
-
err_stats.failed_to_get_links += 1;
190
190
-
continue;
191
191
-
};
192
192
-
let Ok(linkers) = _bincode_opts().deserialize::<TargetLinkers>(&links_raw) else {
193
193
-
err_stats.failed_to_deserialize_linkers += 1;
194
194
-
continue;
195
195
-
};
196
196
-
let (n, _) = linkers.count();
188
188
+
// let Ok(Some(links_raw)) = db.get_cf(&target_links_cf, &target_id) else {
189
189
+
// err_stats.failed_to_get_links += 1;
190
190
+
// continue;
191
191
+
// };
192
192
+
// let Ok(linkers) = _bincode_opts().deserialize::<TargetLinkers>(&links_raw) else {
193
193
+
// err_stats.failed_to_deserialize_linkers += 1;
194
194
+
// continue;
195
195
+
// };
196
196
+
// let (n, _) = linkers.count();
197
197
198
198
-
if n == 0 {
199
199
-
continue;
200
200
-
}
198
198
+
// if n == 0 {
199
199
+
// continue;
200
200
+
// }
201
201
202
202
-
let mut bucket = 0;
203
203
-
for edge in BUCKETS {
204
204
-
if n <= edge || bucket == 22 {
205
205
-
break;
206
206
-
}
207
207
-
bucket += 1;
208
208
-
}
202
202
+
// let mut bucket = 0;
203
203
+
// for edge in BUCKETS {
204
204
+
// if n <= edge || bucket == 22 {
205
205
+
// break;
206
206
+
// }
207
207
+
// bucket += 1;
208
208
+
// }
209
209
210
210
-
let b = &mut stats.entry(source).or_default().0[bucket];
211
211
-
b.count += 1;
212
212
-
b.sum += n;
213
213
-
if b.sample.is_none() {
214
214
-
let (DidId(did_id), RKey(k)) = &linkers.0[(n - 1) as usize];
215
215
-
if let Ok(Some(did_bytes)) = db.get_cf(&did_ids_cf, did_id.to_be_bytes()) {
216
216
-
if let Ok(Did(did)) = _bincode_opts().deserialize(&did_bytes) {
217
217
-
b.sample = Some(SourceSample {
218
218
-
did,
219
219
-
rkey: k.clone(),
220
220
-
});
221
221
-
} else {
222
222
-
err_stats.failed_to_get_sample += 1;
223
223
-
}
224
224
-
} else {
225
225
-
err_stats.failed_to_get_sample += 1;
226
226
-
}
227
227
-
}
210
210
+
// let b = &mut stats.entry(source).or_default().0[bucket];
211
211
+
// b.count += 1;
212
212
+
// b.sum += n;
213
213
+
// if b.sample.is_none() {
214
214
+
// let (DidId(did_id), RKey(k)) = &linkers.0[(n - 1) as usize];
215
215
+
// if let Ok(Some(did_bytes)) = db.get_cf(&did_ids_cf, did_id.to_be_bytes()) {
216
216
+
// if let Ok(Did(did)) = _bincode_opts().deserialize(&did_bytes) {
217
217
+
// b.sample = Some(SourceSample {
218
218
+
// did,
219
219
+
// rkey: k.clone(),
220
220
+
// });
221
221
+
// } else {
222
222
+
// err_stats.failed_to_get_sample += 1;
223
223
+
// }
224
224
+
// } else {
225
225
+
// err_stats.failed_to_get_sample += 1;
226
226
+
// }
227
227
+
// }
228
228
229
229
-
// if i >= 40_000 {
230
230
-
// break;
231
231
-
// }
232
232
-
}
229
229
+
// // if i >= 40_000 {
230
230
+
// // break;
231
231
+
// // }
232
232
+
// }
233
233
234
234
-
let dt = t0.elapsed();
234
234
+
// let dt = t0.elapsed();
235
235
236
236
-
eprintln!("gathering stats for output...");
236
236
+
// eprintln!("gathering stats for output...");
237
237
238
238
-
let itemified = stats
239
239
-
.into_iter()
240
240
-
.map(
241
241
-
|(
242
242
-
SourceLink(Collection(collection), RPath(path), link_type, target_collection),
243
243
-
buckets,
244
244
-
)| Printable {
245
245
-
collection,
246
246
-
path,
247
247
-
link_type,
248
248
-
target_collection: target_collection.map(|Collection(c)| c),
249
249
-
buckets,
250
250
-
},
251
251
-
)
252
252
-
.collect::<Vec<_>>();
238
238
+
// let itemified = stats
239
239
+
// .into_iter()
240
240
+
// .map(
241
241
+
// |(
242
242
+
// SourceLink(Collection(collection), RPath(path), link_type, target_collection),
243
243
+
// buckets,
244
244
+
// )| Printable {
245
245
+
// collection,
246
246
+
// path,
247
247
+
// link_type,
248
248
+
// target_collection: target_collection.map(|Collection(c)| c),
249
249
+
// buckets,
250
250
+
// },
251
251
+
// )
252
252
+
// .collect::<Vec<_>>();
253
253
254
254
-
match serde_json::to_string(&itemified) {
255
255
-
Ok(s) => println!("{s}"),
256
256
-
Err(e) => eprintln!("failed to serialize results: {e:?}"),
257
257
-
}
254
254
+
// match serde_json::to_string(&itemified) {
255
255
+
// Ok(s) => println!("{s}"),
256
256
+
// Err(e) => eprintln!("failed to serialize results: {e:?}"),
257
257
+
// }
258
258
259
259
-
eprintln!(
260
260
-
"{} summarizing {} link targets in {:.1}s",
261
261
-
if stay_alive.is_cancelled() {
262
262
-
"STOPPED"
263
263
-
} else {
264
264
-
"FINISHED"
265
265
-
},
266
266
-
thousands(i),
267
267
-
dt.as_secs_f32()
268
268
-
);
269
269
-
eprintln!("{err_stats:?}");
270
270
-
eprintln!("bye.");
271
271
-
}
259
259
+
// eprintln!(
260
260
+
// "{} summarizing {} link targets in {:.1}s",
261
261
+
// if stay_alive.is_cancelled() {
262
262
+
// "STOPPED"
263
263
+
// } else {
264
264
+
// "FINISHED"
265
265
+
// },
266
266
+
// thousands(i),
267
267
+
// dt.as_secs_f32()
268
268
+
// );
269
269
+
// eprintln!("{err_stats:?}");
270
270
+
// eprintln!("bye.");
271
271
+
// }
272
272
273
273
-
// scan plan
273
273
+
// // scan plan
274
274
275
275
-
// buckets (backlink count)
276
276
-
// 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 16, 32, 64, 128, 256, 512, 1024, 4096, 16384, 65535, 262144, 1048576+
277
277
-
// by
278
278
-
// - collection
279
279
-
// - json path
280
280
-
// - link type
281
281
-
// samples for each bucket for each variation
275
275
+
// // buckets (backlink count)
276
276
+
// // 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 16, 32, 64, 128, 256, 512, 1024, 4096, 16384, 65535, 262144, 1048576+
277
277
+
// // by
278
278
+
// // - collection
279
279
+
// // - json path
280
280
+
// // - link type
281
281
+
// // samples for each bucket for each variation
+2
constellation/src/bin/rocks-restore-from-backup.rs
···
3
3
use clap::Parser;
4
4
use std::path::PathBuf;
5
5
6
6
+
#[cfg(feature = "rocks")]
6
7
use rocksdb::backup::{BackupEngine, BackupEngineOptions, RestoreOptions};
7
8
8
9
use std::time;
···
19
20
to_data_dir: PathBuf,
20
21
}
21
22
23
23
+
#[cfg(feature = "rocks")]
22
24
fn main() -> Result<()> {
23
25
let args = Args::parse();
24
26