A community based topic aggregation platform built on atproto

feat(aggregators): add Reddit Highlights aggregator core implementation

Add a new aggregator that pulls posts with streamable.com video links
from Reddit RSS feeds and posts them to Coves communities.

Core components:
- RSS feed fetcher with retry logic and exponential backoff
- Link extractor for detecting streamable.com URLs
- Coves API client with proper error handling
- State manager with atomic writes and corruption recovery
- Configuration loader with validation

Features:
- Configurable subreddit-to-community mappings
- Deduplication via persistent state
- Video embed metadata (embedType, provider, domain)
- Anti-detection jitter for Reddit rate limits

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

+1713
+41
aggregators/reddit-highlights/.gitignore
··· 1 + # Environment and config 2 + .env 3 + config.yaml 4 + venv/ 5 + 6 + # State files 7 + data/*.json 8 + data/world.xml 9 + 10 + # Python 11 + __pycache__/ 12 + *.py[cod] 13 + *$py.class 14 + *.so 15 + .Python 16 + build/ 17 + develop-eggs/ 18 + dist/ 19 + downloads/ 20 + eggs/ 21 + .eggs/ 22 + lib/ 23 + lib64/ 24 + parts/ 25 + sdist/ 26 + var/ 27 + wheels/ 28 + *.egg-info/ 29 + .installed.cfg 30 + *.egg 31 + 32 + # Testing 33 + .pytest_cache/ 34 + .coverage 35 + htmlcov/ 36 + 37 + # IDE 38 + .vscode/ 39 + .idea/ 40 + *.swp 41 + *.swo
+140
aggregators/reddit-highlights/README.md
··· 1 + # Reddit Highlights Aggregator 2 + 3 + Aggregates video highlights from Reddit subreddits (e.g., r/nba) and posts them to Coves communities. 4 + 5 + ## Features 6 + 7 + - Fetches posts from Reddit via RSS (no API key required) 8 + - Extracts streamable.com video links 9 + - Posts to configured Coves communities with proper attribution 10 + - Anti-detection jitter (randomized polling intervals) 11 + - State tracking for deduplication 12 + - Docker deployment with cron scheduler 13 + 14 + ## Quick Start 15 + 16 + 1. **Copy environment file:** 17 + ```bash 18 + cp .env.example .env 19 + ``` 20 + 21 + 2. **Configure your Coves API key:** 22 + ```bash 23 + # Edit .env and set your API key 24 + COVES_API_KEY=ckapi_your_key_here 25 + ``` 26 + 27 + 3. **Build and run:** 28 + ```bash 29 + docker-compose up -d 30 + ``` 31 + 32 + 4. **View logs:** 33 + ```bash 34 + docker-compose logs -f 35 + ``` 36 + 37 + ## Configuration 38 + 39 + ### config.yaml 40 + 41 + ```yaml 42 + coves_api_url: "https://coves.social" 43 + 44 + subreddits: 45 + - name: "nba" 46 + community_handle: "nba.coves.social" 47 + enabled: true 48 + 49 + allowed_domains: 50 + - streamable.com 51 + ``` 52 + 53 + ### Adding More Subreddits 54 + 55 + 1. Add entry to `config.yaml`: 56 + ```yaml 57 + - name: "soccer" 58 + community_handle: "soccer.coves.social" 59 + enabled: true 60 + ``` 61 + 62 + 2. Authorize the aggregator for the new community in Coves 63 + 64 + 3. Restart the container: 65 + ```bash 66 + docker-compose restart 67 + ``` 68 + 69 + ## Polling Schedule 70 + 71 + - Cron runs every **10 minutes** 72 + - Python script adds **0-10 minutes random jitter** 73 + - Effective polling interval: **10-20 minutes** (varies each run) 74 + 75 + This randomization helps avoid bot detection patterns. 76 + 77 + ## Development 78 + 79 + ### Setup 80 + 81 + ```bash 82 + # Create virtual environment 83 + python -m venv venv 84 + source venv/bin/activate 85 + 86 + # Install dependencies 87 + pip install -r requirements.txt 88 + ``` 89 + 90 + ### Run Tests 91 + 92 + ```bash 93 + pytest 94 + ``` 95 + 96 + ### Run Manually 97 + 98 + ```bash 99 + # Set environment variables 100 + export COVES_API_KEY=ckapi_your_key 101 + export SKIP_JITTER=true # Skip delay for testing 102 + 103 + # Run 104 + python -m src.main 105 + ``` 106 + 107 + ## Architecture 108 + 109 + ``` 110 + src/ 111 + ├── main.py # Orchestration (CRON entry point) 112 + ├── rss_fetcher.py # RSS feed fetching with retry 113 + ├── link_extractor.py # Streamable URL detection 114 + ├── coves_client.py # Coves API client 115 + ├── state_manager.py # Deduplication state tracking 116 + ├── config.py # YAML config loader 117 + └── models.py # Data models 118 + ``` 119 + 120 + ## Post Format 121 + 122 + Posts are created with: 123 + - **Title**: Reddit post title 124 + - **Embed**: Streamable video link with metadata 125 + - **Sources**: Link back to original Reddit post 126 + 127 + Example embed: 128 + ```json 129 + { 130 + "$type": "social.coves.embed.external", 131 + "external": { 132 + "uri": "https://streamable.com/abc123", 133 + "title": "LeBron with the chase-down block!", 134 + "description": "From r/nba", 135 + "sources": [ 136 + {"uri": "https://reddit.com/...", "title": "r/nba", "domain": "reddit.com"} 137 + ] 138 + } 139 + } 140 + ```
+39
aggregators/reddit-highlights/config.example.yaml
··· 1 + # Reddit Highlights Aggregator Configuration 2 + # 3 + # Copy this file to config.yaml and customize for your setup. 4 + # 5 + # This file configures which subreddits to fetch video highlights from 6 + # and which Coves communities to post them to. 7 + 8 + # Coves API endpoint (can be overridden with COVES_API_URL env var) 9 + coves_api_url: "https://coves.social" 10 + 11 + # Subreddit-to-community mappings 12 + # Add entries here for each subreddit you want to aggregate 13 + subreddits: 14 + # NBA highlights 15 + - name: "nba" 16 + community_handle: "nba.coves.social" 17 + enabled: true 18 + 19 + # Example: Soccer/football highlights (disabled by default) 20 + # - name: "soccer" 21 + # community_handle: "soccer.coves.social" 22 + # enabled: false 23 + 24 + # Example: NHL highlights (disabled by default) 25 + # - name: "hockey" 26 + # community_handle: "hockey.coves.social" 27 + # enabled: false 28 + 29 + # Allowed video hosting domains 30 + # Only posts with links to these domains will be imported 31 + # Add more domains here as needed 32 + allowed_domains: 33 + - streamable.com 34 + # Future options: 35 + # - streamff.com 36 + # - streamja.com 37 + 38 + # Logging level (debug, info, warning, error) 39 + log_level: "info"
+6
aggregators/reddit-highlights/pytest.ini
··· 1 + [pytest] 2 + testpaths = tests 3 + python_files = test_*.py 4 + python_classes = Test* 5 + python_functions = test_* 6 + addopts = -v --tb=short
+15
aggregators/reddit-highlights/requirements.txt
··· 1 + # Core dependencies 2 + feedparser==6.0.11 3 + requests==2.31.0 4 + pyyaml==6.0.1 5 + 6 + # Testing 7 + pytest==8.1.1 8 + pytest-cov==5.0.0 9 + responses==0.25.0 10 + 11 + # Development 12 + black==24.3.0 13 + mypy==1.9.0 14 + types-PyYAML==6.0.12.12 15 + types-requests==2.31.0.20240311
+1
aggregators/reddit-highlights/src/__init__.py
··· 1 + """Reddit Highlights Aggregator for Coves."""
+181
aggregators/reddit-highlights/src/config.py
··· 1 + """ 2 + Configuration Loader for Reddit Highlights Aggregator. 3 + 4 + Loads and validates configuration from YAML files. 5 + """ 6 + import os 7 + import logging 8 + from pathlib import Path 9 + from typing import Dict, Any, List 10 + import yaml 11 + from urllib.parse import urlparse 12 + 13 + from src.models import AggregatorConfig, SubredditConfig, LogLevel 14 + 15 + logger = logging.getLogger(__name__) 16 + 17 + 18 + class ConfigError(Exception): 19 + """Configuration error.""" 20 + 21 + pass 22 + 23 + 24 + class ConfigLoader: 25 + """ 26 + Loads and validates aggregator configuration. 27 + 28 + Supports: 29 + - Loading from YAML file 30 + - Environment variable overrides 31 + - Validation of required fields 32 + """ 33 + 34 + def __init__(self, config_path: Path): 35 + """ 36 + Initialize config loader. 37 + 38 + Args: 39 + config_path: Path to config.yaml file 40 + """ 41 + self.config_path = Path(config_path) 42 + 43 + def load(self) -> AggregatorConfig: 44 + """ 45 + Load and validate configuration. 46 + 47 + Returns: 48 + AggregatorConfig object 49 + 50 + Raises: 51 + ConfigError: If config is invalid or missing 52 + """ 53 + if not self.config_path.exists(): 54 + raise ConfigError(f"Configuration file not found: {self.config_path}") 55 + 56 + try: 57 + with open(self.config_path, "r") as f: 58 + config_data = yaml.safe_load(f) 59 + except yaml.YAMLError as e: 60 + raise ConfigError(f"Failed to parse YAML: {e}") 61 + 62 + if not config_data: 63 + raise ConfigError("Configuration file is empty") 64 + 65 + try: 66 + return self._parse_config(config_data) 67 + except ConfigError: 68 + raise 69 + except Exception as e: 70 + raise ConfigError(f"Invalid configuration: {e}") 71 + 72 + def _parse_config(self, data: Dict[str, Any]) -> AggregatorConfig: 73 + """ 74 + Parse and validate configuration data. 75 + 76 + Args: 77 + data: Parsed YAML data 78 + 79 + Returns: 80 + AggregatorConfig object 81 + 82 + Raises: 83 + ConfigError: If validation fails 84 + """ 85 + coves_api_url = os.getenv("COVES_API_URL", data.get("coves_api_url")) 86 + if not coves_api_url: 87 + raise ConfigError("Missing required field: coves_api_url") 88 + 89 + if not self._is_valid_url(coves_api_url): 90 + raise ConfigError(f"Invalid URL for coves_api_url: {coves_api_url}") 91 + 92 + # Parse log level with validation 93 + log_level_str = data.get("log_level", "info").lower() 94 + try: 95 + log_level = LogLevel(log_level_str) 96 + except ValueError: 97 + valid_levels = [level.value for level in LogLevel] 98 + raise ConfigError(f"Invalid log_level '{log_level_str}'. Valid values: {valid_levels}") 99 + 100 + subreddits_data = data.get("subreddits", []) 101 + if not subreddits_data: 102 + raise ConfigError("Configuration must include at least one subreddit") 103 + 104 + subreddits = [] 105 + for sub_data in subreddits_data: 106 + subreddit = self._parse_subreddit(sub_data) 107 + subreddits.append(subreddit) 108 + 109 + allowed_domains = tuple(data.get("allowed_domains", ["streamable.com"])) 110 + 111 + enabled_count = sum(1 for s in subreddits if s.enabled) 112 + logger.info( 113 + f"Loaded configuration with {len(subreddits)} subreddits ({enabled_count} enabled)" 114 + ) 115 + 116 + return AggregatorConfig( 117 + coves_api_url=coves_api_url, 118 + subreddits=tuple(subreddits), # Convert to tuple for immutability 119 + allowed_domains=allowed_domains, 120 + log_level=log_level, 121 + ) 122 + 123 + def _parse_subreddit(self, data: Dict[str, Any]) -> SubredditConfig: 124 + """ 125 + Parse and validate a single subreddit configuration. 126 + 127 + Args: 128 + data: Subreddit configuration data 129 + 130 + Returns: 131 + SubredditConfig object 132 + 133 + Raises: 134 + ConfigError: If validation fails 135 + """ 136 + required_fields = ["name", "community_handle"] 137 + for field in required_fields: 138 + if field not in data: 139 + raise ConfigError( 140 + f"Missing required field in subreddit config: {field}" 141 + ) 142 + 143 + name = data["name"] 144 + community_handle = data["community_handle"] 145 + enabled = data.get("enabled", True) 146 + 147 + if not name or not name.strip(): 148 + raise ConfigError("Subreddit name cannot be empty") 149 + 150 + if not community_handle or not community_handle.strip(): 151 + raise ConfigError(f"Community handle cannot be empty for subreddit '{name}'") 152 + 153 + return SubredditConfig( 154 + name=name.strip().lower(), 155 + community_handle=community_handle.strip(), 156 + enabled=enabled, 157 + ) 158 + 159 + def _is_valid_url(self, url: str) -> bool: 160 + """ 161 + Validate URL format. 162 + 163 + Only allows http and https schemes to prevent dangerous schemes 164 + like file://, javascript://, or data:// URIs. 165 + 166 + Args: 167 + url: URL to validate 168 + 169 + Returns: 170 + True if valid HTTP/HTTPS URL, False otherwise 171 + """ 172 + try: 173 + result = urlparse(url) 174 + # Only allow http and https schemes 175 + if result.scheme not in ("http", "https"): 176 + logger.warning(f"URL has invalid scheme '{result.scheme}': {url}") 177 + return False 178 + return bool(result.netloc) 179 + except ValueError as e: 180 + logger.warning(f"Failed to parse URL '{url}': {e}") 181 + return False
+285
aggregators/reddit-highlights/src/coves_client.py
··· 1 + """ 2 + Coves API Client for posting to communities. 3 + 4 + Handles API key authentication and posting via XRPC. 5 + """ 6 + import logging 7 + import requests 8 + from typing import Dict, List, Optional 9 + 10 + logger = logging.getLogger(__name__) 11 + 12 + 13 + class CovesAPIError(Exception): 14 + """Base exception for Coves API errors.""" 15 + 16 + def __init__(self, message: str, status_code: int = None, response_body: str = None): 17 + super().__init__(message) 18 + self.status_code = status_code 19 + self.response_body = response_body 20 + 21 + 22 + class CovesAuthenticationError(CovesAPIError): 23 + """Raised when authentication fails (401 Unauthorized).""" 24 + pass 25 + 26 + 27 + class CovesNotFoundError(CovesAPIError): 28 + """Raised when a resource is not found (404 Not Found).""" 29 + pass 30 + 31 + 32 + class CovesRateLimitError(CovesAPIError): 33 + """Raised when rate limit is exceeded (429 Too Many Requests).""" 34 + pass 35 + 36 + 37 + class CovesForbiddenError(CovesAPIError): 38 + """Raised when access is forbidden (403 Forbidden).""" 39 + pass 40 + 41 + 42 + class CovesClient: 43 + """ 44 + Client for posting to Coves communities via XRPC. 45 + 46 + Handles: 47 + - API key authentication 48 + - Creating posts in communities (social.coves.community.post.create) 49 + - External embed formatting 50 + """ 51 + 52 + # API key format constants (must match Go constants in apikey_service.go) 53 + API_KEY_PREFIX = "ckapi_" 54 + API_KEY_TOTAL_LENGTH = 70 # 6 (prefix) + 64 (32 bytes hex-encoded) 55 + 56 + def __init__(self, api_url: str, api_key: str): 57 + """ 58 + Initialize Coves client with API key authentication. 59 + 60 + Args: 61 + api_url: Coves API URL for posting (e.g., "https://coves.social") 62 + api_key: Coves API key, 70 characters total (6-char prefix + 64-char hex token) 63 + 64 + Raises: 65 + ValueError: If api_key is empty, has wrong prefix, or wrong length 66 + """ 67 + # Validate API key format for early failure with clear error 68 + if not api_key: 69 + raise ValueError("API key cannot be empty") 70 + if not api_key.startswith(self.API_KEY_PREFIX): 71 + raise ValueError(f"API key must start with '{self.API_KEY_PREFIX}'") 72 + if len(api_key) != self.API_KEY_TOTAL_LENGTH: 73 + raise ValueError( 74 + f"API key must be {self.API_KEY_TOTAL_LENGTH} characters " 75 + f"(got {len(api_key)})" 76 + ) 77 + 78 + self.api_url = api_url.rstrip('/') 79 + self.api_key = api_key 80 + self.session = requests.Session() 81 + self.session.headers['Authorization'] = f'Bearer {api_key}' 82 + self.session.headers['Content-Type'] = 'application/json' 83 + 84 + def authenticate(self): 85 + """ 86 + No-op for API key authentication. 87 + 88 + API key is set in the session headers during initialization. 89 + This method is kept for backward compatibility with existing code 90 + that calls authenticate() before making requests. 91 + """ 92 + logger.info("Using API key authentication (no session creation needed)") 93 + 94 + def create_post( 95 + self, 96 + community_handle: str, 97 + content: str, 98 + facets: List[Dict], 99 + title: Optional[str] = None, 100 + embed: Optional[Dict] = None, 101 + thumbnail_url: Optional[str] = None 102 + ) -> str: 103 + """ 104 + Create a post in a community. 105 + 106 + Args: 107 + community_handle: Community handle (e.g., "world-news.coves.social") 108 + content: Post content (rich text) 109 + facets: Rich text facets (formatting, links) 110 + title: Optional post title 111 + embed: Optional external embed 112 + thumbnail_url: Optional thumbnail URL (for trusted aggregators only) 113 + 114 + Returns: 115 + AT Proto URI of created post (e.g., "at://did:plc:.../social.coves.post/...") 116 + 117 + Raises: 118 + CovesAuthenticationError: If authentication fails (401) 119 + CovesForbiddenError: If access is denied (403) 120 + CovesNotFoundError: If community not found (404) 121 + CovesRateLimitError: If rate limit exceeded (429) 122 + CovesAPIError: For other API errors or invalid responses 123 + requests.RequestException: For network-level errors 124 + """ 125 + try: 126 + # Prepare post data for social.coves.community.post.create endpoint 127 + post_data = { 128 + "community": community_handle, 129 + "content": content, 130 + "facets": facets 131 + } 132 + 133 + # Add title if provided 134 + if title: 135 + post_data["title"] = title 136 + 137 + # Add embed if provided 138 + if embed: 139 + post_data["embed"] = embed 140 + 141 + # Add thumbnail URL at top level if provided (for trusted aggregators) 142 + if thumbnail_url: 143 + post_data["thumbnailUrl"] = thumbnail_url 144 + 145 + # Use Coves-specific endpoint (not direct PDS write) 146 + # This provides validation, authorization, and business logic 147 + logger.info(f"Creating post in community: {community_handle}") 148 + 149 + # Make HTTP request to XRPC endpoint using session with API key 150 + url = f"{self.api_url}/xrpc/social.coves.community.post.create" 151 + response = self.session.post(url, json=post_data, timeout=30) 152 + 153 + # Handle specific error cases 154 + if not response.ok: 155 + # Log status code but not full response body (may contain sensitive data) 156 + logger.error(f"Post creation failed with status {response.status_code}") 157 + self._raise_for_status(response) 158 + 159 + try: 160 + result = response.json() 161 + post_uri = result["uri"] 162 + except (ValueError, KeyError) as e: 163 + # ValueError for invalid JSON, KeyError for missing 'uri' field 164 + logger.error(f"Failed to parse post creation response: {e}") 165 + raise CovesAPIError( 166 + f"Invalid response from server: {e}", 167 + status_code=response.status_code, 168 + response_body=response.text 169 + ) 170 + 171 + logger.info(f"Post created: {post_uri}") 172 + return post_uri 173 + 174 + except requests.RequestException as e: 175 + # Network errors, timeouts, etc. 176 + logger.error(f"Network error creating post: {e}") 177 + raise 178 + except CovesAPIError: 179 + # Re-raise our custom exceptions as-is 180 + raise 181 + 182 + def create_external_embed( 183 + self, 184 + uri: str, 185 + title: str, 186 + description: str, 187 + sources: Optional[List[Dict]] = None, 188 + embed_type: Optional[str] = None, 189 + provider: Optional[str] = None, 190 + domain: Optional[str] = None 191 + ) -> Dict: 192 + """ 193 + Create external embed object for hot-linked content. 194 + 195 + Args: 196 + uri: URL of the external content 197 + title: Title of the content 198 + description: Description/summary 199 + sources: Optional list of source dicts with uri, title, domain 200 + embed_type: Type hint for rendering (article, image, video, website) 201 + provider: Service provider name (e.g., streamable, imgur) 202 + domain: Domain of the linked content (e.g., streamable.com) 203 + 204 + Returns: 205 + Embed dictionary ready for post creation 206 + """ 207 + external = { 208 + "uri": uri, 209 + "title": title, 210 + "description": description 211 + } 212 + 213 + if sources: 214 + external["sources"] = sources 215 + 216 + if embed_type: 217 + external["embedType"] = embed_type 218 + 219 + if provider: 220 + external["provider"] = provider 221 + 222 + if domain: 223 + external["domain"] = domain 224 + 225 + return { 226 + "$type": "social.coves.embed.external", 227 + "external": external 228 + } 229 + 230 + def _raise_for_status(self, response: requests.Response) -> None: 231 + """ 232 + Raise specific exceptions based on HTTP status code. 233 + 234 + Args: 235 + response: The HTTP response object 236 + 237 + Raises: 238 + CovesAuthenticationError: For 401 Unauthorized 239 + CovesNotFoundError: For 404 Not Found 240 + CovesRateLimitError: For 429 Too Many Requests 241 + CovesAPIError: For other 4xx/5xx errors 242 + """ 243 + status_code = response.status_code 244 + error_body = response.text 245 + 246 + if status_code == 401: 247 + raise CovesAuthenticationError( 248 + f"Authentication failed: {error_body}", 249 + status_code=status_code, 250 + response_body=error_body 251 + ) 252 + elif status_code == 403: 253 + raise CovesForbiddenError( 254 + f"Access forbidden: {error_body}", 255 + status_code=status_code, 256 + response_body=error_body 257 + ) 258 + elif status_code == 404: 259 + raise CovesNotFoundError( 260 + f"Resource not found: {error_body}", 261 + status_code=status_code, 262 + response_body=error_body 263 + ) 264 + elif status_code == 429: 265 + raise CovesRateLimitError( 266 + f"Rate limit exceeded: {error_body}", 267 + status_code=status_code, 268 + response_body=error_body 269 + ) 270 + else: 271 + raise CovesAPIError( 272 + f"API request failed ({status_code}): {error_body}", 273 + status_code=status_code, 274 + response_body=error_body 275 + ) 276 + 277 + def _get_timestamp(self) -> str: 278 + """ 279 + Get current timestamp in ISO 8601 format. 280 + 281 + Returns: 282 + ISO timestamp string 283 + """ 284 + from datetime import datetime, timezone 285 + return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
+364
aggregators/reddit-highlights/src/main.py
··· 1 + """ 2 + Main Orchestration Script for Reddit Highlights Aggregator. 3 + 4 + Coordinates all components to: 5 + 1. Apply anti-detection jitter delay 6 + 2. Fetch Reddit RSS feeds 7 + 3. Extract streamable video links 8 + 4. Deduplicate via state tracking 9 + 5. Post to Coves communities 10 + 11 + Designed to run via CRON (single execution, then exit). 12 + """ 13 + import os 14 + import re 15 + import sys 16 + import time 17 + import random 18 + import logging 19 + from pathlib import Path 20 + from datetime import datetime 21 + from typing import Optional 22 + 23 + from src.config import ConfigLoader 24 + from src.rss_fetcher import RSSFetcher 25 + from src.link_extractor import LinkExtractor 26 + from src.state_manager import StateManager 27 + from src.coves_client import CovesClient 28 + from src.models import RedditPost 29 + 30 + # Setup logging 31 + logging.basicConfig( 32 + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 33 + ) 34 + logger = logging.getLogger(__name__) 35 + 36 + # Reddit RSS URL template 37 + REDDIT_RSS_URL = "https://www.reddit.com/r/{subreddit}/.rss" 38 + 39 + # Anti-detection jitter range (0-10 minutes in seconds) 40 + JITTER_MIN_SECONDS = 0 41 + JITTER_MAX_SECONDS = 600 42 + 43 + 44 + class Aggregator: 45 + """ 46 + Main aggregator orchestration. 47 + 48 + Coordinates all components to fetch, filter, and post video highlights. 49 + """ 50 + 51 + def __init__( 52 + self, 53 + config_path: Path, 54 + state_file: Path, 55 + coves_client: Optional[CovesClient] = None, 56 + skip_jitter: bool = False, 57 + ): 58 + """ 59 + Initialize aggregator. 60 + 61 + Args: 62 + config_path: Path to config.yaml 63 + state_file: Path to state.json 64 + coves_client: Optional CovesClient (for testing) 65 + skip_jitter: Skip anti-detection delay (for testing) 66 + """ 67 + self.skip_jitter = skip_jitter 68 + 69 + # Load configuration 70 + logger.info("Loading configuration...") 71 + config_loader = ConfigLoader(config_path) 72 + self.config = config_loader.load() 73 + 74 + # Initialize components 75 + logger.info("Initializing components...") 76 + self.rss_fetcher = RSSFetcher() 77 + self.link_extractor = LinkExtractor( 78 + allowed_domains=self.config.allowed_domains 79 + ) 80 + self.state_manager = StateManager(state_file) 81 + self.state_file = state_file 82 + 83 + # Initialize Coves client (or use provided one for testing) 84 + if coves_client: 85 + self.coves_client = coves_client 86 + else: 87 + api_key = os.getenv("COVES_API_KEY") 88 + if not api_key: 89 + raise ValueError("COVES_API_KEY environment variable required") 90 + 91 + self.coves_client = CovesClient( 92 + api_url=self.config.coves_api_url, api_key=api_key 93 + ) 94 + 95 + def run(self): 96 + """ 97 + Run aggregator: apply jitter, fetch, filter, post, and update state. 98 + 99 + This is the main entry point for CRON execution. 100 + """ 101 + logger.info("=" * 60) 102 + logger.info("Starting Reddit Highlights Aggregator") 103 + logger.info("=" * 60) 104 + 105 + # Anti-detection jitter: random delay before starting 106 + if not self.skip_jitter: 107 + jitter_seconds = random.uniform(JITTER_MIN_SECONDS, JITTER_MAX_SECONDS) 108 + logger.info( 109 + f"Applying anti-detection jitter: sleeping for {jitter_seconds:.1f} seconds " 110 + f"({jitter_seconds/60:.1f} minutes)" 111 + ) 112 + time.sleep(jitter_seconds) 113 + 114 + # Get enabled subreddits only 115 + enabled_subreddits = [s for s in self.config.subreddits if s.enabled] 116 + logger.info(f"Processing {len(enabled_subreddits)} enabled subreddits") 117 + 118 + # Authenticate once at the start 119 + try: 120 + self.coves_client.authenticate() 121 + except Exception as e: 122 + logger.error(f"Failed to authenticate: {e}") 123 + logger.error("Cannot continue without authentication") 124 + raise RuntimeError("Authentication failed") from e 125 + 126 + # Process each subreddit 127 + for subreddit_config in enabled_subreddits: 128 + try: 129 + self._process_subreddit(subreddit_config) 130 + except KeyboardInterrupt: 131 + # Re-raise interrupt signals - don't suppress user abort 132 + logger.info("Received interrupt signal, stopping...") 133 + raise 134 + except Exception as e: 135 + # Log error but continue with other subreddits 136 + logger.error( 137 + f"Error processing subreddit '{subreddit_config.name}': {e}", 138 + exc_info=True, 139 + ) 140 + continue 141 + 142 + logger.info("=" * 60) 143 + logger.info("Aggregator run completed") 144 + logger.info("=" * 60) 145 + 146 + def _process_subreddit(self, subreddit_config): 147 + """ 148 + Process a single subreddit. 149 + 150 + Args: 151 + subreddit_config: SubredditConfig object 152 + """ 153 + subreddit_name = subreddit_config.name 154 + community_handle = subreddit_config.community_handle 155 + 156 + # Sanitize subreddit name to prevent URL injection 157 + # Only allow alphanumeric, underscores, and hyphens 158 + if not re.match(r'^[a-zA-Z0-9_-]+$', subreddit_name): 159 + raise ValueError(f"Invalid subreddit name: {subreddit_name}") 160 + 161 + logger.info(f"Processing subreddit: r/{subreddit_name} -> {community_handle}") 162 + 163 + # Build RSS URL 164 + rss_url = REDDIT_RSS_URL.format(subreddit=subreddit_name) 165 + 166 + # Fetch RSS feed 167 + try: 168 + feed = self.rss_fetcher.fetch_feed(rss_url) 169 + except Exception as e: 170 + logger.error(f"Failed to fetch feed for r/{subreddit_name}: {e}") 171 + raise 172 + 173 + # Check for feed errors 174 + if feed.bozo: 175 + bozo_exception = getattr(feed, 'bozo_exception', None) 176 + logger.warning( 177 + f"Feed for r/{subreddit_name} has parsing issues (bozo flag set): {bozo_exception}" 178 + ) 179 + 180 + # Process entries 181 + new_posts = 0 182 + skipped_posts = 0 183 + no_video_count = 0 184 + 185 + for entry in feed.entries: 186 + try: 187 + # Extract video URL 188 + video_url = self.link_extractor.extract_video_url(entry) 189 + if not video_url: 190 + no_video_count += 1 191 + continue # Skip posts without video links 192 + 193 + # Get entry ID for deduplication 194 + entry_id = self._get_entry_id(entry) 195 + if self.state_manager.is_posted(subreddit_name, entry_id): 196 + skipped_posts += 1 197 + logger.debug(f"Skipping already-posted entry: {entry_id}") 198 + continue 199 + 200 + # Parse entry into RedditPost 201 + reddit_post = self._parse_entry(entry, subreddit_name, video_url) 202 + 203 + # Create embed with sources and video metadata 204 + # Note: Thumbnail is fetched by backend via unfurl service 205 + embed = self.coves_client.create_external_embed( 206 + uri=reddit_post.streamable_url, 207 + title=reddit_post.title, 208 + description=f"From r/{subreddit_name}", 209 + sources=[ 210 + { 211 + "uri": reddit_post.reddit_url, 212 + "title": f"r/{subreddit_name}", 213 + "domain": "reddit.com", 214 + } 215 + ], 216 + embed_type="video", 217 + provider="streamable", 218 + domain="streamable.com", 219 + ) 220 + 221 + # Post to community 222 + try: 223 + post_uri = self.coves_client.create_post( 224 + community_handle=community_handle, 225 + title=reddit_post.title, 226 + content="", # No additional content needed 227 + facets=[], 228 + embed=embed, 229 + ) 230 + 231 + # Mark as posted (only if successful) 232 + self.state_manager.mark_posted(subreddit_name, entry_id, post_uri) 233 + new_posts += 1 234 + logger.info(f"Posted: {reddit_post.title[:50]}... -> {post_uri}") 235 + 236 + except Exception as e: 237 + # Don't update state if posting failed 238 + logger.error(f"Failed to post '{reddit_post.title}': {e}") 239 + continue 240 + 241 + except Exception as e: 242 + # Log error but continue with other entries 243 + logger.error(f"Error processing entry: {e}", exc_info=True) 244 + continue 245 + 246 + # Update last run timestamp 247 + self.state_manager.update_last_run(subreddit_name, datetime.now()) 248 + 249 + logger.info( 250 + f"r/{subreddit_name}: {new_posts} new posts, {skipped_posts} duplicates, " 251 + f"{no_video_count} without video" 252 + ) 253 + 254 + def _get_entry_id(self, entry) -> str: 255 + """ 256 + Get unique identifier for RSS entry. 257 + 258 + Args: 259 + entry: feedparser entry 260 + 261 + Returns: 262 + Unique ID string 263 + """ 264 + # Reddit RSS uses 'id' field with format like 't3_abc123' 265 + if hasattr(entry, "id") and entry.id: 266 + return entry.id 267 + 268 + # Fallback to link 269 + if hasattr(entry, "link") and entry.link: 270 + return entry.link 271 + 272 + # Last resort: title hash (using SHA-256 for security) 273 + if hasattr(entry, "title") and entry.title: 274 + import hashlib 275 + 276 + logger.warning(f"Using fallback hash for entry ID (no id or link found)") 277 + return hashlib.sha256(entry.title.encode()).hexdigest() 278 + 279 + raise ValueError("Cannot determine entry ID") 280 + 281 + def _parse_entry(self, entry, subreddit: str, video_url: str) -> RedditPost: 282 + """ 283 + Parse RSS entry into RedditPost object. 284 + 285 + Args: 286 + entry: feedparser entry 287 + subreddit: Subreddit name 288 + video_url: Extracted video URL 289 + 290 + Returns: 291 + RedditPost object 292 + """ 293 + # Get entry ID 294 + entry_id = self._get_entry_id(entry) 295 + 296 + # Get title 297 + title = entry.title if hasattr(entry, "title") else "Untitled" 298 + 299 + # Get Reddit permalink 300 + reddit_url = entry.link if hasattr(entry, "link") else "" 301 + 302 + # Get author (Reddit RSS uses 'author' field) 303 + author = "" 304 + if hasattr(entry, "author"): 305 + author = entry.author 306 + elif hasattr(entry, "author_detail") and hasattr(entry.author_detail, "name"): 307 + author = entry.author_detail.name 308 + 309 + # Get published date 310 + published = None 311 + if hasattr(entry, "published_parsed") and entry.published_parsed: 312 + try: 313 + published = datetime(*entry.published_parsed[:6]) 314 + except (TypeError, ValueError) as e: 315 + logger.warning(f"Failed to parse published date for entry: {e}") 316 + 317 + return RedditPost( 318 + id=entry_id, 319 + title=title, 320 + link=entry.link if hasattr(entry, "link") else "", 321 + reddit_url=reddit_url, 322 + subreddit=subreddit, 323 + author=author, 324 + published=published, 325 + streamable_url=video_url, 326 + ) 327 + 328 + 329 + def main(): 330 + """ 331 + Main entry point for command-line execution. 332 + 333 + Usage: 334 + python -m src.main 335 + """ 336 + # Get paths from environment or use defaults 337 + config_path = Path(os.getenv("CONFIG_PATH", "config.yaml")) 338 + state_file = Path(os.getenv("STATE_FILE", "data/state.json")) 339 + 340 + # Check for skip jitter flag (for testing) 341 + skip_jitter = os.getenv("SKIP_JITTER", "").lower() in ("true", "1", "yes") 342 + 343 + # Validate config file exists 344 + if not config_path.exists(): 345 + logger.error(f"Configuration file not found: {config_path}") 346 + logger.error("Please create config.yaml (see config.example.yaml)") 347 + sys.exit(1) 348 + 349 + # Create aggregator and run 350 + try: 351 + aggregator = Aggregator( 352 + config_path=config_path, 353 + state_file=state_file, 354 + skip_jitter=skip_jitter, 355 + ) 356 + aggregator.run() 357 + sys.exit(0) 358 + except Exception as e: 359 + logger.error(f"Aggregator failed: {e}", exc_info=True) 360 + sys.exit(1) 361 + 362 + 363 + if __name__ == "__main__": 364 + main()
+90
aggregators/reddit-highlights/src/models.py
··· 1 + """ 2 + Data models for Reddit Highlights Aggregator. 3 + """ 4 + from dataclasses import dataclass, field 5 + from enum import Enum 6 + from typing import List, Optional, Tuple 7 + from datetime import datetime 8 + import re 9 + 10 + 11 + class LogLevel(Enum): 12 + """Valid log levels for aggregator configuration.""" 13 + DEBUG = "debug" 14 + INFO = "info" 15 + WARNING = "warning" 16 + ERROR = "error" 17 + CRITICAL = "critical" 18 + 19 + 20 + @dataclass 21 + class RedditPost: 22 + """ 23 + Represents a Reddit post with video content. 24 + 25 + Parsed from Reddit RSS feed entries. 26 + """ 27 + 28 + id: str # Reddit post ID (e.g., "t3_1abc123" or just the rkey) 29 + title: str # Post title 30 + link: str # Direct link to content (may be streamable URL) 31 + reddit_url: str # Permalink to Reddit post 32 + subreddit: str # Subreddit name (without r/) 33 + author: str # Reddit username 34 + published: Optional[datetime] = None # Post publication time 35 + streamable_url: Optional[str] = None # Extracted streamable URL (if found) 36 + 37 + def __post_init__(self): 38 + """Validate required fields.""" 39 + if not self.id: 40 + raise ValueError("RedditPost.id cannot be empty") 41 + if not self.title: 42 + raise ValueError("RedditPost.title cannot be empty") 43 + if not self.subreddit: 44 + raise ValueError("RedditPost.subreddit cannot be empty") 45 + 46 + 47 + @dataclass(frozen=True) 48 + class SubredditConfig: 49 + """ 50 + Configuration for a single subreddit source. 51 + 52 + Maps a subreddit to a Coves community. 53 + Immutable (frozen) to prevent accidental modification. 54 + """ 55 + 56 + name: str # Subreddit name (e.g., "nba") 57 + community_handle: str # Coves community (e.g., "nba.coves.social") 58 + enabled: bool = True # Whether to fetch from this subreddit 59 + 60 + def __post_init__(self): 61 + """Validate configuration fields.""" 62 + if not self.name or not self.name.strip(): 63 + raise ValueError("SubredditConfig.name cannot be empty") 64 + if not self.community_handle or not self.community_handle.strip(): 65 + raise ValueError("SubredditConfig.community_handle cannot be empty") 66 + # Validate subreddit name format (alphanumeric, underscores, hyphens only) 67 + if not re.match(r'^[a-zA-Z0-9_-]+$', self.name): 68 + raise ValueError(f"Invalid subreddit name format: {self.name}") 69 + 70 + 71 + @dataclass(frozen=True) 72 + class AggregatorConfig: 73 + """ 74 + Full aggregator configuration. 75 + 76 + Loaded from config.yaml. 77 + Immutable (frozen) to prevent accidental modification after loading. 78 + """ 79 + 80 + coves_api_url: str 81 + subreddits: Tuple[SubredditConfig, ...] # Use tuple for immutability 82 + allowed_domains: Tuple[str, ...] = ("streamable.com",) # Default tuple 83 + log_level: LogLevel = LogLevel.INFO 84 + 85 + def __post_init__(self): 86 + """Validate configuration.""" 87 + if not self.coves_api_url: 88 + raise ValueError("AggregatorConfig.coves_api_url cannot be empty") 89 + if not self.subreddits: 90 + raise ValueError("AggregatorConfig.subreddits cannot be empty")
+84
aggregators/reddit-highlights/src/rss_fetcher.py
··· 1 + """ 2 + RSS feed fetcher with retry logic and error handling. 3 + """ 4 + import time 5 + import logging 6 + import requests 7 + import feedparser 8 + from typing import Optional 9 + 10 + logger = logging.getLogger(__name__) 11 + 12 + 13 + class RSSFetcher: 14 + """ 15 + Fetches and parses RSS feeds with retry logic and error handling. 16 + 17 + Features: 18 + - Configurable timeout and retry count 19 + - Exponential backoff on failures 20 + - Custom User-Agent header (required by Reddit) 21 + - Automatic HTTP to HTTPS upgrade handling 22 + """ 23 + 24 + DEFAULT_USER_AGENT = "Coves-Reddit-Aggregator/1.0 (https://coves.social; contact@coves.social)" 25 + 26 + def __init__(self, timeout: int = 30, max_retries: int = 3, user_agent: Optional[str] = None): 27 + """ 28 + Initialize RSS fetcher. 29 + 30 + Args: 31 + timeout: Request timeout in seconds 32 + max_retries: Maximum number of retry attempts 33 + user_agent: Custom User-Agent string (Reddit requires this) 34 + """ 35 + self.timeout = timeout 36 + self.max_retries = max_retries 37 + self.user_agent = user_agent or self.DEFAULT_USER_AGENT 38 + 39 + def fetch_feed(self, url: str) -> feedparser.FeedParserDict: 40 + """ 41 + Fetch and parse an RSS feed. 42 + 43 + Args: 44 + url: RSS feed URL 45 + 46 + Returns: 47 + Parsed feed object 48 + 49 + Raises: 50 + ValueError: If URL is empty 51 + requests.RequestException: If all retry attempts fail 52 + """ 53 + if not url: 54 + raise ValueError("URL cannot be empty") 55 + 56 + last_error = None 57 + 58 + for attempt in range(self.max_retries): 59 + try: 60 + logger.info(f"Fetching feed from {url} (attempt {attempt + 1}/{self.max_retries})") 61 + 62 + headers = {"User-Agent": self.user_agent} 63 + response = requests.get(url, timeout=self.timeout, headers=headers) 64 + response.raise_for_status() 65 + 66 + # Parse with feedparser 67 + feed = feedparser.parse(response.content) 68 + 69 + logger.info(f"Successfully fetched feed: {feed.feed.get('title', 'Unknown')}") 70 + return feed 71 + 72 + except requests.RequestException as e: 73 + last_error = e 74 + logger.warning(f"Fetch attempt {attempt + 1} failed: {e}") 75 + 76 + if attempt < self.max_retries - 1: 77 + # Exponential backoff 78 + sleep_time = 2 ** attempt 79 + logger.info(f"Retrying in {sleep_time} seconds...") 80 + time.sleep(sleep_time) 81 + 82 + # All retries exhausted 83 + logger.error(f"Failed to fetch feed after {self.max_retries} attempts") 84 + raise last_error
+255
aggregators/reddit-highlights/src/state_manager.py
··· 1 + """ 2 + State Manager for tracking posted stories. 3 + 4 + Handles deduplication by tracking which stories have already been posted. 5 + Uses JSON file for persistence. 6 + """ 7 + import json 8 + import logging 9 + from pathlib import Path 10 + from datetime import datetime, timedelta 11 + from typing import Optional, Dict, List 12 + 13 + logger = logging.getLogger(__name__) 14 + 15 + 16 + class StateManager: 17 + """ 18 + Manages aggregator state for deduplication. 19 + 20 + Tracks posted Reddit entries per subreddit to prevent duplicate posting. 21 + 22 + Attributes tracked per subreddit: 23 + - Posted GUIDs (with timestamps and Coves post URIs) 24 + - Last successful run timestamp 25 + - Automatic cleanup of old entries to prevent state file bloat 26 + 27 + Note: The 'feed_url' parameter in methods refers to the subreddit name 28 + (e.g., 'nba'), not a full RSS URL. This naming is historical but the 29 + functionality uses subreddit names as keys. 30 + """ 31 + 32 + def __init__(self, state_file: Path, max_guids_per_feed: int = 100, max_age_days: int = 30): 33 + """ 34 + Initialize state manager. 35 + 36 + Args: 37 + state_file: Path to JSON state file 38 + max_guids_per_feed: Maximum GUIDs to keep per feed (default: 100) 39 + max_age_days: Maximum age in days for GUIDs (default: 30) 40 + """ 41 + self.state_file = Path(state_file) 42 + self.max_guids_per_feed = max_guids_per_feed 43 + self.max_age_days = max_age_days 44 + self.state = self._load_state() 45 + 46 + def _load_state(self) -> Dict: 47 + """Load state from file, or create new state if file doesn't exist.""" 48 + if not self.state_file.exists(): 49 + logger.info(f"Creating new state file at {self.state_file}") 50 + state = {'feeds': {}} 51 + self._save_state(state) 52 + return state 53 + 54 + try: 55 + with open(self.state_file, 'r') as f: 56 + state = json.load(f) 57 + logger.info(f"Loaded state from {self.state_file}") 58 + return state 59 + except json.JSONDecodeError as e: 60 + # Backup corrupted file before overwriting 61 + backup_path = self.state_file.with_suffix('.json.corrupted') 62 + logger.error(f"State file corrupted: {e}. Backing up to {backup_path}") 63 + try: 64 + import shutil 65 + shutil.copy2(self.state_file, backup_path) 66 + logger.info(f"Corrupted state file backed up to {backup_path}") 67 + except OSError as backup_error: 68 + logger.warning(f"Failed to backup corrupted state file: {backup_error}") 69 + state = {'feeds': {}} 70 + self._save_state(state) 71 + return state 72 + 73 + def _save_state(self, state: Optional[Dict] = None): 74 + """ 75 + Save state to file atomically. 76 + 77 + Uses write-to-temp-then-rename pattern to prevent corruption 78 + if the process is interrupted during write. 79 + 80 + Raises: 81 + OSError: If write fails (after logging the error) 82 + """ 83 + if state is None: 84 + state = self.state 85 + 86 + # Ensure parent directory exists 87 + self.state_file.parent.mkdir(parents=True, exist_ok=True) 88 + 89 + # Write to temp file first for atomic update 90 + temp_file = self.state_file.with_suffix('.json.tmp') 91 + try: 92 + with open(temp_file, 'w') as f: 93 + json.dump(state, f, indent=2) 94 + # Atomic rename (on POSIX systems) 95 + temp_file.rename(self.state_file) 96 + except OSError as e: 97 + logger.error(f"Failed to save state file: {e}") 98 + # Clean up temp file if it exists 99 + if temp_file.exists(): 100 + try: 101 + temp_file.unlink() 102 + except OSError: 103 + pass 104 + raise 105 + 106 + def _ensure_feed_exists(self, feed_url: str): 107 + """Ensure feed entry exists in state.""" 108 + if feed_url not in self.state['feeds']: 109 + self.state['feeds'][feed_url] = { 110 + 'posted_guids': [], 111 + 'last_successful_run': None 112 + } 113 + 114 + def is_posted(self, feed_url: str, guid: str) -> bool: 115 + """ 116 + Check if a story has already been posted. 117 + 118 + Args: 119 + feed_url: RSS feed URL 120 + guid: Story GUID 121 + 122 + Returns: 123 + True if already posted, False otherwise 124 + """ 125 + self._ensure_feed_exists(feed_url) 126 + 127 + posted_guids = self.state['feeds'][feed_url]['posted_guids'] 128 + return any(entry['guid'] == guid for entry in posted_guids) 129 + 130 + def mark_posted(self, feed_url: str, guid: str, post_uri: str): 131 + """ 132 + Mark a story as posted. 133 + 134 + Args: 135 + feed_url: RSS feed URL 136 + guid: Story GUID 137 + post_uri: AT Proto URI of created post 138 + """ 139 + self._ensure_feed_exists(feed_url) 140 + 141 + # Add to posted list 142 + entry = { 143 + 'guid': guid, 144 + 'post_uri': post_uri, 145 + 'posted_at': datetime.now().isoformat() 146 + } 147 + self.state['feeds'][feed_url]['posted_guids'].append(entry) 148 + 149 + # Auto-cleanup to keep state file manageable 150 + self.cleanup_old_entries(feed_url) 151 + 152 + # Save state 153 + self._save_state() 154 + 155 + logger.info(f"Marked as posted: {guid} -> {post_uri}") 156 + 157 + def get_last_run(self, feed_url: str) -> Optional[datetime]: 158 + """ 159 + Get last successful run timestamp for a feed. 160 + 161 + Args: 162 + feed_url: RSS feed URL 163 + 164 + Returns: 165 + Datetime of last run, or None if never run 166 + """ 167 + self._ensure_feed_exists(feed_url) 168 + 169 + timestamp_str = self.state['feeds'][feed_url]['last_successful_run'] 170 + if timestamp_str is None: 171 + return None 172 + 173 + return datetime.fromisoformat(timestamp_str) 174 + 175 + def update_last_run(self, feed_url: str, timestamp: datetime): 176 + """ 177 + Update last successful run timestamp. 178 + 179 + Args: 180 + feed_url: RSS feed URL 181 + timestamp: Timestamp of successful run 182 + """ 183 + self._ensure_feed_exists(feed_url) 184 + 185 + self.state['feeds'][feed_url]['last_successful_run'] = timestamp.isoformat() 186 + self._save_state() 187 + 188 + logger.info(f"Updated last run for {feed_url}: {timestamp}") 189 + 190 + def cleanup_old_entries(self, feed_url: str): 191 + """ 192 + Remove old entries from state. 193 + 194 + Removes entries that are: 195 + - Older than max_age_days 196 + - Beyond max_guids_per_feed limit (keeps most recent) 197 + 198 + Args: 199 + feed_url: RSS feed URL 200 + """ 201 + self._ensure_feed_exists(feed_url) 202 + 203 + posted_guids = self.state['feeds'][feed_url]['posted_guids'] 204 + 205 + # Filter out entries older than max_age_days 206 + cutoff_date = datetime.now() - timedelta(days=self.max_age_days) 207 + filtered = [] 208 + for entry in posted_guids: 209 + try: 210 + posted_at = datetime.fromisoformat(entry['posted_at']) 211 + if posted_at > cutoff_date: 212 + filtered.append(entry) 213 + except (KeyError, ValueError) as e: 214 + # Skip entries with malformed or missing timestamps 215 + logger.warning(f"Skipping entry with invalid timestamp: {e}") 216 + continue 217 + 218 + # Keep only most recent max_guids_per_feed entries 219 + # Sort by posted_at (most recent first) 220 + filtered.sort(key=lambda x: x['posted_at'], reverse=True) 221 + filtered = filtered[:self.max_guids_per_feed] 222 + 223 + # Update state 224 + old_count = len(posted_guids) 225 + new_count = len(filtered) 226 + self.state['feeds'][feed_url]['posted_guids'] = filtered 227 + 228 + if old_count != new_count: 229 + logger.info(f"Cleaned up {old_count - new_count} old entries for {feed_url}") 230 + 231 + def get_posted_count(self, feed_url: str) -> int: 232 + """ 233 + Get count of posted items for a feed. 234 + 235 + Args: 236 + feed_url: RSS feed URL 237 + 238 + Returns: 239 + Number of posted items 240 + """ 241 + self._ensure_feed_exists(feed_url) 242 + return len(self.state['feeds'][feed_url]['posted_guids']) 243 + 244 + def get_all_posted_guids(self, feed_url: str) -> List[str]: 245 + """ 246 + Get all posted GUIDs for a feed. 247 + 248 + Args: 249 + feed_url: RSS feed URL 250 + 251 + Returns: 252 + List of GUIDs 253 + """ 254 + self._ensure_feed_exists(feed_url) 255 + return [entry['guid'] for entry in self.state['feeds'][feed_url]['posted_guids']]