An example AT Protocol application, written in Elixir using atex and Drinkup.

feat: store statuses in database, and ingest from firehose

ovyerus.com c28ab4e7 44743ed1

verified
+187 -18
+1
lib/statusphere/application.ex
··· 14 14 repos: Application.fetch_env!(:statusphere, :ecto_repos), skip: skip_migrations?()}, 15 15 {DNSCluster, query: Application.get_env(:statusphere, :dns_cluster_query) || :ignore}, 16 16 {Phoenix.PubSub, name: Statusphere.PubSub}, 17 + {Drinkup, %{consumer: Statusphere.Consumer}}, 17 18 # Start a worker by calling: Statusphere.Worker.start_link(arg) 18 19 # {Statusphere.Worker, arg}, 19 20 # Start to serve requests, typically the last entry
+46
lib/statusphere/consumer.ex
··· 1 + defmodule Statusphere.Consumer do 2 + alias Statusphere.Repo 3 + require Logger 4 + use Drinkup.RecordConsumer, collections: ["xyz.statusphere.status"] 5 + 6 + def handle_create(record), do: upsert(record) 7 + 8 + def handle_update(record), do: upsert(record) 9 + 10 + def handle_delete(record) do 11 + IO.inspect(record, label: "delete") 12 + end 13 + 14 + defp upsert(%{type: "xyz.statusphere.status", record: record} = evt) do 15 + case Xyz.Statusphere.Status.from_json(record) do 16 + {:ok, record} -> 17 + uri = 18 + Atex.AtURI.to_string(%Atex.AtURI{ 19 + authority: evt.did, 20 + collection: evt.type, 21 + rkey: evt.rkey 22 + }) 23 + 24 + status = 25 + %Statusphere.Status{} 26 + |> Statusphere.Status.changeset(%{ 27 + uri: uri, 28 + author_did: evt.did, 29 + status: record.status, 30 + created_at: NaiveDateTime.from_iso8601!(record.createdAt), 31 + indexed_at: NaiveDateTime.utc_now() 32 + }) 33 + |> Repo.insert!( 34 + on_conflict: [set: [status: record.status, indexed_at: NaiveDateTime.utc_now()]], 35 + conflict_target: :uri 36 + ) 37 + 38 + Logger.debug("ingested status: #{inspect(status)}") 39 + 40 + _ -> 41 + nil 42 + end 43 + end 44 + 45 + defp upsert(_), do: nil 46 + end
+21
lib/statusphere/status.ex
··· 1 + defmodule Statusphere.Status do 2 + use Ecto.Schema 3 + import Ecto.Changeset 4 + 5 + @primary_key {:uri, :binary_id, autogenerate: false} 6 + @foreign_key_type :binary_id 7 + schema "status" do 8 + field :author_did, :string 9 + field :status, :string 10 + field :created_at, :utc_datetime 11 + field :indexed_at, :utc_datetime 12 + end 13 + 14 + @doc false 15 + def changeset(status, attrs) do 16 + status 17 + |> cast(attrs, [:uri, :author_did, :status, :created_at, :indexed_at]) 18 + |> validate_required([:uri, :author_did, :status, :created_at, :indexed_at]) 19 + |> unique_constraint(:uri) 20 + end 21 + end
+53 -14
lib/statusphere_web/controllers/page_controller.ex
··· 1 1 defmodule StatusphereWeb.PageController do 2 + alias Statusphere.Repo 3 + alias Statusphere.Status 4 + require Logger 5 + import Ecto.Query, only: [from: 2] 2 6 use StatusphereWeb, :controller 3 7 4 8 def home(conn, _params) do 5 - case Atex.XRPC.OAuthClient.from_conn(conn) do 6 - {:ok, client} -> 7 - {:ok, %{body: %{value: profile}}, client} = 8 - Atex.XRPC.get(client, %Com.Atproto.Repo.GetRecord{ 9 - params: %{ 10 - repo: client.did, 11 - collection: "app.bsky.actor.profile", 12 - rkey: "self" 13 - } 14 - }) 9 + query = 10 + from u in Status, 11 + order_by: [desc: u.indexed_at], 12 + limit: 10 15 13 16 - selected_status = get_session(conn, :selected_status) 14 + statuses = Repo.all(query) 15 + 16 + status_identities = 17 + statuses 18 + |> Enum.map(fn %{author_did: did} -> 19 + case Atex.IdentityResolver.resolve(did) do 20 + {:ok, identity} -> {identity.did, identity.handle} 21 + {:error, _} -> {did, did} 22 + end 23 + end) 24 + |> Enum.into(%{}) 25 + 26 + with {:ok, client} <- Atex.XRPC.OAuthClient.from_conn(conn), 27 + {:ok, %{body: %{value: profile}}, client} <- 28 + Atex.XRPC.get(client, %Com.Atproto.Repo.GetRecord{ 29 + params: %{ 30 + repo: client.did, 31 + collection: "app.bsky.actor.profile", 32 + rkey: "self" 33 + } 34 + }) do 35 + conn 36 + |> Atex.XRPC.OAuthClient.update_plug(client) 37 + |> render(:home, 38 + profile: profile, 39 + did: client.did, 40 + statuses: statuses, 41 + status_identities: status_identities 42 + ) 43 + else 44 + {:error, err, client} -> 45 + Logger.error("Failed to fetch Bluesky profile for #{client.did}: #{inspect(err)}") 17 46 18 47 conn 19 48 |> Atex.XRPC.OAuthClient.update_plug(client) 20 - |> render(:home, profile: profile, conn: conn, selected_status: selected_status) 49 + |> render(:home, 50 + profile: %{}, 51 + did: client.did, 52 + statuses: statuses, 53 + status_identities: status_identities 54 + ) 21 55 22 - :error -> 23 - render(conn, :home, profile: nil, conn: conn, selected_status: nil) 56 + _err -> 57 + render(conn, :home, 58 + profile: nil, 59 + did: nil, 60 + statuses: statuses, 61 + status_identities: status_identities 62 + ) 24 63 end 25 64 end 26 65
+23 -2
lib/statusphere_web/controllers/page_html/home.html.heex
··· 22 22 <% end %> 23 23 </div> 24 24 25 - <div class="mt-4"> 26 - <StatusphereWeb.Components.EmojiForm.emoji_form selected_status={@selected_status} /> 25 + <div class="mt-6"> 26 + <% user_latest_status = 27 + Enum.find(@statuses, %{status: ""}, fn x -> x.author_did == @did end) %> 28 + <StatusphereWeb.Components.EmojiForm.emoji_form selected_status={user_latest_status.status} /> 27 29 </div> 30 + 31 + <ul class="flex flex-col gap-6 mt-6"> 32 + <li 33 + :for={status <- @statuses} 34 + class="relative flex items-center gap-2 first-of-type:before:content-none before:content-[''] before:bg-gray-200 before:w-[2px] before:h-4 before:absolute before:left-6 before:-translate-x-1/2 before:-top-5 " 35 + > 36 + <% user_handle = @status_identities[status.author_did] %> 37 + <div class="size-12 rounded-full bg-white border border-gray-200 flex items-center justify-center text-3xl"> 38 + {status.status} 39 + </div> 40 + 41 + <p> 42 + <.link href={"https://bsky.app/profile/#{user_handle}"} class="font-bold hover:underline"> 43 + @{user_handle} 44 + </.link> 45 + is feeling {status.status} today 46 + </p> 47 + </li> 48 + </ul> 28 49 </Layouts.app>
+30 -2
lib/statusphere_web/controllers/status_controller.ex
··· 1 1 defmodule StatusphereWeb.StatusController do 2 + alias Statusphere.Repo 2 3 require Logger 3 4 use StatusphereWeb, :controller 4 5 5 6 def create(conn, %{"status" => status}) do 7 + rkey = to_string(Atex.TID.now()) 8 + 6 9 with {:ok, client} <- Atex.XRPC.OAuthClient.from_conn(conn), 7 10 {:ok, record} <- 8 11 Xyz.Statusphere.Status.main(%{ ··· 16 19 input: %{ 17 20 repo: client.did, 18 21 collection: "xyz.statusphere.status", 19 - rkey: Atex.TID.now() |> to_string(), 22 + rkey: rkey, 20 23 record: record 21 24 } 22 25 } 23 26 ) do 27 + uri = 28 + Atex.AtURI.to_string(%Atex.AtURI{ 29 + authority: client.did, 30 + collection: "xyz.statusphere.status", 31 + rkey: rkey 32 + }) 33 + 34 + optimistic_insert = 35 + %Statusphere.Status{} 36 + |> Statusphere.Status.changeset(%{ 37 + uri: uri, 38 + author_did: client.did, 39 + status: record.status, 40 + created_at: NaiveDateTime.from_iso8601!(record.createdAt), 41 + indexed_at: NaiveDateTime.utc_now() 42 + }) 43 + |> Repo.insert() 44 + 45 + case optimistic_insert do 46 + {:ok, _} -> 47 + nil 48 + 49 + {:error, changeset} -> 50 + Logger.error("Failed to optimistically insert status: #{inspect(changeset)}") 51 + end 52 + 24 53 conn 25 54 |> Atex.XRPC.OAuthClient.update_plug(client) 26 - |> put_session(:selected_status, status) 27 55 |> redirect(to: ~p"/") 28 56 else 29 57 :error ->
+13
priv/repo/migrations/20251219113112_create_status.exs
··· 1 + defmodule Statusphere.Repo.Migrations.CreateStatus do 2 + use Ecto.Migration 3 + 4 + def change do 5 + create table(:status, primary_key: false) do 6 + add :uri, :binary_id, primary_key: true 7 + add :author_did, :string, null: false 8 + add :status, :string, null: false 9 + add :created_at, :utc_datetime, null: false 10 + add :indexed_at, :utc_datetime, null: false 11 + end 12 + end 13 + end