forked from
jollywhoppers.com/witchsky.app
Bluesky app fork with some witchin' additions 馃挮
1import {
2 AtpAgent,
3 CredentialSession,
4 type ToolsOzoneSafelinkDefs,
5 type ToolsOzoneSafelinkQueryEvents,
6} from '@atproto/api'
7import {ExpiredTokenError} from '@atproto/api/dist/client/types/com/atproto/server/confirmEmail.js'
8import {MINUTE} from '@atproto/common'
9import {LRUCache} from 'lru-cache'
10
11import {type ServiceConfig} from '../config.js'
12import type Database from '../db/index.js'
13import {type SafelinkRule} from '../db/schema.js'
14import {redirectLogger} from '../logger.js'
15
16const SAFELINK_MIN_FETCH_INTERVAL = 1_000
17const SAFELINK_MAX_FETCH_INTERVAL = 10_000
18const SCHEME_REGEX = /^[a-zA-Z][a-zA-Z0-9+.-]*:\/\//
19
20export class SafelinkClient {
21 private domainCache: LRUCache<string, SafelinkRule | 'ok'>
22 private urlCache: LRUCache<string, SafelinkRule | 'ok'>
23
24 private db: Database
25
26 private ozoneAgent: OzoneAgent
27
28 private cursor?: string
29
30 constructor({cfg, db}: {cfg: ServiceConfig; db: Database}) {
31 this.domainCache = new LRUCache<string, SafelinkRule | 'ok'>({
32 max: 10000,
33 })
34
35 this.urlCache = new LRUCache<string, SafelinkRule | 'ok'>({
36 max: 25000,
37 })
38
39 this.db = db
40
41 this.ozoneAgent = new OzoneAgent(
42 cfg.safelinkPdsUrl!,
43 cfg.safelinkAgentIdentifier!,
44 cfg.safelinkAgentPass!,
45 )
46 }
47
48 public async tryFindRule(link: string): Promise<SafelinkRule | 'ok'> {
49 let url: string
50 let domain: string
51 try {
52 url = SafelinkClient.normalizeUrl(link)
53 domain = SafelinkClient.normalizeDomain(link)
54 } catch (e) {
55 redirectLogger.error(
56 {error: e, inputUrl: link},
57 'failed to normalize looked up link',
58 )
59 // fail open
60 return 'ok'
61 }
62
63 // First, check if there is an existing URL rule. Note that even if the rule is 'ok', we still
64 // want to check for a blocking domain rule, so we will only return here if the url rule exists
65 // _and_ it is not 'ok'.
66 const urlRule = this.urlCache.get(url)
67 if (urlRule && urlRule !== 'ok') {
68 return urlRule
69 }
70
71 // If we find a domain rule of _any_ kind, including 'ok', we can now return that rule.
72 const domainRule = this.domainCache.get(domain)
73 if (domainRule) {
74 return domainRule
75 }
76
77 try {
78 const maybeUrlRule = await this.getRule(this.db, url, 'url')
79 this.urlCache.set(url, maybeUrlRule)
80 return maybeUrlRule
81 } catch (e) {
82 this.urlCache.set(url, 'ok')
83 }
84
85 try {
86 const maybeDomainRule = await this.getRule(this.db, domain, 'domain')
87 this.domainCache.set(domain, maybeDomainRule)
88 return maybeDomainRule
89 } catch (e) {
90 this.domainCache.set(domain, 'ok')
91 }
92
93 return 'ok'
94 }
95
96 private async getRule(
97 db: Database,
98 url: string,
99 pattern: ToolsOzoneSafelinkDefs.PatternType,
100 ): Promise<SafelinkRule> {
101 return db.db
102 .selectFrom('safelink_rule')
103 .selectAll()
104 .where('url', '=', url)
105 .where('pattern', '=', pattern)
106 .orderBy('createdAt', 'desc')
107 .executeTakeFirstOrThrow()
108 }
109
110 private async addRule(db: Database, rule: SafelinkRule) {
111 try {
112 if (rule.pattern === 'url') {
113 rule.url = SafelinkClient.normalizeUrl(rule.url)
114 } else if (rule.pattern === 'domain') {
115 rule.url = SafelinkClient.normalizeDomain(rule.url)
116 }
117 } catch (e) {
118 redirectLogger.error(
119 {error: e, inputUrl: rule.url},
120 'failed to normalize rule input URL',
121 )
122 return
123 }
124
125 db.db
126 .insertInto('safelink_rule')
127 .values({
128 id: rule.id,
129 eventType: rule.eventType,
130 url: rule.url,
131 pattern: rule.pattern,
132 action: rule.action,
133 createdAt: rule.createdAt,
134 })
135 .execute()
136 .catch(err => {
137 redirectLogger.error(
138 {error: err, rule},
139 'failed to add rule to database',
140 )
141 })
142
143 if (rule.pattern === 'domain') {
144 this.domainCache.delete(rule.url)
145 } else {
146 this.urlCache.delete(rule.url)
147 }
148 }
149
150 private async removeRule(db: Database, rule: SafelinkRule) {
151 try {
152 if (rule.pattern === 'url') {
153 rule.url = SafelinkClient.normalizeUrl(rule.url)
154 } else if (rule.pattern === 'domain') {
155 rule.url = SafelinkClient.normalizeDomain(rule.url)
156 }
157 } catch (e) {
158 redirectLogger.error(
159 {error: e, inputUrl: rule.url},
160 'failed to normalize rule input URL',
161 )
162 return
163 }
164
165 await db.db
166 .deleteFrom('safelink_rule')
167 .where('pattern', '=', 'domain')
168 .where('url', '=', rule.url)
169 .execute()
170 .catch(err => {
171 redirectLogger.error(
172 {error: err, rule},
173 'failed to remove rule from database',
174 )
175 })
176
177 if (rule.pattern === 'domain') {
178 this.domainCache.delete(rule.url)
179 } else {
180 this.urlCache.delete(rule.url)
181 }
182 }
183
184 public async runFetchEvents() {
185 let agent: AtpAgent
186 try {
187 agent = await this.ozoneAgent.getAgent()
188 } catch (err) {
189 redirectLogger.error({error: err}, 'error getting Ozone agent')
190 setTimeout(() => this.runFetchEvents(), SAFELINK_MAX_FETCH_INTERVAL)
191 return
192 }
193
194 let res: ToolsOzoneSafelinkQueryEvents.Response
195 try {
196 const cursor = await this.getCursor()
197 res = await agent.tools.ozone.safelink.queryEvents({
198 cursor,
199 limit: 100,
200 sortDirection: 'asc',
201 })
202 } catch (err) {
203 if (err instanceof ExpiredTokenError) {
204 redirectLogger.info('ozone agent had expired session, refreshing...')
205 await this.ozoneAgent.refreshSession()
206 setTimeout(() => this.runFetchEvents(), SAFELINK_MIN_FETCH_INTERVAL)
207 return
208 }
209
210 redirectLogger.error(
211 {error: err},
212 'error fetching safelink events from Ozone',
213 )
214 setTimeout(() => this.runFetchEvents(), SAFELINK_MAX_FETCH_INTERVAL)
215 return
216 }
217
218 if (res.data.events.length === 0) {
219 redirectLogger.info('received no new safelink events from ozone')
220 setTimeout(() => this.runFetchEvents(), SAFELINK_MAX_FETCH_INTERVAL)
221 } else {
222 await this.db.transaction(async db => {
223 for (const rule of res.data.events) {
224 switch (rule.eventType) {
225 case 'removeRule':
226 await this.removeRule(db, rule)
227 break
228 case 'addRule':
229 case 'updateRule':
230 await this.addRule(db, rule)
231 break
232 default:
233 redirectLogger.warn({rule}, 'received unknown rule event type')
234 }
235 }
236 })
237 if (res.data.cursor) {
238 redirectLogger.info(
239 {cursor: res.data.cursor},
240 'received new safelink events from Ozone',
241 )
242 await this.setCursor(res.data.cursor)
243 }
244 setTimeout(() => this.runFetchEvents(), SAFELINK_MIN_FETCH_INTERVAL)
245 }
246 }
247
248 private async getCursor() {
249 if (this.cursor === '') {
250 const res = await this.db.db
251 .selectFrom('safelink_cursor')
252 .selectAll()
253 .where('id', '=', 1)
254 .executeTakeFirst()
255 if (!res) {
256 return ''
257 }
258 this.cursor = res.cursor
259 }
260 return this.cursor
261 }
262
263 private async setCursor(cursor: string) {
264 const updatedAt = new Date()
265 try {
266 await this.db.db
267 .insertInto('safelink_cursor')
268 .values({
269 id: 1,
270 cursor,
271 updatedAt,
272 })
273 .onConflict(oc => oc.column('id').doUpdateSet({cursor, updatedAt}))
274 .execute()
275 this.cursor = cursor
276 } catch (err) {
277 redirectLogger.error({error: err}, 'failed to update safelink cursor')
278 }
279 }
280
281 private static normalizeUrl(input: string) {
282 if (!SCHEME_REGEX.test(input)) {
283 input = `https://${input}`
284 }
285 const u = new URL(input)
286 u.hash = ''
287 let normalized = u.href.replace(SCHEME_REGEX, '').toLowerCase()
288 if (normalized.endsWith('/')) {
289 normalized = normalized.substring(0, normalized.length - 1)
290 }
291 return normalized
292 }
293
294 private static normalizeDomain(input: string) {
295 if (!SCHEME_REGEX.test(input)) {
296 input = `https://${input}`
297 }
298 const u = new URL(input)
299 return u.host.toLowerCase()
300 }
301}
302
303export class OzoneAgent {
304 private identifier: string
305 private password: string
306
307 private session: CredentialSession
308 private agent: AtpAgent
309
310 private refreshAt = 0
311
312 constructor(pdsHost: string, identifier: string, password: string) {
313 this.identifier = identifier
314 this.password = password
315
316 this.session = new CredentialSession(new URL(pdsHost))
317 this.agent = new AtpAgent(this.session)
318 }
319
320 public async getAgent() {
321 if (!this.identifier && !this.password) {
322 throw new Error(
323 'OZONE_AGENT_HANDLE and OZONE_AGENT_PASS environment variables must be set',
324 )
325 }
326
327 if (!this.session.hasSession) {
328 redirectLogger.info('creating Ozone session')
329 await this.session.login({
330 identifier: this.identifier,
331 password: this.password,
332 })
333 redirectLogger.info('ozone session created successfully')
334 this.refreshAt = Date.now() + 50 * MINUTE
335 }
336
337 if (Date.now() <= this.refreshAt) {
338 await this.refreshSession()
339 }
340
341 return this.agent
342 }
343
344 public async refreshSession() {
345 try {
346 await this.session.refreshSession()
347 this.refreshAt = Date.now() + 50 * MINUTE
348 } catch (e) {
349 redirectLogger.error({error: e}, 'error refreshing session')
350 }
351 }
352}