A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.

feat: hydrate profile avatars and banners

Extends profile hydration to capture and process avatar and banner images
from app.bsky.actor.profile records. Profile blobs are processed using the
existing blob infrastructure for deduplication, hashing, and storage.

Changes:
- Add avatar_cid and banner_cid columns to profiles table
- Update Profile interface and repository to support new fields
- Extract avatar/banner blob references from profile records
- Process profile blobs using BlobProcessor with special URI format
(profile://{did}/avatar and profile://{did}/banner)
- Add test coverage for profiles with avatar/banner CIDs
- Reuse existing blob deduplication and storage logic

Profile blobs are treated the same as post blobs, respecting the
HYDRATE_BLOBS configuration and benefiting from cross-entity deduplication.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

+86 -6
+9 -3
src/database/profiles.repository.ts
··· 6 6 handle?: string; 7 7 display_name?: string; 8 8 description?: string; 9 + avatar_cid?: string; 10 + banner_cid?: string; 9 11 } 10 12 11 13 export class ProfilesRepository { ··· 15 17 return new Promise((resolve, reject) => { 16 18 this.db.prepare( 17 19 ` 18 - INSERT INTO profiles (did, handle, display_name, description) 19 - VALUES ($1, $2, $3, $4) 20 + INSERT INTO profiles (did, handle, display_name, description, avatar_cid, banner_cid) 21 + VALUES ($1, $2, $3, $4, $5, $6) 20 22 ON CONFLICT (did) DO UPDATE SET 21 23 handle = EXCLUDED.handle, 22 24 display_name = EXCLUDED.display_name, 23 - description = EXCLUDED.description 25 + description = EXCLUDED.description, 26 + avatar_cid = EXCLUDED.avatar_cid, 27 + banner_cid = EXCLUDED.banner_cid 24 28 `, 25 29 (err, stmt) => { 26 30 if (err) { ··· 34 38 profile.handle || null, 35 39 profile.display_name || null, 36 40 profile.description || null, 41 + profile.avatar_cid || null, 42 + profile.banner_cid || null, 37 43 (err) => { 38 44 if (err) { 39 45 logger.error({ err, profile }, "Failed to insert profile");
+3 -1
src/database/schema.ts
··· 34 34 did TEXT PRIMARY KEY, 35 35 handle TEXT, 36 36 display_name TEXT, 37 - description TEXT 37 + description TEXT, 38 + avatar_cid TEXT, 39 + banner_cid TEXT 38 40 ); 39 41 40 42 -- Blobs table: stores information about image blobs found in posts
+53 -1
src/hydration/profiles.service.ts
··· 1 1 import { AtpAgent } from "@atproto/api"; 2 2 import { Database } from "duckdb"; 3 3 import { ProfilesRepository } from "../database/profiles.repository.js"; 4 + import { BlobProcessor } from "../blobs/processor.js"; 4 5 import { pRateLimit } from "p-ratelimit"; 5 6 import { withRetry, isRateLimitError, isNetworkError, isServerError } from "../utils/retry.js"; 6 7 import { logger } from "../logger/index.js"; ··· 9 10 export class ProfileHydrationService { 10 11 private agent: AtpAgent; 11 12 private profilesRepo: ProfilesRepository; 13 + private blobProcessor: BlobProcessor; 12 14 private limit: ReturnType<typeof pRateLimit>; 13 15 14 16 constructor(db: Database) { 15 17 this.agent = new AtpAgent({ service: `https://${config.bsky.pds}` }); 16 18 this.profilesRepo = new ProfilesRepository(db); 19 + this.blobProcessor = new BlobProcessor(db, this.agent); 17 20 this.limit = pRateLimit({ 18 21 interval: 300000, 19 22 rate: 3000, ··· 68 71 69 72 let displayName: string | undefined; 70 73 let description: string | undefined; 74 + let avatarCid: string | undefined; 75 + let bannerCid: string | undefined; 71 76 72 77 if (profileResponse.success && profileResponse.data.value) { 73 78 const record = profileResponse.data.value as any; 74 79 displayName = record.displayName; 75 80 description = record.description; 81 + 82 + if (record.avatar?.ref?.$link) { 83 + avatarCid = record.avatar.ref.$link; 84 + } 85 + if (record.banner?.ref?.$link) { 86 + bannerCid = record.banner.ref.$link; 87 + } 76 88 } 77 89 78 90 const profileLookup = await this.limit(() => ··· 104 116 handle, 105 117 display_name: displayName, 106 118 description, 119 + avatar_cid: avatarCid, 120 + banner_cid: bannerCid, 107 121 }); 108 122 109 - logger.info({ did, handle }, "Profile hydrated successfully"); 123 + if (avatarCid) { 124 + try { 125 + await this.blobProcessor.processBlobs(`profile://${did}/avatar`, [ 126 + { 127 + images: [ 128 + { 129 + image: { 130 + ref: { $link: avatarCid }, 131 + mimeType: "image/jpeg", 132 + }, 133 + }, 134 + ], 135 + }, 136 + ]); 137 + } catch (error) { 138 + logger.warn({ error, did }, "Failed to process avatar blob"); 139 + } 140 + } 141 + 142 + if (bannerCid) { 143 + try { 144 + await this.blobProcessor.processBlobs(`profile://${did}/banner`, [ 145 + { 146 + images: [ 147 + { 148 + image: { 149 + ref: { $link: bannerCid }, 150 + mimeType: "image/jpeg", 151 + }, 152 + }, 153 + ], 154 + }, 155 + ]); 156 + } catch (error) { 157 + logger.warn({ error, did }, "Failed to process banner blob"); 158 + } 159 + } 160 + 161 + logger.info({ did, handle, avatarCid, bannerCid }, "Profile hydrated successfully"); 110 162 } catch (error) { 111 163 logger.error({ error, did }, "Failed to hydrate profile"); 112 164 throw error;
+21 -1
tests/integration/database.test.ts
··· 48 48 did TEXT PRIMARY KEY, 49 49 handle TEXT, 50 50 display_name TEXT, 51 - description TEXT 51 + description TEXT, 52 + avatar_cid TEXT, 53 + banner_cid TEXT 52 54 ); 53 55 54 56 CREATE TABLE IF NOT EXISTS blobs ( ··· 145 147 const found = await profilesRepo.findByHandle("testuser.bsky.social"); 146 148 expect(found).not.toBeNull(); 147 149 expect(found?.did).toBe("did:plc:testuser"); 150 + }); 151 + 152 + test("should insert and retrieve profile with avatar and banner", async () => { 153 + const profile = { 154 + did: "did:plc:testuser2", 155 + handle: "testuser2.bsky.social", 156 + display_name: "Test User 2", 157 + description: "A test user with avatar", 158 + avatar_cid: "bafyavatartest", 159 + banner_cid: "bafybannertest", 160 + }; 161 + 162 + await profilesRepo.insert(profile); 163 + const found = await profilesRepo.findByDid(profile.did); 164 + 165 + expect(found).not.toBeNull(); 166 + expect(found?.avatar_cid).toBe("bafyavatartest"); 167 + expect(found?.banner_cid).toBe("bafybannertest"); 148 168 }); 149 169 }); 150 170