The 1st decentralized social network for sharing when you're on the toilet. Post a "flush" today! Powered by the AT Protocol.
1// jetstream-consumer.js
2// Script to consume Bluesky firehose via Jetstream and save records to Supabase
3
4import WebSocket from 'ws';
5import { createClient } from '@supabase/supabase-js';
6import dotenv from 'dotenv';
7import fs from 'fs';
8import path from 'path';
9import https from 'https';
10import { promisify } from 'util';
11
12// Load environment variables
13dotenv.config();
14
15// Configure Supabase client
16const supabaseUrl = process.env.SUPABASE_URL;
17const supabaseKey = process.env.SUPABASE_KEY;
18const supabase = createClient(supabaseUrl, supabaseKey);
19
20// Configure Jetstream connection
21const JETSTREAM_URL = 'wss://jetstream2.us-east.bsky.network/subscribe';
22const WANTED_COLLECTION = 'im.flushing.right.now';
23const CURSOR_FILE_PATH = path.join(process.cwd(), 'cursor.txt');
24
25// Read cursor from file if it exists
26function loadCursor() {
27 try {
28 if (fs.existsSync(CURSOR_FILE_PATH)) {
29 const cursor = fs.readFileSync(CURSOR_FILE_PATH, 'utf8').trim();
30 console.log(`Loaded cursor: ${cursor}`);
31 return cursor;
32 }
33 } catch (error) {
34 console.error('Error loading cursor:', error);
35 }
36 return null;
37}
38
39// Save cursor to file
40function saveCursor(cursor) {
41 try {
42 fs.writeFileSync(CURSOR_FILE_PATH, cursor.toString());
43 } catch (error) {
44 console.error('Error saving cursor:', error);
45 }
46}
47
48// Utility function to add response headers to avoid rate limiting
49function getRequestOptions(url) {
50 const parsedUrl = new URL(url);
51 return {
52 hostname: parsedUrl.hostname,
53 path: parsedUrl.pathname + parsedUrl.search,
54 headers: {
55 'User-Agent': 'FlushingRecorder/1.0 (https://example.com/)',
56 'Accept': 'application/json'
57 },
58 timeout: 10000
59 };
60}
61
62// Resolve a DID to a handle using multiple methods
63async function resolveDIDToHandle(did) {
64 console.log(`Attempting to resolve DID: ${did}`);
65
66 // Make sure the DID is properly formatted
67 if (!did || !did.startsWith('did:')) {
68 console.error(`Invalid DID format: ${did}`);
69 return null;
70 }
71
72 // Method 1: Try the Bluesky API (most reliable)
73 try {
74 console.log(`Trying Bluesky API method for ${did}`);
75 const handle = await resolveDIDWithBskyAPI(did);
76 if (handle) {
77 console.log(`Bluesky API resolved ${did} to ${handle}`);
78 return handle;
79 }
80 } catch (error) {
81 console.error(`Bluesky API method failed for ${did}:`, error);
82 }
83
84 // Method 2: Try the PLC directory
85 try {
86 console.log(`Trying PLC directory method for ${did}`);
87 const handle = await resolveDIDWithPLC(did);
88 if (handle) {
89 console.log(`PLC directory resolved ${did} to ${handle}`);
90 return handle;
91 }
92 } catch (error) {
93 console.error(`PLC directory method failed for ${did}:`, error);
94 }
95
96 // Method 3: Try handle resolver (unlikely to work for DIDs, but worth a try)
97 try {
98 console.log(`Trying handle resolver method for ${did}`);
99 const handle = await resolveDIDWithHandleResolver(did);
100 if (handle) {
101 console.log(`Handle resolver resolved ${did} to ${handle}`);
102 return handle;
103 }
104 } catch (error) {
105 console.error(`Handle resolver method failed for ${did}:`, error);
106 }
107
108 console.log(`All resolution methods failed for ${did}`);
109 return null;
110}
111
112// Method 1: Resolve using PLC directory
113async function resolveDIDWithPLC(did) {
114 return new Promise((resolve, reject) => {
115 const url = `https://plc.directory/${encodeURIComponent(did)}`;
116 console.log(`Making PLC directory request to: ${url}`);
117
118 const options = getRequestOptions(url);
119
120 const req = https.get(options, (res) => {
121 let data = '';
122
123 // Log response status
124 console.log(`PLC Directory response status: ${res.statusCode}`);
125
126 res.on('data', (chunk) => {
127 data += chunk;
128 });
129
130 res.on('end', () => {
131 try {
132 console.log(`PLC raw response for ${did}: ${data.substring(0, 300)}...`);
133
134 if (res.statusCode !== 200) {
135 console.warn(`Failed to resolve DID ${did} with PLC: HTTP ${res.statusCode}`);
136 resolve(null);
137 return;
138 }
139
140 // Try to parse as JSON first
141 try {
142 const didDoc = JSON.parse(data);
143
144 // Extract handle from alsoKnownAs
145 if (didDoc.alsoKnownAs && Array.isArray(didDoc.alsoKnownAs) && didDoc.alsoKnownAs.length > 0) {
146 console.log(`Found alsoKnownAs entries: ${JSON.stringify(didDoc.alsoKnownAs)}`);
147
148 // Look for value starting with "at://"
149 const atValue = didDoc.alsoKnownAs.find(value => value.startsWith('at://'));
150 if (atValue) {
151 const handle = atValue.replace('at://', '');
152 console.log(`Successfully resolved ${did} to handle: ${handle}`);
153 resolve(handle);
154 return;
155 } else {
156 console.warn(`No 'at://' prefix found in alsoKnownAs for ${did}`);
157 }
158 } else {
159 console.warn(`No alsoKnownAs property found in DID document for ${did}`);
160 }
161 } catch (jsonError) {
162 console.log(`JSON parsing failed, trying regex: ${jsonError.message}`);
163 }
164
165 // If JSON parsing fails or doesn't find handle, try regex as fallback
166 const atMatch = data.match(/at:\/\/([^"'\\s]+)/);
167 if (atMatch && atMatch[1]) {
168 const handle = atMatch[1];
169 console.log(`Regex extracted handle for ${did}: ${handle}`);
170 resolve(handle);
171 return;
172 }
173
174 resolve(null); // No handle found
175 } catch (error) {
176 console.error(`Error parsing PLC directory response for ${did}:`, error);
177 resolve(null);
178 }
179 });
180 });
181
182 req.on('error', (error) => {
183 console.error(`Error fetching PLC document for ${did}:`, error);
184 resolve(null);
185 });
186
187 req.on('timeout', () => {
188 console.error(`PLC request timeout for ${did}`);
189 req.destroy();
190 resolve(null);
191 });
192 });
193}
194
195// Method 2: Resolve using Bluesky API
196async function resolveDIDWithBskyAPI(did) {
197 return new Promise((resolve, reject) => {
198 // The Bluesky API endpoint for DID-to-handle resolution
199 const url = `https://public.api.bsky.app/xrpc/com.atproto.repo.describeRepo?repo=${encodeURIComponent(did)}`;
200 console.log(`Making Bluesky API request to: ${url}`);
201
202 const options = getRequestOptions(url);
203
204 const req = https.get(options, (res) => {
205 let data = '';
206
207 // Log response status
208 console.log(`Bluesky API response status: ${res.statusCode}`);
209
210 res.on('data', (chunk) => {
211 data += chunk;
212 });
213
214 res.on('end', () => {
215 try {
216 if (res.statusCode !== 200) {
217 console.warn(`Failed to resolve DID ${did} with Bluesky API: HTTP ${res.statusCode}`);
218 resolve(null);
219 return;
220 }
221
222 const repoInfo = JSON.parse(data);
223
224 if (repoInfo && repoInfo.handle) {
225 const handle = repoInfo.handle;
226 console.log(`Successfully resolved ${did} to handle: ${handle} using Bluesky API`);
227 resolve(handle);
228 return;
229 }
230
231 resolve(null); // No handle found
232 } catch (error) {
233 console.error(`Error parsing Bluesky API response for ${did}:`, error);
234 resolve(null);
235 }
236 });
237 });
238
239 req.on('error', (error) => {
240 console.error(`Error fetching from Bluesky API for ${did}:`, error);
241 resolve(null);
242 });
243
244 req.on('timeout', () => {
245 console.error(`Bluesky API request timeout for ${did}`);
246 req.destroy();
247 resolve(null);
248 });
249 });
250}
251
252// Method 3: Try Bluesky official handle resolver
253async function resolveDIDWithHandleResolver(did) {
254 try {
255 // First check if this is already a handle format (user.bsky.social)
256 if (did.includes('.') && !did.startsWith('did:')) {
257 console.log(`Input appears to be a handle already: ${did}`);
258 return did;
259 }
260
261 return new Promise((resolve, reject) => {
262 const url = `https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?handle=${encodeURIComponent(did)}`;
263 console.log(`Making handle resolver request to: ${url}`);
264
265 const options = getRequestOptions(url);
266
267 const req = https.get(options, (res) => {
268 let data = '';
269
270 // Log response status
271 console.log(`Handle resolver response status: ${res.statusCode}`);
272
273 res.on('data', (chunk) => {
274 data += chunk;
275 });
276
277 res.on('end', () => {
278 try {
279 if (res.statusCode !== 200) {
280 console.warn(`Failed to resolve ${did} with handle resolver: HTTP ${res.statusCode}`);
281 resolve(null);
282 return;
283 }
284
285 const response = JSON.parse(data);
286
287 if (response && response.did === did) {
288 // This means we resolved a handle to a DID, but we want the opposite
289 resolve(null);
290 return;
291 }
292
293 resolve(null); // No handle found
294 } catch (error) {
295 console.error(`Error parsing handle resolver response for ${did}:`, error);
296 resolve(null);
297 }
298 });
299 });
300
301 req.on('error', (error) => {
302 console.error(`Error fetching from handle resolver for ${did}:`, error);
303 resolve(null);
304 });
305
306 req.on('timeout', () => {
307 console.error(`Handle resolver request timeout for ${did}`);
308 req.destroy();
309 resolve(null);
310 });
311 });
312 } catch (error) {
313 console.error(`Exception in handle resolver for ${did}:`, error);
314 return null;
315 }
316}
317
318// Process Jetstream event
319async function processEvent(event) {
320 try {
321 // Save the cursor for each event we process
322 saveCursor(event.time_us);
323
324 // Only process commit events
325 if (event.kind !== 'commit') {
326 // Don't log skipped events to reduce noise
327 return;
328 }
329
330 // Only process commits for our target collection
331 if (event.commit.collection !== WANTED_COLLECTION) {
332 // Don't log skipped collections to reduce noise
333 return;
334 }
335
336 // Now we can log since we know it's relevant
337 console.log(`Processing event: ${JSON.stringify(event).substring(0, 500)}...`);
338
339 // Extract record data
340 const { did, time_us } = event;
341 const { operation, collection, rkey, record, cid } = event.commit;
342
343 console.log(`Processing ${operation} operation for DID: ${did}, collection: ${collection}, rkey: ${rkey}`);
344
345 // Construct the URI for the record
346 const recordUri = `at://${did}/${collection}/${rkey}`;
347
348 // Handle delete operations
349 if (operation === 'delete') {
350 console.log(`Processing delete operation for URI: ${recordUri}`);
351
352 try {
353 const { data, error } = await supabase
354 .from('flushing_records')
355 .delete()
356 .eq('uri', recordUri);
357
358 if (error) {
359 console.error(`Error deleting record: ${error.message}`);
360 } else {
361 console.log(`Successfully deleted record: ${recordUri}`);
362 }
363 } catch (deleteError) {
364 console.error(`Exception while deleting record: ${deleteError.message}`);
365 }
366
367 return; // Done processing delete
368 }
369
370 // Handle update operations (which are represented as 'update' in Jetstream)
371 if (operation === 'update') {
372 console.log(`Processing update operation for URI: ${recordUri}`);
373 // Updates are handled the same way as creates - we'll update the existing record
374 // Fall through to the normal processing below
375 }
376
377 // Try different approaches to get a handle
378
379 // Approach 1: Check if handle is already in the record
380 let handle = null;
381 if (record && record.handle) {
382 console.log(`Found handle in record: ${record.handle}`);
383 handle = record.handle;
384 }
385
386 // Approach 2: Try to resolve via APIs
387 if (!handle) {
388 console.log(`Resolving handle for DID: ${did}`);
389 handle = await resolveDIDToHandle(did);
390
391 if (handle) {
392 console.log(`Successfully resolved handle: ${handle}`);
393 } else {
394 console.log(`Failed to resolve handle for DID: ${did}`);
395
396 // Check existing records in database for this DID
397 try {
398 const { data, error } = await supabase
399 .from('flushing_records')
400 .select('handle')
401 .eq('did', did)
402 .not('handle', 'is', null)
403 .not('handle', 'eq', 'unknown')
404 .order('indexed_at', { ascending: false })
405 .limit(1);
406
407 if (!error && data && data.length > 0 && data[0].handle) {
408 handle = data[0].handle;
409 console.log(`Found handle in database for DID ${did}: ${handle}`);
410 } else {
411 console.log(`No existing handle found in database for DID: ${did}`);
412 handle = 'unknown'; // Set explicitly to unknown
413 }
414 } catch (dbError) {
415 console.error(`Error checking database for existing handle: ${dbError.message}`);
416 handle = 'unknown'; // Set explicitly if DB query fails
417 }
418 }
419 }
420
421 // Double-check that we have a handle, default to 'unknown' if not
422 if (!handle) {
423 console.log(`No handle could be resolved for DID ${did}, using 'unknown'`);
424 handle = 'unknown';
425 }
426
427 // Prepare data for insertion - DO NOT include id field at all
428 const recordData = {
429 did,
430 collection,
431 type: record?.$type,
432 created_at: record?.createdAt || new Date().toISOString(),
433 emoji: record?.emoji,
434 text: record?.text,
435 cid,
436 uri: `at://${did}/${collection}/${rkey}`,
437 indexed_at: new Date().toISOString(),
438 handle: handle // This will never be null or undefined now
439 };
440
441 console.log(`Preparing to insert/update record with handle '${recordData.handle}'`);
442
443 // First check if the record already exists
444 const { data: existingData, error: checkError } = await supabase
445 .from('flushing_records')
446 .select('id, handle')
447 .eq('uri', recordData.uri)
448 .limit(1);
449
450 let result;
451
452 if (checkError) {
453 console.error(`Error checking if record exists: ${checkError.message}`);
454 return;
455 }
456
457 // If record exists, update it
458 if (existingData && existingData.length > 0) {
459 console.log(`Record with URI ${recordData.uri} already exists, updating`);
460
461 // If existing record has a valid handle and current handle is 'unknown', use the existing handle
462 if (existingData[0].handle && existingData[0].handle !== 'unknown' && recordData.handle === 'unknown') {
463 console.log(`Keeping existing handle '${existingData[0].handle}' instead of replacing with 'unknown'`);
464 recordData.handle = existingData[0].handle;
465 }
466
467 const { data, error } = await supabase
468 .from('flushing_records')
469 .update(recordData)
470 .eq('uri', recordData.uri);
471
472 result = { data, error };
473 }
474 // Otherwise insert a new record
475 else {
476 console.log(`Record with URI ${recordData.uri} doesn't exist, inserting with handle: ${recordData.handle}`);
477 const { data, error } = await supabase
478 .from('flushing_records')
479 .insert(recordData);
480
481 result = { data, error };
482 }
483
484 // Check the result of the operation
485 if (result.error) {
486 console.error(`Error saving record to Supabase: ${result.error.message}`);
487 console.error(`Failed record data: ${JSON.stringify(recordData)}`);
488 } else {
489 console.log(`Successfully saved record: ${recordData.uri} (handle: ${recordData.handle})`);
490 }
491
492 } catch (error) {
493 console.error(`Error processing event: ${error.message}`);
494 console.error(error.stack);
495 }
496}
497
498// Process 'identity' events when they come through the firehose
499async function processIdentityEvent(event) {
500 try {
501 if (event.kind !== 'identity' || !event.identity) {
502 return;
503 }
504
505 const { did, handle } = event.identity;
506
507 if (did && handle) {
508 // Check if we have any records with this DID that have 'unknown' handles
509 try {
510 const { data, error } = await supabase
511 .from('flushing_records')
512 .select('uri')
513 .eq('did', did)
514 .eq('handle', 'unknown');
515
516 if (!error && data && data.length > 0) {
517 console.log(`Found ${data.length} records with DID ${did} and unknown handle. Updating to ${handle}...`);
518
519 // Update all matching records with the new handle
520 const { updateData, updateError } = await supabase
521 .from('flushing_records')
522 .update({ handle })
523 .eq('did', did)
524 .eq('handle', 'unknown');
525
526 if (updateError) {
527 console.error(`Error updating records with DID ${did}: ${updateError.message}`);
528 } else {
529 console.log(`Successfully updated handle for records with DID ${did} to ${handle}`);
530 }
531 }
532 } catch (dbError) {
533 console.error(`Error updating unknown handles: ${dbError.message}`);
534 }
535 }
536 } catch (error) {
537 console.error(`Error processing identity event: ${error.message}`);
538 }
539}
540
541// Connect to Jetstream and process events
542function connectToJetstream() {
543 const cursor = loadCursor();
544
545 // Building the URL with query parameters - now include identity events!
546 // Including identity events will help us maintain DID-to-handle mapping
547 let url = `${JETSTREAM_URL}?wantedCollections=${WANTED_COLLECTION}`;
548 if (cursor) {
549 // Subtract a few seconds (in microseconds) to ensure no gaps
550 const rewindCursor = parseInt(cursor) - 5000000; // 5 seconds in microseconds
551 url += `&cursor=${rewindCursor}`;
552 }
553
554 console.log(`Connecting to Jetstream: ${url}`);
555
556 const ws = new WebSocket(url);
557
558 ws.on('open', () => {
559 console.log('Connected to Jetstream');
560 });
561
562 ws.on('message', async (data) => {
563 try {
564 const event = JSON.parse(data.toString());
565
566 // Process identity events to keep our DID-to-handle mapping up to date
567 if (event.kind === 'identity') {
568 await processIdentityEvent(event);
569 }
570
571 // Process other events normally
572 await processEvent(event);
573 } catch (error) {
574 console.error('Error parsing message:', error);
575 // Don't log message data to reduce noise
576 }
577 });
578
579 ws.on('error', (error) => {
580 console.error('WebSocket error:', error);
581 setTimeout(connectToJetstream, 5000); // Reconnect after 5 seconds
582 });
583
584 ws.on('close', () => {
585 console.log('Connection closed. Attempting to reconnect...');
586 setTimeout(connectToJetstream, 5000); // Reconnect after 5 seconds
587 });
588
589 // Heartbeat to keep the connection alive
590 const interval = setInterval(() => {
591 if (ws.readyState === WebSocket.OPEN) {
592 ws.ping();
593 } else {
594 clearInterval(interval);
595 }
596 }, 30000);
597}
598
599// Start the application
600function start() {
601 console.log('Starting Jetstream consumer...');
602 connectToJetstream();
603
604 // Handle process termination
605 process.on('SIGINT', () => {
606 console.log('Process terminated. Exiting...');
607 process.exit(0);
608 });
609}
610
611start();