- throw in some metrics including a basic request counter
- normalize handles to lowercase (should probably be fixed in atrium)
- refresh handles and DIDs on firehose identity events
- add aliases for
blue.microcosmxrpc nsids to eventually replace thecom.bad-exampleones
+41
-25
slingshot/src/consumer.rs
+41
-25
slingshot/src/consumer.rs
···
1
-
use crate::CachedRecord;
2
1
use crate::error::ConsumerError;
2
+
use crate::{CachedRecord, Identity, IdentityKey};
3
3
use foyer::HybridCache;
4
4
use jetstream::{
5
5
DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector,
···
11
11
jetstream_endpoint: String,
12
12
cursor: Option<Cursor>,
13
13
no_zstd: bool,
14
+
identity: Identity,
14
15
shutdown: CancellationToken,
15
16
cache: HybridCache<String, CachedRecord>,
16
17
) -> Result<(), ConsumerError> {
···
46
47
break;
47
48
};
48
49
49
-
if event.kind != EventKind::Commit {
50
-
continue;
51
-
}
52
-
let Some(ref mut commit) = event.commit else {
53
-
log::warn!("consumer: commit event missing commit data, ignoring");
54
-
continue;
55
-
};
50
+
match event.kind {
51
+
EventKind::Commit => {
52
+
let Some(ref mut commit) = event.commit else {
53
+
log::warn!("consumer: commit event missing commit data, ignoring");
54
+
continue;
55
+
};
56
56
57
-
// TODO: something a bit more robust
58
-
let at_uri = format!(
59
-
"at://{}/{}/{}",
60
-
&*event.did, &*commit.collection, &*commit.rkey
61
-
);
57
+
// TODO: something a bit more robust
58
+
let at_uri = format!(
59
+
"at://{}/{}/{}",
60
+
&*event.did, &*commit.collection, &*commit.rkey
61
+
);
62
62
63
-
if commit.operation == CommitOp::Delete {
64
-
cache.insert(at_uri, CachedRecord::Deleted);
65
-
} else {
66
-
let Some(record) = commit.record.take() else {
67
-
log::warn!("consumer: commit insert or update missing record, ignoring");
68
-
continue;
69
-
};
70
-
let Some(cid) = commit.cid.take() else {
71
-
log::warn!("consumer: commit insert or update missing CID, ignoring");
72
-
continue;
73
-
};
63
+
if commit.operation == CommitOp::Delete {
64
+
cache.insert(at_uri, CachedRecord::Deleted);
65
+
} else {
66
+
let Some(record) = commit.record.take() else {
67
+
log::warn!("consumer: commit insert or update missing record, ignoring");
68
+
continue;
69
+
};
70
+
let Some(cid) = commit.cid.take() else {
71
+
log::warn!("consumer: commit insert or update missing CID, ignoring");
72
+
continue;
73
+
};
74
74
75
-
cache.insert(at_uri, CachedRecord::Found((cid, record).into()));
75
+
cache.insert(at_uri, CachedRecord::Found((cid, record).into()));
76
+
}
77
+
}
78
+
EventKind::Identity => {
79
+
let Some(ident) = event.identity else {
80
+
log::warn!("consumer: identity event missing identity data, ignoring");
81
+
continue;
82
+
};
83
+
if let Some(handle) = ident.handle {
84
+
metrics::counter!("identity_handle_refresh_queued", "reason" => "identity event").increment(1);
85
+
identity.queue_refresh(IdentityKey::Handle(handle)).await;
86
+
}
87
+
metrics::counter!("identity_did_refresh_queued", "reason" => "identity event")
88
+
.increment(1);
89
+
identity.queue_refresh(IdentityKey::Did(ident.did)).await;
90
+
}
91
+
EventKind::Account => {} // TODO: handle account events (esp hiding content on deactivate, clearing on delete)
76
92
}
77
93
}
78
94
+20
-6
slingshot/src/identity.rs
+20
-6
slingshot/src/identity.rs
···
38
38
const MIN_NOT_FOUND_TTL: Duration = Duration::from_secs(60);
39
39
40
40
#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
41
-
enum IdentityKey {
41
+
pub enum IdentityKey {
42
42
Handle(Handle),
43
43
Did(Did),
44
44
}
···
115
115
let Some(maybe_handle) = aka.strip_prefix("at://") else {
116
116
continue;
117
117
};
118
-
let Ok(valid_handle) = Handle::new(maybe_handle.to_string()) else {
118
+
let Ok(valid_handle) = Handle::new(maybe_handle.to_lowercase()) else {
119
119
continue;
120
120
};
121
121
unverified_handle = Some(valid_handle);
···
186
186
/// multi-producer *single consumer* queue
187
187
refresh_queue: Arc<Mutex<RefreshQueue>>,
188
188
/// just a lock to ensure only one refresher (queue consumer) is running (to be improved with a better refresher)
189
-
refresher: Arc<Mutex<()>>,
189
+
refresher_task: Arc<Mutex<()>>,
190
190
}
191
191
192
192
impl Identity {
···
225
225
did_resolver: Arc::new(did_resolver),
226
226
cache,
227
227
refresh_queue: Default::default(),
228
-
refresher: Default::default(),
228
+
refresher_task: Default::default(),
229
229
})
230
230
}
231
231
···
293
293
}
294
294
IdentityData::NotFound => {
295
295
if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
296
+
metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
296
297
self.queue_refresh(key).await;
297
298
}
298
299
Ok(None)
299
300
}
300
301
IdentityData::Did(did) => {
301
302
if (now - *last_fetch) >= MIN_TTL {
303
+
metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "true").increment(1);
302
304
self.queue_refresh(key).await;
303
305
}
304
306
Ok(Some(did.clone()))
···
347
349
}
348
350
IdentityData::NotFound => {
349
351
if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
352
+
metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
350
353
self.queue_refresh(key).await;
351
354
}
352
355
Ok(None)
353
356
}
354
357
IdentityData::Doc(mini_did) => {
355
358
if (now - *last_fetch) >= MIN_TTL {
359
+
metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1);
356
360
self.queue_refresh(key).await;
357
361
}
358
362
Ok(Some(mini_did.clone()))
···
363
367
/// put a refresh task on the queue
364
368
///
365
369
/// this can be safely called from multiple concurrent tasks
366
-
async fn queue_refresh(&self, key: IdentityKey) {
370
+
pub async fn queue_refresh(&self, key: IdentityKey) {
367
371
// todo: max queue size
368
372
let mut q = self.refresh_queue.lock().await;
369
373
if !q.items.contains(&key) {
···
440
444
/// run the refresh queue consumer
441
445
pub async fn run_refresher(&self, shutdown: CancellationToken) -> Result<(), IdentityError> {
442
446
let _guard = self
443
-
.refresher
447
+
.refresher_task
444
448
.try_lock()
445
449
.expect("there to only be one refresher running");
446
450
loop {
···
462
466
log::trace!("refreshing handle {handle:?}");
463
467
match self.handle_resolver.resolve(handle).await {
464
468
Ok(did) => {
469
+
metrics::counter!("identity_handle_refresh", "success" => "true")
470
+
.increment(1);
465
471
self.cache.insert(
466
472
task_key.clone(),
467
473
IdentityVal(UtcDateTime::now(), IdentityData::Did(did)),
468
474
);
469
475
}
470
476
Err(atrium_identity::Error::NotFound) => {
477
+
metrics::counter!("identity_handle_refresh", "success" => "false", "reason" => "not found").increment(1);
471
478
self.cache.insert(
472
479
task_key.clone(),
473
480
IdentityVal(UtcDateTime::now(), IdentityData::NotFound),
474
481
);
475
482
}
476
483
Err(err) => {
484
+
metrics::counter!("identity_handle_refresh", "success" => "false", "reason" => "other").increment(1);
477
485
log::warn!(
478
486
"failed to refresh handle: {err:?}. leaving stale (should we eventually do something?)"
479
487
);
···
488
496
Ok(did_doc) => {
489
497
// TODO: fix in atrium: should verify id is did
490
498
if did_doc.id != did.to_string() {
499
+
metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "wrong did").increment(1);
491
500
log::warn!(
492
501
"refreshed did doc failed: wrong did doc id. dropping refresh."
493
502
);
···
496
505
let mini_doc = match did_doc.try_into() {
497
506
Ok(md) => md,
498
507
Err(e) => {
508
+
metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "bad doc").increment(1);
499
509
log::warn!(
500
510
"converting mini doc failed: {e:?}. dropping refresh."
501
511
);
502
512
continue;
503
513
}
504
514
};
515
+
metrics::counter!("identity_did_refresh", "success" => "true")
516
+
.increment(1);
505
517
self.cache.insert(
506
518
task_key.clone(),
507
519
IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc)),
508
520
);
509
521
}
510
522
Err(atrium_identity::Error::NotFound) => {
523
+
metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "not found").increment(1);
511
524
self.cache.insert(
512
525
task_key.clone(),
513
526
IdentityVal(UtcDateTime::now(), IdentityData::NotFound),
514
527
);
515
528
}
516
529
Err(err) => {
530
+
metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "other").increment(1);
517
531
log::warn!(
518
532
"failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)"
519
533
);
+1
-1
slingshot/src/lib.rs
+1
-1
slingshot/src/lib.rs
+4
-1
slingshot/src/main.rs
+4
-1
slingshot/src/main.rs
···
154
154
155
155
let repo = Repo::new(identity.clone());
156
156
157
+
let identity_for_server = identity.clone();
157
158
let server_shutdown = shutdown.clone();
158
159
let server_cache_handle = cache.clone();
159
160
let bind = args.bind;
160
161
tasks.spawn(async move {
161
162
serve(
162
163
server_cache_handle,
163
-
identity,
164
+
identity_for_server,
164
165
repo,
165
166
args.acme_domain,
166
167
args.acme_contact,
···
173
174
Ok(())
174
175
});
175
176
177
+
let identity_refreshable = identity.clone();
176
178
let consumer_shutdown = shutdown.clone();
177
179
let consumer_cache = cache.clone();
178
180
tasks.spawn(async move {
···
180
182
args.jetstream,
181
183
None,
182
184
args.jetstream_no_zstd,
185
+
identity_refreshable,
183
186
consumer_shutdown,
184
187
consumer_cache,
185
188
)
+1
-1
slingshot/src/record.rs
+1
-1
slingshot/src/record.rs
+66
-4
slingshot/src/server.rs
+66
-4
slingshot/src/server.rs
···
12
12
use tokio_util::sync::CancellationToken;
13
13
14
14
use poem::{
15
-
Endpoint, EndpointExt, Route, Server,
15
+
Endpoint, EndpointExt, IntoResponse, Route, Server,
16
16
endpoint::{StaticFileEndpoint, make_sync},
17
17
http::Method,
18
18
listener::{
···
288
288
self.get_record_impl(repo, collection, rkey, cid).await
289
289
}
290
290
291
+
/// blue.microcosm.repo.getRecordByUri
292
+
///
293
+
/// alias of `com.bad-example.repo.getUriRecord` with intention to stabilize under this name
294
+
#[oai(
295
+
path = "/blue.microcosm.repo.getRecordByUri",
296
+
method = "get",
297
+
tag = "ApiTags::Custom"
298
+
)]
299
+
async fn get_record_by_uri(
300
+
&self,
301
+
/// The at-uri of the record
302
+
///
303
+
/// The identifier can be a DID or an atproto handle, and the collection
304
+
/// and rkey segments must be present.
305
+
#[oai(example = "example_uri")]
306
+
Query(at_uri): Query<String>,
307
+
/// Optional: the CID of the version of the record.
308
+
///
309
+
/// If not specified, then return the most recent version.
310
+
///
311
+
/// > [!tip]
312
+
/// > If specified and a newer version of the record exists, returns 404 not
313
+
/// > found. That is: slingshot only retains the most recent version of a
314
+
/// > record.
315
+
Query(cid): Query<Option<String>>,
316
+
) -> GetRecordResponse {
317
+
self.get_uri_record(Query(at_uri), Query(cid)).await
318
+
}
319
+
291
320
/// com.bad-example.repo.getUriRecord
292
321
///
293
322
/// Ergonomic complement to [`com.atproto.repo.getRecord`](https://docs.bsky.app/docs/api/com-atproto-repo-get-record)
···
375
404
#[oai(example = "example_handle")]
376
405
Query(handle): Query<String>,
377
406
) -> JustDidResponse {
378
-
let Ok(handle) = Handle::new(handle) else {
407
+
let Ok(handle) = Handle::new(handle.to_lowercase()) else {
379
408
return JustDidResponse::BadRequest(xrpc_error("InvalidRequest", "not a valid handle"));
380
409
};
381
410
···
413
442
}))
414
443
}
415
444
445
+
/// blue.microcosm.identity.resolveMiniDoc
446
+
///
447
+
/// alias of `com.bad-example.identity.resolveMiniDoc` with intention to stabilize under this name
448
+
#[oai(
449
+
path = "/blue.microcosm.identity.resolveMiniDoc",
450
+
method = "get",
451
+
tag = "ApiTags::Custom"
452
+
)]
453
+
async fn resolve_mini_doc(
454
+
&self,
455
+
/// Handle or DID to resolve
456
+
#[oai(example = "example_handle")]
457
+
Query(identifier): Query<String>,
458
+
) -> ResolveMiniIDResponse {
459
+
self.resolve_mini_id(Query(identifier)).await
460
+
}
461
+
416
462
/// com.bad-example.identity.resolveMiniDoc
417
463
///
418
464
/// Like [com.atproto.identity.resolveIdentity](https://docs.bsky.app/docs/api/com-atproto-identity-resolve-identity)
···
436
482
let did = match Did::new(identifier.clone()) {
437
483
Ok(did) => did,
438
484
Err(_) => {
439
-
let Ok(alleged_handle) = Handle::new(identifier) else {
485
+
let Ok(alleged_handle) = Handle::new(identifier.to_lowercase()) else {
440
486
return invalid("Identifier was not a valid DID or handle");
441
487
};
442
488
···
513
559
let did = match Did::new(repo.clone()) {
514
560
Ok(did) => did,
515
561
Err(_) => {
516
-
let Ok(handle) = Handle::new(repo) else {
562
+
let Ok(handle) = Handle::new(repo.to_lowercase()) else {
517
563
return GetRecordResponse::BadRequest(xrpc_error(
518
564
"InvalidRequest",
519
565
"Repo was not a valid DID or handle",
···
772
818
.allow_credentials(false),
773
819
)
774
820
.with(CatchPanic::new())
821
+
.around(request_counter)
775
822
.with(Tracing);
823
+
776
824
Server::new(listener)
777
825
.name("slingshot")
778
826
.run_with_graceful_shutdown(app, shutdown.cancelled(), None)
···
780
828
.map_err(ServerError::ServerExited)
781
829
.inspect(|()| log::info!("server ended. goodbye."))
782
830
}
831
+
832
+
async fn request_counter<E: Endpoint>(next: E, req: poem::Request) -> poem::Result<poem::Response> {
833
+
let t0 = std::time::Instant::now();
834
+
let method = req.method().to_string();
835
+
let path = req.uri().path().to_string();
836
+
let res = next.call(req).await?.into_response();
837
+
metrics::histogram!(
838
+
"server_request",
839
+
"endpoint" => format!("{method} {path}"),
840
+
"status" => res.status().to_string(),
841
+
)
842
+
.record(t0.elapsed());
843
+
Ok(res)
844
+
}
History
1 round
0 comments
bad-example.com
submitted
#0
expand 0 comments
pull request successfully merged