A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.

Refactor to use p-ratelimit instead of custom implementation

- Replace custom RateLimiter with p-ratelimit library
- Configure: 3000 requests per 5min, 48 concurrency, 60s max delay
- Wrap API calls with p-ratelimit + retry logic
- All tests passing

Uses existing dependency instead of reinventing the wheel.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

+73 -71
+28 -27
src/hydration/posts.service.ts
··· 2 2 import { Database } from "duckdb"; 3 3 import { PostsRepository } from "../database/posts.repository.js"; 4 4 import { BlobProcessor } from "../blobs/processor.js"; 5 - import { RateLimiter } from "../utils/rate-limit.js"; 5 + import { pRateLimit } from "p-ratelimit"; 6 6 import { withRetry, isRateLimitError, isNetworkError, isServerError } from "../utils/retry.js"; 7 7 import { logger } from "../logger/index.js"; 8 8 import { config } from "../config/index.js"; ··· 11 11 private agent: AtpAgent; 12 12 private postsRepo: PostsRepository; 13 13 private blobProcessor: BlobProcessor; 14 - private rateLimiter: RateLimiter; 14 + private limit: ReturnType<typeof pRateLimit>; 15 15 16 16 constructor(db: Database) { 17 17 this.agent = new AtpAgent({ service: `https://${config.bsky.pds}` }); 18 18 this.postsRepo = new PostsRepository(db); 19 19 this.blobProcessor = new BlobProcessor(db, this.agent); 20 - this.rateLimiter = new RateLimiter({ 21 - maxTokens: 3000, 22 - refillRate: 10, 23 - refillInterval: 100, 20 + this.limit = pRateLimit({ 21 + interval: 300000, 22 + rate: 3000, 23 + concurrency: 48, 24 + maxDelay: 60000, 24 25 }); 25 26 } 26 27 ··· 53 54 54 55 const [did, collection, rkey] = uriParts; 55 56 56 - await this.rateLimiter.acquire(1); 57 - 58 - const response = await withRetry( 59 - async () => { 60 - return await this.agent.com.atproto.repo.getRecord({ 61 - repo: did, 62 - collection, 63 - rkey, 64 - }); 65 - }, 66 - { 67 - maxAttempts: 3, 68 - initialDelay: 1000, 69 - maxDelay: 10000, 70 - backoffMultiplier: 2, 71 - retryableErrors: [ 72 - isRateLimitError, 73 - isNetworkError, 74 - isServerError, 75 - ], 76 - } 57 + const response = await this.limit(() => 58 + withRetry( 59 + async () => { 60 + return await this.agent.com.atproto.repo.getRecord({ 61 + repo: did, 62 + collection, 63 + rkey, 64 + }); 65 + }, 66 + { 67 + maxAttempts: 3, 68 + initialDelay: 1000, 69 + maxDelay: 10000, 70 + backoffMultiplier: 2, 71 + retryableErrors: [ 72 + isRateLimitError, 73 + isNetworkError, 74 + isServerError, 75 + ], 76 + } 77 + ) 77 78 ); 78 79 79 80 if (!response.success || !response.data.value) {
+45 -44
src/hydration/profiles.service.ts
··· 1 1 import { AtpAgent } from "@atproto/api"; 2 2 import { Database } from "duckdb"; 3 3 import { ProfilesRepository } from "../database/profiles.repository.js"; 4 - import { RateLimiter } from "../utils/rate-limit.js"; 4 + import { pRateLimit } from "p-ratelimit"; 5 5 import { withRetry, isRateLimitError, isNetworkError, isServerError } from "../utils/retry.js"; 6 6 import { logger } from "../logger/index.js"; 7 7 import { config } from "../config/index.js"; ··· 9 9 export class ProfileHydrationService { 10 10 private agent: AtpAgent; 11 11 private profilesRepo: ProfilesRepository; 12 - private rateLimiter: RateLimiter; 12 + private limit: ReturnType<typeof pRateLimit>; 13 13 14 14 constructor(db: Database) { 15 15 this.agent = new AtpAgent({ service: `https://${config.bsky.pds}` }); 16 16 this.profilesRepo = new ProfilesRepository(db); 17 - this.rateLimiter = new RateLimiter({ 18 - maxTokens: 3000, 19 - refillRate: 10, 20 - refillInterval: 100, 17 + this.limit = pRateLimit({ 18 + interval: 300000, 19 + rate: 3000, 20 + concurrency: 48, 21 + maxDelay: 60000, 21 22 }); 22 23 } 23 24 ··· 42 43 return; 43 44 } 44 45 45 - await this.rateLimiter.acquire(1); 46 - 47 - const profileResponse = await withRetry( 48 - async () => { 49 - return await this.agent.com.atproto.repo.getRecord({ 50 - repo: did, 51 - collection: "app.bsky.actor.profile", 52 - rkey: "self", 53 - }); 54 - }, 55 - { 56 - maxAttempts: 3, 57 - initialDelay: 1000, 58 - maxDelay: 10000, 59 - backoffMultiplier: 2, 60 - retryableErrors: [ 61 - isRateLimitError, 62 - isNetworkError, 63 - isServerError, 64 - ], 65 - } 46 + const profileResponse = await this.limit(() => 47 + withRetry( 48 + async () => { 49 + return await this.agent.com.atproto.repo.getRecord({ 50 + repo: did, 51 + collection: "app.bsky.actor.profile", 52 + rkey: "self", 53 + }); 54 + }, 55 + { 56 + maxAttempts: 3, 57 + initialDelay: 1000, 58 + maxDelay: 10000, 59 + backoffMultiplier: 2, 60 + retryableErrors: [ 61 + isRateLimitError, 62 + isNetworkError, 63 + isServerError, 64 + ], 65 + } 66 + ) 66 67 ); 67 68 68 69 let displayName: string | undefined; ··· 74 75 description = record.description; 75 76 } 76 77 77 - await this.rateLimiter.acquire(1); 78 - 79 - const profileLookup = await withRetry( 80 - async () => { 81 - return await this.agent.getProfile({ actor: did }); 82 - }, 83 - { 84 - maxAttempts: 3, 85 - initialDelay: 1000, 86 - maxDelay: 10000, 87 - backoffMultiplier: 2, 88 - retryableErrors: [ 89 - isRateLimitError, 90 - isNetworkError, 91 - isServerError, 92 - ], 93 - } 78 + const profileLookup = await this.limit(() => 79 + withRetry( 80 + async () => { 81 + return await this.agent.getProfile({ actor: did }); 82 + }, 83 + { 84 + maxAttempts: 3, 85 + initialDelay: 1000, 86 + maxDelay: 10000, 87 + backoffMultiplier: 2, 88 + retryableErrors: [ 89 + isRateLimitError, 90 + isNetworkError, 91 + isServerError, 92 + ], 93 + } 94 + ) 94 95 ); 95 96 96 97 let handle: string | undefined;