A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.

feat: add timestamp to profile_blobs for change tracking

- Add captured_at timestamp to track when blobs were observed
- Change PK to (did, blob_type, captured_at) to allow history
- Add findLatestByDidAndType method to check current state
- Only insert new row if CID has changed from latest
- Enables tracking when users change avatars/banners

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

+28 -14
+23 -9
src/database/profile-blobs.repository.ts
··· 9 9 phash?: string; 10 10 storage_path?: string; 11 11 mimetype?: string; 12 + captured_at?: Date; 12 13 } 13 14 14 15 export class ProfileBlobsRepository { ··· 18 19 return new Promise((resolve, reject) => { 19 20 this.db.prepare( 20 21 ` 21 - INSERT INTO profile_blobs (did, blob_type, blob_cid, sha256, phash, storage_path, mimetype) 22 - VALUES ($1, $2, $3, $4, $5, $6, $7) 23 - ON CONFLICT (did, blob_type) DO UPDATE SET 24 - blob_cid = EXCLUDED.blob_cid, 25 - sha256 = EXCLUDED.sha256, 26 - phash = EXCLUDED.phash, 27 - storage_path = EXCLUDED.storage_path, 28 - mimetype = EXCLUDED.mimetype 22 + INSERT INTO profile_blobs (did, blob_type, blob_cid, sha256, phash, storage_path, mimetype, captured_at) 23 + VALUES ($1, $2, $3, $4, $5, $6, $7, COALESCE($8, CURRENT_TIMESTAMP)) 29 24 `, 30 25 (err, stmt) => { 31 26 if (err) { ··· 42 37 blob.phash || null, 43 38 blob.storage_path || null, 44 39 blob.mimetype || null, 40 + blob.captured_at || null, 45 41 (err) => { 46 42 if (err) { 47 43 logger.error({ err, blob }, "Failed to insert profile blob"); ··· 59 55 async findByDid(did: string): Promise<ProfileBlob[]> { 60 56 return new Promise((resolve, reject) => { 61 57 this.db.all( 62 - `SELECT * FROM profile_blobs WHERE did = $1`, 58 + `SELECT * FROM profile_blobs WHERE did = $1 ORDER BY captured_at DESC`, 63 59 did, 64 60 (err, rows: ProfileBlob[]) => { 65 61 if (err) { ··· 68 64 return; 69 65 } 70 66 resolve(rows || []); 67 + } 68 + ); 69 + }); 70 + } 71 + 72 + async findLatestByDidAndType(did: string, blobType: "avatar" | "banner"): Promise<ProfileBlob | null> { 73 + return new Promise((resolve, reject) => { 74 + this.db.all( 75 + `SELECT * FROM profile_blobs WHERE did = $1 AND blob_type = $2 ORDER BY captured_at DESC LIMIT 1`, 76 + did, 77 + blobType, 78 + (err, rows: ProfileBlob[]) => { 79 + if (err) { 80 + logger.error({ err, did, blobType }, "Failed to find latest profile blob"); 81 + reject(err); 82 + return; 83 + } 84 + resolve(rows?.[0] || null); 71 85 } 72 86 ); 73 87 });
+2 -1
src/database/schema.ts
··· 60 60 phash TEXT, 61 61 storage_path TEXT, 62 62 mimetype TEXT, 63 - PRIMARY KEY (did, blob_type), 63 + captured_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 64 + PRIMARY KEY (did, blob_type, captured_at), 64 65 FOREIGN KEY (did) REFERENCES profiles(did) 65 66 ); 66 67
+3 -4
src/hydration/profiles.service.ts
··· 187 187 cid: string, 188 188 type: "avatar" | "banner" 189 189 ): Promise<void> { 190 - const existing = await this.profileBlobsRepo.findByDid(did); 191 - const existingBlob = existing.find(b => b.blob_type === type && b.blob_cid === cid); 190 + const latestBlob = await this.profileBlobsRepo.findLatestByDidAndType(did, type); 192 191 193 - if (existingBlob) { 194 - logger.debug({ did, cid, type }, "Blob already processed, skipping"); 192 + if (latestBlob && latestBlob.blob_cid === cid) { 193 + logger.debug({ did, cid, type }, "Latest blob already has same CID, skipping"); 195 194 return; 196 195 } 197 196