A third party ATProto appview
1#!/usr/bin/env python3
2"""
3Label Service for AT Protocol
4
5Applies moderation labels to content.
6Python port of server/services/label.ts
7"""
8
9import logging
10from typing import Optional, List, Dict, Any
11from datetime import datetime, timezone
12import asyncpg
13
14logger = logging.getLogger(__name__)
15
16
17class LabelService:
18 """Service for managing moderation labels"""
19
20 def __init__(self, db_pool):
21 self.db_pool = db_pool
22
23 async def apply_label(
24 self,
25 src: str,
26 subject: str,
27 val: str,
28 neg: bool = False,
29 created_at: Optional[datetime] = None
30 ) -> Dict[str, Any]:
31 """Apply a label to a subject"""
32 uri = f"at://{src}/app.bsky.labeler.label/{int(datetime.now(timezone.utc).timestamp() * 1000)}"
33 created_at = created_at or datetime.now(timezone.utc).replace(tzinfo=None)
34
35 async with self.db_pool.acquire() as conn:
36 try:
37 # Create label
38 await conn.execute(
39 """
40 INSERT INTO labels (uri, src, subject, val, neg, created_at)
41 VALUES ($1, $2, $3, $4, $5, $6)
42 ON CONFLICT (uri) DO NOTHING
43 """,
44 uri, src, subject, val, neg, created_at
45 )
46
47 # Create label event
48 await conn.execute(
49 """
50 INSERT INTO label_events (label_uri, action, created_at)
51 VALUES ($1, $2, NOW())
52 """,
53 uri, 'created'
54 )
55
56 logger.info(f"[LABEL_SERVICE] Applied label {val} to {subject} from {src}")
57
58 return {
59 'uri': uri,
60 'src': src,
61 'subject': subject,
62 'val': val,
63 'neg': neg,
64 'createdAt': created_at
65 }
66 except Exception as e:
67 logger.error(f"[LABEL_SERVICE] Error applying label: {str(e)}")
68 raise
69
70 async def negate_label(self, src: str, subject: str, val: str) -> Dict[str, Any]:
71 """Negate a label (apply as negation)"""
72 return await self.apply_label(src, subject, val, neg=True)
73
74 async def remove_label(self, uri: str):
75 """Remove a label"""
76 async with self.db_pool.acquire() as conn:
77 try:
78 # Get label before deleting
79 label = await conn.fetchrow(
80 "SELECT * FROM labels WHERE uri = $1",
81 uri
82 )
83
84 if label:
85 # Create label event
86 await conn.execute(
87 """
88 INSERT INTO label_events (label_uri, action, created_at)
89 VALUES ($1, $2, NOW())
90 """,
91 uri, 'deleted'
92 )
93
94 # Delete label
95 await conn.execute(
96 "DELETE FROM labels WHERE uri = $1",
97 uri
98 )
99
100 logger.info(f"[LABEL_SERVICE] Removed label {uri}")
101 except Exception as e:
102 logger.error(f"[LABEL_SERVICE] Error removing label: {str(e)}")
103 raise
104
105 async def get_labels_for_subject(self, subject: str) -> List[Dict[str, Any]]:
106 """Get all labels for a subject"""
107 async with self.db_pool.acquire() as conn:
108 rows = await conn.fetch(
109 """
110 SELECT uri, src, subject, val, neg, created_at
111 FROM labels
112 WHERE subject = $1
113 ORDER BY created_at ASC
114 """,
115 subject
116 )
117
118 return [dict(row) for row in rows]
119
120 async def get_labels_for_subjects(self, subjects: List[str]) -> Dict[str, List[Dict[str, Any]]]:
121 """Get all labels for multiple subjects"""
122 async with self.db_pool.acquire() as conn:
123 rows = await conn.fetch(
124 """
125 SELECT uri, src, subject, val, neg, created_at
126 FROM labels
127 WHERE subject = ANY($1::text[])
128 ORDER BY created_at ASC
129 """,
130 subjects
131 )
132
133 label_map: Dict[str, List[Dict[str, Any]]] = {}
134 for row in rows:
135 subject = row['subject']
136 if subject not in label_map:
137 label_map[subject] = []
138 label_map[subject].append(dict(row))
139
140 return label_map
141
142 async def query_labels(
143 self,
144 sources: Optional[List[str]] = None,
145 subjects: Optional[List[str]] = None,
146 values: Optional[List[str]] = None,
147 limit: int = 100
148 ) -> List[Dict[str, Any]]:
149 """Query labels with filters"""
150 conditions = []
151 params = []
152 param_idx = 1
153
154 if sources:
155 conditions.append(f"src = ANY(${param_idx}::text[])")
156 params.append(sources)
157 param_idx += 1
158
159 if subjects:
160 conditions.append(f"subject = ANY(${param_idx}::text[])")
161 params.append(subjects)
162 param_idx += 1
163
164 if values:
165 conditions.append(f"val = ANY(${param_idx}::text[])")
166 params.append(values)
167 param_idx += 1
168
169 where_clause = " AND ".join(conditions) if conditions else "1=1"
170 params.append(limit)
171
172 query = f"""
173 SELECT uri, src, subject, val, neg, created_at
174 FROM labels
175 WHERE {where_clause}
176 ORDER BY created_at DESC
177 LIMIT ${param_idx}
178 """
179
180 async with self.db_pool.acquire() as conn:
181 rows = await conn.fetch(query, *params)
182 return [dict(row) for row in rows]
183
184 async def get_active_labels_for_subject(self, subject: str) -> List[Dict[str, Any]]:
185 """Get active (non-negated) labels for a subject"""
186 labels = await self.get_labels_for_subject(subject)
187 return self.filter_negated_labels(labels)
188
189 async def get_active_labels_for_subjects(self, subjects: List[str]) -> Dict[str, List[Dict[str, Any]]]:
190 """Get active (non-negated) labels for multiple subjects"""
191 all_labels = await self.get_labels_for_subjects(subjects)
192
193 result = {}
194 for subject, labels in all_labels.items():
195 result[subject] = self.filter_negated_labels(labels)
196
197 return result
198
199 def filter_negated_labels(self, labels: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
200 """Filter out negated labels"""
201 label_map: Dict[str, Dict[str, Any]] = {}
202
203 # Sort by createdAt
204 sorted_labels = sorted(labels, key=lambda x: x['createdAt'])
205
206 for label in sorted_labels:
207 key = f"{label['subject']}:{label['val']}"
208
209 if label['neg']:
210 # Negation removes the label
211 label_map.pop(key, None)
212 else:
213 # Add or update label
214 label_map[key] = label
215
216 return list(label_map.values())
217
218 async def create_label_definition(
219 self,
220 value: str,
221 description: Optional[str] = None,
222 severity: str = 'warn',
223 localized_strings: Optional[Dict[str, Any]] = None
224 ) -> Dict[str, Any]:
225 """Create a label definition"""
226 async with self.db_pool.acquire() as conn:
227 await conn.execute(
228 """
229 INSERT INTO label_definitions (value, description, severity, localized_strings)
230 VALUES ($1, $2, $3, $4::jsonb)
231 ON CONFLICT (value) DO UPDATE SET
232 description = EXCLUDED.description,
233 severity = EXCLUDED.severity,
234 localized_strings = EXCLUDED.localized_strings
235 """,
236 value, description, severity, localized_strings or {}
237 )
238
239 return {
240 'value': value,
241 'description': description,
242 'severity': severity,
243 'localizedStrings': localized_strings or {}
244 }
245
246 async def get_label_definition(self, value: str) -> Optional[Dict[str, Any]]:
247 """Get a label definition"""
248 async with self.db_pool.acquire() as conn:
249 row = await conn.fetchrow(
250 """
251 SELECT value, description, severity, localized_strings
252 FROM label_definitions
253 WHERE value = $1
254 """,
255 value
256 )
257
258 return dict(row) if row else None
259
260 async def get_all_label_definitions(self) -> List[Dict[str, Any]]:
261 """Get all label definitions"""
262 async with self.db_pool.acquire() as conn:
263 rows = await conn.fetch(
264 """
265 SELECT value, description, severity, localized_strings
266 FROM label_definitions
267 ORDER BY value
268 """
269 )
270
271 return [dict(row) for row in rows]