- throw in some metrics including a basic request counter
- normalize handles to lowercase (should probably be fixed in atrium)
- refresh handles and DIDs on firehose identity events
- add aliases for
blue.microcosmxrpc nsids to eventually replace thecom.bad-exampleones
+41
-25
slingshot/src/consumer.rs
+41
-25
slingshot/src/consumer.rs
···
1
-
use crate::CachedRecord;
2
use crate::error::ConsumerError;
3
use foyer::HybridCache;
4
use jetstream::{
5
DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector,
···
11
jetstream_endpoint: String,
12
cursor: Option<Cursor>,
13
no_zstd: bool,
14
shutdown: CancellationToken,
15
cache: HybridCache<String, CachedRecord>,
16
) -> Result<(), ConsumerError> {
···
46
break;
47
};
48
49
-
if event.kind != EventKind::Commit {
50
-
continue;
51
-
}
52
-
let Some(ref mut commit) = event.commit else {
53
-
log::warn!("consumer: commit event missing commit data, ignoring");
54
-
continue;
55
-
};
56
57
-
// TODO: something a bit more robust
58
-
let at_uri = format!(
59
-
"at://{}/{}/{}",
60
-
&*event.did, &*commit.collection, &*commit.rkey
61
-
);
62
63
-
if commit.operation == CommitOp::Delete {
64
-
cache.insert(at_uri, CachedRecord::Deleted);
65
-
} else {
66
-
let Some(record) = commit.record.take() else {
67
-
log::warn!("consumer: commit insert or update missing record, ignoring");
68
-
continue;
69
-
};
70
-
let Some(cid) = commit.cid.take() else {
71
-
log::warn!("consumer: commit insert or update missing CID, ignoring");
72
-
continue;
73
-
};
74
75
-
cache.insert(at_uri, CachedRecord::Found((cid, record).into()));
76
}
77
}
78
···
1
use crate::error::ConsumerError;
2
+
use crate::{CachedRecord, Identity, IdentityKey};
3
use foyer::HybridCache;
4
use jetstream::{
5
DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector,
···
11
jetstream_endpoint: String,
12
cursor: Option<Cursor>,
13
no_zstd: bool,
14
+
identity: Identity,
15
shutdown: CancellationToken,
16
cache: HybridCache<String, CachedRecord>,
17
) -> Result<(), ConsumerError> {
···
47
break;
48
};
49
50
+
match event.kind {
51
+
EventKind::Commit => {
52
+
let Some(ref mut commit) = event.commit else {
53
+
log::warn!("consumer: commit event missing commit data, ignoring");
54
+
continue;
55
+
};
56
57
+
// TODO: something a bit more robust
58
+
let at_uri = format!(
59
+
"at://{}/{}/{}",
60
+
&*event.did, &*commit.collection, &*commit.rkey
61
+
);
62
63
+
if commit.operation == CommitOp::Delete {
64
+
cache.insert(at_uri, CachedRecord::Deleted);
65
+
} else {
66
+
let Some(record) = commit.record.take() else {
67
+
log::warn!("consumer: commit insert or update missing record, ignoring");
68
+
continue;
69
+
};
70
+
let Some(cid) = commit.cid.take() else {
71
+
log::warn!("consumer: commit insert or update missing CID, ignoring");
72
+
continue;
73
+
};
74
75
+
cache.insert(at_uri, CachedRecord::Found((cid, record).into()));
76
+
}
77
+
}
78
+
EventKind::Identity => {
79
+
let Some(ident) = event.identity else {
80
+
log::warn!("consumer: identity event missing identity data, ignoring");
81
+
continue;
82
+
};
83
+
if let Some(handle) = ident.handle {
84
+
metrics::counter!("identity_handle_refresh_queued", "reason" => "identity event").increment(1);
85
+
identity.queue_refresh(IdentityKey::Handle(handle)).await;
86
+
}
87
+
metrics::counter!("identity_did_refresh_queued", "reason" => "identity event")
88
+
.increment(1);
89
+
identity.queue_refresh(IdentityKey::Did(ident.did)).await;
90
+
}
91
+
EventKind::Account => {} // TODO: handle account events (esp hiding content on deactivate, clearing on delete)
92
}
93
}
94
+20
-6
slingshot/src/identity.rs
+20
-6
slingshot/src/identity.rs
···
38
const MIN_NOT_FOUND_TTL: Duration = Duration::from_secs(60);
39
40
#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
41
-
enum IdentityKey {
42
Handle(Handle),
43
Did(Did),
44
}
···
115
let Some(maybe_handle) = aka.strip_prefix("at://") else {
116
continue;
117
};
118
-
let Ok(valid_handle) = Handle::new(maybe_handle.to_string()) else {
119
continue;
120
};
121
unverified_handle = Some(valid_handle);
···
186
/// multi-producer *single consumer* queue
187
refresh_queue: Arc<Mutex<RefreshQueue>>,
188
/// just a lock to ensure only one refresher (queue consumer) is running (to be improved with a better refresher)
189
-
refresher: Arc<Mutex<()>>,
190
}
191
192
impl Identity {
···
225
did_resolver: Arc::new(did_resolver),
226
cache,
227
refresh_queue: Default::default(),
228
-
refresher: Default::default(),
229
})
230
}
231
···
293
}
294
IdentityData::NotFound => {
295
if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
296
self.queue_refresh(key).await;
297
}
298
Ok(None)
299
}
300
IdentityData::Did(did) => {
301
if (now - *last_fetch) >= MIN_TTL {
302
self.queue_refresh(key).await;
303
}
304
Ok(Some(did.clone()))
···
347
}
348
IdentityData::NotFound => {
349
if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
350
self.queue_refresh(key).await;
351
}
352
Ok(None)
353
}
354
IdentityData::Doc(mini_did) => {
355
if (now - *last_fetch) >= MIN_TTL {
356
self.queue_refresh(key).await;
357
}
358
Ok(Some(mini_did.clone()))
···
363
/// put a refresh task on the queue
364
///
365
/// this can be safely called from multiple concurrent tasks
366
-
async fn queue_refresh(&self, key: IdentityKey) {
367
// todo: max queue size
368
let mut q = self.refresh_queue.lock().await;
369
if !q.items.contains(&key) {
···
440
/// run the refresh queue consumer
441
pub async fn run_refresher(&self, shutdown: CancellationToken) -> Result<(), IdentityError> {
442
let _guard = self
443
-
.refresher
444
.try_lock()
445
.expect("there to only be one refresher running");
446
loop {
···
462
log::trace!("refreshing handle {handle:?}");
463
match self.handle_resolver.resolve(handle).await {
464
Ok(did) => {
465
self.cache.insert(
466
task_key.clone(),
467
IdentityVal(UtcDateTime::now(), IdentityData::Did(did)),
468
);
469
}
470
Err(atrium_identity::Error::NotFound) => {
471
self.cache.insert(
472
task_key.clone(),
473
IdentityVal(UtcDateTime::now(), IdentityData::NotFound),
474
);
475
}
476
Err(err) => {
477
log::warn!(
478
"failed to refresh handle: {err:?}. leaving stale (should we eventually do something?)"
479
);
···
488
Ok(did_doc) => {
489
// TODO: fix in atrium: should verify id is did
490
if did_doc.id != did.to_string() {
491
log::warn!(
492
"refreshed did doc failed: wrong did doc id. dropping refresh."
493
);
···
496
let mini_doc = match did_doc.try_into() {
497
Ok(md) => md,
498
Err(e) => {
499
log::warn!(
500
"converting mini doc failed: {e:?}. dropping refresh."
501
);
502
continue;
503
}
504
};
505
self.cache.insert(
506
task_key.clone(),
507
IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc)),
508
);
509
}
510
Err(atrium_identity::Error::NotFound) => {
511
self.cache.insert(
512
task_key.clone(),
513
IdentityVal(UtcDateTime::now(), IdentityData::NotFound),
514
);
515
}
516
Err(err) => {
517
log::warn!(
518
"failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)"
519
);
···
38
const MIN_NOT_FOUND_TTL: Duration = Duration::from_secs(60);
39
40
#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
41
+
pub enum IdentityKey {
42
Handle(Handle),
43
Did(Did),
44
}
···
115
let Some(maybe_handle) = aka.strip_prefix("at://") else {
116
continue;
117
};
118
+
let Ok(valid_handle) = Handle::new(maybe_handle.to_lowercase()) else {
119
continue;
120
};
121
unverified_handle = Some(valid_handle);
···
186
/// multi-producer *single consumer* queue
187
refresh_queue: Arc<Mutex<RefreshQueue>>,
188
/// just a lock to ensure only one refresher (queue consumer) is running (to be improved with a better refresher)
189
+
refresher_task: Arc<Mutex<()>>,
190
}
191
192
impl Identity {
···
225
did_resolver: Arc::new(did_resolver),
226
cache,
227
refresh_queue: Default::default(),
228
+
refresher_task: Default::default(),
229
})
230
}
231
···
293
}
294
IdentityData::NotFound => {
295
if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
296
+
metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
297
self.queue_refresh(key).await;
298
}
299
Ok(None)
300
}
301
IdentityData::Did(did) => {
302
if (now - *last_fetch) >= MIN_TTL {
303
+
metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "true").increment(1);
304
self.queue_refresh(key).await;
305
}
306
Ok(Some(did.clone()))
···
349
}
350
IdentityData::NotFound => {
351
if (now - *last_fetch) >= MIN_NOT_FOUND_TTL {
352
+
metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1);
353
self.queue_refresh(key).await;
354
}
355
Ok(None)
356
}
357
IdentityData::Doc(mini_did) => {
358
if (now - *last_fetch) >= MIN_TTL {
359
+
metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1);
360
self.queue_refresh(key).await;
361
}
362
Ok(Some(mini_did.clone()))
···
367
/// put a refresh task on the queue
368
///
369
/// this can be safely called from multiple concurrent tasks
370
+
pub async fn queue_refresh(&self, key: IdentityKey) {
371
// todo: max queue size
372
let mut q = self.refresh_queue.lock().await;
373
if !q.items.contains(&key) {
···
444
/// run the refresh queue consumer
445
pub async fn run_refresher(&self, shutdown: CancellationToken) -> Result<(), IdentityError> {
446
let _guard = self
447
+
.refresher_task
448
.try_lock()
449
.expect("there to only be one refresher running");
450
loop {
···
466
log::trace!("refreshing handle {handle:?}");
467
match self.handle_resolver.resolve(handle).await {
468
Ok(did) => {
469
+
metrics::counter!("identity_handle_refresh", "success" => "true")
470
+
.increment(1);
471
self.cache.insert(
472
task_key.clone(),
473
IdentityVal(UtcDateTime::now(), IdentityData::Did(did)),
474
);
475
}
476
Err(atrium_identity::Error::NotFound) => {
477
+
metrics::counter!("identity_handle_refresh", "success" => "false", "reason" => "not found").increment(1);
478
self.cache.insert(
479
task_key.clone(),
480
IdentityVal(UtcDateTime::now(), IdentityData::NotFound),
481
);
482
}
483
Err(err) => {
484
+
metrics::counter!("identity_handle_refresh", "success" => "false", "reason" => "other").increment(1);
485
log::warn!(
486
"failed to refresh handle: {err:?}. leaving stale (should we eventually do something?)"
487
);
···
496
Ok(did_doc) => {
497
// TODO: fix in atrium: should verify id is did
498
if did_doc.id != did.to_string() {
499
+
metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "wrong did").increment(1);
500
log::warn!(
501
"refreshed did doc failed: wrong did doc id. dropping refresh."
502
);
···
505
let mini_doc = match did_doc.try_into() {
506
Ok(md) => md,
507
Err(e) => {
508
+
metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "bad doc").increment(1);
509
log::warn!(
510
"converting mini doc failed: {e:?}. dropping refresh."
511
);
512
continue;
513
}
514
};
515
+
metrics::counter!("identity_did_refresh", "success" => "true")
516
+
.increment(1);
517
self.cache.insert(
518
task_key.clone(),
519
IdentityVal(UtcDateTime::now(), IdentityData::Doc(mini_doc)),
520
);
521
}
522
Err(atrium_identity::Error::NotFound) => {
523
+
metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "not found").increment(1);
524
self.cache.insert(
525
task_key.clone(),
526
IdentityVal(UtcDateTime::now(), IdentityData::NotFound),
527
);
528
}
529
Err(err) => {
530
+
metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "other").increment(1);
531
log::warn!(
532
"failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)"
533
);
+1
-1
slingshot/src/lib.rs
+1
-1
slingshot/src/lib.rs
+4
-1
slingshot/src/main.rs
+4
-1
slingshot/src/main.rs
···
154
155
let repo = Repo::new(identity.clone());
156
157
let server_shutdown = shutdown.clone();
158
let server_cache_handle = cache.clone();
159
let bind = args.bind;
160
tasks.spawn(async move {
161
serve(
162
server_cache_handle,
163
-
identity,
164
repo,
165
args.acme_domain,
166
args.acme_contact,
···
173
Ok(())
174
});
175
176
let consumer_shutdown = shutdown.clone();
177
let consumer_cache = cache.clone();
178
tasks.spawn(async move {
···
180
args.jetstream,
181
None,
182
args.jetstream_no_zstd,
183
consumer_shutdown,
184
consumer_cache,
185
)
···
154
155
let repo = Repo::new(identity.clone());
156
157
+
let identity_for_server = identity.clone();
158
let server_shutdown = shutdown.clone();
159
let server_cache_handle = cache.clone();
160
let bind = args.bind;
161
tasks.spawn(async move {
162
serve(
163
server_cache_handle,
164
+
identity_for_server,
165
repo,
166
args.acme_domain,
167
args.acme_contact,
···
174
Ok(())
175
});
176
177
+
let identity_refreshable = identity.clone();
178
let consumer_shutdown = shutdown.clone();
179
let consumer_cache = cache.clone();
180
tasks.spawn(async move {
···
182
args.jetstream,
183
None,
184
args.jetstream_no_zstd,
185
+
identity_refreshable,
186
consumer_shutdown,
187
consumer_cache,
188
)
+1
-1
slingshot/src/record.rs
+1
-1
slingshot/src/record.rs
+66
-4
slingshot/src/server.rs
+66
-4
slingshot/src/server.rs
···
12
use tokio_util::sync::CancellationToken;
13
14
use poem::{
15
-
Endpoint, EndpointExt, Route, Server,
16
endpoint::{StaticFileEndpoint, make_sync},
17
http::Method,
18
listener::{
···
288
self.get_record_impl(repo, collection, rkey, cid).await
289
}
290
291
/// com.bad-example.repo.getUriRecord
292
///
293
/// Ergonomic complement to [`com.atproto.repo.getRecord`](https://docs.bsky.app/docs/api/com-atproto-repo-get-record)
···
375
#[oai(example = "example_handle")]
376
Query(handle): Query<String>,
377
) -> JustDidResponse {
378
-
let Ok(handle) = Handle::new(handle) else {
379
return JustDidResponse::BadRequest(xrpc_error("InvalidRequest", "not a valid handle"));
380
};
381
···
413
}))
414
}
415
416
/// com.bad-example.identity.resolveMiniDoc
417
///
418
/// Like [com.atproto.identity.resolveIdentity](https://docs.bsky.app/docs/api/com-atproto-identity-resolve-identity)
···
436
let did = match Did::new(identifier.clone()) {
437
Ok(did) => did,
438
Err(_) => {
439
-
let Ok(alleged_handle) = Handle::new(identifier) else {
440
return invalid("Identifier was not a valid DID or handle");
441
};
442
···
513
let did = match Did::new(repo.clone()) {
514
Ok(did) => did,
515
Err(_) => {
516
-
let Ok(handle) = Handle::new(repo) else {
517
return GetRecordResponse::BadRequest(xrpc_error(
518
"InvalidRequest",
519
"Repo was not a valid DID or handle",
···
772
.allow_credentials(false),
773
)
774
.with(CatchPanic::new())
775
.with(Tracing);
776
Server::new(listener)
777
.name("slingshot")
778
.run_with_graceful_shutdown(app, shutdown.cancelled(), None)
···
780
.map_err(ServerError::ServerExited)
781
.inspect(|()| log::info!("server ended. goodbye."))
782
}
···
12
use tokio_util::sync::CancellationToken;
13
14
use poem::{
15
+
Endpoint, EndpointExt, IntoResponse, Route, Server,
16
endpoint::{StaticFileEndpoint, make_sync},
17
http::Method,
18
listener::{
···
288
self.get_record_impl(repo, collection, rkey, cid).await
289
}
290
291
+
/// blue.microcosm.repo.getRecordByUri
292
+
///
293
+
/// alias of `com.bad-example.repo.getUriRecord` with intention to stabilize under this name
294
+
#[oai(
295
+
path = "/blue.microcosm.repo.getRecordByUri",
296
+
method = "get",
297
+
tag = "ApiTags::Custom"
298
+
)]
299
+
async fn get_record_by_uri(
300
+
&self,
301
+
/// The at-uri of the record
302
+
///
303
+
/// The identifier can be a DID or an atproto handle, and the collection
304
+
/// and rkey segments must be present.
305
+
#[oai(example = "example_uri")]
306
+
Query(at_uri): Query<String>,
307
+
/// Optional: the CID of the version of the record.
308
+
///
309
+
/// If not specified, then return the most recent version.
310
+
///
311
+
/// > [!tip]
312
+
/// > If specified and a newer version of the record exists, returns 404 not
313
+
/// > found. That is: slingshot only retains the most recent version of a
314
+
/// > record.
315
+
Query(cid): Query<Option<String>>,
316
+
) -> GetRecordResponse {
317
+
self.get_uri_record(Query(at_uri), Query(cid)).await
318
+
}
319
+
320
/// com.bad-example.repo.getUriRecord
321
///
322
/// Ergonomic complement to [`com.atproto.repo.getRecord`](https://docs.bsky.app/docs/api/com-atproto-repo-get-record)
···
404
#[oai(example = "example_handle")]
405
Query(handle): Query<String>,
406
) -> JustDidResponse {
407
+
let Ok(handle) = Handle::new(handle.to_lowercase()) else {
408
return JustDidResponse::BadRequest(xrpc_error("InvalidRequest", "not a valid handle"));
409
};
410
···
442
}))
443
}
444
445
+
/// blue.microcosm.identity.resolveMiniDoc
446
+
///
447
+
/// alias of `com.bad-example.identity.resolveMiniDoc` with intention to stabilize under this name
448
+
#[oai(
449
+
path = "/blue.microcosm.identity.resolveMiniDoc",
450
+
method = "get",
451
+
tag = "ApiTags::Custom"
452
+
)]
453
+
async fn resolve_mini_doc(
454
+
&self,
455
+
/// Handle or DID to resolve
456
+
#[oai(example = "example_handle")]
457
+
Query(identifier): Query<String>,
458
+
) -> ResolveMiniIDResponse {
459
+
self.resolve_mini_id(Query(identifier)).await
460
+
}
461
+
462
/// com.bad-example.identity.resolveMiniDoc
463
///
464
/// Like [com.atproto.identity.resolveIdentity](https://docs.bsky.app/docs/api/com-atproto-identity-resolve-identity)
···
482
let did = match Did::new(identifier.clone()) {
483
Ok(did) => did,
484
Err(_) => {
485
+
let Ok(alleged_handle) = Handle::new(identifier.to_lowercase()) else {
486
return invalid("Identifier was not a valid DID or handle");
487
};
488
···
559
let did = match Did::new(repo.clone()) {
560
Ok(did) => did,
561
Err(_) => {
562
+
let Ok(handle) = Handle::new(repo.to_lowercase()) else {
563
return GetRecordResponse::BadRequest(xrpc_error(
564
"InvalidRequest",
565
"Repo was not a valid DID or handle",
···
818
.allow_credentials(false),
819
)
820
.with(CatchPanic::new())
821
+
.around(request_counter)
822
.with(Tracing);
823
+
824
Server::new(listener)
825
.name("slingshot")
826
.run_with_graceful_shutdown(app, shutdown.cancelled(), None)
···
828
.map_err(ServerError::ServerExited)
829
.inspect(|()| log::info!("server ended. goodbye."))
830
}
831
+
832
+
async fn request_counter<E: Endpoint>(next: E, req: poem::Request) -> poem::Result<poem::Response> {
833
+
let t0 = std::time::Instant::now();
834
+
let method = req.method().to_string();
835
+
let path = req.uri().path().to_string();
836
+
let res = next.call(req).await?.into_response();
837
+
metrics::histogram!(
838
+
"server_request",
839
+
"endpoint" => format!("{method} {path}"),
840
+
"status" => res.status().to_string(),
841
+
)
842
+
.record(t0.elapsed());
843
+
Ok(res)
844
+
}
History
1 round
0 comments
bad-example.com
submitted
#0
expand 0 comments
pull request successfully merged