title: "background tasks"#
plyr.fm uses pydocket for durable background task execution, backed by Redis.
overview#
background tasks handle operations that shouldn't block the request/response cycle:
- copyright scanning - analyzes uploaded tracks for potential copyright matches
- media export - downloads all tracks, zips them, and uploads to R2
- ATProto sync - syncs records to user's PDS on login
- teal scrobbling - scrobbles plays to user's PDS
- album list sync - updates ATProto list records when album metadata changes
- PDS like/unlike - syncs like records to user's PDS asynchronously
- PDS comment create/update/delete - syncs comment records to user's PDS asynchronously
- genre classification - classifies tracks via Replicate effnet-discogs, stores predictions in
track.extra. ifauto_tagflag is set, applies top genre tags automatically after classification - move track audio - moves files between public/private R2 buckets when support_gate is toggled
- sync copyright resolutions (perpetual) - syncs copyright label resolutions from ATProto labeler every 5 minutes
architecture#
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ FastAPI │────▶│ Redis │◀────│ Worker │
│ (add task)│ │ (queue) │ │ (process) │
└─────────────┘ └─────────────┘ └─────────────┘
- docket schedules tasks to Redis
- worker runs in-process alongside FastAPI, processing tasks from the queue
- tasks are durable - if the worker crashes, tasks are retried on restart
configuration#
environment variables#
# redis URL (required to enable docket)
DOCKET_URL=redis://localhost:6379
# optional settings (have sensible defaults)
DOCKET_NAME=plyr # queue namespace
DOCKET_WORKER_CONCURRENCY=10 # concurrent task limit
⚠️ worker settings - do not modify#
the worker is initialized in backend/_internal/background.py using tasks registered via backend/_internal/tasks/__init__.py. do not change these settings without extensive testing:
| setting | default | why it matters |
|---|---|---|
heartbeat_interval |
2s | changing this broke all task execution (2025-12-30 incident) |
minimum_check_interval |
1s | affects how quickly tasks are picked up |
scheduling_resolution |
1s | affects scheduled task precision |
2025-12-30 incident: setting heartbeat_interval=30s caused all scheduled tasks (likes, comments, exports) to silently fail while perpetual tasks continued running. root cause unclear - correlation was definitive but mechanism wasn't found in pydocket source. reverted in PR #669.
if you need to tune worker settings:
- test extensively in staging with real task volume
- verify ALL task types execute (not just perpetual tasks)
- check logfire for task execution spans
when DOCKET_URL is not set, docket is disabled and tasks fall back to asyncio.create_task() (fire-and-forget).
local development#
# start redis + backend + frontend
just dev
# or manually:
docker compose up -d # starts redis on localhost:6379
DOCKET_URL=redis://localhost:6379 just backend run
production/staging#
Redis instances are self-hosted on Fly.io (redis:7-alpine):
| environment | fly app | region |
|---|---|---|
| production | plyr-redis |
iad |
| staging | plyr-redis-stg |
iad |
set DOCKET_URL in fly.io secrets:
flyctl secrets set DOCKET_URL=redis://plyr-redis.internal:6379 -a relay-api
flyctl secrets set DOCKET_URL=redis://plyr-redis-stg.internal:6379 -a relay-api-staging
note: uses Fly internal networking (.internal domain), no TLS needed within private network.
usage#
scheduling a task#
from backend._internal.tasks import schedule_copyright_scan
from backend._internal.tasks import schedule_genre_classification, schedule_embedding_generation
# automatically uses docket if enabled, else asyncio.create_task
await schedule_copyright_scan(track_id, audio_url)
await schedule_genre_classification(track_id)
await schedule_embedding_generation(track_id, audio_url)
adding new tasks#
- define the task function in a module under
backend/_internal/tasks/(organized by domain):tasks/copyright.py- copyright scanning and resolution synctasks/ml.py- genre classification, CLAP embeddingstasks/pds.py- PDS record sync (likes, comments)tasks/storage.py- R2 file operationstasks/sync.py- ATProto sync, scrobbling, album lists
# e.g., backend/_internal/tasks/ml.py
async def my_new_task(arg1: str, arg2: int) -> None:
"""task functions must be async and JSON-serializable args only."""
# do work here
pass
- export it from
backend/_internal/tasks/__init__.pyand add it to thebackground_taskslist:
from backend._internal.tasks.ml import my_new_task
background_tasks = [
# ... existing tasks
my_new_task,
]
- create a scheduler helper in the same module:
async def schedule_my_task(arg1: str, arg2: int) -> None:
"""schedule with docket if enabled, else asyncio."""
if is_docket_enabled():
try:
docket = get_docket()
await docket.add(my_new_task)(arg1, arg2)
return
except Exception:
pass # fall through to asyncio
asyncio.create_task(my_new_task(arg1, arg2))
costs#
self-hosted Redis on Fly.io (fixed monthly):
- ~$2/month per instance (256MB shared-cpu VM)
- ~$4/month total for prod + staging
this replaced Upstash pay-per-command pricing which was costing ~$75/month at scale (37M commands/month).
fallback behavior#
when docket is disabled (DOCKET_URL not set):
schedule_copyright_scan()usesasyncio.create_task()instead- tasks are fire-and-forget (no retries, no durability)
- suitable for local dev without Redis
monitoring#
background task execution is traced in Logfire:
- span:
scheduled copyright scan via docket - span:
docket scheduling failed, falling back to asyncio
query recent background task activity:
SELECT start_timestamp, message, span_name, duration
FROM records
WHERE span_name LIKE '%copyright%'
AND start_timestamp > NOW() - INTERVAL '1 hour'
ORDER BY start_timestamp DESC