audio streaming app plyr.fm

feat: store audio blobs on user's PDS for data ownership (#823)

* feat: store audio blobs on user's PDS for data ownership

embraces ATProto's data ownership model by uploading audio to the user's
PDS while keeping R2 copies for CDN streaming performance.

- upload audio blob to PDS during track creation (non-gated only)
- store BlobRef in track record with ref, mimeType, size
- add migration endpoint POST /tracks/{id}/migrate-to-pds
- track audio_storage ("r2" | "pds" | "both"), pds_blob_cid, pds_blob_size
- handle PayloadTooLargeError gracefully (fall back to R2-only)
- add frontend indicator for PDS-stored tracks

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* refactor: move imports to module level

imports were unnecessarily deferred inside migrate_track_to_pds function.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix: resolve type warning in search result iteration

ArtistSearchResult doesn't have an `id` field, so use getattr for safe access.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* feat(frontend): add PDS indicator to track detail page

show PDS storage status in track-stats section, consistent with TrackItem.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* docs: update STATUS.md with PDS blob storage

- add PR #823 to recent work with implementation details
- note fast follow needed: UI for migrating existing tracks
- note: verify via staging integration tests after merge
- update current focus and what's working sections

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>

authored by zzstoatzz.io

Claude Opus 4.5 and committed by
GitHub
a5269fec eb550eee

+746 -16
+28 -2
STATUS.md
··· 47 47 48 48 ### January 2026 49 49 50 + #### PDS blob storage for audio (PR #823, Jan 29) 51 + 52 + **audio files now stored on user's PDS** - embraces ATProto's data ownership model by uploading audio to the user's PDS while keeping R2 copies for CDN streaming. 53 + 54 + **how it works**: 55 + - new uploads: audio blob uploaded to PDS, BlobRef stored in track record 56 + - dual-write: R2 copy kept for streaming performance (PDS `getBlob` isn't CDN-optimized) 57 + - graceful fallback: if PDS rejects blob (size limit), track stays R2-only 58 + - gated tracks skip PDS (need auth-protected access) 59 + 60 + **database changes**: 61 + - `audio_storage`: "r2" | "pds" | "both" 62 + - `pds_blob_cid`: CID of blob on user's PDS 63 + - `pds_blob_size`: size in bytes 64 + 65 + **migration endpoint**: `POST /tracks/{id}/migrate-to-pds` lets owners migrate existing tracks 66 + 67 + **frontend**: PDS indicator shown on track cards and detail page when `audio_storage` is "pds" or "both" 68 + 69 + **fast follow needed**: UI button for migrating existing tracks (endpoint exists, no frontend yet) 70 + 71 + **verification**: run staging integration tests after merging to main 72 + 73 + --- 74 + 50 75 #### PDS-based account creation (PRs #813-815, Jan 27) 51 76 52 77 **create ATProto accounts directly from plyr.fm** - users without an existing ATProto identity can now create one during sign-up by selecting a PDS host. ··· 330 355 331 356 ### current focus 332 357 333 - listen receipts shipped - share links now track who clicked and played. legal foundation complete with terms of service and privacy policy. responsive embed layout handles any container size. 358 + PDS blob storage shipped - new uploads store audio on user's PDS for true data ownership. fast follow: add UI button for migrating existing tracks. 334 359 335 360 **end-of-year sprint [#625](https://github.com/zzstoatzz/plyr.fm/issues/625) shipped:** 336 361 - moderation consolidation: sensitive images moved to moderation service (#644) ··· 387 412 - ✅ track upload with streaming 388 413 - ✅ audio streaming via 307 redirects to R2 CDN 389 414 - ✅ lossless audio (AIFF/FLAC) with automatic transcoding for browser compatibility 415 + - ✅ PDS blob storage for audio (user data ownership) 390 416 - ✅ play count tracking, likes, queue management 391 417 - ✅ unified search with Cmd/Ctrl+K 392 418 - ✅ teal.fm scrobbling ··· 500 526 501 527 --- 502 528 503 - this is a living document. last updated 2026-01-27. 529 + this is a living document. last updated 2026-01-29.
+36
backend/alembic/versions/2026_01_28_232100_e88dbd481272_add_pds_blob_storage_columns.py
··· 1 + """add pds blob storage columns 2 + 3 + Revision ID: e88dbd481272 4 + Revises: 38dd0d1af1b7 5 + Create Date: 2026-01-28 23:21:00.640289 6 + 7 + """ 8 + 9 + from collections.abc import Sequence 10 + 11 + import sqlalchemy as sa 12 + 13 + from alembic import op 14 + 15 + # revision identifiers, used by Alembic. 16 + revision: str = "e88dbd481272" 17 + down_revision: str | Sequence[str] | None = "38dd0d1af1b7" 18 + branch_labels: str | Sequence[str] | None = None 19 + depends_on: str | Sequence[str] | None = None 20 + 21 + 22 + def upgrade() -> None: 23 + """Upgrade schema.""" 24 + op.add_column( 25 + "tracks", 26 + sa.Column("audio_storage", sa.String(), server_default="r2", nullable=False), 27 + ) 28 + op.add_column("tracks", sa.Column("pds_blob_cid", sa.String(), nullable=True)) 29 + op.add_column("tracks", sa.Column("pds_blob_size", sa.Integer(), nullable=True)) 30 + 31 + 32 + def downgrade() -> None: 33 + """Downgrade schema.""" 34 + op.drop_column("tracks", "pds_blob_size") 35 + op.drop_column("tracks", "pds_blob_cid") 36 + op.drop_column("tracks", "audio_storage")
+4
backend/src/backend/_internal/atproto/__init__.py
··· 1 1 """ATProto integration for relay.""" 2 2 3 + from backend._internal.atproto.client import BlobRef, PayloadTooLargeError, upload_blob 3 4 from backend._internal.atproto.profile import ( 4 5 fetch_user_avatar, 5 6 fetch_user_profile, ··· 22 23 from backend._internal.atproto.sync import sync_atproto_records 23 24 24 25 __all__ = [ 26 + "BlobRef", 27 + "PayloadTooLargeError", 25 28 "create_comment_record", 26 29 "create_like_record", 27 30 "create_list_record", ··· 35 38 "update_comment_record", 36 39 "update_list_record", 37 40 "update_record", 41 + "upload_blob", 38 42 "upsert_album_list_record", 39 43 "upsert_liked_list_record", 40 44 "upsert_profile_record",
+79 -1
backend/src/backend/_internal/atproto/client.py
··· 4 4 import json 5 5 import logging 6 6 from datetime import UTC, datetime, timedelta 7 - from typing import Any 7 + from typing import Any, BinaryIO 8 8 9 9 from atproto_oauth.models import OAuthSession 10 10 from cachetools import LRUCache ··· 17 17 ) 18 18 19 19 logger = logging.getLogger(__name__) 20 + 21 + 22 + class PayloadTooLargeError(Exception): 23 + """raised when PDS rejects a blob due to size limits.""" 24 + 25 + 26 + # BlobRef uses ATProto's JSON structure with $type, $link keys. 27 + # TypedDict can't express $ in field names, so we use dict[str, Any] with documentation. 28 + # Structure: {"$type": "blob", "ref": {"$link": "<CID>"}, "mimeType": str, "size": int} 29 + BlobRef = dict[str, Any] 20 30 21 31 # per-session locks for token refresh to prevent concurrent refresh races. 22 32 # uses LRUCache (not TTLCache) to bound memory - LRU eviction is safe because: ··· 236 246 pass 237 247 238 248 raise Exception(f"PDS request failed: {response.status_code} {response.text}") 249 + 250 + 251 + async def upload_blob( 252 + auth_session: AuthSession, 253 + data: bytes | BinaryIO, 254 + content_type: str, 255 + ) -> BlobRef: 256 + """upload a blob to the user's PDS. 257 + 258 + args: 259 + auth_session: authenticated user session 260 + data: binary data or file-like object to upload 261 + content_type: MIME type (e.g., audio/mpeg, audio/wav) 262 + 263 + returns: 264 + blob reference dict: {"$type": "blob", "ref": {"$link": CID}, "mimeType": str, "size": int} 265 + 266 + raises: 267 + PayloadTooLargeError: if PDS rejects due to size limit (413) 268 + ValueError: if session is invalid 269 + Exception: if upload fails after retry 270 + """ 271 + oauth_data = auth_session.oauth_session 272 + if not oauth_data or "access_token" not in oauth_data: 273 + raise ValueError( 274 + f"OAuth session data missing or invalid for {auth_session.did}" 275 + ) 276 + 277 + oauth_session = reconstruct_oauth_session(oauth_data) 278 + url = f"{oauth_data['pds_url']}/xrpc/com.atproto.repo.uploadBlob" 279 + 280 + # read data if it's a file-like object 281 + blob_data = data if isinstance(data, bytes) else data.read() 282 + 283 + for attempt in range(2): 284 + response = await get_oauth_client().make_authenticated_request( 285 + session=oauth_session, 286 + method="POST", 287 + url=url, 288 + content=blob_data, 289 + headers={"Content-Type": content_type}, 290 + ) 291 + 292 + if response.status_code == 200: 293 + return response.json()["blob"] 294 + 295 + # payload too large - PDS rejects due to size limit 296 + if response.status_code == 413: 297 + raise PayloadTooLargeError( 298 + f"blob too large for PDS (limit exceeded): {response.text}" 299 + ) 300 + 301 + # token expired - refresh and retry 302 + if response.status_code == 401 and attempt == 0: 303 + try: 304 + error_data = response.json() 305 + if "exp" in error_data.get("message", ""): 306 + logger.info( 307 + f"access token expired for {auth_session.did}, attempting refresh" 308 + ) 309 + oauth_session = await _refresh_session_tokens( 310 + auth_session, oauth_session 311 + ) 312 + continue 313 + except (json.JSONDecodeError, KeyError): 314 + pass 315 + 316 + raise Exception(f"blob upload failed: {response.status_code} {response.text}") 239 317 240 318 241 319 def parse_at_uri(uri: str) -> tuple[str, str, str]:
+12 -5
backend/src/backend/_internal/atproto/records/fm_plyr/track.py
··· 5 5 from typing import Any 6 6 7 7 from backend._internal import Session as AuthSession 8 - from backend._internal.atproto.client import make_pds_request, parse_at_uri 8 + from backend._internal.atproto.client import BlobRef, make_pds_request, parse_at_uri 9 9 from backend.config import settings 10 10 11 11 logger = logging.getLogger(__name__) ··· 18 18 file_type: str, 19 19 album: str | None = None, 20 20 duration: int | None = None, 21 - features: list[dict] | None = None, 21 + features: list[dict[str, Any]] | None = None, 22 22 image_url: str | None = None, 23 - support_gate: dict | None = None, 23 + support_gate: dict[str, Any] | None = None, 24 + audio_blob: BlobRef | None = None, 24 25 ) -> dict[str, Any]: 25 26 """Build a track record dict for ATProto. 26 27 ··· 34 35 features: optional list of featured artists [{did, handle, display_name, avatar_url}] 35 36 image_url: optional cover art image URL 36 37 support_gate: optional gating config (e.g., {"type": "any"}) 38 + audio_blob: optional blob reference from PDS upload (canonical source when present) 37 39 38 40 returns: 39 41 record dict ready for ATProto ··· 68 70 record["imageUrl"] = image_url 69 71 if support_gate: 70 72 record["supportGate"] = support_gate 73 + if audio_blob: 74 + record["audioBlob"] = audio_blob 71 75 72 76 return record 73 77 ··· 80 84 file_type: str, 81 85 album: str | None = None, 82 86 duration: int | None = None, 83 - features: list[dict] | None = None, 87 + features: list[dict[str, Any]] | None = None, 84 88 image_url: str | None = None, 85 - support_gate: dict | None = None, 89 + support_gate: dict[str, Any] | None = None, 90 + audio_blob: BlobRef | None = None, 86 91 ) -> tuple[str, str]: 87 92 """Create a track record on the user's PDS using the configured collection. 88 93 ··· 97 102 features: optional list of featured artists [{did, handle, display_name, avatar_url}] 98 103 image_url: optional cover art image URL 99 104 support_gate: optional gating config (e.g., {"type": "any"}) 105 + audio_blob: optional blob reference from PDS upload (canonical source when present) 100 106 101 107 returns: 102 108 tuple of (record_uri, record_cid) ··· 115 121 features=features, 116 122 image_url=image_url, 117 123 support_gate=support_gate, 124 + audio_blob=audio_blob, 118 125 ) 119 126 120 127 payload = {
+174 -1
backend/src/backend/api/tracks/mutations.py
··· 17 17 18 18 from backend._internal import Session as AuthSession 19 19 from backend._internal import get_oauth_client, require_auth 20 - from backend._internal.atproto import delete_record_by_uri 20 + from backend._internal.atproto import ( 21 + PayloadTooLargeError, 22 + delete_record_by_uri, 23 + upload_blob, 24 + ) 21 25 from backend._internal.atproto.records import ( 22 26 _reconstruct_oauth_session, 23 27 _refresh_session_tokens, ··· 25 29 update_record, 26 30 ) 27 31 from backend._internal.atproto.tid import datetime_to_tid 32 + from backend._internal.audio import AudioFormat 28 33 from backend._internal.background_tasks import ( 29 34 schedule_album_list_sync, 30 35 schedule_move_track_audio, ··· 662 667 track=await TrackResponse.from_track(track), 663 668 restored_uri=new_uri, 664 669 ) 670 + 671 + 672 + class MigrateToPdsResponse(BaseModel): 673 + """response for PDS migration endpoint.""" 674 + 675 + success: bool 676 + track_id: int 677 + pds_blob_cid: str | None = None 678 + message: str 679 + 680 + 681 + @router.post("/{track_id}/migrate-to-pds") 682 + async def migrate_track_to_pds( 683 + track_id: int, 684 + db: Annotated[AsyncSession, Depends(get_db)], 685 + auth_session: AuthSession = Depends(require_auth), 686 + ) -> MigrateToPdsResponse: 687 + """migrate an existing track's audio to the user's PDS. 688 + 689 + this uploads the audio blob to the user's PDS and updates the ATProto record 690 + to reference it. the R2 copy is kept for CDN streaming performance. 691 + 692 + requires: 693 + - active OAuth session (user must be logged in) 694 + - track must belong to the authenticated user 695 + - track must not already have a PDS blob 696 + 697 + note: this may fail if the audio file is too large for the PDS blob limit. 698 + """ 699 + # fetch track with artist 700 + result = await db.execute( 701 + select(Track).options(selectinload(Track.artist)).where(Track.id == track_id) 702 + ) 703 + track = result.scalar_one_or_none() 704 + 705 + if not track: 706 + raise HTTPException(status_code=404, detail="track not found") 707 + 708 + # verify ownership 709 + if track.artist_did != auth_session.did: 710 + raise HTTPException( 711 + status_code=403, 712 + detail="you can only migrate your own tracks", 713 + ) 714 + 715 + # check if already migrated 716 + if track.pds_blob_cid: 717 + return MigrateToPdsResponse( 718 + success=True, 719 + track_id=track_id, 720 + pds_blob_cid=track.pds_blob_cid, 721 + message="track already has PDS blob", 722 + ) 723 + 724 + # gated tracks can't be migrated (they need auth-protected access) 725 + if track.support_gate: 726 + raise HTTPException( 727 + status_code=400, 728 + detail="supporter-gated tracks cannot be migrated to PDS", 729 + ) 730 + 731 + # get the audio file from R2 732 + try: 733 + audio_data = await storage.get_file_data(track.file_id, track.file_type) 734 + if not audio_data: 735 + raise HTTPException( 736 + status_code=404, 737 + detail="audio file not found in storage", 738 + ) 739 + except Exception as e: 740 + logger.error(f"failed to fetch audio for track {track_id}: {e}", exc_info=True) 741 + raise HTTPException( 742 + status_code=500, 743 + detail=f"failed to fetch audio file: {e}", 744 + ) from e 745 + 746 + # determine content type from file type 747 + audio_format = AudioFormat.from_extension(track.file_type) 748 + if not audio_format: 749 + raise HTTPException( 750 + status_code=400, 751 + detail=f"unsupported audio format: {track.file_type}", 752 + ) 753 + content_type = audio_format.media_type 754 + 755 + # upload blob to PDS 756 + try: 757 + blob_ref = await upload_blob(auth_session, audio_data, content_type) 758 + blob_cid = blob_ref.get("ref", {}).get("$link") 759 + blob_size = blob_ref.get("size") 760 + 761 + logfire.info( 762 + "pds blob migration succeeded", 763 + track_id=track_id, 764 + cid=blob_cid, 765 + size=blob_size, 766 + did=auth_session.did, 767 + ) 768 + except PayloadTooLargeError as e: 769 + logfire.info( 770 + "pds blob migration failed: file too large", 771 + track_id=track_id, 772 + error=str(e), 773 + did=auth_session.did, 774 + ) 775 + raise HTTPException( 776 + status_code=413, 777 + detail="audio file too large for PDS blob upload", 778 + ) from e 779 + except Exception as e: 780 + logger.error( 781 + f"pds blob migration failed for track {track_id}: {e}", exc_info=True 782 + ) 783 + raise HTTPException( 784 + status_code=500, 785 + detail=f"PDS blob upload failed: {e}", 786 + ) from e 787 + 788 + # update the ATProto record with the blob reference 789 + new_record_cid: str | None = None 790 + if track.atproto_record_uri and track.r2_url: 791 + try: 792 + # build updated record with blob 793 + track_record = build_track_record( 794 + title=track.title, 795 + artist=track.artist.display_name, 796 + audio_url=track.r2_url, 797 + file_type=track.file_type, 798 + album=track.album, 799 + duration=track.duration, 800 + features=track.features if track.features else None, 801 + image_url=await track.get_image_url(), 802 + support_gate=track.support_gate, 803 + audio_blob=blob_ref, 804 + ) 805 + track_record["createdAt"] = track.created_at.isoformat() 806 + 807 + _, new_record_cid = await update_record( 808 + auth_session, track.atproto_record_uri, track_record 809 + ) 810 + logfire.info( 811 + "updated ATProto record with blob", 812 + track_id=track_id, 813 + record_uri=track.atproto_record_uri, 814 + new_cid=new_record_cid, 815 + ) 816 + except Exception as e: 817 + # log but don't fail - the blob was uploaded successfully 818 + # the record can be updated later 819 + logger.warning( 820 + f"failed to update ATProto record for track {track_id}: {e}", 821 + exc_info=True, 822 + ) 823 + 824 + # update database 825 + track.audio_storage = "both" 826 + track.pds_blob_cid = blob_cid 827 + track.pds_blob_size = blob_size 828 + if new_record_cid: 829 + track.atproto_record_cid = new_record_cid 830 + await db.commit() 831 + 832 + return MigrateToPdsResponse( 833 + success=True, 834 + track_id=track_id, 835 + pds_blob_cid=blob_cid, 836 + message="track audio migrated to PDS successfully", 837 + )
+126 -1
backend/src/backend/api/tracks/uploads.py
··· 10 10 from pathlib import Path 11 11 from typing import Annotated 12 12 13 + import aiofiles 13 14 import logfire 14 15 from fastapi import ( 15 16 BackgroundTasks, ··· 28 29 29 30 from backend._internal import Session as AuthSession 30 31 from backend._internal import has_flag, require_artist_profile 31 - from backend._internal.atproto import create_track_record 32 + from backend._internal.atproto import ( 33 + BlobRef, 34 + PayloadTooLargeError, 35 + create_track_record, 36 + upload_blob, 37 + ) 32 38 from backend._internal.atproto.handles import resolve_featured_artists 33 39 from backend._internal.audio import AudioFormat 34 40 from backend._internal.background_tasks import ( ··· 226 232 transcoded_data: bytes 227 233 228 234 235 + @dataclass 236 + class PdsBlobResult: 237 + """result of attempting to upload a blob to user's PDS.""" 238 + 239 + blob_ref: BlobRef | None 240 + cid: str | None 241 + size: int | None 242 + 243 + 244 + async def _try_upload_to_pds( 245 + upload_id: str, 246 + auth_session: AuthSession, 247 + file_data: bytes, 248 + content_type: str, 249 + ) -> PdsBlobResult: 250 + """attempt to upload audio blob to user's PDS. 251 + 252 + this is a best-effort operation - if it fails due to size limits or other 253 + errors, we fall back to R2-only storage. the canonical audio data lives on 254 + the PDS when possible (embracing ATProto's data ownership ideals). 255 + 256 + args: 257 + upload_id: job tracking ID 258 + auth_session: authenticated user session 259 + file_data: audio file bytes (should match content_type) 260 + content_type: MIME type (e.g., audio/mpeg) 261 + 262 + returns: 263 + PdsBlobResult with blob_ref, cid, and size if successful, all None otherwise 264 + """ 265 + await job_service.update_progress( 266 + upload_id, 267 + JobStatus.PROCESSING, 268 + "uploading to your PDS...", 269 + phase="pds_upload", 270 + progress_pct=0.0, 271 + ) 272 + 273 + try: 274 + blob_ref = await upload_blob(auth_session, file_data, content_type) 275 + 276 + # extract CID from blob ref: {"ref": {"$link": "<CID>"}, ...} 277 + cid = blob_ref.get("ref", {}).get("$link") 278 + size = blob_ref.get("size") 279 + 280 + await job_service.update_progress( 281 + upload_id, 282 + JobStatus.PROCESSING, 283 + "uploaded to PDS", 284 + phase="pds_upload", 285 + progress_pct=100.0, 286 + ) 287 + logfire.info( 288 + "pds blob upload succeeded", 289 + cid=cid, 290 + size=size, 291 + did=auth_session.did, 292 + ) 293 + return PdsBlobResult(blob_ref=blob_ref, cid=cid, size=size) 294 + 295 + except PayloadTooLargeError as e: 296 + logfire.info( 297 + "pds blob upload skipped: file too large", 298 + error=str(e), 299 + did=auth_session.did, 300 + ) 301 + await job_service.update_progress( 302 + upload_id, 303 + JobStatus.PROCESSING, 304 + "file too large for PDS, using CDN only", 305 + phase="pds_upload", 306 + ) 307 + return PdsBlobResult(blob_ref=None, cid=None, size=None) 308 + 309 + except Exception as e: 310 + logfire.warning( 311 + "pds blob upload failed", 312 + error=str(e), 313 + did=auth_session.did, 314 + exc_info=True, 315 + ) 316 + await job_service.update_progress( 317 + upload_id, 318 + JobStatus.PROCESSING, 319 + "PDS upload failed, using CDN only", 320 + phase="pds_upload", 321 + ) 322 + return PdsBlobResult(blob_ref=None, cid=None, size=None) 323 + 324 + 229 325 async def _transcode_audio( 230 326 upload_id: str, 231 327 file_path: str, ··· 413 509 image_id: str | None = None 414 510 audio_format: AudioFormat | None = None 415 511 playable_format: AudioFormat | None = None 512 + pds_blob_result: PdsBlobResult | None = None 416 513 417 514 try: 418 515 await job_service.update_progress( ··· 455 552 return 456 553 457 554 # for non-web-playable formats, transcode first 555 + transcode_info: TranscodeInfo | None = None 458 556 if not audio_format.is_web_playable: 459 557 # gated tracks don't support transcoding yet 460 558 if is_gated: ··· 528 626 ) 529 627 return 530 628 629 + # try uploading blob to user's PDS (best-effort, falls back to R2-only) 630 + # gated tracks skip PDS blob upload since they need auth-protected access 631 + if not is_gated and playable_format: 632 + content_type = playable_format.media_type 633 + # use transcoded bytes if available, otherwise read original file 634 + if transcode_info: 635 + pds_file_data = transcode_info.transcoded_data 636 + else: 637 + async with aiofiles.open(ctx.file_path, "rb") as f: 638 + pds_file_data = await f.read() 639 + pds_blob_result = await _try_upload_to_pds( 640 + ctx.upload_id, 641 + ctx.auth_session, 642 + pds_file_data, 643 + content_type, 644 + ) 645 + 531 646 # save image if provided 532 647 image_url = None 533 648 if ctx.image_path and ctx.image_filename: ··· 604 719 features=featured_artists or None, 605 720 image_url=image_url, 606 721 support_gate=ctx.support_gate, 722 + audio_blob=pds_blob_result.blob_ref 723 + if pds_blob_result 724 + else None, 607 725 ) 608 726 if not atproto_result: 609 727 raise ValueError("PDS returned no record data") ··· 649 767 db, artist, ctx.album, image_id, image_url 650 768 ) 651 769 770 + # determine audio storage type 771 + has_pds_blob = pds_blob_result and pds_blob_result.cid is not None 772 + audio_storage = "both" if has_pds_blob else "r2" 773 + 652 774 track = Track( 653 775 title=ctx.title, 654 776 file_id=file_id, ··· 665 787 image_id=image_id, 666 788 image_url=image_url, 667 789 support_gate=ctx.support_gate, 790 + audio_storage=audio_storage, 791 + pds_blob_cid=pds_blob_result.cid if pds_blob_result else None, 792 + pds_blob_size=pds_blob_result.size if pds_blob_result else None, 668 793 ) 669 794 670 795 db.add(track)
+7
backend/src/backend/models/track.py
··· 77 77 atproto_record_uri: Mapped[str | None] = mapped_column(String, nullable=True) 78 78 atproto_record_cid: Mapped[str | None] = mapped_column(String, nullable=True) 79 79 80 + # PDS blob storage (for audio stored on user's PDS) 81 + audio_storage: Mapped[str] = mapped_column( 82 + String, nullable=False, default="r2", server_default="r2" 83 + ) # "r2" | "pds" | "both" 84 + pds_blob_cid: Mapped[str | None] = mapped_column(String, nullable=True) 85 + pds_blob_size: Mapped[int | None] = mapped_column(Integer, nullable=True) 86 + 80 87 # engagement metrics 81 88 play_count: Mapped[int] = mapped_column( 82 89 Integer, nullable=False, default=0, server_default="0"
+4
backend/src/backend/schemas.py
··· 115 115 original_file_type: str | None = ( 116 116 None # original format if transcoded (e.g., aiff, flac) 117 117 ) 118 + audio_storage: str = "r2" # "r2" | "pds" | "both" 119 + pds_blob_cid: str | None = None # CID if stored on user's PDS 118 120 119 121 @classmethod 120 122 async def from_track( ··· 222 224 gated=gated, 223 225 original_file_id=track.original_file_id, 224 226 original_file_type=track.original_file_type, 227 + audio_storage=track.audio_storage, 228 + pds_blob_cid=track.pds_blob_cid, 225 229 )
+58
backend/src/backend/storage/r2.py
··· 302 302 303 303 return None 304 304 305 + async def get_file_data( 306 + self, 307 + file_id: str, 308 + file_type: str, 309 + ) -> bytes | None: 310 + """fetch raw file data from R2. 311 + 312 + args: 313 + file_id: the file identifier hash 314 + file_type: file extension without dot (e.g., "mp3", "wav") 315 + 316 + returns: 317 + file bytes if found, None otherwise 318 + """ 319 + with logfire.span("R2 get_file_data", file_id=file_id, file_type=file_type): 320 + async with self.async_session.client( 321 + "s3", 322 + endpoint_url=self.endpoint_url, 323 + aws_access_key_id=self.aws_access_key_id, 324 + aws_secret_access_key=self.aws_secret_access_key, 325 + ) as client: 326 + # build key from file_id and file_type 327 + audio_format = AudioFormat.from_extension(f".{file_type.lower()}") 328 + if not audio_format: 329 + logfire.warning( 330 + "unsupported file type for get_file_data", 331 + file_id=file_id, 332 + file_type=file_type, 333 + ) 334 + return None 335 + 336 + key = f"audio/{file_id}{audio_format.extension}" 337 + 338 + try: 339 + response = await client.get_object( 340 + Bucket=self.audio_bucket_name, Key=key 341 + ) 342 + body = response["Body"] 343 + data = await body.read() 344 + logfire.info( 345 + "R2 file data fetched", 346 + file_id=file_id, 347 + key=key, 348 + size=len(data), 349 + ) 350 + return data 351 + except client.exceptions.NoSuchKey: 352 + logfire.warning( 353 + "R2 file not found", 354 + file_id=file_id, 355 + key=key, 356 + ) 357 + return None 358 + except ClientError as e: 359 + if e.response.get("Error", {}).get("Code") == "404": 360 + return None 361 + raise 362 + 305 363 async def delete(self, file_id: str, file_type: str | None = None) -> bool: 306 364 """delete media file from R2. 307 365
+1 -1
backend/tests/integration/test_track_lifecycle.py
··· 160 160 search_result = await client.search(unique_title, type="tracks") 161 161 # search results are in search_result.results, filter for tracks 162 162 for item in search_result.results: 163 - if item.type == "track" and item.id == track_id: 163 + if item.type == "track" and getattr(item, "id", None) == track_id: 164 164 found = True 165 165 break 166 166 if found:
+137
docs/research/2026-01-29-pds-blob-storage.md
··· 1 + # PDS blob storage research 2 + 3 + storing audio on user's PDS instead of (or in addition to) R2. 4 + 5 + ## motivation 6 + 7 + user feedback: storing audio on R2 diverges from ATProto ideals. users should own their data on their PDS. 8 + 9 + proposed pattern: 10 + 1. user's PDS: store audio blob + user-owned record 11 + 2. service repo: record with CDN URL that references user's PDS blob via at-uri/strongRef 12 + 3. CDN: still use for streaming performance 13 + 14 + benefits: 15 + - data ownership (canonical audio in user's PDS) 16 + - takedown capability (remove service record, audio stops appearing) 17 + - CDN flexibility (can migrate without breaking tracks) 18 + - protocol-native (explicit verifiable relationship) 19 + 20 + ## bluesky PDS implementation 21 + 22 + repo: `bluesky-social/atproto` (packages/pds) 23 + 24 + ### blob upload limit 25 + 26 + | source | value | 27 + |--------|-------| 28 + | code default | 5MB (`5 * 1024 * 1024`) | 29 + | sample.env recommendation | 100MB (`104857600`) | 30 + | bluesky.social hosted | unknown (likely 100MB) | 31 + 32 + config: `PDS_BLOB_UPLOAD_LIMIT` env var 33 + 34 + location: `packages/pds/src/config/config.ts` 35 + ```typescript 36 + blobUploadLimit: env.blobUploadLimit ?? 5 * 1024 * 1024, // 5mb 37 + ``` 38 + 39 + ### limit discovery 40 + 41 + **there is no API to discover the blob limit.** 42 + 43 + `com.atproto.server.describeServer` returns: 44 + - did 45 + - availableUserDomains 46 + - inviteCodeRequired 47 + - links (privacyPolicy, termsOfService) 48 + - contact email 49 + 50 + it does NOT expose blobUploadLimit. 51 + 52 + location: `packages/pds/src/api/com/atproto/server/describeServer.ts` 53 + 54 + ### error when limit exceeded 55 + 56 + error type: `ResponseType.PayloadTooLarge` 57 + message: `"request entity too large"` 58 + 59 + location: `packages/xrpc-server/src/util.ts` 60 + 61 + checks both: 62 + 1. content-length header (early rejection) 63 + 2. streaming body size (MaxSizeChecker) 64 + 65 + ## custom PDS implementations 66 + 67 + (to be documented as we encounter them) 68 + 69 + ### tangled.sh PDS 70 + 71 + TODO: investigate limits 72 + 73 + ### other implementations 74 + 75 + TODO 76 + 77 + ## upstream issues (bluesky-social/atproto) 78 + 79 + ### #1555 → discussion #1582 - Extend describeServer 80 + 81 + requested adding blob limits to describeServer. converted to discussion. 82 + 83 + **bnewbold's response (Sept 2023):** 84 + - application-specific constraints belong in **Lexicons**, not describeServer 85 + - blob sizes and mime types should be defined at the application level 86 + - rate limits should use HTTP headers 87 + - future endpoint for account-specific resource quotas "we don't have a plan/proposal for this yet" 88 + 89 + **implication for plyr.fm:** we can define max audio size in our lexicon, but we can't discover the PDS's actual blob limit. the only option is try-and-catch. 90 + 91 + link: https://github.com/bluesky-social/atproto/discussions/1582 92 + 93 + ### #1737 - Consider raising the size limit of blobs (CLOSED) 94 + 95 + from 2023, complained about ~1MB limit. closed as "COMPLETED" - limit was raised (now 100MB in sample.env). 96 + 97 + ### #3392 - Add restrictions on blobstore consumption (OPEN) 98 + 99 + requesting total blob quota per account (not just per-blob limit). mentions: 100 + - ATFile using PDS as file storage 101 + - GrayHaze video streaming experiment 102 + - growing interest in media-heavy apps 103 + 104 + ### #4009 - bump PDS JSON request limit to 1MB (OPEN PR) 105 + 106 + about record size, not blob size. but shows active work on limits. 107 + 108 + ## production file size analysis (2026-01-29) 109 + 110 + bucket: `audio-prod`, 570 files total 111 + 112 + | threshold | count | % of total | 113 + |-----------|-------|------------| 114 + | > 100MB | 10 | 1.8% | 115 + | > 50MB | 26 | 4.6% | 116 + | > 25MB | 45 | 7.9% | 117 + | > 10MB | 218 | 38% | 118 + | > 5MB | 447 | 78% | 119 + 120 + largest file: 775MB (WAV) 121 + 122 + **implications:** 123 + - with 100MB limit (sample.env recommendation): 10 files need R2 fallback 124 + - with 5MB limit (code default): 447 files (78%) need R2 fallback 125 + - WAV files tend to be largest (lossless uploads for export) 126 + 127 + ## open questions 128 + 129 + 1. should we try PDS upload and catch PayloadTooLarge, or use a conservative threshold? 130 + 2. what's the actual limit on bluesky.social hosted PDSes? (likely 100MB based on sample.env) 131 + 3. should we propose re-opening #1555 to add blobUploadLimit to describeServer? 132 + 133 + ## related 134 + 135 + - issue #614: S3-compatible blob sidecar discussion 136 + - issue #146: content-addressable storage 137 + - ATProto discourse: [Media PDS/Service](https://discourse.atprotocol.community/t/media-pds-service/297)
+35
frontend/src/lib/components/TrackItem.svelte
··· 368 368 {/if} 369 369 </span> 370 370 {/if} 371 + {#if track.audio_storage === 'both' || track.audio_storage === 'pds'} 372 + <span class="meta-separator">•</span> 373 + <span class="pds-indicator" title="stored on PDS"> 374 + <svg width="12" height="12" viewBox="0 0 16 16" fill="none" xmlns="http://www.w3.org/2000/svg"> 375 + <circle cx="8" cy="8" r="6" stroke="currentColor" stroke-width="1.5" fill="none"/> 376 + <path d="M5 8l2 2 4-4" stroke="currentColor" stroke-width="1.5" stroke-linecap="round" stroke-linejoin="round"/> 377 + </svg> 378 + <span class="pds-label">pds</span> 379 + </span> 380 + {/if} 371 381 </div> 372 382 </div> 373 383 </button> ··· 915 925 916 926 .comment-count { 917 927 font-family: inherit; 928 + } 929 + 930 + .pds-indicator { 931 + display: inline-flex; 932 + align-items: center; 933 + gap: 0.25rem; 934 + color: var(--text-tertiary); 935 + cursor: help; 936 + transition: color 0.2s; 937 + } 938 + 939 + .pds-indicator:hover { 940 + color: var(--accent); 941 + } 942 + 943 + .pds-indicator svg { 944 + width: 12px; 945 + height: 12px; 946 + flex-shrink: 0; 947 + } 948 + 949 + .pds-label { 950 + font-family: inherit; 951 + font-size: inherit; 952 + text-transform: lowercase; 918 953 } 919 954 920 955 .action-button {
+2
frontend/src/lib/types.ts
··· 59 59 gated?: boolean; // true if track is gated AND viewer lacks access 60 60 original_file_id?: string | null; // original file hash if transcoded 61 61 original_file_type?: string | null; // original format if transcoded (e.g., aiff, flac) 62 + audio_storage?: 'r2' | 'pds' | 'both'; // where audio is stored 63 + pds_blob_cid?: string | null; // CID if stored on user's PDS 62 64 } 63 65 64 66 export interface LinkedAccount {
+32
frontend/src/routes/track/[id]/+page.svelte
··· 646 646 {/if} 647 647 </span> 648 648 {/if} 649 + {#if track.audio_storage === 'both' || track.audio_storage === 'pds'} 650 + <span class="separator">•</span> 651 + <span class="pds-indicator" title="stored on PDS"> 652 + <svg width="14" height="14" viewBox="0 0 16 16" fill="none" xmlns="http://www.w3.org/2000/svg"> 653 + <circle cx="8" cy="8" r="6" stroke="currentColor" stroke-width="1.5" fill="none"/> 654 + <path d="M5 8l2 2 4-4" stroke="currentColor" stroke-width="1.5" stroke-linecap="round" stroke-linejoin="round"/> 655 + </svg> 656 + <span class="pds-label">pds</span> 657 + </span> 658 + {/if} 649 659 </div> 650 660 651 661 <div class="side-buttons"> ··· 1050 1060 background: color-mix(in srgb, var(--accent) 15%, transparent); 1051 1061 color: var(--accent); 1052 1062 outline: none; 1063 + } 1064 + 1065 + .track-stats .pds-indicator { 1066 + display: inline-flex; 1067 + align-items: center; 1068 + gap: 0.25rem; 1069 + cursor: help; 1070 + transition: color 0.2s; 1071 + } 1072 + 1073 + .track-stats .pds-indicator:hover { 1074 + color: var(--accent); 1075 + } 1076 + 1077 + .track-stats .pds-indicator svg { 1078 + flex-shrink: 0; 1079 + } 1080 + 1081 + .track-stats .pds-label { 1082 + font-family: inherit; 1083 + font-size: inherit; 1084 + text-transform: lowercase; 1053 1085 } 1054 1086 1055 1087 .track-tags {
+6
lexicons/track.json
··· 66 66 "type": "ref", 67 67 "ref": "#supportGate", 68 68 "description": "If set, this track requires viewer to be a supporter of the artist via atprotofans." 69 + }, 70 + "audioBlob": { 71 + "type": "blob", 72 + "description": "Audio file stored on the user's PDS. When present, this is the canonical source; audioUrl is the CDN fallback.", 73 + "accept": ["audio/*"], 74 + "maxSize": 104857600 69 75 } 70 76 } 71 77 }
+5 -5
loq.toml
··· 43 43 44 44 [[rules]] 45 45 path = "backend/src/backend/api/tracks/mutations.py" 46 - max_lines = 664 46 + max_lines = 850 47 47 48 48 [[rules]] 49 49 path = "backend/src/backend/api/tracks/uploads.py" 50 - max_lines = 976 50 + max_lines = 1100 51 51 52 52 [[rules]] 53 53 path = "backend/src/backend/config.py" ··· 55 55 56 56 [[rules]] 57 57 path = "backend/src/backend/storage/r2.py" 58 - max_lines = 661 58 + max_lines = 725 59 59 60 60 [[rules]] 61 61 path = "backend/tests/api/test_audio.py" ··· 119 119 120 120 [[rules]] 121 121 path = "frontend/src/lib/components/TrackItem.svelte" 122 - max_lines = 1105 122 + max_lines = 1145 123 123 124 124 [[rules]] 125 125 path = "frontend/src/lib/components/UserMenu.svelte" ··· 167 167 168 168 [[rules]] 169 169 path = "frontend/src/routes/track/\\[id\\]/+page.svelte" 170 - max_lines = 1667 170 + max_lines = 1700 171 171 172 172 [[rules]] 173 173 path = "frontend/src/routes/u/\\[handle\\]/+page.svelte"