A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.
at main 123 lines 3.2 kB view raw
1import { Database } from "duckdb"; 2import { logger } from "../logger/index.js"; 3 4export interface Blob { 5 post_uri: string; 6 blob_cid: string; 7 sha256: string; 8 phash?: string; 9 storage_path?: string; 10 mimetype?: string; 11} 12 13export class BlobsRepository { 14 constructor(private db: Database) {} 15 16 async insert(blob: Blob): Promise<void> { 17 return new Promise((resolve, reject) => { 18 this.db.prepare( 19 ` 20 INSERT INTO blobs (post_uri, blob_cid, sha256, phash, storage_path, mimetype) 21 VALUES ($1, $2, $3, $4, $5, $6) 22 ON CONFLICT (post_uri, blob_cid) DO UPDATE SET 23 sha256 = EXCLUDED.sha256, 24 phash = EXCLUDED.phash, 25 storage_path = EXCLUDED.storage_path, 26 mimetype = EXCLUDED.mimetype 27 `, 28 (err, stmt) => { 29 if (err) { 30 logger.error({ err }, "Failed to prepare blob insert statement"); 31 reject(err); 32 return; 33 } 34 35 stmt.run( 36 blob.post_uri, 37 blob.blob_cid, 38 blob.sha256, 39 blob.phash || null, 40 blob.storage_path || null, 41 blob.mimetype || null, 42 (err) => { 43 if (err) { 44 logger.error({ err, blob }, "Failed to insert blob"); 45 reject(err); 46 return; 47 } 48 resolve(); 49 } 50 ); 51 } 52 ); 53 }); 54 } 55 56 async findByPostUri(postUri: string): Promise<Blob[]> { 57 return new Promise((resolve, reject) => { 58 this.db.all( 59 `SELECT * FROM blobs WHERE post_uri = $1`, 60 postUri, 61 (err, rows: Blob[]) => { 62 if (err) { 63 logger.error({ err, postUri }, "Failed to find blobs by post URI"); 64 reject(err); 65 return; 66 } 67 resolve(rows || []); 68 } 69 ); 70 }); 71 } 72 73 async findBySha256(sha256: string): Promise<Blob | null> { 74 return new Promise((resolve, reject) => { 75 this.db.all( 76 `SELECT * FROM blobs WHERE sha256 = $1 LIMIT 1`, 77 sha256, 78 (err, rows: Blob[]) => { 79 if (err) { 80 logger.error({ err, sha256 }, "Failed to find blob by SHA256"); 81 reject(err); 82 return; 83 } 84 resolve(rows?.[0] || null); 85 } 86 ); 87 }); 88 } 89 90 async findByPhash(phash: string): Promise<Blob[]> { 91 return new Promise((resolve, reject) => { 92 this.db.all( 93 `SELECT * FROM blobs WHERE phash = $1`, 94 phash, 95 (err, rows: Blob[]) => { 96 if (err) { 97 logger.error({ err, phash }, "Failed to find blobs by pHash"); 98 reject(err); 99 return; 100 } 101 resolve(rows || []); 102 } 103 ); 104 }); 105 } 106 107 async findByCid(cid: string): Promise<Blob | null> { 108 return new Promise((resolve, reject) => { 109 this.db.all( 110 `SELECT * FROM blobs WHERE blob_cid = $1 LIMIT 1`, 111 cid, 112 (err, rows: Blob[]) => { 113 if (err) { 114 logger.error({ err, cid }, "Failed to find blob by CID"); 115 reject(err); 116 return; 117 } 118 resolve(rows?.[0] || null); 119 } 120 ); 121 }); 122 } 123}