audio streaming app plyr.fm

perf: fix slow homepage load times (#1025)

* perf: fix slow homepage load times with SWR caching and pool warmup

follow graph: stale-while-revalidate pattern (TTL 60min, stale at 8min)
returns cached data immediately, schedules background re-warm when stale.
removes redundant login-time cache warming from auth paths.

track listing: cache anonymous first-page discovery feed in Redis (60s TTL)
with invalidation on upload, delete, and edit.

connection pool: warm one connection at startup to eliminate cold connect
penalty on first request after deploy.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: remove stale schedule_follow_graph_warm mock from test

the import was removed from auth.py, so the test mock path no longer exists.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: narrow bare except Exception to specific types

- Redis operations: catch (RuntimeError, RedisError) instead of Exception
- DB pool warmup: catch (OSError, SQLAlchemyError) instead of Exception
- Move deferred imports in main.py to top level
- Update tests to use redis.exceptions.ConnectionError

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>

authored by zzstoatzz.io

Claude Opus 4.6 and committed by
GitHub
5153d514 4dd6ed45

+315 -43
+44 -8
backend/src/backend/_internal/follow_graph.py
··· 1 - """bluesky follow graph with Redis read-through caching.""" 2 3 import json 4 import logging 5 from dataclasses import asdict, dataclass 6 7 import httpx 8 import logfire 9 10 from backend._internal.atproto.profile import BSKY_API_BASE, normalize_avatar_url 11 from backend.utilities.redis import get_async_redis_client ··· 13 logger = logging.getLogger(__name__) 14 15 FOLLOWS_CACHE_PREFIX = "plyr:follows:" 16 - FOLLOWS_CACHE_TTL_SECONDS = 600 # 10 minutes 17 18 19 @dataclass(frozen=True, slots=True) ··· 27 async def get_follows(did: str) -> dict[str, FollowInfo]: 28 """get all DIDs a user follows on bluesky, with Redis read-through cache. 29 30 - checks Redis first, falls back to live Bluesky API on miss, 31 - then writes back to cache. fails silently on Redis errors. 32 """ 33 cache_key = f"{FOLLOWS_CACHE_PREFIX}{did}" 34 35 # try cache 36 try: 37 redis = get_async_redis_client() 38 if cached := await redis.get(cache_key): 39 - return _deserialize_follows(cached) 40 - except Exception: 41 logger.debug("redis cache read failed for follows %s", did) 42 43 # cache miss — fetch live 44 follows = await _fetch_follows_from_bsky(did) 45 46 - # write back 47 try: 48 redis = get_async_redis_client() 49 await redis.set( 50 cache_key, _serialize_follows(follows), ex=FOLLOWS_CACHE_TTL_SECONDS 51 ) 52 - except Exception: 53 logger.debug("redis cache write failed for follows %s", did) 54 55 return follows 56 57 58 async def warm_follows_cache(did: str) -> None: 59 """always fetch from bluesky and write to Redis. called from background task.""" 60 follows = await _fetch_follows_from_bsky(did) 61 cache_key = f"{FOLLOWS_CACHE_PREFIX}{did}" 62 try: 63 redis = get_async_redis_client() 64 await redis.set( 65 cache_key, _serialize_follows(follows), ex=FOLLOWS_CACHE_TTL_SECONDS 66 ) 67 logfire.info("warmed follows cache", did=did, count=len(follows)) 68 except Exception: 69 logger.debug("redis cache write failed warming follows for %s", did)
··· 1 + """bluesky follow graph with Redis read-through caching and stale-while-revalidate.""" 2 3 import json 4 import logging 5 + import time 6 from dataclasses import asdict, dataclass 7 8 import httpx 9 import logfire 10 + from redis.exceptions import RedisError 11 12 from backend._internal.atproto.profile import BSKY_API_BASE, normalize_avatar_url 13 from backend.utilities.redis import get_async_redis_client ··· 15 logger = logging.getLogger(__name__) 16 17 FOLLOWS_CACHE_PREFIX = "plyr:follows:" 18 + FOLLOWS_TIMESTAMP_PREFIX = "plyr:follows:ts:" 19 + FOLLOWS_REVALIDATING_PREFIX = "plyr:follows:revalidating:" 20 + FOLLOWS_CACHE_TTL_SECONDS = 3600 # 60 minutes 21 + FOLLOWS_STALE_AFTER_SECONDS = 480 # 8 minutes 22 23 24 @dataclass(frozen=True, slots=True) ··· 32 async def get_follows(did: str) -> dict[str, FollowInfo]: 33 """get all DIDs a user follows on bluesky, with Redis read-through cache. 34 35 + uses stale-while-revalidate: returns cached data immediately even if stale, 36 + and schedules a background re-warm when data is older than FOLLOWS_STALE_AFTER_SECONDS. 37 + on cache miss, fetches live from Bluesky and writes back. fails silently on Redis errors. 38 """ 39 cache_key = f"{FOLLOWS_CACHE_PREFIX}{did}" 40 + ts_key = f"{FOLLOWS_TIMESTAMP_PREFIX}{did}" 41 42 # try cache 43 try: 44 redis = get_async_redis_client() 45 if cached := await redis.get(cache_key): 46 + follows = _deserialize_follows(cached) 47 + 48 + # check staleness — schedule background re-warm if stale 49 + try: 50 + ts_raw = await redis.get(ts_key) 51 + if ts_raw and time.time() - float(ts_raw) > FOLLOWS_STALE_AFTER_SECONDS: 52 + await _maybe_schedule_revalidation(did) 53 + except (RuntimeError, RedisError): 54 + logger.debug("redis staleness check failed for follows %s", did) 55 + 56 + return follows 57 + except (RuntimeError, RedisError): 58 logger.debug("redis cache read failed for follows %s", did) 59 60 # cache miss — fetch live 61 follows = await _fetch_follows_from_bsky(did) 62 63 + # write back with timestamp 64 try: 65 redis = get_async_redis_client() 66 await redis.set( 67 cache_key, _serialize_follows(follows), ex=FOLLOWS_CACHE_TTL_SECONDS 68 ) 69 + await redis.set(ts_key, str(time.time()), ex=FOLLOWS_CACHE_TTL_SECONDS) 70 + except (RuntimeError, RedisError): 71 logger.debug("redis cache write failed for follows %s", did) 72 73 return follows 74 75 76 + async def _maybe_schedule_revalidation(did: str) -> None: 77 + """schedule a background re-warm if no revalidation is already in progress. 78 + 79 + uses SET NX on a revalidating key to dedup concurrent requests. 80 + """ 81 + revalidating_key = f"{FOLLOWS_REVALIDATING_PREFIX}{did}" 82 + try: 83 + redis = get_async_redis_client() 84 + if await redis.set(revalidating_key, "1", nx=True, ex=60): 85 + from backend._internal.tasks import schedule_follow_graph_warm 86 + 87 + await schedule_follow_graph_warm(did) 88 + except (RuntimeError, RedisError): 89 + logger.debug("failed to schedule revalidation for follows %s", did) 90 + 91 + 92 async def warm_follows_cache(did: str) -> None: 93 """always fetch from bluesky and write to Redis. called from background task.""" 94 follows = await _fetch_follows_from_bsky(did) 95 cache_key = f"{FOLLOWS_CACHE_PREFIX}{did}" 96 + ts_key = f"{FOLLOWS_TIMESTAMP_PREFIX}{did}" 97 try: 98 redis = get_async_redis_client() 99 await redis.set( 100 cache_key, _serialize_follows(follows), ex=FOLLOWS_CACHE_TTL_SECONDS 101 ) 102 + await redis.set(ts_key, str(time.time()), ex=FOLLOWS_CACHE_TTL_SECONDS) 103 logfire.info("warmed follows cache", did=did, count=len(follows)) 104 except Exception: 105 logger.debug("redis cache write failed warming follows for %s", did)
+1 -4
backend/src/backend/api/auth.py
··· 39 switch_active_account, 40 ) 41 from backend._internal.auth import get_refresh_token_lifetime_days 42 - from backend._internal.tasks import schedule_atproto_sync, schedule_follow_graph_warm 43 from backend.config import settings 44 from backend.models import Artist, get_db 45 from backend.utilities.rate_limit import limiter ··· 249 250 # schedule ATProto sync (via docket if enabled, else asyncio) 251 await schedule_atproto_sync(session_id, did) 252 - await schedule_follow_graph_warm(did) 253 254 return RedirectResponse( 255 url=f"{settings.frontend.url}/settings?exchange_token={exchange_token}&scope_upgraded=true", ··· 276 277 # schedule ATProto sync 278 await schedule_atproto_sync(session_id, did) 279 - await schedule_follow_graph_warm(did) 280 281 return RedirectResponse( 282 url=f"{settings.frontend.url}/portal?exchange_token={exchange_token}&account_added=true", ··· 294 295 # schedule ATProto sync (via docket if enabled, else asyncio) 296 await schedule_atproto_sync(session_id, did) 297 - await schedule_follow_graph_warm(did) 298 299 # redirect to profile setup if needed, otherwise to portal 300 redirect_path = "/portal" if has_profile else "/profile/setup"
··· 39 switch_active_account, 40 ) 41 from backend._internal.auth import get_refresh_token_lifetime_days 42 + from backend._internal.tasks import schedule_atproto_sync 43 from backend.config import settings 44 from backend.models import Artist, get_db 45 from backend.utilities.rate_limit import limiter ··· 249 250 # schedule ATProto sync (via docket if enabled, else asyncio) 251 await schedule_atproto_sync(session_id, did) 252 253 return RedirectResponse( 254 url=f"{settings.frontend.url}/settings?exchange_token={exchange_token}&scope_upgraded=true", ··· 275 276 # schedule ATProto sync 277 await schedule_atproto_sync(session_id, did) 278 279 return RedirectResponse( 280 url=f"{settings.frontend.url}/portal?exchange_token={exchange_token}&account_added=true", ··· 292 293 # schedule ATProto sync (via docket if enabled, else asyncio) 294 await schedule_atproto_sync(session_id, did) 295 296 # redirect to profile setup if needed, otherwise to portal 297 redirect_path = "/portal" if has_profile else "/profile/setup"
+2 -1
backend/src/backend/api/tracks/constants.py
··· 2 3 MAX_FEATURES = 5 4 5 - __all__ = ["MAX_FEATURES"]
··· 2 3 MAX_FEATURES = 5 4 5 + DISCOVERY_CACHE_KEY = "plyr:tracks:discovery" 6 + DISCOVERY_CACHE_TTL_SECONDS = 60
+41 -1
backend/src/backend/api/tracks/listing.py
··· 1 """Read-only track listing endpoints.""" 2 3 import asyncio 4 from datetime import datetime 5 from typing import TYPE_CHECKING, Annotated, cast 6 ··· 8 from botocore.exceptions import ClientError 9 from fastapi import Depends, HTTPException, Query 10 from pydantic import BaseModel 11 from sqlalchemy import func, select 12 from sqlalchemy.ext.asyncio import AsyncSession 13 from sqlalchemy.orm import selectinload ··· 34 get_top_tracks_with_counts, 35 get_track_tags, 36 ) 37 from backend.utilities.tags import DEFAULT_HIDDEN_TAGS 38 39 from .router import router 40 41 if TYPE_CHECKING: 42 from backend.storage.r2 import R2Storage 43 ··· 80 Pass this to get the next page of results. 81 limit: Maximum number of tracks to return (default from settings, max 100). 82 """ 83 # use settings default if not provided, clamp to reasonable bounds 84 if limit is None: 85 limit = settings.app.default_page_size ··· 280 ] 281 ) 282 283 - return TracksListResponse( 284 tracks=list(track_responses), 285 next_cursor=next_cursor, 286 has_more=has_more, 287 ) 288 289 290 @router.get("/top")
··· 1 """Read-only track listing endpoints.""" 2 3 import asyncio 4 + import logging 5 from datetime import datetime 6 from typing import TYPE_CHECKING, Annotated, cast 7 ··· 9 from botocore.exceptions import ClientError 10 from fastapi import Depends, HTTPException, Query 11 from pydantic import BaseModel 12 + from redis.exceptions import RedisError 13 from sqlalchemy import func, select 14 from sqlalchemy.ext.asyncio import AsyncSession 15 from sqlalchemy.orm import selectinload ··· 36 get_top_tracks_with_counts, 37 get_track_tags, 38 ) 39 + from backend.utilities.redis import get_async_redis_client 40 from backend.utilities.tags import DEFAULT_HIDDEN_TAGS 41 42 + from .constants import DISCOVERY_CACHE_KEY, DISCOVERY_CACHE_TTL_SECONDS 43 from .router import router 44 45 + logger = logging.getLogger(__name__) 46 + 47 + 48 + async def invalidate_tracks_discovery_cache() -> None: 49 + """delete the anonymous discovery feed cache key.""" 50 + try: 51 + redis = get_async_redis_client() 52 + await redis.delete(DISCOVERY_CACHE_KEY) 53 + except (RuntimeError, RedisError): 54 + logger.debug("failed to invalidate discovery cache") 55 + 56 + 57 if TYPE_CHECKING: 58 from backend.storage.r2 import R2Storage 59 ··· 96 Pass this to get the next page of results. 97 limit: Maximum number of tracks to return (default from settings, max 100). 98 """ 99 + # anonymous first-page discovery feed — serve from cache if available 100 + is_cacheable = session is None and artist_did is None and cursor is None 101 + if is_cacheable: 102 + try: 103 + redis = get_async_redis_client() 104 + if cached := await redis.get(DISCOVERY_CACHE_KEY): 105 + return TracksListResponse.model_validate_json(cached) 106 + except (RuntimeError, RedisError): 107 + logger.debug("discovery cache read failed") 108 + 109 # use settings default if not provided, clamp to reasonable bounds 110 if limit is None: 111 limit = settings.app.default_page_size ··· 306 ] 307 ) 308 309 + response = TracksListResponse( 310 tracks=list(track_responses), 311 next_cursor=next_cursor, 312 has_more=has_more, 313 ) 314 + 315 + # write back to cache for anonymous first-page requests 316 + if is_cacheable: 317 + try: 318 + redis = get_async_redis_client() 319 + await redis.set( 320 + DISCOVERY_CACHE_KEY, 321 + response.model_dump_json(), 322 + ex=DISCOVERY_CACHE_TTL_SECONDS, 323 + ) 324 + except (RuntimeError, RedisError): 325 + logger.debug("discovery cache write failed") 326 + 327 + return response 328 329 330 @router.get("/top")
+7
backend/src/backend/api/tracks/mutations.py
··· 38 from backend.storage import storage 39 from backend.utilities.tags import get_or_create_tag, parse_tags_json 40 41 from .metadata_service import ( 42 apply_album_update, 43 resolve_feature_handles, ··· 125 # delete track record 126 await db.delete(track) 127 await db.commit() 128 129 # sync album list record if track was in an album 130 if album_id_to_sync: ··· 307 308 await db.commit() 309 await db.refresh(track) 310 311 # invalidate album cache if track metadata changed within an album 312 from backend.api.albums import invalidate_album_cache_by_id
··· 38 from backend.storage import storage 39 from backend.utilities.tags import get_or_create_tag, parse_tags_json 40 41 + from .listing import invalidate_tracks_discovery_cache 42 from .metadata_service import ( 43 apply_album_update, 44 resolve_feature_handles, ··· 126 # delete track record 127 await db.delete(track) 128 await db.commit() 129 + 130 + # invalidate anonymous discovery feed cache 131 + await invalidate_tracks_discovery_cache() 132 133 # sync album list record if track was in an album 134 if album_id_to_sync: ··· 311 312 await db.commit() 313 await db.refresh(track) 314 + 315 + # invalidate anonymous discovery feed cache 316 + await invalidate_tracks_discovery_cache() 317 318 # invalidate album cache if track metadata changed within an album 319 from backend.api.albums import invalidate_album_cache_by_id
+4
backend/src/backend/api/tracks/uploads.py
··· 59 from backend.utilities.rate_limit import limiter 60 from backend.utilities.tags import add_tags_to_track, parse_tags_json 61 62 from .router import router 63 from .services import get_or_create_album 64 ··· 816 817 # phase 7: post-upload tasks (tags, notifications, background jobs) 818 await _schedule_post_upload(ctx, sr, track) 819 820 await job_service.update_progress( 821 ctx.upload_id,
··· 59 from backend.utilities.rate_limit import limiter 60 from backend.utilities.tags import add_tags_to_track, parse_tags_json 61 62 + from .listing import invalidate_tracks_discovery_cache 63 from .router import router 64 from .services import get_or_create_album 65 ··· 817 818 # phase 7: post-upload tasks (tags, notifications, background jobs) 819 await _schedule_post_upload(ctx, sr, track) 820 + 821 + # invalidate anonymous discovery feed cache 822 + await invalidate_tracks_discovery_cache() 823 824 await job_service.update_progress( 825 ctx.upload_id,
+12
backend/src/backend/main.py
··· 11 from slowapi import _rate_limit_exceeded_handler 12 from slowapi.errors import RateLimitExceeded 13 from slowapi.middleware import SlowAPIMiddleware 14 15 from backend._internal import jam_service, notification_service, queue_service 16 from backend._internal.background import background_worker_lifespan ··· 39 from backend.api.lists import router as lists_router 40 from backend.api.migration import router as migration_router 41 from backend.config import settings 42 from backend.utilities.middleware import SecurityHeadersMiddleware 43 from backend.utilities.observability import ( 44 configure_observability, ··· 63 await notification_service.setup() 64 await queue_service.setup() 65 await jam_service.setup() 66 67 # start background task worker (docket) 68 async with background_worker_lifespan() as docket:
··· 11 from slowapi import _rate_limit_exceeded_handler 12 from slowapi.errors import RateLimitExceeded 13 from slowapi.middleware import SlowAPIMiddleware 14 + from sqlalchemy import text 15 + from sqlalchemy.exc import SQLAlchemyError 16 17 from backend._internal import jam_service, notification_service, queue_service 18 from backend._internal.background import background_worker_lifespan ··· 41 from backend.api.lists import router as lists_router 42 from backend.api.migration import router as migration_router 43 from backend.config import settings 44 + from backend.utilities.database import get_engine 45 from backend.utilities.middleware import SecurityHeadersMiddleware 46 from backend.utilities.observability import ( 47 configure_observability, ··· 66 await notification_service.setup() 67 await queue_service.setup() 68 await jam_service.setup() 69 + 70 + # warm the database connection pool so the first request avoids cold connect 71 + try: 72 + engine = get_engine() 73 + async with engine.connect() as conn: 74 + await conn.execute(text("SELECT 1")) 75 + logger.info("database connection pool warmed") 76 + except (OSError, SQLAlchemyError): 77 + logger.warning("failed to warm database connection pool") 78 79 # start background task worker (docket) 80 async with background_worker_lifespan() as docket:
+156 -27
backend/tests/_internal/test_follow_graph.py
··· 1 """tests for bluesky follow graph caching.""" 2 3 from unittest.mock import AsyncMock, patch 4 5 import pytest 6 7 from backend._internal.follow_graph import ( 8 FOLLOWS_CACHE_PREFIX, 9 FOLLOWS_CACHE_TTL_SECONDS, 10 FollowInfo, 11 _deserialize_follows, 12 _serialize_follows, ··· 19 "did:plc:bob": FollowInfo(index=1, avatar_url=None), 20 } 21 22 23 def test_serialization_roundtrip() -> None: 24 """serialize -> deserialize preserves data.""" ··· 36 def mock_redis() -> AsyncMock: 37 redis = AsyncMock() 38 redis.get = AsyncMock(return_value=None) 39 - redis.set = AsyncMock() 40 return redis 41 42 ··· 45 return AsyncMock(return_value=SAMPLE_FOLLOWS) 46 47 48 async def test_cache_hit(mock_redis: AsyncMock) -> None: 49 - """returns cached data without calling bluesky.""" 50 - mock_redis.get.return_value = _serialize_follows(SAMPLE_FOLLOWS) 51 52 with ( 53 patch( ··· 56 ), 57 patch( 58 "backend._internal.follow_graph._fetch_follows_from_bsky", 59 - ) as mock_fetch, 60 ): 61 - result = await get_follows("did:plc:test") 62 63 assert result == SAMPLE_FOLLOWS 64 - mock_fetch.assert_not_called() 65 66 67 async def test_cache_miss_fetches_and_writes( 68 mock_redis: AsyncMock, mock_fetch: AsyncMock 69 ) -> None: 70 - """on miss, fetches from bluesky and writes back to redis.""" 71 with ( 72 patch( 73 "backend._internal.follow_graph.get_async_redis_client", ··· 78 mock_fetch, 79 ), 80 ): 81 - result = await get_follows("did:plc:test") 82 83 assert result == SAMPLE_FOLLOWS 84 - mock_fetch.assert_awaited_once_with("did:plc:test") 85 86 - # verify cache write 87 - mock_redis.set.assert_awaited_once() 88 - call_args = mock_redis.set.call_args 89 - assert call_args[0][0] == f"{FOLLOWS_CACHE_PREFIX}did:plc:test" 90 - assert call_args[1]["ex"] == FOLLOWS_CACHE_TTL_SECONDS 91 92 - # verify written data roundtrips 93 - written = _deserialize_follows(call_args[0][1]) 94 - assert written == SAMPLE_FOLLOWS 95 96 97 async def test_redis_error_falls_back_to_live(mock_fetch: AsyncMock) -> None: 98 """redis errors fall back to live fetch without raising.""" 99 broken_redis = AsyncMock() 100 - broken_redis.get.side_effect = ConnectionError("redis down") 101 - broken_redis.set.side_effect = ConnectionError("redis down") 102 103 with ( 104 patch( ··· 110 mock_fetch, 111 ), 112 ): 113 - result = await get_follows("did:plc:test") 114 115 assert result == SAMPLE_FOLLOWS 116 mock_fetch.assert_awaited_once() ··· 119 async def test_warm_writes_to_redis( 120 mock_redis: AsyncMock, mock_fetch: AsyncMock 121 ) -> None: 122 - """warm_follows_cache always fetches and writes to redis.""" 123 with ( 124 patch( 125 "backend._internal.follow_graph.get_async_redis_client", ··· 130 mock_fetch, 131 ), 132 ): 133 - await warm_follows_cache("did:plc:test") 134 135 - mock_fetch.assert_awaited_once_with("did:plc:test") 136 - mock_redis.set.assert_awaited_once() 137 - call_args = mock_redis.set.call_args 138 - assert call_args[0][0] == f"{FOLLOWS_CACHE_PREFIX}did:plc:test" 139 - assert call_args[1]["ex"] == FOLLOWS_CACHE_TTL_SECONDS
··· 1 """tests for bluesky follow graph caching.""" 2 3 + import time 4 from unittest.mock import AsyncMock, patch 5 6 import pytest 7 + from redis.exceptions import ConnectionError as RedisConnectionError 8 9 from backend._internal.follow_graph import ( 10 FOLLOWS_CACHE_PREFIX, 11 FOLLOWS_CACHE_TTL_SECONDS, 12 + FOLLOWS_REVALIDATING_PREFIX, 13 + FOLLOWS_STALE_AFTER_SECONDS, 14 + FOLLOWS_TIMESTAMP_PREFIX, 15 FollowInfo, 16 _deserialize_follows, 17 _serialize_follows, ··· 24 "did:plc:bob": FollowInfo(index=1, avatar_url=None), 25 } 26 27 + TEST_DID = "did:plc:test" 28 + 29 30 def test_serialization_roundtrip() -> None: 31 """serialize -> deserialize preserves data.""" ··· 43 def mock_redis() -> AsyncMock: 44 redis = AsyncMock() 45 redis.get = AsyncMock(return_value=None) 46 + redis.set = AsyncMock(return_value=True) 47 return redis 48 49 ··· 52 return AsyncMock(return_value=SAMPLE_FOLLOWS) 53 54 55 + def _make_redis_getter(cache_data: str | None = None, timestamp: str | None = None): 56 + """create a side_effect for redis.get that returns different values per key.""" 57 + cache_key = f"{FOLLOWS_CACHE_PREFIX}{TEST_DID}" 58 + ts_key = f"{FOLLOWS_TIMESTAMP_PREFIX}{TEST_DID}" 59 + 60 + async def _get(key: str) -> str | None: 61 + if key == cache_key: 62 + return cache_data 63 + if key == ts_key: 64 + return timestamp 65 + return None 66 + 67 + return _get 68 + 69 + 70 async def test_cache_hit(mock_redis: AsyncMock) -> None: 71 + """returns cached data without calling bluesky (fresh cache).""" 72 + mock_redis.get.side_effect = _make_redis_getter( 73 + cache_data=_serialize_follows(SAMPLE_FOLLOWS), 74 + timestamp=str(time.time()), # fresh 75 + ) 76 77 with ( 78 patch( ··· 81 ), 82 patch( 83 "backend._internal.follow_graph._fetch_follows_from_bsky", 84 + ) as mock_bsky, 85 ): 86 + result = await get_follows(TEST_DID) 87 88 assert result == SAMPLE_FOLLOWS 89 + mock_bsky.assert_not_called() 90 91 92 async def test_cache_miss_fetches_and_writes( 93 mock_redis: AsyncMock, mock_fetch: AsyncMock 94 ) -> None: 95 + """on miss, fetches from bluesky and writes data + timestamp to redis.""" 96 with ( 97 patch( 98 "backend._internal.follow_graph.get_async_redis_client", ··· 103 mock_fetch, 104 ), 105 ): 106 + result = await get_follows(TEST_DID) 107 108 assert result == SAMPLE_FOLLOWS 109 + mock_fetch.assert_awaited_once_with(TEST_DID) 110 111 + # verify both data and timestamp were written 112 + assert mock_redis.set.await_count == 2 113 114 + data_call = mock_redis.set.call_args_list[0] 115 + assert data_call[0][0] == f"{FOLLOWS_CACHE_PREFIX}{TEST_DID}" 116 + assert data_call[1]["ex"] == FOLLOWS_CACHE_TTL_SECONDS 117 + assert _deserialize_follows(data_call[0][1]) == SAMPLE_FOLLOWS 118 + 119 + ts_call = mock_redis.set.call_args_list[1] 120 + assert ts_call[0][0] == f"{FOLLOWS_TIMESTAMP_PREFIX}{TEST_DID}" 121 + assert ts_call[1]["ex"] == FOLLOWS_CACHE_TTL_SECONDS 122 123 124 async def test_redis_error_falls_back_to_live(mock_fetch: AsyncMock) -> None: 125 """redis errors fall back to live fetch without raising.""" 126 broken_redis = AsyncMock() 127 + broken_redis.get.side_effect = RedisConnectionError("redis down") 128 + broken_redis.set.side_effect = RedisConnectionError("redis down") 129 130 with ( 131 patch( ··· 137 mock_fetch, 138 ), 139 ): 140 + result = await get_follows(TEST_DID) 141 142 assert result == SAMPLE_FOLLOWS 143 mock_fetch.assert_awaited_once() ··· 146 async def test_warm_writes_to_redis( 147 mock_redis: AsyncMock, mock_fetch: AsyncMock 148 ) -> None: 149 + """warm_follows_cache always fetches and writes data + timestamp to redis.""" 150 with ( 151 patch( 152 "backend._internal.follow_graph.get_async_redis_client", ··· 157 mock_fetch, 158 ), 159 ): 160 + await warm_follows_cache(TEST_DID) 161 + 162 + mock_fetch.assert_awaited_once_with(TEST_DID) 163 + 164 + # data + timestamp = 2 set calls 165 + assert mock_redis.set.await_count == 2 166 + data_call = mock_redis.set.call_args_list[0] 167 + assert data_call[0][0] == f"{FOLLOWS_CACHE_PREFIX}{TEST_DID}" 168 + assert data_call[1]["ex"] == FOLLOWS_CACHE_TTL_SECONDS 169 + 170 + 171 + # --- stale-while-revalidate tests --- 172 + 173 + 174 + async def test_stale_cache_triggers_revalidation(mock_redis: AsyncMock) -> None: 175 + """cache >8min old returns data immediately and schedules a background re-warm.""" 176 + stale_ts = str(time.time() - FOLLOWS_STALE_AFTER_SECONDS - 10) 177 + mock_redis.get.side_effect = _make_redis_getter( 178 + cache_data=_serialize_follows(SAMPLE_FOLLOWS), 179 + timestamp=stale_ts, 180 + ) 181 + 182 + with ( 183 + patch( 184 + "backend._internal.follow_graph.get_async_redis_client", 185 + return_value=mock_redis, 186 + ), 187 + patch( 188 + "backend._internal.follow_graph._fetch_follows_from_bsky", 189 + ) as mock_bsky, 190 + patch( 191 + "backend._internal.tasks.schedule_follow_graph_warm", 192 + ) as mock_schedule, 193 + ): 194 + result = await get_follows(TEST_DID) 195 + 196 + # returns cached data without live fetch 197 + assert result == SAMPLE_FOLLOWS 198 + mock_bsky.assert_not_called() 199 + 200 + # acquired revalidation lock and scheduled re-warm 201 + mock_redis.set.assert_awaited_once_with( 202 + f"{FOLLOWS_REVALIDATING_PREFIX}{TEST_DID}", "1", nx=True, ex=60 203 + ) 204 + mock_schedule.assert_awaited_once_with(TEST_DID) 205 + 206 + 207 + async def test_fresh_cache_does_not_revalidate(mock_redis: AsyncMock) -> None: 208 + """cache <8min old does not trigger any background re-warm.""" 209 + fresh_ts = str(time.time() - 60) # 1 minute old 210 + mock_redis.get.side_effect = _make_redis_getter( 211 + cache_data=_serialize_follows(SAMPLE_FOLLOWS), 212 + timestamp=fresh_ts, 213 + ) 214 + 215 + with ( 216 + patch( 217 + "backend._internal.follow_graph.get_async_redis_client", 218 + return_value=mock_redis, 219 + ), 220 + patch( 221 + "backend._internal.follow_graph._fetch_follows_from_bsky", 222 + ) as mock_bsky, 223 + patch( 224 + "backend._internal.tasks.schedule_follow_graph_warm", 225 + ) as mock_schedule, 226 + ): 227 + result = await get_follows(TEST_DID) 228 + 229 + assert result == SAMPLE_FOLLOWS 230 + mock_bsky.assert_not_called() 231 + 232 + # no revalidation lock acquired, no re-warm scheduled 233 + mock_redis.set.assert_not_awaited() 234 + mock_schedule.assert_not_awaited() 235 + 236 + 237 + async def test_concurrent_revalidation_deduped(mock_redis: AsyncMock) -> None: 238 + """two stale requests only trigger one re-warm (SET NX dedup).""" 239 + stale_ts = str(time.time() - FOLLOWS_STALE_AFTER_SECONDS - 10) 240 + mock_redis.get.side_effect = _make_redis_getter( 241 + cache_data=_serialize_follows(SAMPLE_FOLLOWS), 242 + timestamp=stale_ts, 243 + ) 244 + 245 + # first call acquires lock, second is rejected 246 + mock_redis.set.side_effect = [True, False] 247 + 248 + with ( 249 + patch( 250 + "backend._internal.follow_graph.get_async_redis_client", 251 + return_value=mock_redis, 252 + ), 253 + patch( 254 + "backend._internal.follow_graph._fetch_follows_from_bsky", 255 + ) as mock_bsky, 256 + patch( 257 + "backend._internal.tasks.schedule_follow_graph_warm", 258 + ) as mock_schedule, 259 + ): 260 + result1 = await get_follows(TEST_DID) 261 + result2 = await get_follows(TEST_DID) 262 263 + assert result1 == SAMPLE_FOLLOWS 264 + assert result2 == SAMPLE_FOLLOWS 265 + mock_bsky.assert_not_called() 266 + 267 + # only one schedule call despite two stale reads 268 + mock_schedule.assert_awaited_once_with(TEST_DID)
-1
backend/tests/api/test_list_record_sync.py
··· 530 patch( 531 "backend.api.auth.schedule_atproto_sync", new_callable=AsyncMock 532 ) as mock_schedule_sync, 533 - patch("backend.api.auth.schedule_follow_graph_warm", new_callable=AsyncMock), 534 ): 535 async with AsyncClient( 536 transport=ASGITransport(app=test_app), base_url="http://test"
··· 530 patch( 531 "backend.api.auth.schedule_atproto_sync", new_callable=AsyncMock 532 ) as mock_schedule_sync, 533 ): 534 async with AsyncClient( 535 transport=ASGITransport(app=test_app), base_url="http://test"
+47
backend/tests/api/test_tracks_cache.py
···
··· 1 + """tests for anonymous discovery feed caching.""" 2 + 3 + from unittest.mock import AsyncMock, patch 4 + 5 + from redis.exceptions import ConnectionError as RedisConnectionError 6 + 7 + from backend.api.tracks.constants import DISCOVERY_CACHE_KEY 8 + from backend.api.tracks.listing import ( 9 + TracksListResponse, 10 + invalidate_tracks_discovery_cache, 11 + ) 12 + 13 + SAMPLE_RESPONSE = TracksListResponse(tracks=[], next_cursor=None, has_more=False) 14 + 15 + 16 + async def test_anonymous_discovery_cache_hit() -> None: 17 + """cached response deserializes correctly from Redis.""" 18 + cached_json = SAMPLE_RESPONSE.model_dump_json() 19 + result = TracksListResponse.model_validate_json(cached_json) 20 + assert result == SAMPLE_RESPONSE 21 + 22 + 23 + async def test_invalidate_clears_cache() -> None: 24 + """invalidate_tracks_discovery_cache deletes the cache key.""" 25 + mock_redis = AsyncMock() 26 + mock_redis.delete = AsyncMock() 27 + 28 + with patch( 29 + "backend.api.tracks.listing.get_async_redis_client", 30 + return_value=mock_redis, 31 + ): 32 + await invalidate_tracks_discovery_cache() 33 + 34 + mock_redis.delete.assert_awaited_once_with(DISCOVERY_CACHE_KEY) 35 + 36 + 37 + async def test_invalidate_handles_redis_error() -> None: 38 + """invalidation silently handles Redis errors.""" 39 + mock_redis = AsyncMock() 40 + mock_redis.delete.side_effect = RedisConnectionError("redis down") 41 + 42 + with patch( 43 + "backend.api.tracks.listing.get_async_redis_client", 44 + return_value=mock_redis, 45 + ): 46 + # should not raise 47 + await invalidate_tracks_discovery_cache()
+1 -1
loq.toml
··· 39 40 [[rules]] 41 path = "backend/src/backend/api/tracks/listing.py" 42 - max_lines = 523 43 44 [[rules]] 45 path = "backend/src/backend/api/tracks/mutations.py"
··· 39 40 [[rules]] 41 path = "backend/src/backend/api/tracks/listing.py" 42 + max_lines = 562 43 44 [[rules]] 45 path = "backend/src/backend/api/tracks/mutations.py"