A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.
1import { Database } from "duckdb";
2import { logger } from "../logger/index.js";
3
4export interface Blob {
5 post_uri: string;
6 blob_cid: string;
7 sha256: string;
8 phash?: string;
9 storage_path?: string;
10 mimetype?: string;
11}
12
13export class BlobsRepository {
14 constructor(private db: Database) {}
15
16 async insert(blob: Blob): Promise<void> {
17 return new Promise((resolve, reject) => {
18 this.db.prepare(
19 `
20 INSERT INTO blobs (post_uri, blob_cid, sha256, phash, storage_path, mimetype)
21 VALUES ($1, $2, $3, $4, $5, $6)
22 ON CONFLICT (post_uri, blob_cid) DO UPDATE SET
23 sha256 = EXCLUDED.sha256,
24 phash = EXCLUDED.phash,
25 storage_path = EXCLUDED.storage_path,
26 mimetype = EXCLUDED.mimetype
27 `,
28 (err, stmt) => {
29 if (err) {
30 logger.error({ err }, "Failed to prepare blob insert statement");
31 reject(err);
32 return;
33 }
34
35 stmt.run(
36 blob.post_uri,
37 blob.blob_cid,
38 blob.sha256,
39 blob.phash || null,
40 blob.storage_path || null,
41 blob.mimetype || null,
42 (err) => {
43 if (err) {
44 logger.error({ err, blob }, "Failed to insert blob");
45 reject(err);
46 return;
47 }
48 resolve();
49 }
50 );
51 }
52 );
53 });
54 }
55
56 async findByPostUri(postUri: string): Promise<Blob[]> {
57 return new Promise((resolve, reject) => {
58 this.db.all(
59 `SELECT * FROM blobs WHERE post_uri = $1`,
60 postUri,
61 (err, rows: Blob[]) => {
62 if (err) {
63 logger.error({ err, postUri }, "Failed to find blobs by post URI");
64 reject(err);
65 return;
66 }
67 resolve(rows || []);
68 }
69 );
70 });
71 }
72
73 async findBySha256(sha256: string): Promise<Blob | null> {
74 return new Promise((resolve, reject) => {
75 this.db.all(
76 `SELECT * FROM blobs WHERE sha256 = $1 LIMIT 1`,
77 sha256,
78 (err, rows: Blob[]) => {
79 if (err) {
80 logger.error({ err, sha256 }, "Failed to find blob by SHA256");
81 reject(err);
82 return;
83 }
84 resolve(rows?.[0] || null);
85 }
86 );
87 });
88 }
89
90 async findByPhash(phash: string): Promise<Blob[]> {
91 return new Promise((resolve, reject) => {
92 this.db.all(
93 `SELECT * FROM blobs WHERE phash = $1`,
94 phash,
95 (err, rows: Blob[]) => {
96 if (err) {
97 logger.error({ err, phash }, "Failed to find blobs by pHash");
98 reject(err);
99 return;
100 }
101 resolve(rows || []);
102 }
103 );
104 });
105 }
106
107 async findByCid(cid: string): Promise<Blob | null> {
108 return new Promise((resolve, reject) => {
109 this.db.all(
110 `SELECT * FROM blobs WHERE blob_cid = $1 LIMIT 1`,
111 cid,
112 (err, rows: Blob[]) => {
113 if (err) {
114 logger.error({ err, cid }, "Failed to find blob by CID");
115 reject(err);
116 return;
117 }
118 resolve(rows?.[0] || null);
119 }
120 );
121 });
122 }
123}