···11# Drinkup
2233-An Elixir library for listening to events from an ATProtocol relay
44-(firehose/`com.atproto.sync.subscribeRepos`). Eventually aiming to support any
55-ATProtocol subscription.
33+An Elixir library for consuming various AT Protocol sync services.
6477-## TODO
55+Drinkup provides a unified interface for connecting to various AT Protocol data
66+streams, handling reconnection logic, sequence tracking, and event dispatch.
77+Choose the sync service that fits your needs:
8899-- Support for different subscriptions other than
1010- `com.atproto.sync.subscribeRepo'.
1111-- Validation (signatures, making sure to only track handle active accounts,
1212- etc.) (see
1313- [Firehose Validation Best Practices](https://atproto.com/specs/sync#firehose-validation-best-practices))
1414-- Look into backfilling? See if there's better ways to do it.
1515-- Built-in solutions for tracking resumption? (probably a pluggable solution to
1616- allow for different things like Mnesia, Postgres, etc.)
1717-- Testing of multi-node/distribution.
1818-- Tests
1919-- Documentation
99+- **Firehose** - Raw event stream from the full AT Protocol network.
1010+- **Jetstream** - Lightweight, cherry-picked event stream with filtering by
1111+ record collections and DIDs.
1212+- **Tap** - Managed backfill and indexing solution.
20132114## Installation
22152323-Add `drinkup` to your `mix.exs`.
1616+Add `drinkup` to your `mix.exs`:
24172518```elixir
2619def deps do
···32253326Documentation can be found on HexDocs at https://hexdocs.pm/drinkup.
34273535-## Example Usage
2828+## Quick Start
36293737-First, create a module implementing the `Drinkup.Consumer` behaviour (only
3838-requires a `handle_event/1` function):
3030+### Firehose
39314032```elixir
4141-defmodule ExampleConsumer do
4242- @behaviour Drinkup.Consumer
3333+defmodule MyApp.FirehoseConsumer do
3434+ @behaviour Drinkup.Firehose.Consumer
43354444- def handle_event(%Drinkup.Event.Commit{} = event) do
4545- IO.inspect(event, label: "Got commit event")
3636+ def handle_event(%Drinkup.Firehose.Event.Commit{} = event) do
3737+ IO.inspect(event, label: "Commit")
4638 end
47394840 def handle_event(_), do: :noop
4941end
4242+4343+# In your supervision tree:
4444+children = [{Drinkup.Firehose, %{consumer: MyApp.FirehoseConsumer}}]
5045```
51465252-Then add Drinkup and your consumer to your application's supervision tree:
4747+### Jetstream
53485449```elixir
5555-defmodule MyApp.Application do
5656- use Application
5050+defmodule MyApp.JetstreamConsumer do
5151+ @behaviour Drinkup.Jetstream.Consumer
57525858- def start(_type, _args) do
5959- children = [{Drinkup, %{consumer: ExampleConsumer}}]
6060- Supervisor.start_link(children, strategy: :one_for_one)
5353+ def handle_event(%Drinkup.Jetstream.Event.Commit{} = event) do
5454+ IO.inspect(event, label: "Commit")
6155 end
5656+5757+ def handle_event(_), do: :noop
6258end
6363-```
64596565-You should then be able to start your application and start seeing
6666-`Got commit event: ...` in the terminal.
6767-6868-### Record Consumer
6060+# In your supervision tree:
6161+children = [
6262+ {Drinkup.Jetstream, %{
6363+ consumer: MyApp.JetstreamConsumer,
6464+ wanted_collections: ["app.bsky.feed.post"]
6565+ }}
6666+]
6767+```
69687070-One of the main reasons for listening to an ATProto relay is to synchronise a
7171-database with records. As a result, Drinkup provides a light extension around a
7272-basic consumer, the `RecordConsumer`, which only listens to commit events, and
7373-transforms them into a slightly nicer structure to work around, calling your
7474-`handle_create/1`, `handle_update/1`, and `handle_delete/1` functions for each
7575-record it comes across. It also allows for filtering of specific types of
7676-records either by full name or with a
7777-[Regex](https://hexdocs.pm/elixir/1.18.4/Regex.html) match.
6969+### Tap
78707971```elixir
8080-defmodule ExampleRecordConsumer do
8181- # Will respond to any events either `app.bsky.feed.post` records, or anything under `app.bsky.graph`.
8282- use Drinkup.RecordConsumer, collections: [~r/app\.bsky\.graph\..+/, "app.bsky.feed.post"]
8383- alias Drinkup.RecordConsumer.Record
7272+defmodule MyApp.TapConsumer do
7373+ @behaviour Drinkup.Tap.Consumer
84748585- def handle_create(%Record{type: "app.bsky.feed.post"} = record) do
8686- IO.inspect(record, label: "Bluesky post created")
7575+ def handle_event(%Drinkup.Tap.Event.Record{} = event) do
7676+ IO.inspect(event, label: "Record")
8777 end
88788989- def handle_create(%Record{type: "app.bsky.graph" <> _} = record) do
9090- IO.inspect(record, label: "Bluesky graph updated")
9191- end
7979+ def handle_event(_), do: :noop
8080+end
92819393- def handle_update(record) do
9494- # ...
9595- end
8282+# In your supervision tree:
8383+children = [
8484+ {Drinkup.Tap, %{
8585+ consumer: MyApp.TapConsumer,
8686+ host: "http://localhost:2480"
8787+ }}
8888+]
96899797- def handle_delete(record) do
9898- # ...
9999- end
100100-end
9090+# Track specific repos:
9191+Drinkup.Tap.add_repos(Drinkup.Tap, ["did:plc:abc123"])
10192```
9393+9494+See [the examples](./examples) for some more complete samples.
9595+9696+## TODO
9797+9898+- Validation for Firehose events (signatures, active account tracking) — see
9999+ [Firehose Validation Best Practices](https://atproto.com/specs/sync#firehose-validation-best-practices)
100100+- Pluggable cursor persistence (Mnesia, Postgres, etc.)
101101+- Multi-node/distribution testing
102102+- More comprehensive test coverage
103103+- Additional documentation
102104103105## Special thanks
104106