A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.

feat: create separate profile_blobs table for avatars and banners

- New profile_blobs table with FK to profiles(did)
- Restored FK constraint on blobs table to posts(uri)
- Created ProfileBlobsRepository for profile blob operations
- Primary key on (did, blob_type) ensures one avatar and one banner per profile
- Proper relational model for later analysis and queries

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

+136 -54
+109
src/database/profile-blobs.repository.ts
··· 1 + import { Database } from "duckdb"; 2 + import { logger } from "../logger/index.js"; 3 + 4 + export interface ProfileBlob { 5 + did: string; 6 + blob_type: "avatar" | "banner"; 7 + blob_cid: string; 8 + sha256: string; 9 + phash?: string; 10 + storage_path?: string; 11 + mimetype?: string; 12 + } 13 + 14 + export class ProfileBlobsRepository { 15 + constructor(private db: Database) {} 16 + 17 + async insert(blob: ProfileBlob): Promise<void> { 18 + return new Promise((resolve, reject) => { 19 + this.db.prepare( 20 + ` 21 + INSERT INTO profile_blobs (did, blob_type, blob_cid, sha256, phash, storage_path, mimetype) 22 + VALUES ($1, $2, $3, $4, $5, $6, $7) 23 + ON CONFLICT (did, blob_type) DO UPDATE SET 24 + blob_cid = EXCLUDED.blob_cid, 25 + sha256 = EXCLUDED.sha256, 26 + phash = EXCLUDED.phash, 27 + storage_path = EXCLUDED.storage_path, 28 + mimetype = EXCLUDED.mimetype 29 + `, 30 + (err, stmt) => { 31 + if (err) { 32 + logger.error({ err }, "Failed to prepare profile blob insert statement"); 33 + reject(err); 34 + return; 35 + } 36 + 37 + stmt.run( 38 + blob.did, 39 + blob.blob_type, 40 + blob.blob_cid, 41 + blob.sha256, 42 + blob.phash || null, 43 + blob.storage_path || null, 44 + blob.mimetype || null, 45 + (err) => { 46 + if (err) { 47 + logger.error({ err, blob }, "Failed to insert profile blob"); 48 + reject(err); 49 + return; 50 + } 51 + resolve(); 52 + } 53 + ); 54 + } 55 + ); 56 + }); 57 + } 58 + 59 + async findByDid(did: string): Promise<ProfileBlob[]> { 60 + return new Promise((resolve, reject) => { 61 + this.db.all( 62 + `SELECT * FROM profile_blobs WHERE did = $1`, 63 + did, 64 + (err, rows: ProfileBlob[]) => { 65 + if (err) { 66 + logger.error({ err, did }, "Failed to find profile blobs by DID"); 67 + reject(err); 68 + return; 69 + } 70 + resolve(rows || []); 71 + } 72 + ); 73 + }); 74 + } 75 + 76 + async findBySha256(sha256: string): Promise<ProfileBlob | null> { 77 + return new Promise((resolve, reject) => { 78 + this.db.all( 79 + `SELECT * FROM profile_blobs WHERE sha256 = $1 LIMIT 1`, 80 + sha256, 81 + (err, rows: ProfileBlob[]) => { 82 + if (err) { 83 + logger.error({ err, sha256 }, "Failed to find profile blob by SHA256"); 84 + reject(err); 85 + return; 86 + } 87 + resolve(rows?.[0] || null); 88 + } 89 + ); 90 + }); 91 + } 92 + 93 + async findByPhash(phash: string): Promise<ProfileBlob[]> { 94 + return new Promise((resolve, reject) => { 95 + this.db.all( 96 + `SELECT * FROM profile_blobs WHERE phash = $1`, 97 + phash, 98 + (err, rows: ProfileBlob[]) => { 99 + if (err) { 100 + logger.error({ err, phash }, "Failed to find profile blobs by pHash"); 101 + reject(err); 102 + return; 103 + } 104 + resolve(rows || []); 105 + } 106 + ); 107 + }); 108 + } 109 + }
+18 -46
src/database/schema.ts
··· 39 39 banner_cid TEXT 40 40 ); 41 41 42 - -- Blobs table: stores information about image blobs found in posts and profiles 42 + -- Blobs table: stores information about image blobs found in posts 43 43 CREATE TABLE IF NOT EXISTS blobs ( 44 44 post_uri TEXT NOT NULL, 45 45 blob_cid TEXT NOT NULL, ··· 47 47 phash TEXT, 48 48 storage_path TEXT, 49 49 mimetype TEXT, 50 - PRIMARY KEY (post_uri, blob_cid) 50 + PRIMARY KEY (post_uri, blob_cid), 51 + FOREIGN KEY (post_uri) REFERENCES posts(uri) 52 + ); 53 + 54 + -- Profile blobs table: stores avatar and banner blobs for profiles 55 + CREATE TABLE IF NOT EXISTS profile_blobs ( 56 + did TEXT NOT NULL, 57 + blob_type TEXT NOT NULL CHECK (blob_type IN ('avatar', 'banner')), 58 + blob_cid TEXT NOT NULL, 59 + sha256 TEXT NOT NULL, 60 + phash TEXT, 61 + storage_path TEXT, 62 + mimetype TEXT, 63 + PRIMARY KEY (did, blob_type), 64 + FOREIGN KEY (did) REFERENCES profiles(did) 51 65 ); 52 66 53 67 -- Indexes for performance ··· 57 71 CREATE INDEX IF NOT EXISTS idx_posts_did ON posts(did); 58 72 CREATE INDEX IF NOT EXISTS idx_blobs_sha256 ON blobs(sha256); 59 73 CREATE INDEX IF NOT EXISTS idx_blobs_phash ON blobs(phash); 74 + CREATE INDEX IF NOT EXISTS idx_profile_blobs_sha256 ON profile_blobs(sha256); 75 + CREATE INDEX IF NOT EXISTS idx_profile_blobs_phash ON profile_blobs(phash); 60 76 `; 61 77 62 78 async function migrateProfilesTable(): Promise<void> { ··· 105 121 }); 106 122 } 107 123 108 - async function migrateBlobsTableConstraint(): Promise<void> { 109 - const db = getDatabase(); 110 - 111 - return new Promise((resolve, reject) => { 112 - db.all( 113 - `SELECT constraint_name FROM information_schema.table_constraints 114 - WHERE table_name = 'blobs' AND constraint_type = 'FOREIGN KEY'`, 115 - (err, rows: any[]) => { 116 - if (err) { 117 - logger.error({ err }, "Failed to check blobs table constraints"); 118 - reject(err); 119 - return; 120 - } 121 - 122 - if (rows && rows.length > 0) { 123 - logger.info("Migrating blobs table to remove foreign key constraint"); 124 - 125 - const migration = ` 126 - CREATE TABLE blobs_new AS SELECT * FROM blobs; 127 - DROP TABLE blobs; 128 - ALTER TABLE blobs_new RENAME TO blobs; 129 - CREATE INDEX IF NOT EXISTS idx_blobs_sha256 ON blobs(sha256); 130 - CREATE INDEX IF NOT EXISTS idx_blobs_phash ON blobs(phash); 131 - `; 132 - 133 - db.exec(migration, (err) => { 134 - if (err) { 135 - logger.error({ err }, "Failed to migrate blobs table"); 136 - reject(err); 137 - return; 138 - } 139 - logger.info("Blobs table migration completed"); 140 - resolve(); 141 - }); 142 - } else { 143 - logger.debug("Blobs table already has no foreign key constraint"); 144 - resolve(); 145 - } 146 - } 147 - ); 148 - }); 149 - } 150 - 151 124 export async function initializeSchema(): Promise<void> { 152 125 const db = getDatabase(); 153 126 ··· 162 135 163 136 try { 164 137 await migrateProfilesTable(); 165 - await migrateBlobsTableConstraint(); 166 138 resolve(); 167 139 } catch (migrationErr) { 168 140 reject(migrationErr);
+9 -8
src/hydration/profiles.service.ts
··· 1 1 import { AtpAgent } from "@atproto/api"; 2 2 import { Database } from "duckdb"; 3 3 import { ProfilesRepository } from "../database/profiles.repository.js"; 4 - import { BlobsRepository } from "../database/blobs.repository.js"; 4 + import { ProfileBlobsRepository } from "../database/profile-blobs.repository.js"; 5 5 import { computeBlobHashes } from "../blobs/hasher.js"; 6 6 import { pRateLimit } from "p-ratelimit"; 7 7 import { withRetry, isRateLimitError, isNetworkError, isServerError } from "../utils/retry.js"; ··· 11 11 export class ProfileHydrationService { 12 12 private agent: AtpAgent; 13 13 private profilesRepo: ProfilesRepository; 14 - private blobsRepo: BlobsRepository; 14 + private profileBlobsRepo: ProfileBlobsRepository; 15 15 private limit: ReturnType<typeof pRateLimit>; 16 16 17 17 constructor(db: Database) { 18 18 this.agent = new AtpAgent({ service: `https://${config.bsky.pds}` }); 19 19 this.profilesRepo = new ProfilesRepository(db); 20 - this.blobsRepo = new BlobsRepository(db); 20 + this.profileBlobsRepo = new ProfileBlobsRepository(db); 21 21 this.limit = pRateLimit({ 22 22 interval: 300000, 23 23 rate: 3000, ··· 187 187 cid: string, 188 188 type: "avatar" | "banner" 189 189 ): Promise<void> { 190 - const postUri = `profile://${did}/${type}`; 191 - const existing = await this.blobsRepo.findByPostUri(postUri); 190 + const existing = await this.profileBlobsRepo.findByDid(did); 191 + const existingBlob = existing.find(b => b.blob_type === type && b.blob_cid === cid); 192 192 193 - if (existing.length > 0 && existing.some(b => b.blob_cid === cid)) { 193 + if (existingBlob) { 194 194 logger.debug({ did, cid, type }, "Blob already processed, skipping"); 195 195 return; 196 196 } ··· 212 212 const blobData = Buffer.from(await blobResponse.arrayBuffer()); 213 213 const hashes = await computeBlobHashes(blobData, "image/jpeg"); 214 214 215 - await this.blobsRepo.insert({ 216 - post_uri: postUri, 215 + await this.profileBlobsRepo.insert({ 216 + did, 217 + blob_type: type, 217 218 blob_cid: cid, 218 219 sha256: hashes.sha256, 219 220 phash: hashes.phash,