Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver

feat: otel

mia.omg.lol 0c46bb0a acf91e50

verified
+619 -25
+351 -12
Cargo.lock
··· 286 286 ] 287 287 288 288 [[package]] 289 + name = "axum-tracing-opentelemetry" 290 + version = "0.32.1" 291 + source = "registry+https://github.com/rust-lang/crates.io-index" 292 + checksum = "328c8ddd5ca871b2a5acb00be0b4f103aa62f5d6b6db4071ccf3b12b0629e7c1" 293 + dependencies = [ 294 + "axum", 295 + "futures-core", 296 + "futures-util", 297 + "http", 298 + "opentelemetry", 299 + "opentelemetry-semantic-conventions", 300 + "pin-project-lite", 301 + "tower", 302 + "tracing", 303 + "tracing-opentelemetry", 304 + "tracing-opentelemetry-instrumentation-sdk", 305 + ] 306 + 307 + [[package]] 289 308 name = "backtrace" 290 309 version = "0.3.74" 291 310 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 347 366 "proc-macro2", 348 367 "quote", 349 368 "regex", 350 - "rustc-hash", 369 + "rustc-hash 1.1.0", 351 370 "shlex", 352 371 "syn", 353 372 "which", ··· 465 484 checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" 466 485 467 486 [[package]] 487 + name = "cfg_aliases" 488 + version = "0.2.1" 489 + source = "registry+https://github.com/rust-lang/crates.io-index" 490 + checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" 491 + 492 + [[package]] 468 493 name = "chrono" 469 494 version = "0.4.41" 470 495 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1370 1395 checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" 1371 1396 dependencies = [ 1372 1397 "cfg-if", 1398 + "js-sys", 1373 1399 "libc", 1374 1400 "r-efi", 1375 1401 "wasi 0.14.2+wasi-0.2.4", 1402 + "wasm-bindgen", 1376 1403 ] 1377 1404 1378 1405 [[package]] ··· 2167 2194 dependencies = [ 2168 2195 "linked-hash-map", 2169 2196 ] 2197 + 2198 + [[package]] 2199 + name = "lru-slab" 2200 + version = "0.1.2" 2201 + source = "registry+https://github.com/rust-lang/crates.io-index" 2202 + checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" 2170 2203 2171 2204 [[package]] 2172 2205 name = "lz4-sys" ··· 2185 2218 checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" 2186 2219 2187 2220 [[package]] 2221 + name = "matchers" 2222 + version = "0.1.0" 2223 + source = "registry+https://github.com/rust-lang/crates.io-index" 2224 + checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" 2225 + dependencies = [ 2226 + "regex-automata 0.1.10", 2227 + ] 2228 + 2229 + [[package]] 2188 2230 name = "matchit" 2189 2231 version = "0.8.4" 2190 2232 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2513 2555 ] 2514 2556 2515 2557 [[package]] 2558 + name = "opentelemetry" 2559 + version = "0.31.0" 2560 + source = "registry+https://github.com/rust-lang/crates.io-index" 2561 + checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" 2562 + dependencies = [ 2563 + "futures-core", 2564 + "futures-sink", 2565 + "js-sys", 2566 + "pin-project-lite", 2567 + "thiserror 2.0.12", 2568 + "tracing", 2569 + ] 2570 + 2571 + [[package]] 2572 + name = "opentelemetry-http" 2573 + version = "0.31.0" 2574 + source = "registry+https://github.com/rust-lang/crates.io-index" 2575 + checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" 2576 + dependencies = [ 2577 + "async-trait", 2578 + "bytes", 2579 + "http", 2580 + "opentelemetry", 2581 + "reqwest", 2582 + ] 2583 + 2584 + [[package]] 2585 + name = "opentelemetry-otlp" 2586 + version = "0.31.0" 2587 + source = "registry+https://github.com/rust-lang/crates.io-index" 2588 + checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf" 2589 + dependencies = [ 2590 + "http", 2591 + "opentelemetry", 2592 + "opentelemetry-http", 2593 + "opentelemetry-proto", 2594 + "opentelemetry_sdk", 2595 + "prost 0.14.1", 2596 + "reqwest", 2597 + "thiserror 2.0.12", 2598 + "tracing", 2599 + ] 2600 + 2601 + [[package]] 2602 + name = "opentelemetry-proto" 2603 + version = "0.31.0" 2604 + source = "registry+https://github.com/rust-lang/crates.io-index" 2605 + checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" 2606 + dependencies = [ 2607 + "opentelemetry", 2608 + "opentelemetry_sdk", 2609 + "prost 0.14.1", 2610 + "tonic 0.14.2", 2611 + "tonic-prost", 2612 + ] 2613 + 2614 + [[package]] 2615 + name = "opentelemetry-semantic-conventions" 2616 + version = "0.31.0" 2617 + source = "registry+https://github.com/rust-lang/crates.io-index" 2618 + checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846" 2619 + 2620 + [[package]] 2621 + name = "opentelemetry_sdk" 2622 + version = "0.31.0" 2623 + source = "registry+https://github.com/rust-lang/crates.io-index" 2624 + checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" 2625 + dependencies = [ 2626 + "futures-channel", 2627 + "futures-executor", 2628 + "futures-util", 2629 + "opentelemetry", 2630 + "percent-encoding", 2631 + "rand 0.9.1", 2632 + "thiserror 2.0.12", 2633 + ] 2634 + 2635 + [[package]] 2516 2636 name = "overload" 2517 2637 version = "0.1.1" 2518 2638 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2549 2669 "async-recursion", 2550 2670 "axum", 2551 2671 "axum-extra", 2672 + "axum-tracing-opentelemetry", 2552 2673 "base64 0.22.1", 2553 2674 "chrono", 2554 2675 "dataloader", ··· 2563 2684 "jsonwebtoken", 2564 2685 "lexica", 2565 2686 "multibase", 2687 + "opentelemetry", 2688 + "opentelemetry-otlp", 2689 + "opentelemetry_sdk", 2566 2690 "parakeet-db", 2567 2691 "parakeet-index", 2568 2692 "redis", ··· 2571 2695 "serde_ipld_dagcbor", 2572 2696 "serde_json", 2573 2697 "tokio", 2698 + "tower", 2574 2699 "tower-http", 2575 2700 "tracing", 2701 + "tracing-opentelemetry", 2576 2702 "tracing-subscriber", 2577 2703 ] 2578 2704 ··· 2594 2720 "eyre", 2595 2721 "figment", 2596 2722 "itertools 0.14.0", 2597 - "prost", 2723 + "opentelemetry", 2724 + "opentelemetry-otlp", 2725 + "opentelemetry_sdk", 2726 + "prost 0.13.5", 2598 2727 "rocksdb", 2599 2728 "serde", 2600 2729 "tokio", 2601 - "tonic", 2730 + "tonic 0.13.1", 2602 2731 "tonic-build", 2603 2732 "tonic-health", 2733 + "tonic-tracing-opentelemetry", 2734 + "tower", 2604 2735 "tracing", 2736 + "tracing-opentelemetry", 2605 2737 "tracing-subscriber", 2606 2738 ] 2607 2739 ··· 2907 3039 checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" 2908 3040 dependencies = [ 2909 3041 "bytes", 2910 - "prost-derive", 3042 + "prost-derive 0.13.5", 3043 + ] 3044 + 3045 + [[package]] 3046 + name = "prost" 3047 + version = "0.14.1" 3048 + source = "registry+https://github.com/rust-lang/crates.io-index" 3049 + checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" 3050 + dependencies = [ 3051 + "bytes", 3052 + "prost-derive 0.14.1", 2911 3053 ] 2912 3054 2913 3055 [[package]] ··· 2923 3065 "once_cell", 2924 3066 "petgraph", 2925 3067 "prettyplease", 2926 - "prost", 3068 + "prost 0.13.5", 2927 3069 "prost-types", 2928 3070 "regex", 2929 3071 "syn", ··· 2944 3086 ] 2945 3087 2946 3088 [[package]] 3089 + name = "prost-derive" 3090 + version = "0.14.1" 3091 + source = "registry+https://github.com/rust-lang/crates.io-index" 3092 + checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" 3093 + dependencies = [ 3094 + "anyhow", 3095 + "itertools 0.14.0", 3096 + "proc-macro2", 3097 + "quote", 3098 + "syn", 3099 + ] 3100 + 3101 + [[package]] 2947 3102 name = "prost-types" 2948 3103 version = "0.13.5" 2949 3104 source = "registry+https://github.com/rust-lang/crates.io-index" 2950 3105 checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" 2951 3106 dependencies = [ 2952 - "prost", 3107 + "prost 0.13.5", 2953 3108 ] 2954 3109 2955 3110 [[package]] ··· 2974 3129 checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" 2975 3130 2976 3131 [[package]] 3132 + name = "quinn" 3133 + version = "0.11.9" 3134 + source = "registry+https://github.com/rust-lang/crates.io-index" 3135 + checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" 3136 + dependencies = [ 3137 + "bytes", 3138 + "cfg_aliases", 3139 + "pin-project-lite", 3140 + "quinn-proto", 3141 + "quinn-udp", 3142 + "rustc-hash 2.1.1", 3143 + "rustls", 3144 + "socket2 0.5.8", 3145 + "thiserror 2.0.12", 3146 + "tokio", 3147 + "tracing", 3148 + "web-time", 3149 + ] 3150 + 3151 + [[package]] 3152 + name = "quinn-proto" 3153 + version = "0.11.13" 3154 + source = "registry+https://github.com/rust-lang/crates.io-index" 3155 + checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" 3156 + dependencies = [ 3157 + "bytes", 3158 + "getrandom 0.3.3", 3159 + "lru-slab", 3160 + "rand 0.9.1", 3161 + "ring", 3162 + "rustc-hash 2.1.1", 3163 + "rustls", 3164 + "rustls-pki-types", 3165 + "slab", 3166 + "thiserror 2.0.12", 3167 + "tinyvec", 3168 + "tracing", 3169 + "web-time", 3170 + ] 3171 + 3172 + [[package]] 3173 + name = "quinn-udp" 3174 + version = "0.5.14" 3175 + source = "registry+https://github.com/rust-lang/crates.io-index" 3176 + checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" 3177 + dependencies = [ 3178 + "cfg_aliases", 3179 + "libc", 3180 + "once_cell", 3181 + "socket2 0.5.8", 3182 + "tracing", 3183 + "windows-sys 0.52.0", 3184 + ] 3185 + 3186 + [[package]] 2977 3187 name = "quote" 2978 3188 version = "1.0.38" 2979 3189 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3135 3345 dependencies = [ 3136 3346 "aho-corasick", 3137 3347 "memchr", 3138 - "regex-automata", 3139 - "regex-syntax", 3348 + "regex-automata 0.4.9", 3349 + "regex-syntax 0.8.5", 3350 + ] 3351 + 3352 + [[package]] 3353 + name = "regex-automata" 3354 + version = "0.1.10" 3355 + source = "registry+https://github.com/rust-lang/crates.io-index" 3356 + checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" 3357 + dependencies = [ 3358 + "regex-syntax 0.6.29", 3140 3359 ] 3141 3360 3142 3361 [[package]] ··· 3147 3366 dependencies = [ 3148 3367 "aho-corasick", 3149 3368 "memchr", 3150 - "regex-syntax", 3369 + "regex-syntax 0.8.5", 3151 3370 ] 3152 3371 3153 3372 [[package]] 3154 3373 name = "regex-syntax" 3374 + version = "0.6.29" 3375 + source = "registry+https://github.com/rust-lang/crates.io-index" 3376 + checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" 3377 + 3378 + [[package]] 3379 + name = "regex-syntax" 3155 3380 version = "0.8.5" 3156 3381 source = "registry+https://github.com/rust-lang/crates.io-index" 3157 3382 checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" ··· 3166 3391 "base64 0.22.1", 3167 3392 "bytes", 3168 3393 "encoding_rs", 3394 + "futures-channel", 3169 3395 "futures-core", 3170 3396 "futures-util", 3171 3397 "h2", ··· 3184 3410 "once_cell", 3185 3411 "percent-encoding", 3186 3412 "pin-project-lite", 3413 + "quinn", 3414 + "rustls", 3415 + "rustls-native-certs", 3187 3416 "rustls-pemfile", 3417 + "rustls-pki-types", 3188 3418 "serde", 3189 3419 "serde_json", 3190 3420 "serde_urlencoded", ··· 3192 3422 "system-configuration", 3193 3423 "tokio", 3194 3424 "tokio-native-tls", 3425 + "tokio-rustls", 3195 3426 "tokio-util", 3196 3427 "tower", 3197 3428 "tower-service", ··· 3281 3512 checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" 3282 3513 3283 3514 [[package]] 3515 + name = "rustc-hash" 3516 + version = "2.1.1" 3517 + source = "registry+https://github.com/rust-lang/crates.io-index" 3518 + checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" 3519 + 3520 + [[package]] 3284 3521 name = "rustc_version" 3285 3522 version = "0.4.1" 3286 3523 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3310 3547 dependencies = [ 3311 3548 "aws-lc-rs", 3312 3549 "once_cell", 3550 + "ring", 3313 3551 "rustls-pki-types", 3314 3552 "rustls-webpki", 3315 3553 "subtle", ··· 3342 3580 version = "1.11.0" 3343 3581 source = "registry+https://github.com/rust-lang/crates.io-index" 3344 3582 checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c" 3583 + dependencies = [ 3584 + "web-time", 3585 + ] 3345 3586 3346 3587 [[package]] 3347 3588 name = "rustls-webpki" ··· 4185 4426 "hyper-util", 4186 4427 "percent-encoding", 4187 4428 "pin-project", 4188 - "prost", 4429 + "prost 0.13.5", 4189 4430 "socket2 0.5.8", 4190 4431 "tokio", 4191 4432 "tokio-stream", ··· 4196 4437 ] 4197 4438 4198 4439 [[package]] 4440 + name = "tonic" 4441 + version = "0.14.2" 4442 + source = "registry+https://github.com/rust-lang/crates.io-index" 4443 + checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203" 4444 + dependencies = [ 4445 + "async-trait", 4446 + "base64 0.22.1", 4447 + "bytes", 4448 + "http", 4449 + "http-body", 4450 + "http-body-util", 4451 + "percent-encoding", 4452 + "pin-project", 4453 + "sync_wrapper", 4454 + "tokio-stream", 4455 + "tower-layer", 4456 + "tower-service", 4457 + "tracing", 4458 + ] 4459 + 4460 + [[package]] 4199 4461 name = "tonic-build" 4200 4462 version = "0.13.0" 4201 4463 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 4215 4477 source = "registry+https://github.com/rust-lang/crates.io-index" 4216 4478 checksum = "cb87334d340313fefa513b6e60794d44a86d5f039b523229c99c323e4e19ca4b" 4217 4479 dependencies = [ 4218 - "prost", 4480 + "prost 0.13.5", 4219 4481 "tokio", 4220 4482 "tokio-stream", 4221 - "tonic", 4483 + "tonic 0.13.1", 4484 + ] 4485 + 4486 + [[package]] 4487 + name = "tonic-prost" 4488 + version = "0.14.2" 4489 + source = "registry+https://github.com/rust-lang/crates.io-index" 4490 + checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67" 4491 + dependencies = [ 4492 + "bytes", 4493 + "prost 0.14.1", 4494 + "tonic 0.14.2", 4495 + ] 4496 + 4497 + [[package]] 4498 + name = "tonic-tracing-opentelemetry" 4499 + version = "0.32.0" 4500 + source = "registry+https://github.com/rust-lang/crates.io-index" 4501 + checksum = "31f57ac46b32b08989476b498239364c300b09d75928c1fa2e46cb489a41c8e3" 4502 + dependencies = [ 4503 + "futures-core", 4504 + "futures-util", 4505 + "http", 4506 + "http-body", 4507 + "hyper", 4508 + "opentelemetry", 4509 + "pin-project-lite", 4510 + "tonic 0.14.2", 4511 + "tower", 4512 + "tracing", 4513 + "tracing-opentelemetry", 4514 + "tracing-opentelemetry-instrumentation-sdk", 4222 4515 ] 4223 4516 4224 4517 [[package]] ··· 4313 4606 ] 4314 4607 4315 4608 [[package]] 4609 + name = "tracing-opentelemetry" 4610 + version = "0.32.0" 4611 + source = "registry+https://github.com/rust-lang/crates.io-index" 4612 + checksum = "1e6e5658463dd88089aba75c7791e1d3120633b1bfde22478b28f625a9bb1b8e" 4613 + dependencies = [ 4614 + "js-sys", 4615 + "opentelemetry", 4616 + "opentelemetry_sdk", 4617 + "rustversion", 4618 + "smallvec", 4619 + "thiserror 2.0.12", 4620 + "tracing", 4621 + "tracing-core", 4622 + "tracing-log", 4623 + "tracing-subscriber", 4624 + "web-time", 4625 + ] 4626 + 4627 + [[package]] 4628 + name = "tracing-opentelemetry-instrumentation-sdk" 4629 + version = "0.32.1" 4630 + source = "registry+https://github.com/rust-lang/crates.io-index" 4631 + checksum = "7a1a4dcfb798af2cef9e47c30a14e13c108b4b40e057120401b2025ec622c416" 4632 + dependencies = [ 4633 + "http", 4634 + "opentelemetry", 4635 + "opentelemetry-semantic-conventions", 4636 + "tracing", 4637 + "tracing-opentelemetry", 4638 + ] 4639 + 4640 + [[package]] 4316 4641 name = "tracing-subscriber" 4317 4642 version = "0.3.19" 4318 4643 source = "registry+https://github.com/rust-lang/crates.io-index" 4319 4644 checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" 4320 4645 dependencies = [ 4646 + "matchers", 4321 4647 "nu-ansi-term", 4648 + "once_cell", 4649 + "regex", 4322 4650 "sharded-slab", 4323 4651 "smallvec", 4324 4652 "thread_local", 4653 + "tracing", 4325 4654 "tracing-core", 4326 4655 "tracing-log", 4327 4656 ] ··· 4592 4921 version = "0.3.77" 4593 4922 source = "registry+https://github.com/rust-lang/crates.io-index" 4594 4923 checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2" 4924 + dependencies = [ 4925 + "js-sys", 4926 + "wasm-bindgen", 4927 + ] 4928 + 4929 + [[package]] 4930 + name = "web-time" 4931 + version = "1.1.0" 4932 + source = "registry+https://github.com/rust-lang/crates.io-index" 4933 + checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" 4595 4934 dependencies = [ 4596 4935 "js-sys", 4597 4936 "wasm-bindgen",
+24 -2
parakeet-index/Cargo.toml
··· 10 10 [dependencies] 11 11 tonic = "0.13.0" 12 12 prost = "0.13.5" 13 + tonic-tracing-opentelemetry = { version = "0.32", optional = true } 14 + tower = { version = "0.5", optional = true } 13 15 14 16 eyre = { version = "0.6.12", optional = true } 15 17 figment = { version = "0.10.19", features = ["env", "toml"], optional = true } 16 18 itertools = { version = "0.14.0", optional = true } 19 + opentelemetry = { version = "0.31.0", optional = true } 20 + opentelemetry-otlp = { version = "0.31.0", features = ["reqwest-rustls"], optional = true } 21 + opentelemetry_sdk = { version = "0.31.0", optional = true } 17 22 rocksdb = { version = "0.23", default-features = false, features = ["lz4", "bindgen-runtime"], optional = true } 18 23 serde = { version = "1.0.217", features = ["derive"], optional = true } 19 24 tokio = { version = "1.42.0", features = ["full"], optional = true } 20 25 tonic-health = { version = "0.13.0", optional = true } 21 26 tracing = { version = "0.1.40", optional = true } 22 - tracing-subscriber = { version = "0.3.18", optional = true } 27 + tracing-subscriber = { version = "0.3.18", features = ["env-filter"], optional = true } 28 + tracing-opentelemetry = { version = "0.32", optional = true } 23 29 24 30 [build-dependencies] 25 31 tonic-build = "0.13.0" 26 32 27 33 [features] 28 - server = ["dep:eyre", "dep:figment", "dep:itertools", "dep:rocksdb", "dep:serde", "dep:tokio", "dep:tonic-health", "dep:tracing", "dep:tracing-subscriber"] 34 + otel = ["dep:tonic-tracing-opentelemetry", "dep:tower"] 35 + server = [ 36 + "dep:eyre", 37 + "dep:figment", 38 + "dep:itertools", 39 + "dep:opentelemetry", 40 + "dep:opentelemetry-otlp", 41 + "dep:opentelemetry_sdk", 42 + "dep:rocksdb", 43 + "dep:serde", 44 + "dep:tokio", 45 + "dep:tonic-health", 46 + "otel", 47 + "dep:tracing", 48 + "dep:tracing-subscriber", 49 + "dep:tracing-opentelemetry" 50 + ]
+20 -1
parakeet-index/src/lib.rs
··· 1 + use tonic::transport::Channel; 2 + 1 3 #[allow(clippy::all)] 2 4 pub mod index { 3 5 tonic::include_proto!("parakeet"); 4 6 } 5 7 6 8 pub use index::*; 7 - pub type Client = index_client::IndexClient<tonic::transport::Channel>; 9 + #[cfg(not(feature = "otel"))] 10 + pub type Client = index_client::IndexClient<Channel>; 11 + #[cfg(feature = "otel")] 12 + pub type Client = index_client::IndexClient< 13 + tonic_tracing_opentelemetry::middleware::client::OtelGrpcService<Channel>, 14 + >; 8 15 9 16 #[cfg(feature = "server")] 10 17 pub mod server; 18 + 19 + #[cfg(feature = "otel")] 20 + pub async fn connect_with_otel( 21 + uri: String, 22 + ) -> Result<Client, Box<dyn std::error::Error + Send + Sync>> { 23 + let channel = Channel::from_shared(uri)?.connect().await?; 24 + let channel = tower::ServiceBuilder::new() 25 + .layer(tonic_tracing_opentelemetry::middleware::client::OtelGrpcLayer) 26 + .service(channel); 27 + 28 + Ok(index_client::IndexClient::new(channel)) 29 + }
+9 -3
parakeet-index/src/main.rs
··· 1 1 use parakeet_index::index_server::IndexServer; 2 2 use parakeet_index::server::service::Service; 3 - use parakeet_index::server::{GlobalState, config}; 3 + use parakeet_index::server::{GlobalState, config, instrumentation}; 4 4 use std::sync::Arc; 5 5 use tonic::transport::Server; 6 + use tonic_tracing_opentelemetry::middleware::server::OtelGrpcLayer; 6 7 7 8 #[tokio::main] 8 9 async fn main() -> eyre::Result<()> { 9 - tracing_subscriber::fmt::init(); 10 - 11 10 let conf = config::load_config()?; 12 11 12 + instrumentation::init_instruments(conf.otel_enable); 13 + 13 14 let db_root = conf.index_db_path.parse()?; 14 15 let addr = std::net::SocketAddr::new(conf.server.bind_address.parse()?, conf.server.port); 15 16 let state = Arc::new(GlobalState::new(db_root)?); ··· 18 19 reporter.set_serving::<IndexServer<Service>>().await; 19 20 20 21 let service = Service::new(state.clone()); 22 + 23 + let mw = tower::ServiceBuilder::new() 24 + .option_layer(conf.otel_enable.then(OtelGrpcLayer::default)); 25 + 21 26 Server::builder() 27 + .layer(mw) 22 28 .add_service(health_service) 23 29 .add_service(IndexServer::new(service)) 24 30 .serve(addr)
+2
parakeet-index/src/server/config.rs
··· 13 13 14 14 #[derive(Debug, Deserialize)] 15 15 pub struct Config { 16 + #[serde(default)] 17 + pub otel_enable: bool, 16 18 pub database_url: String, 17 19 pub index_db_path: String, 18 20 #[serde(default)]
+45
parakeet-index/src/server/instrumentation.rs
··· 1 + use opentelemetry::trace::TracerProvider; 2 + use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig}; 3 + use opentelemetry_sdk::trace::{Sampler, SdkTracer, SdkTracerProvider}; 4 + use tracing::Subscriber; 5 + use tracing_opentelemetry::OpenTelemetryLayer; 6 + use tracing_subscriber::filter::Filtered; 7 + use tracing_subscriber::layer::SubscriberExt; 8 + use tracing_subscriber::registry::LookupSpan; 9 + use tracing_subscriber::util::SubscriberInitExt; 10 + use tracing_subscriber::{EnvFilter, Layer}; 11 + 12 + pub fn init_instruments(otel_enable: bool) { 13 + let otel_layer = otel_enable.then(init_otel); 14 + 15 + let stdout_filter = 16 + EnvFilter::from_default_env().add_directive("otel::tracing=off".parse().unwrap()); 17 + 18 + tracing_subscriber::registry() 19 + .with(tracing_subscriber::fmt::layer().with_filter(stdout_filter)) 20 + .with(otel_layer) 21 + .init(); 22 + } 23 + 24 + fn init_otel<S>() -> Filtered<OpenTelemetryLayer<S, SdkTracer>, EnvFilter, S> 25 + where 26 + S: Subscriber + for<'span> LookupSpan<'span>, 27 + { 28 + let span_exporter = SpanExporter::builder() 29 + .with_http() 30 + .with_protocol(Protocol::HttpBinary) 31 + .build() 32 + .unwrap(); 33 + 34 + let tracer_provider = SdkTracerProvider::builder() 35 + .with_batch_exporter(span_exporter) 36 + .with_sampler(Sampler::AlwaysOn) 37 + .build(); 38 + 39 + opentelemetry::global::set_tracer_provider(tracer_provider.clone()); 40 + 41 + let tracer = tracer_provider.tracer("parakeet"); 42 + let otel_filter = EnvFilter::new("info,otel::tracing=trace"); 43 + 44 + OpenTelemetryLayer::new(tracer).with_filter(otel_filter) 45 + }
+1
parakeet-index/src/server/mod.rs
··· 2 2 3 3 pub mod config; 4 4 pub mod db; 5 + pub mod instrumentation; 5 6 pub mod service; 6 7 mod utils; 7 8
+8 -2
parakeet/Cargo.toml
··· 6 6 [dependencies] 7 7 async-recursion = "1.1.1" 8 8 axum = { version = "0.8", features = ["json"] } 9 + axum-tracing-opentelemetry = "0.32" 9 10 axum-extra = { version = "0.10.0", features = ["query", "typed-header"] } 10 11 base64 = "0.22" 11 12 chrono = { version = "0.4.39", features = ["serde"] } ··· 21 22 jsonwebtoken = { git = "https://gitlab.com/parakeet-social/jsonwebtoken", branch = "es256k" } 22 23 lexica = { path = "../lexica" } 23 24 multibase = "0.9.1" 25 + opentelemetry = "0.31.0" 26 + opentelemetry-otlp = "0.31.0" 27 + opentelemetry_sdk = "0.31.0" 24 28 parakeet-db = { path = "../parakeet-db" } 25 - parakeet-index = { path = "../parakeet-index" } 29 + parakeet-index = { path = "../parakeet-index", features = ["otel"] } 26 30 redis = { version = "0.32", features = ["tokio-native-tls-comp"] } 27 31 reqwest = { version = "0.12", features = ["json"] } 28 32 serde = { version = "1.0.217", features = ["derive"] } 29 33 serde_ipld_dagcbor = "0.6.1" 30 34 serde_json = "1.0.134" 31 35 tokio = { version = "1.42.0", features = ["full"] } 36 + tower = "0.5" 32 37 tower-http = { version = "0.6.2", features = ["cors", "trace"] } 33 38 tracing = "0.1.40" 34 - tracing-subscriber = "0.3.18" 39 + tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } 40 + tracing-opentelemetry = "0.32"
+2
parakeet/src/config.rs
··· 13 13 14 14 #[derive(Debug, Deserialize)] 15 15 pub struct Config { 16 + #[serde(default)] 17 + pub otel_enable: bool, 16 18 pub index_uri: String, 17 19 pub database_url: String, 18 20 pub redis_uri: String,
+20
parakeet/src/db.rs
··· 1 1 use diesel::prelude::*; 2 2 use diesel::sql_types::{Array, Bool, Integer, Nullable, Text}; 3 3 use diesel_async::{AsyncPgConnection, RunQueryDsl}; 4 + use tracing::instrument; 4 5 use parakeet_db::{schema, types}; 5 6 use parakeet_db::models::TextArray; 6 7 8 + #[instrument(skip_all)] 7 9 pub async fn get_actor_status( 8 10 conn: &mut AsyncPgConnection, 9 11 did: &str, ··· 38 40 #[diesel(sql_type = Nullable<Text>)] 39 41 pub list_mute: Option<String>, 40 42 } 43 + 44 + #[instrument(skip_all)] 41 45 pub async fn get_profile_state( 42 46 conn: &mut AsyncPgConnection, 43 47 did: &str, ··· 50 54 .await 51 55 .optional() 52 56 } 57 + 58 + #[instrument(skip_all)] 53 59 pub async fn get_profile_states( 54 60 conn: &mut AsyncPgConnection, 55 61 did: &str, ··· 84 90 #[diesel(sql_type = diesel::sql_types::Bool)] 85 91 pub pinned: bool, 86 92 } 93 + 94 + #[instrument(skip_all)] 87 95 pub async fn get_post_state( 88 96 conn: &mut AsyncPgConnection, 89 97 did: &str, ··· 97 105 .optional() 98 106 } 99 107 108 + #[instrument(skip_all)] 100 109 pub async fn get_post_states( 101 110 conn: &mut AsyncPgConnection, 102 111 did: &str, ··· 120 129 pub block: Option<String>, 121 130 } 122 131 132 + #[instrument(skip_all)] 123 133 pub async fn get_list_state( 124 134 conn: &mut AsyncPgConnection, 125 135 did: &str, ··· 133 143 .optional() 134 144 } 135 145 146 + #[instrument(skip_all)] 136 147 pub async fn get_list_states( 137 148 conn: &mut AsyncPgConnection, 138 149 did: &str, ··· 145 156 .await 146 157 } 147 158 159 + #[instrument(skip_all)] 148 160 pub async fn get_like_state( 149 161 conn: &mut AsyncPgConnection, 150 162 did: &str, ··· 162 174 .optional() 163 175 } 164 176 177 + #[instrument(skip_all)] 165 178 pub async fn get_like_states( 166 179 conn: &mut AsyncPgConnection, 167 180 did: &str, ··· 182 195 .await 183 196 } 184 197 198 + #[instrument(skip_all)] 185 199 pub async fn get_pinned_post_uri( 186 200 conn: &mut AsyncPgConnection, 187 201 did: &str, ··· 212 226 pub depth: i32, 213 227 } 214 228 229 + #[instrument(skip_all)] 215 230 pub async fn get_thread_children( 216 231 conn: &mut AsyncPgConnection, 217 232 uri: &str, ··· 224 239 .await 225 240 } 226 241 242 + #[instrument(skip_all)] 227 243 pub async fn get_thread_children_branching( 228 244 conn: &mut AsyncPgConnection, 229 245 uri: &str, ··· 245 261 pub at_uri: String, 246 262 } 247 263 264 + #[instrument(skip_all)] 248 265 pub async fn get_thread_children_hidden( 249 266 conn: &mut AsyncPgConnection, 250 267 uri: &str, ··· 257 274 .await 258 275 } 259 276 277 + #[instrument(skip_all)] 260 278 pub async fn get_thread_parents( 261 279 conn: &mut AsyncPgConnection, 262 280 uri: &str, ··· 269 287 .await 270 288 } 271 289 290 + #[instrument(skip_all)] 272 291 pub async fn get_root_post(conn: &mut AsyncPgConnection, uri: &str) -> QueryResult<Option<String>> { 273 292 schema::posts::table 274 293 .select(schema::posts::root_uri) ··· 279 298 .map(|v| v.flatten()) 280 299 } 281 300 301 + #[instrument(skip_all)] 282 302 pub async fn get_threadgate_hiddens( 283 303 conn: &mut AsyncPgConnection, 284 304 uri: &str,
+3
parakeet/src/hydration/embed.rs
··· 8 8 use lexica::app_bsky::feed::PostView; 9 9 use parakeet_db::models; 10 10 use std::collections::HashMap; 11 + use tracing::instrument; 11 12 12 13 fn build_aspect_ratio(height: Option<i32>, width: Option<i32>) -> Option<AspectRatio> { 13 14 height ··· 176 177 out 177 178 } 178 179 180 + #[instrument(skip_all)] 179 181 pub async fn hydrate_embed(&self, post: String) -> Option<Embed> { 180 182 let (embed, author) = self.loaders.embed.load(post).await?; 181 183 ··· 195 197 } 196 198 } 197 199 200 + #[instrument(skip_all)] 198 201 pub async fn hydrate_embeds(&self, posts: Vec<String>) -> HashMap<String, Embed> { 199 202 let embeds = self.loaders.embed.load_many(posts).await; 200 203
+5
parakeet/src/hydration/feedgen.rs
··· 5 5 use parakeet_db::models; 6 6 use std::collections::HashMap; 7 7 use std::str::FromStr; 8 + use tracing::instrument; 8 9 9 10 fn build_viewer((did, rkey): (String, String)) -> GeneratorViewerState { 10 11 GeneratorViewerState { ··· 49 50 } 50 51 51 52 impl super::StatefulHydrator<'_> { 53 + #[instrument(skip_all)] 52 54 pub async fn hydrate_feedgen(&self, feedgen: String) -> Option<GeneratorView> { 53 55 let labels = self.get_label(&feedgen).await; 54 56 let viewer = self.get_feedgen_viewer_state(&feedgen).await; ··· 61 63 )) 62 64 } 63 65 66 + #[instrument(skip_all)] 64 67 pub async fn hydrate_feedgens(&self, feedgens: Vec<String>) -> HashMap<String, GeneratorView> { 65 68 let labels = self.get_label_many(&feedgens).await; 66 69 let viewers = self.get_feedgen_viewer_states(&feedgens).await; ··· 90 93 .collect() 91 94 } 92 95 96 + #[instrument(skip_all)] 93 97 async fn get_feedgen_viewer_state(&self, subject: &str) -> Option<GeneratorViewerState> { 94 98 if let Some(viewer) = &self.current_actor { 95 99 let data = self.loaders.like_state.get(viewer, subject).await?; ··· 100 104 } 101 105 } 102 106 107 + #[instrument(skip_all)] 103 108 async fn get_feedgen_viewer_states( 104 109 &self, 105 110 subjects: &[String],
+7
parakeet/src/hydration/labeler.rs
··· 8 8 use parakeet_db::models; 9 9 use std::collections::HashMap; 10 10 use std::str::FromStr; 11 + use tracing::instrument; 11 12 12 13 fn build_viewer((did, rkey): (String, String)) -> LabelerViewerState { 13 14 LabelerViewerState { ··· 98 99 } 99 100 100 101 impl StatefulHydrator<'_> { 102 + #[instrument(skip_all)] 101 103 pub async fn hydrate_labeler(&self, labeler: String) -> Option<LabelerView> { 102 104 let labels = self.get_label(&labeler).await; 103 105 let viewer = self.get_labeler_viewer_state(&labeler).await; ··· 108 110 Some(build_view(labeler, creator, labels, viewer, likes)) 109 111 } 110 112 113 + #[instrument(skip_all)] 111 114 pub async fn hydrate_labelers(&self, labelers: Vec<String>) -> HashMap<String, LabelerView> { 112 115 let labels = self.get_label_many(&labelers).await; 113 116 let labelers = self.loaders.labeler.load_many(labelers).await; ··· 133 136 .collect() 134 137 } 135 138 139 + #[instrument(skip_all)] 136 140 pub async fn hydrate_labeler_detailed(&self, labeler: String) -> Option<LabelerViewDetailed> { 137 141 let labels = self.get_label(&labeler).await; 138 142 let viewer = self.get_labeler_viewer_state(&labeler).await; ··· 145 149 )) 146 150 } 147 151 152 + #[instrument(skip_all)] 148 153 pub async fn hydrate_labelers_detailed( 149 154 &self, 150 155 labelers: Vec<String>, ··· 175 180 .collect() 176 181 } 177 182 183 + #[instrument(skip_all)] 178 184 async fn get_labeler_viewer_state(&self, subject: &str) -> Option<LabelerViewerState> { 179 185 if let Some(viewer) = &self.current_actor { 180 186 let data = self ··· 189 195 } 190 196 } 191 197 198 + #[instrument(skip_all)] 192 199 async fn get_labeler_viewer_states( 193 200 &self, 194 201 subjects: &[String],
+7
parakeet/src/hydration/list.rs
··· 6 6 use parakeet_db::models; 7 7 use std::collections::HashMap; 8 8 use std::str::FromStr; 9 + use tracing::instrument; 9 10 10 11 fn build_viewer(data: ListStateRet) -> ListViewerState { 11 12 ListViewerState { ··· 69 70 } 70 71 71 72 impl StatefulHydrator<'_> { 73 + #[instrument(skip_all)] 72 74 pub async fn hydrate_list_basic(&self, list: String) -> Option<ListViewBasic> { 73 75 let labels = self.get_label(&list).await; 74 76 let viewer = self.get_list_viewer_state(&list).await; ··· 77 79 build_basic(list, count, labels, viewer, &self.cdn) 78 80 } 79 81 82 + #[instrument(skip_all)] 80 83 pub async fn hydrate_lists_basic(&self, lists: Vec<String>) -> HashMap<String, ListViewBasic> { 81 84 if lists.is_empty() { 82 85 return HashMap::new(); ··· 97 100 .collect() 98 101 } 99 102 103 + #[instrument(skip_all)] 100 104 pub async fn hydrate_list(&self, list: String) -> Option<ListView> { 101 105 let labels = self.get_label(&list).await; 102 106 let viewer = self.get_list_viewer_state(&list).await; ··· 106 110 build_listview(list, count, profile, labels, viewer, &self.cdn) 107 111 } 108 112 113 + #[instrument(skip_all)] 109 114 pub async fn hydrate_lists(&self, lists: Vec<String>) -> HashMap<String, ListView> { 110 115 if lists.is_empty() { 111 116 return HashMap::new(); ··· 131 136 .collect() 132 137 } 133 138 139 + #[instrument(skip_all)] 134 140 async fn get_list_viewer_state(&self, subject: &str) -> Option<ListViewerState> { 135 141 if let Some(viewer) = &self.current_actor { 136 142 let data = self.loaders.list_state.get(viewer, subject).await?; ··· 141 147 } 142 148 } 143 149 150 + #[instrument(skip_all)] 144 151 async fn get_list_viewer_states( 145 152 &self, 146 153 subjects: &[String],
+4
parakeet/src/hydration/mod.rs
··· 63 63 } 64 64 } 65 65 66 + #[tracing::instrument(skip_all)] 66 67 async fn get_label(&self, uri: &str) -> Vec<parakeet_db::models::Label> { 67 68 self.loaders.label.load(uri, self.accept_labelers).await 68 69 } 69 70 71 + #[tracing::instrument(skip_all)] 70 72 async fn get_profile_label(&self, did: &str) -> Vec<parakeet_db::models::Label> { 71 73 let uris = &[ 72 74 did.to_string(), ··· 80 82 .collect() 81 83 } 82 84 85 + #[tracing::instrument(skip_all)] 83 86 async fn get_label_many( 84 87 &self, 85 88 uris: &[String], ··· 90 93 .await 91 94 } 92 95 96 + #[tracing::instrument(skip_all)] 93 97 async fn get_profile_label_many( 94 98 &self, 95 99 uris: &[String],
+9
parakeet/src/hydration/posts.rs
··· 11 11 use parakeet_db::models; 12 12 use parakeet_index::PostStats; 13 13 use std::collections::HashMap; 14 + use tracing::instrument; 14 15 15 16 fn build_viewer(did: &str, data: PostStateRet) -> PostViewerState { 16 17 let is_me = did == data.did; ··· 82 83 } 83 84 84 85 impl StatefulHydrator<'_> { 86 + #[instrument(skip_all)] 85 87 async fn hydrate_threadgate( 86 88 &self, 87 89 threadgate: Option<models::Threadgate>, ··· 100 102 )) 101 103 } 102 104 105 + #[instrument(skip_all)] 103 106 async fn hydrate_threadgates( 104 107 &self, 105 108 threadgates: Vec<models::Threadgate>, ··· 131 134 .collect() 132 135 } 133 136 137 + #[instrument(skip_all)] 134 138 pub async fn hydrate_post(&self, post: String) -> Option<PostView> { 135 139 let stats = self.loaders.post_stats.load(post.clone()).await; 136 140 let (post, threadgate) = self.loaders.posts.load(post).await?; ··· 145 149 ))) 146 150 } 147 151 152 + #[instrument(skip_all)] 148 153 async fn hydrate_posts_inner(&self, posts: Vec<String>) -> HashMap<String, HydratePostsRet> { 149 154 let stats = self.loaders.post_stats.load_many(posts.clone()).await; 150 155 let posts = self.loaders.posts.load_many(posts).await; ··· 184 189 .collect() 185 190 } 186 191 192 + #[instrument(skip_all)] 187 193 pub async fn hydrate_posts(&self, posts: Vec<String>) -> HashMap<String, PostView> { 188 194 self.hydrate_posts_inner(posts) 189 195 .await ··· 192 198 .collect() 193 199 } 194 200 201 + #[instrument(skip_all)] 195 202 pub async fn hydrate_feed_posts( 196 203 &self, 197 204 posts: Vec<RawFeedItem>, ··· 295 302 .collect() 296 303 } 297 304 305 + #[instrument(skip_all)] 298 306 async fn get_post_viewer_state(&self, subject: &str) -> Option<PostViewerState> { 299 307 if let Some(viewer) = &self.current_actor { 300 308 let data = self.loaders.post_state.get(viewer, subject).await?; ··· 305 313 } 306 314 } 307 315 316 + #[instrument(skip_all)] 308 317 async fn get_post_viewer_states( 309 318 &self, 310 319 subjects: &[String],
+9
parakeet/src/hydration/profile.rs
··· 12 12 use std::collections::HashMap; 13 13 use std::str::FromStr; 14 14 use std::sync::OnceLock; 15 + use tracing::instrument; 15 16 16 17 pub static TRUSTED_VERIFIERS: OnceLock<Vec<String>> = OnceLock::new(); 17 18 ··· 274 275 } 275 276 276 277 impl super::StatefulHydrator<'_> { 278 + #[instrument(skip_all)] 277 279 pub async fn hydrate_profile_basic(&self, did: String) -> Option<ProfileViewBasic> { 278 280 let labels = self.get_profile_label(&did).await; 279 281 let viewer = self.get_profile_viewer_state(&did).await; ··· 291 293 )) 292 294 } 293 295 296 + #[instrument(skip_all)] 294 297 pub async fn hydrate_profiles_basic( 295 298 &self, 296 299 dids: Vec<String>, ··· 315 318 .collect() 316 319 } 317 320 321 + #[instrument(skip_all)] 318 322 pub async fn hydrate_profile(&self, did: String) -> Option<ProfileView> { 319 323 let labels = self.get_profile_label(&did).await; 320 324 let viewer = self.get_profile_viewer_state(&did).await; ··· 332 336 )) 333 337 } 334 338 339 + #[instrument(skip_all)] 335 340 pub async fn hydrate_profiles(&self, dids: Vec<String>) -> HashMap<String, ProfileView> { 336 341 let labels = self.get_profile_label_many(&dids).await; 337 342 let viewers = self.get_profile_viewer_states(&dids).await; ··· 353 358 .collect() 354 359 } 355 360 361 + #[instrument(skip_all)] 356 362 pub async fn hydrate_profile_detailed(&self, did: String) -> Option<ProfileViewDetailed> { 357 363 let labels = self.get_profile_label(&did).await; 358 364 let viewer = self.get_profile_viewer_state(&did).await; ··· 370 376 )) 371 377 } 372 378 379 + #[instrument(skip_all)] 373 380 pub async fn hydrate_profiles_detailed( 374 381 &self, 375 382 dids: Vec<String>, ··· 394 401 .collect() 395 402 } 396 403 404 + #[instrument(skip_all)] 397 405 async fn get_profile_viewer_state(&self, subject: &str) -> Option<ProfileViewerState> { 398 406 if let Some(viewer) = &self.current_actor { 399 407 let data = self.loaders.profile_state.get(viewer, subject).await?; ··· 413 421 } 414 422 } 415 423 424 + #[instrument(skip_all)] 416 425 async fn get_profile_viewer_states( 417 426 &self, 418 427 dids: &[String],
+5
parakeet/src/hydration/starter_packs.rs
··· 4 4 use lexica::app_bsky::graph::{ListViewBasic, StarterPackView, StarterPackViewBasic}; 5 5 use parakeet_db::models; 6 6 use std::collections::HashMap; 7 + use tracing::instrument; 7 8 8 9 fn build_basic( 9 10 starter_pack: models::StaterPack, ··· 50 51 } 51 52 52 53 impl StatefulHydrator<'_> { 54 + #[instrument(skip_all)] 53 55 pub async fn hydrate_starterpack_basic(&self, pack: String) -> Option<StarterPackViewBasic> { 54 56 let labels = self.get_label(&pack).await; 55 57 let sp = self.loaders.starterpacks.load(pack).await?; ··· 59 61 Some(build_basic(sp, creator, labels, list_item_count)) 60 62 } 61 63 64 + #[instrument(skip_all)] 62 65 pub async fn hydrate_starterpacks_basic( 63 66 &self, 64 67 packs: Vec<String>, ··· 86 89 .collect() 87 90 } 88 91 92 + #[instrument(skip_all)] 89 93 pub async fn hydrate_starterpack(&self, pack: String) -> Option<StarterPackView> { 90 94 let labels = self.get_label(&pack).await; 91 95 let sp = self.loaders.starterpacks.load(pack).await?; ··· 102 106 Some(build_spview(sp, creator, labels, list, feeds)) 103 107 } 104 108 109 + #[instrument(skip_all)] 105 110 pub async fn hydrate_starterpacks( 106 111 &self, 107 112 packs: Vec<String>,
+45
parakeet/src/instrumentation.rs
··· 1 + use opentelemetry::trace::TracerProvider; 2 + use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig}; 3 + use opentelemetry_sdk::trace::{Sampler, SdkTracer, SdkTracerProvider}; 4 + use tracing::Subscriber; 5 + use tracing_opentelemetry::OpenTelemetryLayer; 6 + use tracing_subscriber::filter::Filtered; 7 + use tracing_subscriber::layer::SubscriberExt; 8 + use tracing_subscriber::registry::LookupSpan; 9 + use tracing_subscriber::util::SubscriberInitExt; 10 + use tracing_subscriber::{EnvFilter, Layer}; 11 + 12 + pub fn init_instruments(otel_enable: bool) { 13 + let otel_layer = otel_enable.then(init_otel); 14 + 15 + let stdout_filter = 16 + EnvFilter::from_default_env().add_directive("otel::tracing=off".parse().unwrap()); 17 + 18 + tracing_subscriber::registry() 19 + .with(tracing_subscriber::fmt::layer().with_filter(stdout_filter)) 20 + .with(otel_layer) 21 + .init(); 22 + } 23 + 24 + fn init_otel<S>() -> Filtered<OpenTelemetryLayer<S, SdkTracer>, EnvFilter, S> 25 + where 26 + S: Subscriber + for<'span> LookupSpan<'span>, 27 + { 28 + let span_exporter = SpanExporter::builder() 29 + .with_http() 30 + .with_protocol(Protocol::HttpBinary) 31 + .build() 32 + .unwrap(); 33 + 34 + let tracer_provider = SdkTracerProvider::builder() 35 + .with_batch_exporter(span_exporter) 36 + .with_sampler(Sampler::AlwaysOn) 37 + .build(); 38 + 39 + opentelemetry::global::set_tracer_provider(tracer_provider.clone()); 40 + 41 + let tracer = tracer_provider.tracer("parakeet"); 42 + let otel_filter = EnvFilter::new("info,otel::tracing=trace,tower_http=off"); 43 + 44 + OpenTelemetryLayer::new(tracer).with_filter(otel_filter) 45 + }
+23
parakeet/src/loaders.rs
··· 15 15 use serde::{Deserialize, Serialize}; 16 16 use std::collections::HashMap; 17 17 use std::str::FromStr; 18 + use tracing::instrument; 18 19 19 20 type CachingLoader<K, V, L> = Loader<K, V, L, PrefixedLoaderCache<V>>; 20 21 ··· 85 86 86 87 pub struct LikeLoader(parakeet_index::Client); 87 88 impl BatchFn<String, i32> for LikeLoader { 89 + #[instrument(name = "LikeLoader", skip_all)] 88 90 async fn load(&mut self, keys: &[String]) -> HashMap<String, i32> { 89 91 let res = self 90 92 .0 ··· 107 109 108 110 pub struct LikeRecordLoader(Pool<AsyncPgConnection>); 109 111 impl LikeRecordLoader { 112 + #[instrument(name = "LikeRecordLoader::get", skip_all)] 110 113 pub async fn get(&self, did: &str, subject: &str) -> Option<(String, String)> { 111 114 let mut conn = self.0.get().await.unwrap(); 112 115 ··· 118 121 }) 119 122 } 120 123 124 + #[instrument(name = "LikeRecordLoader::get_many", skip_all)] 121 125 pub async fn get_many( 122 126 &self, 123 127 did: &str, ··· 139 143 140 144 pub struct HandleLoader(Pool<AsyncPgConnection>); 141 145 impl BatchFn<String, String> for HandleLoader { 146 + #[instrument(name = "HandleLoader", skip_all)] 142 147 async fn load(&mut self, keys: &[String]) -> HashMap<String, String> { 143 148 let mut conn = self.0.get().await.unwrap(); 144 149 ··· 171 176 Option<ProfileAllowSubscriptions>, 172 177 ); 173 178 impl BatchFn<String, ProfileLoaderRet> for ProfileLoader { 179 + #[instrument(name = "ProfileLoader", skip_all)] 174 180 async fn load(&mut self, keys: &[String]) -> HashMap<String, ProfileLoaderRet> { 175 181 let mut conn = self.0.get().await.unwrap(); 176 182 ··· 232 238 233 239 pub struct ProfileStatsLoader(parakeet_index::Client); 234 240 impl BatchFn<String, parakeet_index::ProfileStats> for ProfileStatsLoader { 241 + #[instrument(name = "ProfileStatsLoader", skip_all)] 235 242 async fn load(&mut self, keys: &[String]) -> HashMap<String, parakeet_index::ProfileStats> { 236 243 let stats_req = parakeet_index::GetStatsManyReq { 237 244 uris: keys.to_vec(), ··· 248 255 249 256 pub struct ProfileStateLoader(Pool<AsyncPgConnection>); 250 257 impl ProfileStateLoader { 258 + #[instrument(name = "ProfileStateLoader::get", skip_all)] 251 259 pub async fn get(&self, did: &str, subject: &str) -> Option<db::ProfileStateRet> { 252 260 let mut conn = self.0.get().await.unwrap(); 253 261 ··· 259 267 }) 260 268 } 261 269 270 + #[instrument(name = "ProfileStateLoader::get_many", skip_all)] 262 271 pub async fn get_many( 263 272 &self, 264 273 did: &str, ··· 279 288 pub struct ListLoader(Pool<AsyncPgConnection>); 280 289 type ListLoaderRet = (models::List, i64); 281 290 impl BatchFn<String, ListLoaderRet> for ListLoader { 291 + #[instrument(name = "ListLoaderRet", skip_all)] 282 292 async fn load(&mut self, keys: &[String]) -> HashMap<String, ListLoaderRet> { 283 293 let mut conn = self.0.get().await.unwrap(); 284 294 ··· 310 320 311 321 pub struct ListStateLoader(Pool<AsyncPgConnection>); 312 322 impl ListStateLoader { 323 + #[instrument(name = "ListStateLoader::get", skip_all)] 313 324 pub async fn get(&self, did: &str, subject: &str) -> Option<db::ListStateRet> { 314 325 let mut conn = self.0.get().await.unwrap(); 315 326 ··· 321 332 }) 322 333 } 323 334 335 + #[instrument(name = "ListStateLoader::get_many", skip_all)] 324 336 pub async fn get_many( 325 337 &self, 326 338 did: &str, ··· 340 352 341 353 pub struct FeedGenLoader(Pool<AsyncPgConnection>, parakeet_index::Client); 342 354 impl BatchFn<String, models::FeedGen> for FeedGenLoader { 355 + #[instrument(name = "FeedGenLoader", skip_all)] 343 356 async fn load(&mut self, keys: &[String]) -> HashMap<String, models::FeedGen> { 344 357 let mut conn = self.0.get().await.unwrap(); 345 358 ··· 365 378 pub struct PostLoader(Pool<AsyncPgConnection>); 366 379 type PostLoaderRet = (models::Post, Option<models::Threadgate>); 367 380 impl BatchFn<String, PostLoaderRet> for PostLoader { 381 + #[instrument(name = "PostLoader", skip_all)] 368 382 async fn load(&mut self, keys: &[String]) -> HashMap<String, PostLoaderRet> { 369 383 let mut conn = self.0.get().await.unwrap(); 370 384 ··· 395 409 396 410 pub struct PostStatsLoader(parakeet_index::Client); 397 411 impl BatchFn<String, parakeet_index::PostStats> for PostStatsLoader { 412 + #[instrument(name = "PostStatsLoader", skip_all)] 398 413 async fn load(&mut self, keys: &[String]) -> HashMap<String, parakeet_index::PostStats> { 399 414 let stats_req = parakeet_index::GetStatsManyReq { 400 415 uris: keys.to_vec(), ··· 411 426 412 427 pub struct PostStateLoader(Pool<AsyncPgConnection>); 413 428 impl PostStateLoader { 429 + #[instrument(name = "PostStateLoader::get", skip_all)] 414 430 pub async fn get(&self, did: &str, subject: &str) -> Option<db::PostStateRet> { 415 431 let mut conn = self.0.get().await.unwrap(); 416 432 ··· 422 438 }) 423 439 } 424 440 441 + #[instrument(name = "PostStateLoader::get_many", skip_all)] 425 442 pub async fn get_many( 426 443 &self, 427 444 did: &str, ··· 449 466 RecordWithMedia(models::PostEmbedRecord, Box<EmbedLoaderRet>), 450 467 } 451 468 impl BatchFn<String, (EmbedLoaderRet, String)> for EmbedLoader { 469 + #[instrument(name = "EmbedLoader", skip_all)] 452 470 async fn load(&mut self, keys: &[String]) -> HashMap<String, (EmbedLoaderRet, String)> { 453 471 let mut conn = self.0.get().await.unwrap(); 454 472 ··· 531 549 pub struct StarterPackLoader(Pool<AsyncPgConnection>); 532 550 type StarterPackLoaderRet = models::StaterPack; 533 551 impl BatchFn<String, StarterPackLoaderRet> for StarterPackLoader { 552 + #[instrument(name = "StarterPackLoader", skip_all)] 534 553 async fn load(&mut self, keys: &[String]) -> HashMap<String, StarterPackLoaderRet> { 535 554 let mut conn = self.0.get().await.unwrap(); 536 555 ··· 556 575 pub struct LabelServiceLoader(Pool<AsyncPgConnection>, parakeet_index::Client); 557 576 type LabelServiceLoaderRet = (models::LabelerService, Vec<models::LabelDefinition>); 558 577 impl BatchFn<String, LabelServiceLoaderRet> for LabelServiceLoader { 578 + #[instrument(name = "LabelServiceLoader", skip_all)] 559 579 async fn load(&mut self, keys: &[String]) -> HashMap<String, LabelServiceLoaderRet> { 560 580 let mut conn = self.0.get().await.unwrap(); 561 581 ··· 594 614 // but it should live here anyway 595 615 pub struct LabelLoader(Pool<AsyncPgConnection>); 596 616 impl LabelLoader { 617 + #[instrument(name = "LabelLoader::load", skip_all)] 597 618 pub async fn load(&self, uri: &str, services: &[LabelConfigItem]) -> Vec<models::Label> { 598 619 let mut conn = self.0.get().await.unwrap(); 599 620 ··· 613 634 }) 614 635 } 615 636 637 + #[instrument(name = "LabelLoader::load_many", skip_all)] 616 638 pub async fn load_many( 617 639 &self, 618 640 uris: &[String], ··· 647 669 648 670 pub struct VerificationLoader(Pool<AsyncPgConnection>); 649 671 impl BatchFn<String, Vec<models::VerificationEntry>> for VerificationLoader { 672 + #[instrument(name = "VerificationLoader", skip_all)] 650 673 async fn load(&mut self, keys: &[String]) -> HashMap<String, Vec<models::VerificationEntry>> { 651 674 let mut conn = self.0.get().await.unwrap(); 652 675
+14 -5
parakeet/src/main.rs
··· 1 + use axum_tracing_opentelemetry::middleware::{OtelAxumLayer, OtelInResponseLayer}; 1 2 use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; 2 3 use diesel_async::pooled_connection::deadpool::Pool; 3 4 use diesel_async::pooled_connection::AsyncDieselConnectionManager; ··· 14 15 mod config; 15 16 mod db; 16 17 mod hydration; 18 + mod instrumentation; 17 19 mod loaders; 18 20 mod xrpc; 19 21 ··· 31 33 32 34 #[tokio::main] 33 35 async fn main() -> eyre::Result<()> { 34 - tracing_subscriber::fmt::init(); 35 - 36 36 let conf = config::load_config()?; 37 37 38 + instrumentation::init_instruments(conf.otel_enable); 39 + 38 40 let db_mgr = AsyncDieselConnectionManager::<AsyncPgConnection>::new(&conf.database_url); 39 41 let pool = Pool::builder(db_mgr).build()?; 40 42 ··· 52 54 let redis_client = redis::Client::open(conf.redis_uri)?; 53 55 let redis_mp = redis_client.get_multiplexed_tokio_connection().await?; 54 56 55 - let index_client = parakeet_index::Client::connect(conf.index_uri).await?; 57 + let index_client = parakeet_index::connect_with_otel(conf.index_uri) 58 + .await 59 + .map_err(|e| eyre::eyre!(e))?; 56 60 57 61 let dataloaders = Arc::new(loaders::Dataloaders::new( 58 62 pool.clone(), ··· 79 83 80 84 let did_doc = did_web_doc(&conf.service); 81 85 86 + let mw = tower::ServiceBuilder::new() 87 + .option_layer(conf.otel_enable.then(OtelInResponseLayer::default)) 88 + .option_layer(conf.otel_enable.then(OtelAxumLayer::default)) 89 + .layer(TraceLayer::new_for_http()) 90 + .layer(cors); 91 + 82 92 let app = axum::Router::new() 83 93 .nest("/xrpc", xrpc::xrpc_routes()) 84 94 .route( 85 95 "/.well-known/did.json", 86 96 axum::routing::get(async || axum::Json(did_doc)), 87 97 ) 88 - .layer(TraceLayer::new_for_http()) 89 - .layer(cors) 98 + .layer(mw) 90 99 .with_state(GlobalState { 91 100 pool, 92 101 redis_mp,
+3
parakeet/src/xrpc/app_bsky/feed/posts.rs
··· 24 24 use reqwest::Url; 25 25 use serde::{Deserialize, Serialize}; 26 26 use std::collections::HashMap; 27 + use tracing::instrument; 27 28 28 29 const FEEDGEN_SERVICE_ID: &str = "#bsky_fg"; 29 30 ··· 612 613 .or(schema::posts::embed_subtype.eq_any(filter)) 613 614 } 614 615 616 + #[instrument(skip_all)] 615 617 async fn get_feed_skeleton( 616 618 feed: &str, 617 619 service: &str, ··· 653 655 } 654 656 } 655 657 658 + #[instrument(skip_all)] 656 659 async fn get_skeleton_repost_data( 657 660 conn: &mut AsyncPgConnection, 658 661 reposts: Vec<String>,
+3
parakeet/src/xrpc/jwt.rs
··· 4 4 use std::collections::HashMap; 5 5 use std::sync::{Arc, LazyLock}; 6 6 use tokio::sync::RwLock; 7 + use tracing::instrument; 7 8 8 9 static DUMMY_KEY: LazyLock<DecodingKey> = LazyLock::new(|| DecodingKey::from_secret(&[])); 9 10 static NO_VERIFY: LazyLock<Validation> = LazyLock::new(|| { ··· 38 39 } 39 40 } 40 41 42 + #[instrument(skip_all)] 41 43 pub async fn resolve_and_verify_jwt(&self, token: &str, aud: Option<&str>) -> Option<Claims> { 42 44 // first we need to decode without verifying, to get iss. 43 45 let unsafe_data = jsonwebtoken::decode::<Claims>(token, &DUMMY_KEY, &NO_VERIFY).ok()?; ··· 56 58 self.verify_jwt_multibase_with_alg(token, &multibase_key, unsafe_data.header.alg, aud) 57 59 } 58 60 61 + #[instrument(skip_all)] 59 62 async fn resolve_key(&self, did: &str) -> Option<String> { 60 63 tracing::trace!("resolving multikey for {did}"); 61 64 let did_doc = self.resolver.resolve_did(did).await.ok()??;