The 1st decentralized social network for sharing when you're on the toilet. Post a "flush" today! Powered by the AT Protocol.
at main 611 lines 20 kB view raw
1// jetstream-consumer.js 2// Script to consume Bluesky firehose via Jetstream and save records to Supabase 3 4import WebSocket from 'ws'; 5import { createClient } from '@supabase/supabase-js'; 6import dotenv from 'dotenv'; 7import fs from 'fs'; 8import path from 'path'; 9import https from 'https'; 10import { promisify } from 'util'; 11 12// Load environment variables 13dotenv.config(); 14 15// Configure Supabase client 16const supabaseUrl = process.env.SUPABASE_URL; 17const supabaseKey = process.env.SUPABASE_KEY; 18const supabase = createClient(supabaseUrl, supabaseKey); 19 20// Configure Jetstream connection 21const JETSTREAM_URL = 'wss://jetstream2.us-east.bsky.network/subscribe'; 22const WANTED_COLLECTION = 'im.flushing.right.now'; 23const CURSOR_FILE_PATH = path.join(process.cwd(), 'cursor.txt'); 24 25// Read cursor from file if it exists 26function loadCursor() { 27 try { 28 if (fs.existsSync(CURSOR_FILE_PATH)) { 29 const cursor = fs.readFileSync(CURSOR_FILE_PATH, 'utf8').trim(); 30 console.log(`Loaded cursor: ${cursor}`); 31 return cursor; 32 } 33 } catch (error) { 34 console.error('Error loading cursor:', error); 35 } 36 return null; 37} 38 39// Save cursor to file 40function saveCursor(cursor) { 41 try { 42 fs.writeFileSync(CURSOR_FILE_PATH, cursor.toString()); 43 } catch (error) { 44 console.error('Error saving cursor:', error); 45 } 46} 47 48// Utility function to add response headers to avoid rate limiting 49function getRequestOptions(url) { 50 const parsedUrl = new URL(url); 51 return { 52 hostname: parsedUrl.hostname, 53 path: parsedUrl.pathname + parsedUrl.search, 54 headers: { 55 'User-Agent': 'FlushingRecorder/1.0 (https://example.com/)', 56 'Accept': 'application/json' 57 }, 58 timeout: 10000 59 }; 60} 61 62// Resolve a DID to a handle using multiple methods 63async function resolveDIDToHandle(did) { 64 console.log(`Attempting to resolve DID: ${did}`); 65 66 // Make sure the DID is properly formatted 67 if (!did || !did.startsWith('did:')) { 68 console.error(`Invalid DID format: ${did}`); 69 return null; 70 } 71 72 // Method 1: Try the Bluesky API (most reliable) 73 try { 74 console.log(`Trying Bluesky API method for ${did}`); 75 const handle = await resolveDIDWithBskyAPI(did); 76 if (handle) { 77 console.log(`Bluesky API resolved ${did} to ${handle}`); 78 return handle; 79 } 80 } catch (error) { 81 console.error(`Bluesky API method failed for ${did}:`, error); 82 } 83 84 // Method 2: Try the PLC directory 85 try { 86 console.log(`Trying PLC directory method for ${did}`); 87 const handle = await resolveDIDWithPLC(did); 88 if (handle) { 89 console.log(`PLC directory resolved ${did} to ${handle}`); 90 return handle; 91 } 92 } catch (error) { 93 console.error(`PLC directory method failed for ${did}:`, error); 94 } 95 96 // Method 3: Try handle resolver (unlikely to work for DIDs, but worth a try) 97 try { 98 console.log(`Trying handle resolver method for ${did}`); 99 const handle = await resolveDIDWithHandleResolver(did); 100 if (handle) { 101 console.log(`Handle resolver resolved ${did} to ${handle}`); 102 return handle; 103 } 104 } catch (error) { 105 console.error(`Handle resolver method failed for ${did}:`, error); 106 } 107 108 console.log(`All resolution methods failed for ${did}`); 109 return null; 110} 111 112// Method 1: Resolve using PLC directory 113async function resolveDIDWithPLC(did) { 114 return new Promise((resolve, reject) => { 115 const url = `https://plc.directory/${encodeURIComponent(did)}`; 116 console.log(`Making PLC directory request to: ${url}`); 117 118 const options = getRequestOptions(url); 119 120 const req = https.get(options, (res) => { 121 let data = ''; 122 123 // Log response status 124 console.log(`PLC Directory response status: ${res.statusCode}`); 125 126 res.on('data', (chunk) => { 127 data += chunk; 128 }); 129 130 res.on('end', () => { 131 try { 132 console.log(`PLC raw response for ${did}: ${data.substring(0, 300)}...`); 133 134 if (res.statusCode !== 200) { 135 console.warn(`Failed to resolve DID ${did} with PLC: HTTP ${res.statusCode}`); 136 resolve(null); 137 return; 138 } 139 140 // Try to parse as JSON first 141 try { 142 const didDoc = JSON.parse(data); 143 144 // Extract handle from alsoKnownAs 145 if (didDoc.alsoKnownAs && Array.isArray(didDoc.alsoKnownAs) && didDoc.alsoKnownAs.length > 0) { 146 console.log(`Found alsoKnownAs entries: ${JSON.stringify(didDoc.alsoKnownAs)}`); 147 148 // Look for value starting with "at://" 149 const atValue = didDoc.alsoKnownAs.find(value => value.startsWith('at://')); 150 if (atValue) { 151 const handle = atValue.replace('at://', ''); 152 console.log(`Successfully resolved ${did} to handle: ${handle}`); 153 resolve(handle); 154 return; 155 } else { 156 console.warn(`No 'at://' prefix found in alsoKnownAs for ${did}`); 157 } 158 } else { 159 console.warn(`No alsoKnownAs property found in DID document for ${did}`); 160 } 161 } catch (jsonError) { 162 console.log(`JSON parsing failed, trying regex: ${jsonError.message}`); 163 } 164 165 // If JSON parsing fails or doesn't find handle, try regex as fallback 166 const atMatch = data.match(/at:\/\/([^"'\\s]+)/); 167 if (atMatch && atMatch[1]) { 168 const handle = atMatch[1]; 169 console.log(`Regex extracted handle for ${did}: ${handle}`); 170 resolve(handle); 171 return; 172 } 173 174 resolve(null); // No handle found 175 } catch (error) { 176 console.error(`Error parsing PLC directory response for ${did}:`, error); 177 resolve(null); 178 } 179 }); 180 }); 181 182 req.on('error', (error) => { 183 console.error(`Error fetching PLC document for ${did}:`, error); 184 resolve(null); 185 }); 186 187 req.on('timeout', () => { 188 console.error(`PLC request timeout for ${did}`); 189 req.destroy(); 190 resolve(null); 191 }); 192 }); 193} 194 195// Method 2: Resolve using Bluesky API 196async function resolveDIDWithBskyAPI(did) { 197 return new Promise((resolve, reject) => { 198 // The Bluesky API endpoint for DID-to-handle resolution 199 const url = `https://public.api.bsky.app/xrpc/com.atproto.repo.describeRepo?repo=${encodeURIComponent(did)}`; 200 console.log(`Making Bluesky API request to: ${url}`); 201 202 const options = getRequestOptions(url); 203 204 const req = https.get(options, (res) => { 205 let data = ''; 206 207 // Log response status 208 console.log(`Bluesky API response status: ${res.statusCode}`); 209 210 res.on('data', (chunk) => { 211 data += chunk; 212 }); 213 214 res.on('end', () => { 215 try { 216 if (res.statusCode !== 200) { 217 console.warn(`Failed to resolve DID ${did} with Bluesky API: HTTP ${res.statusCode}`); 218 resolve(null); 219 return; 220 } 221 222 const repoInfo = JSON.parse(data); 223 224 if (repoInfo && repoInfo.handle) { 225 const handle = repoInfo.handle; 226 console.log(`Successfully resolved ${did} to handle: ${handle} using Bluesky API`); 227 resolve(handle); 228 return; 229 } 230 231 resolve(null); // No handle found 232 } catch (error) { 233 console.error(`Error parsing Bluesky API response for ${did}:`, error); 234 resolve(null); 235 } 236 }); 237 }); 238 239 req.on('error', (error) => { 240 console.error(`Error fetching from Bluesky API for ${did}:`, error); 241 resolve(null); 242 }); 243 244 req.on('timeout', () => { 245 console.error(`Bluesky API request timeout for ${did}`); 246 req.destroy(); 247 resolve(null); 248 }); 249 }); 250} 251 252// Method 3: Try Bluesky official handle resolver 253async function resolveDIDWithHandleResolver(did) { 254 try { 255 // First check if this is already a handle format (user.bsky.social) 256 if (did.includes('.') && !did.startsWith('did:')) { 257 console.log(`Input appears to be a handle already: ${did}`); 258 return did; 259 } 260 261 return new Promise((resolve, reject) => { 262 const url = `https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?handle=${encodeURIComponent(did)}`; 263 console.log(`Making handle resolver request to: ${url}`); 264 265 const options = getRequestOptions(url); 266 267 const req = https.get(options, (res) => { 268 let data = ''; 269 270 // Log response status 271 console.log(`Handle resolver response status: ${res.statusCode}`); 272 273 res.on('data', (chunk) => { 274 data += chunk; 275 }); 276 277 res.on('end', () => { 278 try { 279 if (res.statusCode !== 200) { 280 console.warn(`Failed to resolve ${did} with handle resolver: HTTP ${res.statusCode}`); 281 resolve(null); 282 return; 283 } 284 285 const response = JSON.parse(data); 286 287 if (response && response.did === did) { 288 // This means we resolved a handle to a DID, but we want the opposite 289 resolve(null); 290 return; 291 } 292 293 resolve(null); // No handle found 294 } catch (error) { 295 console.error(`Error parsing handle resolver response for ${did}:`, error); 296 resolve(null); 297 } 298 }); 299 }); 300 301 req.on('error', (error) => { 302 console.error(`Error fetching from handle resolver for ${did}:`, error); 303 resolve(null); 304 }); 305 306 req.on('timeout', () => { 307 console.error(`Handle resolver request timeout for ${did}`); 308 req.destroy(); 309 resolve(null); 310 }); 311 }); 312 } catch (error) { 313 console.error(`Exception in handle resolver for ${did}:`, error); 314 return null; 315 } 316} 317 318// Process Jetstream event 319async function processEvent(event) { 320 try { 321 // Save the cursor for each event we process 322 saveCursor(event.time_us); 323 324 // Only process commit events 325 if (event.kind !== 'commit') { 326 // Don't log skipped events to reduce noise 327 return; 328 } 329 330 // Only process commits for our target collection 331 if (event.commit.collection !== WANTED_COLLECTION) { 332 // Don't log skipped collections to reduce noise 333 return; 334 } 335 336 // Now we can log since we know it's relevant 337 console.log(`Processing event: ${JSON.stringify(event).substring(0, 500)}...`); 338 339 // Extract record data 340 const { did, time_us } = event; 341 const { operation, collection, rkey, record, cid } = event.commit; 342 343 console.log(`Processing ${operation} operation for DID: ${did}, collection: ${collection}, rkey: ${rkey}`); 344 345 // Construct the URI for the record 346 const recordUri = `at://${did}/${collection}/${rkey}`; 347 348 // Handle delete operations 349 if (operation === 'delete') { 350 console.log(`Processing delete operation for URI: ${recordUri}`); 351 352 try { 353 const { data, error } = await supabase 354 .from('flushing_records') 355 .delete() 356 .eq('uri', recordUri); 357 358 if (error) { 359 console.error(`Error deleting record: ${error.message}`); 360 } else { 361 console.log(`Successfully deleted record: ${recordUri}`); 362 } 363 } catch (deleteError) { 364 console.error(`Exception while deleting record: ${deleteError.message}`); 365 } 366 367 return; // Done processing delete 368 } 369 370 // Handle update operations (which are represented as 'update' in Jetstream) 371 if (operation === 'update') { 372 console.log(`Processing update operation for URI: ${recordUri}`); 373 // Updates are handled the same way as creates - we'll update the existing record 374 // Fall through to the normal processing below 375 } 376 377 // Try different approaches to get a handle 378 379 // Approach 1: Check if handle is already in the record 380 let handle = null; 381 if (record && record.handle) { 382 console.log(`Found handle in record: ${record.handle}`); 383 handle = record.handle; 384 } 385 386 // Approach 2: Try to resolve via APIs 387 if (!handle) { 388 console.log(`Resolving handle for DID: ${did}`); 389 handle = await resolveDIDToHandle(did); 390 391 if (handle) { 392 console.log(`Successfully resolved handle: ${handle}`); 393 } else { 394 console.log(`Failed to resolve handle for DID: ${did}`); 395 396 // Check existing records in database for this DID 397 try { 398 const { data, error } = await supabase 399 .from('flushing_records') 400 .select('handle') 401 .eq('did', did) 402 .not('handle', 'is', null) 403 .not('handle', 'eq', 'unknown') 404 .order('indexed_at', { ascending: false }) 405 .limit(1); 406 407 if (!error && data && data.length > 0 && data[0].handle) { 408 handle = data[0].handle; 409 console.log(`Found handle in database for DID ${did}: ${handle}`); 410 } else { 411 console.log(`No existing handle found in database for DID: ${did}`); 412 handle = 'unknown'; // Set explicitly to unknown 413 } 414 } catch (dbError) { 415 console.error(`Error checking database for existing handle: ${dbError.message}`); 416 handle = 'unknown'; // Set explicitly if DB query fails 417 } 418 } 419 } 420 421 // Double-check that we have a handle, default to 'unknown' if not 422 if (!handle) { 423 console.log(`No handle could be resolved for DID ${did}, using 'unknown'`); 424 handle = 'unknown'; 425 } 426 427 // Prepare data for insertion - DO NOT include id field at all 428 const recordData = { 429 did, 430 collection, 431 type: record?.$type, 432 created_at: record?.createdAt || new Date().toISOString(), 433 emoji: record?.emoji, 434 text: record?.text, 435 cid, 436 uri: `at://${did}/${collection}/${rkey}`, 437 indexed_at: new Date().toISOString(), 438 handle: handle // This will never be null or undefined now 439 }; 440 441 console.log(`Preparing to insert/update record with handle '${recordData.handle}'`); 442 443 // First check if the record already exists 444 const { data: existingData, error: checkError } = await supabase 445 .from('flushing_records') 446 .select('id, handle') 447 .eq('uri', recordData.uri) 448 .limit(1); 449 450 let result; 451 452 if (checkError) { 453 console.error(`Error checking if record exists: ${checkError.message}`); 454 return; 455 } 456 457 // If record exists, update it 458 if (existingData && existingData.length > 0) { 459 console.log(`Record with URI ${recordData.uri} already exists, updating`); 460 461 // If existing record has a valid handle and current handle is 'unknown', use the existing handle 462 if (existingData[0].handle && existingData[0].handle !== 'unknown' && recordData.handle === 'unknown') { 463 console.log(`Keeping existing handle '${existingData[0].handle}' instead of replacing with 'unknown'`); 464 recordData.handle = existingData[0].handle; 465 } 466 467 const { data, error } = await supabase 468 .from('flushing_records') 469 .update(recordData) 470 .eq('uri', recordData.uri); 471 472 result = { data, error }; 473 } 474 // Otherwise insert a new record 475 else { 476 console.log(`Record with URI ${recordData.uri} doesn't exist, inserting with handle: ${recordData.handle}`); 477 const { data, error } = await supabase 478 .from('flushing_records') 479 .insert(recordData); 480 481 result = { data, error }; 482 } 483 484 // Check the result of the operation 485 if (result.error) { 486 console.error(`Error saving record to Supabase: ${result.error.message}`); 487 console.error(`Failed record data: ${JSON.stringify(recordData)}`); 488 } else { 489 console.log(`Successfully saved record: ${recordData.uri} (handle: ${recordData.handle})`); 490 } 491 492 } catch (error) { 493 console.error(`Error processing event: ${error.message}`); 494 console.error(error.stack); 495 } 496} 497 498// Process 'identity' events when they come through the firehose 499async function processIdentityEvent(event) { 500 try { 501 if (event.kind !== 'identity' || !event.identity) { 502 return; 503 } 504 505 const { did, handle } = event.identity; 506 507 if (did && handle) { 508 // Check if we have any records with this DID that have 'unknown' handles 509 try { 510 const { data, error } = await supabase 511 .from('flushing_records') 512 .select('uri') 513 .eq('did', did) 514 .eq('handle', 'unknown'); 515 516 if (!error && data && data.length > 0) { 517 console.log(`Found ${data.length} records with DID ${did} and unknown handle. Updating to ${handle}...`); 518 519 // Update all matching records with the new handle 520 const { updateData, updateError } = await supabase 521 .from('flushing_records') 522 .update({ handle }) 523 .eq('did', did) 524 .eq('handle', 'unknown'); 525 526 if (updateError) { 527 console.error(`Error updating records with DID ${did}: ${updateError.message}`); 528 } else { 529 console.log(`Successfully updated handle for records with DID ${did} to ${handle}`); 530 } 531 } 532 } catch (dbError) { 533 console.error(`Error updating unknown handles: ${dbError.message}`); 534 } 535 } 536 } catch (error) { 537 console.error(`Error processing identity event: ${error.message}`); 538 } 539} 540 541// Connect to Jetstream and process events 542function connectToJetstream() { 543 const cursor = loadCursor(); 544 545 // Building the URL with query parameters - now include identity events! 546 // Including identity events will help us maintain DID-to-handle mapping 547 let url = `${JETSTREAM_URL}?wantedCollections=${WANTED_COLLECTION}`; 548 if (cursor) { 549 // Subtract a few seconds (in microseconds) to ensure no gaps 550 const rewindCursor = parseInt(cursor) - 5000000; // 5 seconds in microseconds 551 url += `&cursor=${rewindCursor}`; 552 } 553 554 console.log(`Connecting to Jetstream: ${url}`); 555 556 const ws = new WebSocket(url); 557 558 ws.on('open', () => { 559 console.log('Connected to Jetstream'); 560 }); 561 562 ws.on('message', async (data) => { 563 try { 564 const event = JSON.parse(data.toString()); 565 566 // Process identity events to keep our DID-to-handle mapping up to date 567 if (event.kind === 'identity') { 568 await processIdentityEvent(event); 569 } 570 571 // Process other events normally 572 await processEvent(event); 573 } catch (error) { 574 console.error('Error parsing message:', error); 575 // Don't log message data to reduce noise 576 } 577 }); 578 579 ws.on('error', (error) => { 580 console.error('WebSocket error:', error); 581 setTimeout(connectToJetstream, 5000); // Reconnect after 5 seconds 582 }); 583 584 ws.on('close', () => { 585 console.log('Connection closed. Attempting to reconnect...'); 586 setTimeout(connectToJetstream, 5000); // Reconnect after 5 seconds 587 }); 588 589 // Heartbeat to keep the connection alive 590 const interval = setInterval(() => { 591 if (ws.readyState === WebSocket.OPEN) { 592 ws.ping(); 593 } else { 594 clearInterval(interval); 595 } 596 }, 30000); 597} 598 599// Start the application 600function start() { 601 console.log('Starting Jetstream consumer...'); 602 connectToJetstream(); 603 604 // Handle process termination 605 process.on('SIGINT', () => { 606 console.log('Process terminated. Exiting...'); 607 process.exit(0); 608 }); 609} 610 611start();