···1+# Environment Configuration
2+PORT="8080" # The port your server will listen on
3+HOST="127.0.0.1" # Hostname for the server
4+PUBLIC_URL="" # Set when deployed publicly, e.g. "https://mysite.com". Informs OAuth client id.
5+# DB_PATH="./statusphere.sqlite3" # The SQLite database path. Leave commented out to use a temporary in-memory database.
6+7+
···1+MIT License
2+3+Copyright (c) 2025 Bailey Townsend
4+5+Permission is hereby granted, free of charge, to any person obtaining a copy
6+of this software and associated documentation files (the "Software"), to deal
7+in the Software without restriction, including without limitation the rights
8+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+copies of the Software, and to permit persons to whom the Software is
10+furnished to do so, subject to the following conditions:
11+12+The above copyright notice and this permission notice shall be included in all
13+copies or substantial portions of the Software.
14+15+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+SOFTWARE.
···1+2+3+# !!!!!!!!!!!!!!!Squash before going public!!!!!!!!!!
4+5+6+7+8+9+10+11+12+Originally taken
13+from [bluesky-social/atproto-website](https://github.com/bluesky-social/atproto-website/blob/dbcd70ced53078579c7e5b015a26db295b7a7807/src/app/%5Blocale%5D/guides/applications/en.mdx)
14+15+> [!NOTE]
16+> ***This tutorial is based off of the original quick start guide found [here](https://atproto.com/guides/applications).
17+> The goal is to follow as closely to the original as possible, expect for one small change. It's in Rust 🦀.
18+> All credit goes to the maintainers of the original project and tutorial. This was made to help you get started with
19+> using Rust to write applications in the Atmosphere. Parts that stray from the tutorial, or need extra context will be in blocks like this one.***
20+21+# Quick start guide to building applications on AT Protocol
22+23+[Find the source code on GitHub](https://github.com/fatfingers23/rusty_statusphere_example_app)
24+25+In this guide, we're going to build a simple multi-user app that publishes your current "status" as an emoji. Our
26+application will look like this:
27+28+
29+30+We will cover how to:
31+32+- Signin via OAuth
33+- Fetch information about users (profiles)
34+- Listen to the network firehose for new data via the [Jetstream](https://docs.bsky.app/blog/jetstream)
35+- Publish data on the user's account using a custom schema
36+37+We're going to keep this light so you can quickly wrap your head around ATProto. There will be links with more
38+information about each step.
39+40+## Introduction
41+42+Data in the Atmosphere is stored on users' personal repos. It's almost like each user has their own website. Our goal is
43+to aggregate data from the users into our SQLite DB.
44+45+Think of our app like a Google. If Google's job was to say which emoji each website had under `/status.json`, then it
46+would show something like:
47+48+- `nytimes.com` is feeling 📰 according to `https://nytimes.com/status.json`
49+- `bsky.app` is feeling 🦋 according to `https://bsky.app/status.json`
50+- `reddit.com` is feeling 🤓 according to `https://reddit.com/status.json`
51+52+The Atmosphere works the same way, except we're going to check `at://` instead of `https://`. Each user has a data repo
53+under an `at://` URL. We'll crawl all the user data repos in the Atmosphere for all the "status.json" records and
54+aggregate them into our SQLite database.
55+56+> `at://` is the URL scheme of the AT Protocol. Under the hood it uses common tech like HTTP and DNS, but it adds all of
57+> the features we'll be using in this tutorial.
58+59+## Step 1. Starting with our Actix Web app
60+61+Start by cloning the repo and installing packages.
62+63+```bash
64+git clone git@github.com:fatfingers23/rusty_statusphere_example_app.git
65+cd rusty_statusphere_example_app
66+cp .env.template .env
67+cargo run
68+# Navigate to http://127.0.0.1:8080
69+```
70+71+Our repo is a regular Web app. We're rendering our HTML server-side like it's 1999. We also have a SQLite database that
72+we're managing with [async-sqlite](https://crates.io/crates/async-sqlite).
73+74+Our starting stack:
75+76+- [Rust](https://www.rust-lang.org/tools/install)
77+- Rust web server ([Actix Web](https://actix.rs/))
78+- SQLite database ([async-sqlite](https://crates.io/crates/async-sqlite))
79+- HTML Templating ([askama](https://crates.io/crates/askama))
80+81+> [!NOTE]
82+> Along with the above, we are also using a couple of community maintained projects for using rust with the ATProtocol.
83+> Since these are community maintained I have also linked sponsor links for the maintainers and _highly_ recommend you to
84+> think
85+> about sponsoring them.
86+> Thanks to their work and projects, we are able to create Rust applications in the Atmosphere.
87+> - ATProtocol client and OAuth
88+ with [atrium](https://github.com/atrium-rs/atrium) - [sponsor sugyan](https://github.com/sponsors/sugyan)
89+> - Jetstream consumer
90+ with [rocketman](https://crates.io/crates/rocketman)- [buy natalie a coffee](https://ko-fi.com/uxieq)
91+92+With each step we'll explain how our Web app taps into the Atmosphere. Refer to the codebase for more detailed code
93+— again, this tutorial is going to keep it light and quick to digest.
94+95+## Step 2. Signing in with OAuth
96+97+When somebody logs into our app, they'll give us read & write access to their personal `at://` repo. We'll use that to
98+write the status json record.
99+100+We're going to accomplish this using OAuth ([spec](https://github.com/bluesky-social/proposals/tree/main/0004-oauth)).
101+Most of the OAuth flows are going to be handled for us using
102+the [atrium-oauth](https://crates.io/crates/atrium-oauth)
103+crate. This is the arrangement we're aiming toward:
104+105+
106+107+When the user logs in, the OAuth client will create a new session with their repo server and give us read/write access
108+along with basic user info.
109+110+
111+112+Our login page just asks the user for their "handle," which is the domain name associated with their account.
113+For [Bluesky](https://bsky.app) users, these tend to look like `alice.bsky.social`, but they can be any kind of domain (
114+eg `alice.com`).
115+116+```html
117+<!-- templates/login.html -->
118+<form action="/login" method="post" class="login-form">
119+ <input
120+ type="text"
121+ name="handle"
122+ placeholder="Enter your handle (eg alice.bsky.social)"
123+ required
124+ />
125+ <button type="submit">Log in</button>
126+</form>
127+```
128+129+When they submit the form, we tell our OAuth client to initiate the authorization flow and then redirect the user to
130+their server to complete the process.
131+132+```rust
133+/** ./src/main.rs **/
134+/// Login endpoint
135+#[post("/login")]
136+async fn login_post(
137+ request: HttpRequest,
138+ params: web::Form<LoginForm>,
139+ oauth_client: web::Data<OAuthClientType>,
140+) -> HttpResponse {
141+ // This will act the same as the js method isValidHandle
142+ match atrium_api::types::string::Handle::new(params.handle.clone()) {
143+ Ok(handle) => {
144+ // Initiates the OAuth flow
145+ let oauth_url = oauth_client
146+ .authorize(
147+ &handle,
148+ AuthorizeOptions {
149+ scopes: vec![
150+ Scope::Known(KnownScope::Atproto),
151+ Scope::Known(KnownScope::TransitionGeneric),
152+ ],
153+ ..Default::default()
154+ },
155+ )
156+ .await;
157+ match oauth_url {
158+ Ok(url) => Redirect::to(url)
159+ .see_other()
160+ .respond_to(&request)
161+ .map_into_boxed_body(),
162+ Err(err) => {
163+ log::error!("Error: {err}");
164+ let html = LoginTemplate {
165+ title: "Log in",
166+ error: Some("OAuth error"),
167+ };
168+ HttpResponse::Ok().body(html.render().expect("template should be valid"))
169+ }
170+ }
171+ }
172+ Err(err) => {
173+ let html: LoginTemplate<'_> = LoginTemplate {
174+ title: "Log in",
175+ error: Some(err),
176+ };
177+ HttpResponse::Ok().body(html.render().expect("template should be valid"))
178+ }
179+ }
180+}
181+```
182+183+This is the same kind of SSO flow that Google or GitHub uses. The user will be asked for their password, then asked to
184+confirm the session with your application.
185+186+When that finishes, the user will be sent back to `/oauth/callback` on our Web app. The OAuth client will store the
187+access tokens for the user's server, and then we attach their account's [DID](https://atproto.com/specs/did) to the
188+cookie-session.
189+190+```rust
191+/** ./src/main.rs **/
192+/// OAuth callback endpoint to complete session creation
193+#[get("/oauth/callback")]
194+async fn oauth_callback(
195+ request: HttpRequest,
196+ params: web::Query<CallbackParams>,
197+ oauth_client: web::Data<OAuthClientType>,
198+ session: Session,
199+) -> HttpResponse {
200+ // Store the credentials
201+ match oauth_client.callback(params.into_inner()).await {
202+ Ok((bsky_session, _)) => {
203+ let agent = Agent::new(bsky_session);
204+ match agent.did().await {
205+ Some(did) => {
206+ //Attach the account DID to our user via a cookie
207+ session.insert("did", did).unwrap();
208+ Redirect::to("/")
209+ .see_other()
210+ .respond_to(&request)
211+ .map_into_boxed_body()
212+ }
213+ None => {
214+ let html = ErrorTemplate {
215+ title: "Log in",
216+ error: "The OAuth agent did not return a DID. My try relogging in.",
217+ };
218+ HttpResponse::Ok().body(html.render().expect("template should be valid"))
219+ }
220+ }
221+ }
222+ Err(err) => {
223+ log::error!("Error: {err}");
224+ let html = ErrorTemplate {
225+ title: "Log in",
226+ error: "OAuth error, check the logs",
227+ };
228+ HttpResponse::Ok().body(html.render().expect("template should be valid"))
229+ }
230+ }
231+}
232+```
233+234+With that, we're in business! We now have a session with the user's repo server and can use that to access their data.
235+236+## Step 3. Fetching the user's profile
237+238+Why don't we learn something about our user? In [Bluesky](https://bsky.app), users publish a "profile" record which
239+looks like this:
240+241+```rust
242+pub struct ProfileViewDetailedData {
243+ pub display_name: Option<String>, // a human friendly name
244+ pub description: Option<String>, // a short bio
245+ pub avatar: Option<String>, // small profile picture
246+ pub banner: Option<String>, // banner image to put on profiles
247+ pub created_at: Option<String> // declared time this profile data was added
248+ // ...
249+}
250+```
251+252+You can examine this record directly using [atproto-browser.vercel.app](https://atproto-browser.vercel.app). For
253+instance, [this is the profile record for @bsky.app](https://atproto-browser.vercel.app/at?u=at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.actor.profile/self).
254+255+> [!NOTE]
256+> In the original tutorial `agent.com.atproto.repo.getRecord` is used, which is
257+> this [method](https://docs.rs/atrium-api/latest/atrium_api/com/atproto/repo/get_record/index.html) in atrium-api.
258+> For simplicity we are
259+> using [agent.api.app.bsky.actor.get_profile](https://docs.rs/atrium-api/latest/atrium_api/app/bsky/actor/get_profile/index.html).
260+> The original text found here has been moved to [Step 4. Reading & writing records](#step-4-reading--writing-records)
261+> since it makes more sense in that context.
262+263+We're going to use the [Agent](https://crates.io/crates/atrium-oauth) associated with the
264+user's OAuth session to fetch this record.
265+266+Let's update our homepage to fetch this profile record:
267+268+```rust
269+/** ./src/main.rs **/
270+/// Homepage
271+#[get("/")]
272+async fn home(
273+ _req: HttpRequest,
274+ session: Session,
275+ oauth_client: web::Data<OAuthClientType>,
276+ db_pool: web::Data<Pool>,
277+ handle_resolver: web::Data<HandleResolver>,
278+) -> Result<impl Responder> {
279+ const TITLE: &str = "Home";
280+281+ // If the user is signed in, get an agent which communicates with their server
282+ match session.get::<String>("did").unwrap_or(None) {
283+ Some(did) => {
284+ let did = Did::new(did).expect("failed to parse did");
285+ match oauth_client.restore(&did).await {
286+ Ok(session) => {
287+ let agent = Agent::new(session);
288+289+ // Fetch additional information about the logged-in user
290+ let profile = agent
291+ .api
292+ .app
293+ .bsky
294+ .actor
295+ .get_profile(
296+ atrium_api::app::bsky::actor::get_profile::ParametersData {
297+ actor: atrium_api::types::string::AtIdentifier::Did(did),
298+ }.into(),
299+ )
300+ .await;
301+302+ // Serve the logged-in view
303+ let html = HomeTemplate {
304+ title: TITLE,
305+ status_options: &STATUS_OPTIONS,
306+ profile: match profile {
307+ Ok(profile) => {
308+ let profile_data = Profile {
309+ did: profile.did.to_string(),
310+ display_name: profile.display_name.clone(),
311+ };
312+ Some(profile_data)
313+ }
314+ Err(err) => {
315+ log::error!("Error accessing profile: {err}");
316+ None
317+ }
318+ },
319+ }.render().expect("template should be valid");
320+321+ Ok(web::Html::new(html))
322+ }
323+ Err(err) => {
324+ //Unset the session
325+ session.remove("did");
326+ log::error!("Error restoring session: {err}");
327+ let error_html = ErrorTemplate {
328+ title: TITLE,
329+ error: "Was an error resuming the session, please check the logs.",
330+ }.render().expect("template should be valid");
331+332+ Ok(web::Html::new(error_html))
333+ }
334+ }
335+ }
336+ None => {
337+ // Serve the logged-out view
338+ let html = HomeTemplate {
339+ title: TITLE,
340+ status_options: &STATUS_OPTIONS,
341+ profile: None,
342+ }.render().expect("template should be valid");
343+344+ Ok(web::Html::new(html))
345+ }
346+ }
347+}
348+```
349+350+With that data, we can give a nice personalized welcome banner for our user:
351+352+
353+354+```html
355+<!-- templates/home.html -->
356+<div class="card">
357+ {% if let Some(Profile {did, display_name}) = profile %}
358+ <form action="/logout" method="post" class="session-form">
359+ <div>
360+ Hi,
361+ {% if let Some(display_name) = display_name %}
362+ <strong>{{display_name}}</strong>
363+ {% else %}
364+ <strong>friend</strong>
365+ {% endif %}.
366+ What's your status today??
367+ </div>
368+ <div>
369+ <button type="submit">Log out</button>
370+ </div>
371+ </form>
372+ {% else %}
373+ <div class="session-form">
374+ <div><a href="/login">Log in</a> to set your status!</div>
375+ <div>
376+ <a href="/login" class="button">Log in</a>
377+ </div>
378+ </div>
379+ {% endif %}
380+</div>
381+```
382+383+## Step 4. Reading & writing records
384+385+You can think of the user repositories as collections of JSON records:
386+387+
388+389+When asking for a record, we provide three pieces of information.
390+391+- **repo** The [DID](https://atproto.com/specs/did) which identifies the user,
392+- **collection** The collection name, and
393+- **rkey** The record key
394+395+We'll explain the collection name shortly. Record keys are strings
396+with [some restrictions](https://atproto.com/specs/record-key#record-key-syntax) and a couple of common patterns. The
397+`"self"` pattern is used when a collection is expected to only contain one record which describes the user.
398+399+Let's look again at how we read the "profile" record:
400+401+```rust
402+fn example_get_record() {
403+ let get_result = agent
404+ .api
405+ .com
406+ .atproto
407+ .repo
408+ .get_record(
409+ atrium_api::com::atproto::repo::get_record::ParametersData {
410+ cid: None,
411+ collection: "app.bsky.actor.profile" // The collection
412+ .parse()
413+ .unwrap(),
414+ repo: did.into(), // The user
415+ rkey: "self".parse().unwrap(), // The record key
416+ }
417+ .into(),
418+ )
419+ .await;
420+}
421+422+```
423+424+We write records using a similar API. Since our goal is to write "status" records, let's look at how that will happen:
425+426+```rust
427+fn example_create_record() {
428+ let did = atrium_api::types::string::Did::new(did_string.clone()).unwrap();
429+ let agent = Agent::new(session);
430+431+ let status: Unknown = serde_json::from_str(
432+ format!(
433+ r#"{{"$type":"xyz.statusphere.status","status":"{}","createdAt":"{}"}}"#,
434+ form.status,
435+ Datetime::now().as_str()
436+ )
437+ .as_str(),
438+ ).unwrap();
439+440+ let create_result = agent
441+ .api
442+ .com
443+ .atproto
444+ .repo
445+ .create_record(
446+ atrium_api::com::atproto::repo::create_record::InputData {
447+ collection: Status::NSID.parse().unwrap(), // The collection
448+ repo: did.clone().into(), // The user
449+ rkey: None, // The record key, auto creates with None
450+ record: status, // The record from a strong type
451+ swap_commit: None,
452+ validate: None,
453+ }
454+ .into(),
455+ )
456+ .await;
457+}
458+```
459+460+Our `POST /status` route is going to use this API to publish the user's status to their repo.
461+462+```rust
463+/// "Set status" Endpoint
464+#[post("/status")]
465+async fn status(
466+ request: HttpRequest,
467+ session: Session,
468+ oauth_client: web::Data<OAuthClientType>,
469+ db_pool: web::Data<Pool>,
470+ form: web::Form<StatusForm>,
471+) -> HttpResponse {
472+ const TITLE: &str = "Home";
473+474+ // If the user is signed in, get an agent which communicates with their server
475+ match session.get::<String>("did").unwrap_or(None) {
476+ Some(did_string) => {
477+ let did = atrium_api::types::string::Did::new(did_string.clone())
478+ .expect("failed to parse did");
479+ match oauth_client.restore(&did).await {
480+ Ok(session) => {
481+ let agent = Agent::new(session);
482+483+ // Construct their status record
484+ let status: Unknown = serde_json::from_str(
485+ format!(
486+ r#"{{"$type":"xyz.statusphere.status","status":"{}","createdAt":"{}"}}"#,
487+ form.status,
488+ Datetime::now().as_str()
489+ )
490+ .as_str(),
491+ ).unwrap();
492+493+ // Write the status record to the user's repository
494+ let create_result = agent
495+ .api
496+ .com
497+ .atproto
498+ .repo
499+ .create_record(
500+ atrium_api::com::atproto::repo::create_record::InputData {
501+ collection: "xyz.statusphere.status".parse().unwrap(),
502+ repo: did.clone().into(),
503+ rkey: None,
504+ record: status,
505+ swap_commit: None,
506+ validate: None,
507+ }
508+ .into(),
509+ )
510+ .await;
511+512+ match create_result {
513+ Ok(_) => Redirect::to("/")
514+ .see_other()
515+ .respond_to(&request)
516+ .map_into_boxed_body(),
517+ Err(err) => {
518+ log::error!("Error creating status: {err}");
519+ let error_html = ErrorTemplate {
520+ title: TITLE,
521+ error: "Was an error creating the status, please check the logs.",
522+ }
523+ .render()
524+ .expect("template should be valid");
525+ HttpResponse::Ok().body(error_html)
526+ }
527+ }
528+ }
529+ Err(err) => {
530+ //Unset the session
531+ session.remove("did");
532+ log::error!(
533+ "Error restoring session, we are removing the session from the cookie: {err}"
534+ );
535+ let error_html = ErrorTemplate {
536+ title: TITLE,
537+ error: "Was an error resuming the session, please check the logs.",
538+ }
539+ .render()
540+ .expect("template should be valid");
541+ HttpResponse::Ok().body(error_html)
542+ }
543+ }
544+ }
545+ None => {
546+ let error_template = ErrorTemplate {
547+ title: "Error",
548+ error: "You must be logged in to create a status.",
549+ }
550+ .render()
551+ .expect("template should be valid");
552+ HttpResponse::Ok().body(error_template)
553+ }
554+ }
555+}
556+```
557+558+Now in our homepage we can list out the status buttons:
559+560+```html
561+<!-- templates/home.html -->
562+<form action="/status" method="post" class="status-options">
563+ {% for status in status_options %}
564+ <button
565+ class="{% if let Some(my_status) = my_status %} {%if my_status == status %} status-option selected {% else %} status-option {% endif %} {% else %} status-option {%endif%} "
566+ name="status" value="{{status}}">
567+ {{status}}
568+ </button>
569+ {% endfor %}
570+</form>
571+```
572+573+And here we are!
574+575+
576+577+## Step 5. Creating a custom "status" schema
578+579+Repo collections are typed, meaning that they have a defined schema. The `app.bsky.actor.profile` type
580+definition [can be found here](https://github.com/bluesky-social/atproto/blob/main/lexicons/app/bsky/actor/profile.json).
581+582+Anybody can create a new schema using the [Lexicon](https://atproto.com/specs/lexicon) language, which is very similar
583+to [JSON-Schema](http://json-schema.org/). The schemas use [reverse-DNS IDs](https://atproto.com/specs/nsid) which
584+indicate ownership. In this demo app we're going to use `xyz.statusphere` which we registered specifically for this
585+project (aka statusphere.xyz).
586+587+> ### Why create a schema?
588+>
589+> Schemas help other applications understand the data your app is creating. By publishing your schemas, you make it
590+> easier for other application authors to publish data in a format your app will recognize and handle.
591+592+Let's create our schema in the `/lexicons` folder of our codebase. You
593+can [read more about how to define schemas here](https://atproto.com/guides/lexicon).
594+595+```json
596+/** lexicons/status.json **/
597+{
598+ "lexicon": 1,
599+ "id": "xyz.statusphere.status",
600+ "defs": {
601+ "main": {
602+ "type": "record",
603+ "key": "tid",
604+ "record": {
605+ "type": "object",
606+ "required": [
607+ "status",
608+ "createdAt"
609+ ],
610+ "properties": {
611+ "status": {
612+ "type": "string",
613+ "minLength": 1,
614+ "maxGraphemes": 1,
615+ "maxLength": 32
616+ },
617+ "createdAt": {
618+ "type": "string",
619+ "format": "datetime"
620+ }
621+ }
622+ }
623+ }
624+ }
625+}
626+```
627+628+Now let's run some code-generation using our schema:
629+630+> [!NOTE]
631+> For generating schemas, we are going to
632+> use [esquema-cli](https://github.com/fatfingers23/esquema?tab=readme-ov-file)
633+> (Which is a tool I've created from a fork of atrium's codegen).
634+> This can be installed by running this command
635+`cargo install esquema-cli --git https://github.com/fatfingers23/esquema.git`
636+> This is a WIP tool with bugs and missing features. But it's good enough for us to generate Rust types from the lexicon
637+> schema.
638+639+```bash
640+esquema-cli generate -l ./lexicons/ -o ./src/lexicons/
641+```
642+643+644+645+This will produce Rust structs. Here's what that generated code looks like:
646+647+```rust
648+/** ./src/lexicons/xyz/statusphere/status.rs **/
649+// @generated - This file is generated by esquema-codegen (forked from atrium-codegen). DO NOT EDIT.
650+//!Definitions for the `xyz.statusphere.status` namespace.
651+use atrium_api::types::TryFromUnknown;
652+#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
653+#[serde(rename_all = "camelCase")]
654+pub struct RecordData {
655+ pub created_at: atrium_api::types::string::Datetime,
656+ pub status: String,
657+}
658+pub type Record = atrium_api::types::Object<RecordData>;
659+impl From<atrium_api::types::Unknown> for RecordData {
660+ fn from(value: atrium_api::types::Unknown) -> Self {
661+ Self::try_from_unknown(value).unwrap()
662+ }
663+}
664+665+```
666+667+> [!NOTE]
668+> You may have noticed we do not cover the validation part like in the TypeScript version.
669+> Esquema can validate to a point such as the data structure and if a field is there or not.
670+> But validation of the data itself is not possible, yet.
671+> There are plans to add it.
672+> Maybe you would like to add it?
673+> https://github.com/fatfingers23/esquema/issues/3
674+675+Let's use that code to improve the `POST /status` route:
676+677+```rust
678+/// "Set status" Endpoint
679+#[post("/status")]
680+async fn status(
681+ request: HttpRequest,
682+ session: Session,
683+ oauth_client: web::Data<OAuthClientType>,
684+ db_pool: web::Data<Pool>,
685+ form: web::Form<StatusForm>,
686+) -> HttpResponse {
687+ // ...
688+ let agent = Agent::new(session);
689+ //We use the new status type we generated with esquema
690+ let status: KnownRecord = lexicons::xyz::statusphere::status::RecordData {
691+ created_at: Datetime::now(),
692+ status: form.status.clone(),
693+ }
694+ .into();
695+696+ // TODO no validation yet from esquema
697+ // Maybe you'd like to add it? https://github.com/fatfingers23/esquema/issues/3
698+699+ let create_result = agent
700+ .api
701+ .com
702+ .atproto
703+ .repo
704+ .create_record(
705+ atrium_api::com::atproto::repo::create_record::InputData {
706+ collection: Status::NSID.parse().unwrap(),
707+ repo: did.into(),
708+ rkey: None,
709+ record: status.into(),
710+ swap_commit: None,
711+ validate: None,
712+ }
713+ .into(),
714+ )
715+ .await;
716+ // ...
717+}
718+```
719+> [!NOTE]
720+> You will notice the first example used a string to serialize to Unknown, you could do something similar with
721+> a struct you create, then serialize.But I created esquema to make that easier.
722+> With esquema you can use other provided lexicons
723+> or ones you create to build out the data structure for your ATProtocol application.
724+> As well as in future updates it will honor the
725+> validation you have in the Lexicon.
726+> Things like string should be 10 long, etc.
727+728+## Step 6. Listening to the firehose
729+730+> [!IMPORTANT]
731+> It is important to note that the original tutorial they connect directly to the firehose, but in this one we use
732+> [rocketman](https://crates.io/crates/rocketman) to connect to the Jetstream instead.
733+> For most use cases this is fine and usually easier when using other clients than the Bluesky provided ones.
734+> But it is important to note there are some differences that can
735+> be found in their introduction to Jetstream article.
736+> https://docs.bsky.app/blog/jetstream#tradeoffs-and-use-cases
737+738+So far, we have:
739+740+- Logged in via OAuth
741+- Created a custom schema
742+- Read & written records for the logged in user
743+744+Now we want to fetch the status records from other users.
745+746+Remember how we referred to our app as being like Google, crawling around the repos to get their records? One advantage
747+we have in the AT Protocol is that each repo publishes an event log of their updates.
748+749+
750+751+Using a [~~Relay~~ Jetstream service](https://docs.bsky.app/blog/jetstream) we can listen to an
752+aggregated firehose of these events across all users in the network. In our case what we're looking for are valid
753+`xyz.statusphere.status` records.
754+755+```rust
756+/** ./src/ingester.rs **/
757+#[async_trait]
758+impl LexiconIngestor for StatusSphereIngester {
759+ async fn ingest(&self, message: Event<Value>) -> anyhow::Result<()> {
760+ if let Some(commit) = &message.commit {
761+ //We manually construct the uri since jetstream does not provide it
762+ //at://{users did}/{collection: xyz.statusphere.status}{records key}
763+ let record_uri = format!("at://{}/{}/{}", message.did, commit.collection, commit.rkey);
764+ match commit.operation {
765+ Operation::Create | Operation::Update => {
766+ if let Some(record) = &commit.record {
767+ //We deserialize the record into our Rust struct
768+ let status_at_proto_record = serde_json::from_value::<
769+ lexicons::xyz::statusphere::status::RecordData,
770+ >(record.clone())?;
771+772+ if let Some(ref _cid) = commit.cid {
773+ // Although esquema does not have full validation yet,
774+ // if you get to this point,
775+ // You know the data structure is the same
776+777+ // Store the status
778+ // TODO
779+ }
780+ }
781+ }
782+ Operation::Delete => {},
783+ }
784+ } else {
785+ return Err(anyhow!("Message has no commit"));
786+ }
787+ Ok(())
788+}
789+}
790+```
791+792+Let's create a SQLite table to store these statuses:
793+794+```rust
795+/** ./src/db.rs **/
796+// Create our statuses table
797+pub async fn create_tables_in_database(pool: &Pool) -> Result<(), async_sqlite::Error> {
798+ pool.conn(move |conn| {
799+ conn.execute("PRAGMA foreign_keys = ON", []).unwrap();
800+801+ // status
802+ conn.execute(
803+ "CREATE TABLE IF NOT EXISTS status (
804+ uri TEXT PRIMARY KEY,
805+ authorDid TEXT NOT NULL,
806+ status TEXT NOT NULL,
807+ createdAt INTEGER NOT NULL,
808+ indexedAt INTEGER NOT NULL
809+ )",
810+ [],
811+ )
812+ .unwrap();
813+814+// ...
815+```
816+817+Now we can write these statuses into our database as they arrive from the firehose:
818+819+```rust
820+/** ./src/ingester.rs **/
821+// If the write is a valid status update
822+if let Some(record) = &commit.record {
823+ let status_at_proto_record = serde_json::from_value::<
824+ lexicons::xyz::statusphere::status::RecordData,
825+ >(record.clone())?;
826+827+ if let Some(ref _cid) = commit.cid {
828+ // Although esquema does not have full validation yet,
829+ // if you get to this point,
830+ // You know the data structure is the same
831+ let created = status_at_proto_record.created_at.as_ref();
832+ let right_now = chrono::Utc::now();
833+ // We save or update the record in the db
834+ StatusFromDb {
835+ uri: record_uri,
836+ author_did: message.did.clone(),
837+ status: status_at_proto_record.status.clone(),
838+ created_at: created.to_utc(),
839+ indexed_at: right_now,
840+ handle: None,
841+ }
842+ .save_or_update(&self.db_pool)
843+ .await?;
844+ }
845+}
846+```
847+848+You can almost think of information flowing in a loop:
849+850+
851+852+Applications write to the repo. The write events are then emitted on the firehose where they're caught by the apps and
853+ingested into their databases.
854+855+Why sync from the event log like this? Because there are other apps in the network that will write the records we're
856+interested in. By subscribing to the event log (via the Jetstream), we ensure that we catch all the data we're interested in —
857+including data published by other apps!
858859+## Step 7. Listing the latest statuses
0860861+Now that we have statuses populating our SQLite, we can produce a timeline of status updates by users. We also use
862+a [DID](https://atproto.com/specs/did)-to-handle resolver so we can show a nice username with the statuses:
863+```rust
864+/** ./src/main.rs **/
865+// Homepage
866+/// Home
867+#[get("/")]
868+async fn home(
869+ session: Session,
870+ oauth_client: web::Data<OAuthClientType>,
871+ db_pool: web::Data<Arc<Pool>>,
872+ handle_resolver: web::Data<HandleResolver>,
873+) -> Result<impl Responder> {
874+ const TITLE: &str = "Home";
875+ // Fetch data stored in our SQLite
876+ let mut statuses = StatusFromDb::load_latest_statuses(&db_pool)
877+ .await
878+ .unwrap_or_else(|err| {
879+ log::error!("Error loading statuses: {err}");
880+ vec![]
881+ });
882+883+ // We resolve the handles to the DID. This is a bit messy atm,
884+ // and there are hopes to find a cleaner way
885+ // to handle resolving the DIDs and formating the handles,
886+ // But it gets the job done for the purpose of this tutorial.
887+ // PRs are welcomed!
888+889+ //Simple way to cut down on resolve calls if we already know the handle for the did
890+ let mut quick_resolve_map: HashMap<Did, String> = HashMap::new();
891+ for db_status in &mut statuses {
892+ let authors_did = Did::new(db_status.author_did.clone()).expect("failed to parse did");
893+ //Check to see if we already resolved it to cut down on resolve requests
894+ match quick_resolve_map.get(&authors_did) {
895+ None => {}
896+ Some(found_handle) => {
897+ db_status.handle = Some(found_handle.clone());
898+ continue;
899+ }
900+ }
901+ //Attempts to resolve the DID to a handle
902+ db_status.handle = match handle_resolver.resolve(&authors_did).await {
903+ Ok(did_doc) => {
904+ match did_doc.also_known_as {
905+ None => None,
906+ Some(also_known_as) => {
907+ match also_known_as.is_empty() {
908+ true => None,
909+ false => {
910+ //also_known as a list starts the array with the highest priority handle
911+ let formatted_handle =
912+ format!("@{}", also_known_as[0]).replace("at://", "");
913+ quick_resolve_map.insert(authors_did, formatted_handle.clone());
914+ Some(formatted_handle)
915+ }
916+ }
917+ }
918+ }
919+ }
920+ Err(err) => {
921+ log::error!("Error resolving did: {err}");
922+ None
923+ }
924+ };
925+ }
926+ // ...
927+```
928+>[!NOTE]
929+> We use a newly released handle resolver from atrium.
930+> Can see
931+> how it is set up in [./src/main.rs](https://github.com/fatfingers23/rusty_statusphere_example_app/blob/a13ab7eb8fcba901a483468f7fd7c56b2948972d/src/main.rs#L508)
932+933+934+Our HTML can now list these status records:
935+936+```html
937+<!-- ./templates/home.html -->
938+{% for status in statuses %}
939+<div class="{% if loop.first %} status-line no-line {% else %} status-line {% endif %} ">
940+ <div>
941+ <div class="status">{{status.status}}</div>
942+ </div>
943+ <div class="desc">
944+ <a class="author"
945+ href="https://bsky.app/profile/{{status.author_did}}">{{status.author_display_name()}}</a>
946+ {% if status.is_today() %}
947+ is feeling {{status.status}} today
948+ {% else %}
949+ was feeling {{status.status}} on {{status.created_at}}
950+ {% endif %}
951+ </div>
952+</div>
953+{% endfor %}
954+`
955+})}
956+```
957+958+
959+960+## Step 8. Optimistic updates
961+962+As a final optimization, let's introduce "optimistic updates."
963+964+Remember the information flow loop with the repo write and the event log?
965+966+
967+968+Since we're updating our users' repos locally, we can short-circuit that flow to our own database:
969+970+
971+972+This is an important optimization to make, because it ensures that the user sees their own changes while using your app.
973+When the event eventually arrives from the firehose, we just discard it since we already have it saved locally.
974+975+To do this, we just update `POST /status` to include an additional write to our SQLite DB:
976+977+```rust
978+/** ./src/main.rs **/
979+/// Creates a new status
980+#[post("/status")]
981+async fn status(
982+ request: HttpRequest,
983+ session: Session,
984+ oauth_client: web::Data<OAuthClientType>,
985+ db_pool: web::Data<Arc<Pool>>,
986+ form: web::Form<StatusForm>,
987+) -> HttpResponse {
988+ //...
989+ let create_result = agent
990+ .api
991+ .com
992+ .atproto
993+ .repo
994+ .create_record(
995+ atrium_api::com::atproto::repo::create_record::InputData {
996+ collection: Status::NSID.parse().unwrap(),
997+ repo: did.into(),
998+ rkey: None,
999+ record: status.into(),
1000+ swap_commit: None,
1001+ validate: None,
1002+ }
1003+ .into(),
1004+ )
1005+ .await;
1006+1007+ match create_result {
1008+ Ok(record) => {
1009+ let status = StatusFromDb::new(
1010+ record.uri.clone(),
1011+ did_string,
1012+ form.status.clone(),
1013+ );
1014+1015+ let _ = status.save(db_pool).await;
1016+ Redirect::to("/")
1017+ .see_other()
1018+ .respond_to(&request)
1019+ .map_into_boxed_body()
1020+ }
1021+ Err(err) => {
1022+ log::error!("Error creating status: {err}");
1023+ let error_html = ErrorTemplate {
1024+ title: "Error",
1025+ error: "Was an error creating the status, please check the logs.",
1026+ }
1027+ .render()
1028+ .expect("template should be valid");
1029+ HttpResponse::Ok().body(error_html)
1030+ }
1031+ }
1032+ //...
1033+}
1034+```
1035+1036+You'll notice this code looks almost exactly like what we're doing in `ingester.rs`.
1037+1038+## Thinking in AT Proto
1039+1040+In this tutorial we've covered the key steps to building an atproto app. Data is published in its canonical form on
1041+users' `at://` repos and then aggregated into apps' databases to produce views of the network.
1042+1043+When building your app, think in these four key steps:
1044+1045+- Design the [Lexicon](#) schemas for the records you'll publish into the Atmosphere.
1046+- Create a database for aggregating the records into useful views.
1047+- Build your application to write the records on your users' repos.
1048+- Listen to the firehose to aggregate data across the network.
1049+1050+Remember this flow of information throughout:
1051+1052+
1053+1054+This is how every app in the Atmosphere works, including the [Bluesky social app](https://bsky.app).
1055+1056+## Next steps
1057+1058+If you want to practice what you've learned, here are some additional challenges you could try:
1059+1060+- Sync the profile records of all users so that you can show their display names instead of their handles.
1061+- Count the number of each status used and display the total counts.
1062+- Fetch the authed user's `app.bsky.graph.follow` follows and show statuses from them.
1063+- Create a different kind of schema, like a way to post links to websites and rate them 1 through 4 stars.
1064+1065+[Ready to learn more? Specs, guides, and SDKs can be found here.](https://atproto.com/)
1066+1067+>[!NOTE]
1068+> Thank you for checking out my version of the Statusphere example project!
1069+> There are parts of this I feel can be improved on and made more efficient,
1070+> but I think it does a good job for providing you with a starting point to start building Rust applications in the Atmosphere.
1071+> See something you think could be done better? Then please submit a PR!
1072+> [@baileytownsend.dev](https://bsky.app/profile/baileytownsend.dev)
images/cover.png
This is a binary file and will not be displayed.
images/emojis.png
This is a binary file and will not be displayed.
+2
justfile
···00
···1+watch:
2+ watchexec -w src -w templates -r cargo run
···1+use crate::db::StatusFromDb;
2+use crate::lexicons;
3+use crate::lexicons::xyz::statusphere::Status;
4+use anyhow::anyhow;
5+use async_sqlite::Pool;
6+use async_trait::async_trait;
7+use atrium_api::types::Collection;
8+use log::error;
9+use rocketman::{
10+ connection::JetstreamConnection,
11+ handler,
12+ ingestion::LexiconIngestor,
13+ options::JetstreamOptions,
14+ types::event::{Event, Operation},
15+};
16+use serde_json::Value;
17+use std::{
18+ collections::HashMap,
19+ sync::{Arc, Mutex},
20+};
21+22+#[async_trait]
23+impl LexiconIngestor for StatusSphereIngester {
24+ async fn ingest(&self, message: Event<Value>) -> anyhow::Result<()> {
25+ if let Some(commit) = &message.commit {
26+ //We manually construct the uri since Jetstream does not provide it
27+ //at://{users did}/{collection: xyz.statusphere.status}{records key}
28+ let record_uri = format!("at://{}/{}/{}", message.did, commit.collection, commit.rkey);
29+ match commit.operation {
30+ Operation::Create | Operation::Update => {
31+ if let Some(record) = &commit.record {
32+ let status_at_proto_record = serde_json::from_value::<
33+ lexicons::xyz::statusphere::status::RecordData,
34+ >(record.clone())?;
35+36+ if let Some(ref _cid) = commit.cid {
37+ // Although esquema does not have full validation yet,
38+ // if you get to this point,
39+ // You know the data structure is the same
40+ let created = status_at_proto_record.created_at.as_ref();
41+ let right_now = chrono::Utc::now();
42+ // We save or update the record in the db
43+ StatusFromDb {
44+ uri: record_uri,
45+ author_did: message.did.clone(),
46+ status: status_at_proto_record.status.clone(),
47+ created_at: created.to_utc(),
48+ indexed_at: right_now,
49+ handle: None,
50+ }
51+ .save_or_update(&self.db_pool)
52+ .await?;
53+ }
54+ }
55+ }
56+ Operation::Delete => StatusFromDb::delete_by_uri(&self.db_pool, record_uri).await?,
57+ }
58+ } else {
59+ return Err(anyhow!("Message has no commit"));
60+ }
61+ Ok(())
62+ }
63+}
64+pub struct StatusSphereIngester {
65+ db_pool: Arc<Pool>,
66+}
67+68+pub async fn start_ingester(db_pool: Arc<Pool>) {
69+ // init the builder
70+ let opts = JetstreamOptions::builder()
71+ // your EXACT nsids
72+ // Which in this case is xyz.statusphere.status
73+ .wanted_collections(vec![Status::NSID.parse().unwrap()])
74+ .build();
75+ // create the jetstream connector
76+ let jetstream = JetstreamConnection::new(opts);
77+78+ // create your ingesters
79+ // Which in this case is xyz.statusphere.status
80+ let mut ingesters: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new();
81+ ingesters.insert(
82+ // your EXACT nsid
83+ Status::NSID.parse().unwrap(),
84+ Box::new(StatusSphereIngester { db_pool }),
85+ );
86+87+ // tracks the last message we've processed
88+ let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
89+90+ // get channels
91+ let msg_rx = jetstream.get_msg_rx();
92+ let reconnect_tx = jetstream.get_reconnect_tx();
93+94+ // spawn a task to process messages from the queue.
95+ // this is a simple implementation, you can use a more complex one based on needs.
96+ let c_cursor = cursor.clone();
97+ tokio::spawn(async move {
98+ while let Ok(message) = msg_rx.recv_async().await {
99+ if let Err(e) =
100+ handler::handle_message(message, &ingesters, reconnect_tx.clone(), c_cursor.clone())
101+ .await
102+ {
103+ error!("Error processing message: {}", e);
104+ };
105+ }
106+ });
107+108+ // connect to jetstream
109+ // retries internally, but may fail if there is an extreme error.
110+ if let Err(e) = jetstream.connect(cursor.clone()).await {
111+ error!("Failed to connect to Jetstream: {}", e);
112+ std::process::exit(1);
113+ }
114+}
+3
src/lexicons/mod.rs
···000
···1+// @generated - This file is generated by esquema-codegen (forked from atrium-codegen). DO NOT EDIT.
2+pub mod record;
3+pub mod xyz;
+23
src/lexicons/record.rs
···00000000000000000000000
···1+// @generated - This file is generated by esquema-codegen (forked from atrium-codegen). DO NOT EDIT.
2+//!A collection of known record types.
3+#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
4+#[serde(tag = "$type")]
5+pub enum KnownRecord {
6+ #[serde(rename = "xyz.statusphere.status")]
7+ LexiconsXyzStatusphereStatus(Box<crate::lexicons::xyz::statusphere::status::Record>),
8+}
9+impl From<crate::lexicons::xyz::statusphere::status::Record> for KnownRecord {
10+ fn from(record: crate::lexicons::xyz::statusphere::status::Record) -> Self {
11+ KnownRecord::LexiconsXyzStatusphereStatus(Box::new(record))
12+ }
13+}
14+impl From<crate::lexicons::xyz::statusphere::status::RecordData> for KnownRecord {
15+ fn from(record_data: crate::lexicons::xyz::statusphere::status::RecordData) -> Self {
16+ KnownRecord::LexiconsXyzStatusphereStatus(Box::new(record_data.into()))
17+ }
18+}
19+impl Into<atrium_api::types::Unknown> for KnownRecord {
20+ fn into(self) -> atrium_api::types::Unknown {
21+ atrium_api::types::TryIntoUnknown::try_into_unknown(&self).unwrap()
22+ }
23+}
+3
src/lexicons/xyz.rs
···000
···1+// @generated - This file is generated by esquema-codegen (forked from atrium-codegen). DO NOT EDIT.
2+//!Definitions for the `xyz` namespace.
3+pub mod statusphere;
+9
src/lexicons/xyz/statusphere.rs
···000000000
···1+// @generated - This file is generated by esquema-codegen (forked from atrium-codegen). DO NOT EDIT.
2+//!Definitions for the `xyz.statusphere` namespace.
3+pub mod status;
4+#[derive(Debug)]
5+pub struct Status;
6+impl atrium_api::types::Collection for Status {
7+ const NSID: &'static str = "xyz.statusphere.status";
8+ type Record = status::Record;
9+}
+15
src/lexicons/xyz/statusphere/status.rs
···000000000000000
···1+// @generated - This file is generated by esquema-codegen (forked from atrium-codegen). DO NOT EDIT.
2+//!Definitions for the `xyz.statusphere.status` namespace.
3+use atrium_api::types::TryFromUnknown;
4+#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
5+#[serde(rename_all = "camelCase")]
6+pub struct RecordData {
7+ pub created_at: atrium_api::types::string::Datetime,
8+ pub status: String,
9+}
10+pub type Record = atrium_api::types::Object<RecordData>;
11+impl From<atrium_api::types::Unknown> for RecordData {
12+ fn from(value: atrium_api::types::Unknown) -> Self {
13+ Self::try_from_unknown(value).unwrap()
14+ }
15+}