A third party ATProto appview
at main 271 lines 9.7 kB view raw
1#!/usr/bin/env python3 2""" 3Label Service for AT Protocol 4 5Applies moderation labels to content. 6Python port of server/services/label.ts 7""" 8 9import logging 10from typing import Optional, List, Dict, Any 11from datetime import datetime, timezone 12import asyncpg 13 14logger = logging.getLogger(__name__) 15 16 17class LabelService: 18 """Service for managing moderation labels""" 19 20 def __init__(self, db_pool): 21 self.db_pool = db_pool 22 23 async def apply_label( 24 self, 25 src: str, 26 subject: str, 27 val: str, 28 neg: bool = False, 29 created_at: Optional[datetime] = None 30 ) -> Dict[str, Any]: 31 """Apply a label to a subject""" 32 uri = f"at://{src}/app.bsky.labeler.label/{int(datetime.now(timezone.utc).timestamp() * 1000)}" 33 created_at = created_at or datetime.now(timezone.utc).replace(tzinfo=None) 34 35 async with self.db_pool.acquire() as conn: 36 try: 37 # Create label 38 await conn.execute( 39 """ 40 INSERT INTO labels (uri, src, subject, val, neg, created_at) 41 VALUES ($1, $2, $3, $4, $5, $6) 42 ON CONFLICT (uri) DO NOTHING 43 """, 44 uri, src, subject, val, neg, created_at 45 ) 46 47 # Create label event 48 await conn.execute( 49 """ 50 INSERT INTO label_events (label_uri, action, created_at) 51 VALUES ($1, $2, NOW()) 52 """, 53 uri, 'created' 54 ) 55 56 logger.info(f"[LABEL_SERVICE] Applied label {val} to {subject} from {src}") 57 58 return { 59 'uri': uri, 60 'src': src, 61 'subject': subject, 62 'val': val, 63 'neg': neg, 64 'createdAt': created_at 65 } 66 except Exception as e: 67 logger.error(f"[LABEL_SERVICE] Error applying label: {str(e)}") 68 raise 69 70 async def negate_label(self, src: str, subject: str, val: str) -> Dict[str, Any]: 71 """Negate a label (apply as negation)""" 72 return await self.apply_label(src, subject, val, neg=True) 73 74 async def remove_label(self, uri: str): 75 """Remove a label""" 76 async with self.db_pool.acquire() as conn: 77 try: 78 # Get label before deleting 79 label = await conn.fetchrow( 80 "SELECT * FROM labels WHERE uri = $1", 81 uri 82 ) 83 84 if label: 85 # Create label event 86 await conn.execute( 87 """ 88 INSERT INTO label_events (label_uri, action, created_at) 89 VALUES ($1, $2, NOW()) 90 """, 91 uri, 'deleted' 92 ) 93 94 # Delete label 95 await conn.execute( 96 "DELETE FROM labels WHERE uri = $1", 97 uri 98 ) 99 100 logger.info(f"[LABEL_SERVICE] Removed label {uri}") 101 except Exception as e: 102 logger.error(f"[LABEL_SERVICE] Error removing label: {str(e)}") 103 raise 104 105 async def get_labels_for_subject(self, subject: str) -> List[Dict[str, Any]]: 106 """Get all labels for a subject""" 107 async with self.db_pool.acquire() as conn: 108 rows = await conn.fetch( 109 """ 110 SELECT uri, src, subject, val, neg, created_at 111 FROM labels 112 WHERE subject = $1 113 ORDER BY created_at ASC 114 """, 115 subject 116 ) 117 118 return [dict(row) for row in rows] 119 120 async def get_labels_for_subjects(self, subjects: List[str]) -> Dict[str, List[Dict[str, Any]]]: 121 """Get all labels for multiple subjects""" 122 async with self.db_pool.acquire() as conn: 123 rows = await conn.fetch( 124 """ 125 SELECT uri, src, subject, val, neg, created_at 126 FROM labels 127 WHERE subject = ANY($1::text[]) 128 ORDER BY created_at ASC 129 """, 130 subjects 131 ) 132 133 label_map: Dict[str, List[Dict[str, Any]]] = {} 134 for row in rows: 135 subject = row['subject'] 136 if subject not in label_map: 137 label_map[subject] = [] 138 label_map[subject].append(dict(row)) 139 140 return label_map 141 142 async def query_labels( 143 self, 144 sources: Optional[List[str]] = None, 145 subjects: Optional[List[str]] = None, 146 values: Optional[List[str]] = None, 147 limit: int = 100 148 ) -> List[Dict[str, Any]]: 149 """Query labels with filters""" 150 conditions = [] 151 params = [] 152 param_idx = 1 153 154 if sources: 155 conditions.append(f"src = ANY(${param_idx}::text[])") 156 params.append(sources) 157 param_idx += 1 158 159 if subjects: 160 conditions.append(f"subject = ANY(${param_idx}::text[])") 161 params.append(subjects) 162 param_idx += 1 163 164 if values: 165 conditions.append(f"val = ANY(${param_idx}::text[])") 166 params.append(values) 167 param_idx += 1 168 169 where_clause = " AND ".join(conditions) if conditions else "1=1" 170 params.append(limit) 171 172 query = f""" 173 SELECT uri, src, subject, val, neg, created_at 174 FROM labels 175 WHERE {where_clause} 176 ORDER BY created_at DESC 177 LIMIT ${param_idx} 178 """ 179 180 async with self.db_pool.acquire() as conn: 181 rows = await conn.fetch(query, *params) 182 return [dict(row) for row in rows] 183 184 async def get_active_labels_for_subject(self, subject: str) -> List[Dict[str, Any]]: 185 """Get active (non-negated) labels for a subject""" 186 labels = await self.get_labels_for_subject(subject) 187 return self.filter_negated_labels(labels) 188 189 async def get_active_labels_for_subjects(self, subjects: List[str]) -> Dict[str, List[Dict[str, Any]]]: 190 """Get active (non-negated) labels for multiple subjects""" 191 all_labels = await self.get_labels_for_subjects(subjects) 192 193 result = {} 194 for subject, labels in all_labels.items(): 195 result[subject] = self.filter_negated_labels(labels) 196 197 return result 198 199 def filter_negated_labels(self, labels: List[Dict[str, Any]]) -> List[Dict[str, Any]]: 200 """Filter out negated labels""" 201 label_map: Dict[str, Dict[str, Any]] = {} 202 203 # Sort by createdAt 204 sorted_labels = sorted(labels, key=lambda x: x['createdAt']) 205 206 for label in sorted_labels: 207 key = f"{label['subject']}:{label['val']}" 208 209 if label['neg']: 210 # Negation removes the label 211 label_map.pop(key, None) 212 else: 213 # Add or update label 214 label_map[key] = label 215 216 return list(label_map.values()) 217 218 async def create_label_definition( 219 self, 220 value: str, 221 description: Optional[str] = None, 222 severity: str = 'warn', 223 localized_strings: Optional[Dict[str, Any]] = None 224 ) -> Dict[str, Any]: 225 """Create a label definition""" 226 async with self.db_pool.acquire() as conn: 227 await conn.execute( 228 """ 229 INSERT INTO label_definitions (value, description, severity, localized_strings) 230 VALUES ($1, $2, $3, $4::jsonb) 231 ON CONFLICT (value) DO UPDATE SET 232 description = EXCLUDED.description, 233 severity = EXCLUDED.severity, 234 localized_strings = EXCLUDED.localized_strings 235 """, 236 value, description, severity, localized_strings or {} 237 ) 238 239 return { 240 'value': value, 241 'description': description, 242 'severity': severity, 243 'localizedStrings': localized_strings or {} 244 } 245 246 async def get_label_definition(self, value: str) -> Optional[Dict[str, Any]]: 247 """Get a label definition""" 248 async with self.db_pool.acquire() as conn: 249 row = await conn.fetchrow( 250 """ 251 SELECT value, description, severity, localized_strings 252 FROM label_definitions 253 WHERE value = $1 254 """, 255 value 256 ) 257 258 return dict(row) if row else None 259 260 async def get_all_label_definitions(self) -> List[Dict[str, Any]]: 261 """Get all label definitions""" 262 async with self.db_pool.acquire() as conn: 263 rows = await conn.fetch( 264 """ 265 SELECT value, description, severity, localized_strings 266 FROM label_definitions 267 ORDER BY value 268 """ 269 ) 270 271 return [dict(row) for row in rows]