Bluesky app fork with some witchin' additions 馃挮
at post-text-option 352 lines 9.5 kB view raw
1import { 2 AtpAgent, 3 CredentialSession, 4 type ToolsOzoneSafelinkDefs, 5 type ToolsOzoneSafelinkQueryEvents, 6} from '@atproto/api' 7import {ExpiredTokenError} from '@atproto/api/dist/client/types/com/atproto/server/confirmEmail.js' 8import {MINUTE} from '@atproto/common' 9import {LRUCache} from 'lru-cache' 10 11import {type ServiceConfig} from '../config.js' 12import type Database from '../db/index.js' 13import {type SafelinkRule} from '../db/schema.js' 14import {redirectLogger} from '../logger.js' 15 16const SAFELINK_MIN_FETCH_INTERVAL = 1_000 17const SAFELINK_MAX_FETCH_INTERVAL = 10_000 18const SCHEME_REGEX = /^[a-zA-Z][a-zA-Z0-9+.-]*:\/\// 19 20export class SafelinkClient { 21 private domainCache: LRUCache<string, SafelinkRule | 'ok'> 22 private urlCache: LRUCache<string, SafelinkRule | 'ok'> 23 24 private db: Database 25 26 private ozoneAgent: OzoneAgent 27 28 private cursor?: string 29 30 constructor({cfg, db}: {cfg: ServiceConfig; db: Database}) { 31 this.domainCache = new LRUCache<string, SafelinkRule | 'ok'>({ 32 max: 10000, 33 }) 34 35 this.urlCache = new LRUCache<string, SafelinkRule | 'ok'>({ 36 max: 25000, 37 }) 38 39 this.db = db 40 41 this.ozoneAgent = new OzoneAgent( 42 cfg.safelinkPdsUrl!, 43 cfg.safelinkAgentIdentifier!, 44 cfg.safelinkAgentPass!, 45 ) 46 } 47 48 public async tryFindRule(link: string): Promise<SafelinkRule | 'ok'> { 49 let url: string 50 let domain: string 51 try { 52 url = SafelinkClient.normalizeUrl(link) 53 domain = SafelinkClient.normalizeDomain(link) 54 } catch (e) { 55 redirectLogger.error( 56 {error: e, inputUrl: link}, 57 'failed to normalize looked up link', 58 ) 59 // fail open 60 return 'ok' 61 } 62 63 // First, check if there is an existing URL rule. Note that even if the rule is 'ok', we still 64 // want to check for a blocking domain rule, so we will only return here if the url rule exists 65 // _and_ it is not 'ok'. 66 const urlRule = this.urlCache.get(url) 67 if (urlRule && urlRule !== 'ok') { 68 return urlRule 69 } 70 71 // If we find a domain rule of _any_ kind, including 'ok', we can now return that rule. 72 const domainRule = this.domainCache.get(domain) 73 if (domainRule) { 74 return domainRule 75 } 76 77 try { 78 const maybeUrlRule = await this.getRule(this.db, url, 'url') 79 this.urlCache.set(url, maybeUrlRule) 80 return maybeUrlRule 81 } catch (e) { 82 this.urlCache.set(url, 'ok') 83 } 84 85 try { 86 const maybeDomainRule = await this.getRule(this.db, domain, 'domain') 87 this.domainCache.set(domain, maybeDomainRule) 88 return maybeDomainRule 89 } catch (e) { 90 this.domainCache.set(domain, 'ok') 91 } 92 93 return 'ok' 94 } 95 96 private async getRule( 97 db: Database, 98 url: string, 99 pattern: ToolsOzoneSafelinkDefs.PatternType, 100 ): Promise<SafelinkRule> { 101 return db.db 102 .selectFrom('safelink_rule') 103 .selectAll() 104 .where('url', '=', url) 105 .where('pattern', '=', pattern) 106 .orderBy('createdAt', 'desc') 107 .executeTakeFirstOrThrow() 108 } 109 110 private async addRule(db: Database, rule: SafelinkRule) { 111 try { 112 if (rule.pattern === 'url') { 113 rule.url = SafelinkClient.normalizeUrl(rule.url) 114 } else if (rule.pattern === 'domain') { 115 rule.url = SafelinkClient.normalizeDomain(rule.url) 116 } 117 } catch (e) { 118 redirectLogger.error( 119 {error: e, inputUrl: rule.url}, 120 'failed to normalize rule input URL', 121 ) 122 return 123 } 124 125 db.db 126 .insertInto('safelink_rule') 127 .values({ 128 id: rule.id, 129 eventType: rule.eventType, 130 url: rule.url, 131 pattern: rule.pattern, 132 action: rule.action, 133 createdAt: rule.createdAt, 134 }) 135 .execute() 136 .catch(err => { 137 redirectLogger.error( 138 {error: err, rule}, 139 'failed to add rule to database', 140 ) 141 }) 142 143 if (rule.pattern === 'domain') { 144 this.domainCache.delete(rule.url) 145 } else { 146 this.urlCache.delete(rule.url) 147 } 148 } 149 150 private async removeRule(db: Database, rule: SafelinkRule) { 151 try { 152 if (rule.pattern === 'url') { 153 rule.url = SafelinkClient.normalizeUrl(rule.url) 154 } else if (rule.pattern === 'domain') { 155 rule.url = SafelinkClient.normalizeDomain(rule.url) 156 } 157 } catch (e) { 158 redirectLogger.error( 159 {error: e, inputUrl: rule.url}, 160 'failed to normalize rule input URL', 161 ) 162 return 163 } 164 165 await db.db 166 .deleteFrom('safelink_rule') 167 .where('pattern', '=', 'domain') 168 .where('url', '=', rule.url) 169 .execute() 170 .catch(err => { 171 redirectLogger.error( 172 {error: err, rule}, 173 'failed to remove rule from database', 174 ) 175 }) 176 177 if (rule.pattern === 'domain') { 178 this.domainCache.delete(rule.url) 179 } else { 180 this.urlCache.delete(rule.url) 181 } 182 } 183 184 public async runFetchEvents() { 185 let agent: AtpAgent 186 try { 187 agent = await this.ozoneAgent.getAgent() 188 } catch (err) { 189 redirectLogger.error({error: err}, 'error getting Ozone agent') 190 setTimeout(() => this.runFetchEvents(), SAFELINK_MAX_FETCH_INTERVAL) 191 return 192 } 193 194 let res: ToolsOzoneSafelinkQueryEvents.Response 195 try { 196 const cursor = await this.getCursor() 197 res = await agent.tools.ozone.safelink.queryEvents({ 198 cursor, 199 limit: 100, 200 sortDirection: 'asc', 201 }) 202 } catch (err) { 203 if (err instanceof ExpiredTokenError) { 204 redirectLogger.info('ozone agent had expired session, refreshing...') 205 await this.ozoneAgent.refreshSession() 206 setTimeout(() => this.runFetchEvents(), SAFELINK_MIN_FETCH_INTERVAL) 207 return 208 } 209 210 redirectLogger.error( 211 {error: err}, 212 'error fetching safelink events from Ozone', 213 ) 214 setTimeout(() => this.runFetchEvents(), SAFELINK_MAX_FETCH_INTERVAL) 215 return 216 } 217 218 if (res.data.events.length === 0) { 219 redirectLogger.info('received no new safelink events from ozone') 220 setTimeout(() => this.runFetchEvents(), SAFELINK_MAX_FETCH_INTERVAL) 221 } else { 222 await this.db.transaction(async db => { 223 for (const rule of res.data.events) { 224 switch (rule.eventType) { 225 case 'removeRule': 226 await this.removeRule(db, rule) 227 break 228 case 'addRule': 229 case 'updateRule': 230 await this.addRule(db, rule) 231 break 232 default: 233 redirectLogger.warn({rule}, 'received unknown rule event type') 234 } 235 } 236 }) 237 if (res.data.cursor) { 238 redirectLogger.info( 239 {cursor: res.data.cursor}, 240 'received new safelink events from Ozone', 241 ) 242 await this.setCursor(res.data.cursor) 243 } 244 setTimeout(() => this.runFetchEvents(), SAFELINK_MIN_FETCH_INTERVAL) 245 } 246 } 247 248 private async getCursor() { 249 if (this.cursor === '') { 250 const res = await this.db.db 251 .selectFrom('safelink_cursor') 252 .selectAll() 253 .where('id', '=', 1) 254 .executeTakeFirst() 255 if (!res) { 256 return '' 257 } 258 this.cursor = res.cursor 259 } 260 return this.cursor 261 } 262 263 private async setCursor(cursor: string) { 264 const updatedAt = new Date() 265 try { 266 await this.db.db 267 .insertInto('safelink_cursor') 268 .values({ 269 id: 1, 270 cursor, 271 updatedAt, 272 }) 273 .onConflict(oc => oc.column('id').doUpdateSet({cursor, updatedAt})) 274 .execute() 275 this.cursor = cursor 276 } catch (err) { 277 redirectLogger.error({error: err}, 'failed to update safelink cursor') 278 } 279 } 280 281 private static normalizeUrl(input: string) { 282 if (!SCHEME_REGEX.test(input)) { 283 input = `https://${input}` 284 } 285 const u = new URL(input) 286 u.hash = '' 287 let normalized = u.href.replace(SCHEME_REGEX, '').toLowerCase() 288 if (normalized.endsWith('/')) { 289 normalized = normalized.substring(0, normalized.length - 1) 290 } 291 return normalized 292 } 293 294 private static normalizeDomain(input: string) { 295 if (!SCHEME_REGEX.test(input)) { 296 input = `https://${input}` 297 } 298 const u = new URL(input) 299 return u.host.toLowerCase() 300 } 301} 302 303export class OzoneAgent { 304 private identifier: string 305 private password: string 306 307 private session: CredentialSession 308 private agent: AtpAgent 309 310 private refreshAt = 0 311 312 constructor(pdsHost: string, identifier: string, password: string) { 313 this.identifier = identifier 314 this.password = password 315 316 this.session = new CredentialSession(new URL(pdsHost)) 317 this.agent = new AtpAgent(this.session) 318 } 319 320 public async getAgent() { 321 if (!this.identifier && !this.password) { 322 throw new Error( 323 'OZONE_AGENT_HANDLE and OZONE_AGENT_PASS environment variables must be set', 324 ) 325 } 326 327 if (!this.session.hasSession) { 328 redirectLogger.info('creating Ozone session') 329 await this.session.login({ 330 identifier: this.identifier, 331 password: this.password, 332 }) 333 redirectLogger.info('ozone session created successfully') 334 this.refreshAt = Date.now() + 50 * MINUTE 335 } 336 337 if (Date.now() <= this.refreshAt) { 338 await this.refreshSession() 339 } 340 341 return this.agent 342 } 343 344 public async refreshSession() { 345 try { 346 await this.session.refreshSession() 347 this.refreshAt = Date.now() + 50 * MINUTE 348 } catch (e) { 349 redirectLogger.error({error: e}, 'error refreshing session') 350 } 351 } 352}