A simple tool which lets you scrape twitter accounts and crosspost them to bluesky accounts! Comes with a CLI and a webapp for managing profiles! Works with images/videos/link embeds/threads.

Perf: Implement parallel account processing and early fetch stopping

jack d285c939 c2ae4f73

+75 -11
+75 -11
src/index.ts
··· 732 732 return chunks; 733 733 } 734 734 735 + // Simple p-limit implementation for concurrency control 736 + const pLimit = (concurrency: number) => { 737 + const queue: (() => Promise<void>)[] = []; 738 + let activeCount = 0; 739 + 740 + const next = () => { 741 + activeCount--; 742 + if (queue.length > 0) { 743 + queue.shift()!(); 744 + } 745 + }; 746 + 747 + return <T>(fn: () => Promise<T>): Promise<T> => { 748 + return new Promise<T>((resolve, reject) => { 749 + const run = async () => { 750 + activeCount++; 751 + try { 752 + resolve(await fn()); 753 + } catch (e) { 754 + reject(e); 755 + } finally { 756 + next(); 757 + } 758 + }; 759 + 760 + if (activeCount < concurrency) { 761 + run(); 762 + } else { 763 + queue.push(run); 764 + } 765 + }); 766 + }; 767 + }; 768 + 735 769 // Replaced safeSearch with fetchUserTweets to use UserTweets endpoint instead of Search 736 - async function fetchUserTweets(username: string, limit: number): Promise<Tweet[]> { 770 + // Added processedIds for early stopping optimization 771 + async function fetchUserTweets(username: string, limit: number, processedIds?: Set<string>): Promise<Tweet[]> { 737 772 const client = await getTwitterScraper(); 738 773 if (!client) return []; 739 774 ··· 742 777 try { 743 778 const tweets: Tweet[] = []; 744 779 const generator = client.getTweets(username, limit); 780 + let consecutiveProcessedCount = 0; 781 + 745 782 for await (const t of generator) { 746 - tweets.push(mapScraperTweetToLocalTweet(t)); 783 + const tweet = mapScraperTweetToLocalTweet(t); 784 + const tweetId = tweet.id_str || tweet.id; 785 + 786 + // Early stopping logic: if we see 3 consecutive tweets we've already processed, stop. 787 + // This assumes timeline order (mostly true). 788 + if (processedIds && tweetId && processedIds.has(tweetId)) { 789 + consecutiveProcessedCount++; 790 + if (consecutiveProcessedCount >= 3) { 791 + console.log(`[${username}] 🛑 Found 3 consecutive processed tweets. Stopping fetch early.`); 792 + break; 793 + } 794 + } else { 795 + consecutiveProcessedCount = 0; 796 + } 797 + 798 + tweets.push(tweet); 747 799 if (tweets.length >= limit) break; 748 800 } 749 801 return tweets; ··· 1319 1371 clearBackfill(mapping.id); 1320 1372 console.log(`[${mapping.bskyIdentifier}] Backfill complete.`); 1321 1373 } else { 1374 + // Pre-load processed IDs for optimization 1375 + const processedMap = loadProcessedTweets(mapping.bskyIdentifier); 1376 + const processedIds = new Set(Object.keys(processedMap)); 1377 + 1322 1378 for (const twitterUsername of mapping.twitterUsernames) { 1323 1379 try { 1324 1380 updateAppStatus({ state: 'checking', currentAccount: twitterUsername, message: 'Fetching latest tweets...' }); 1325 1381 1326 - // Use fetchUserTweets instead of safeSearch 1327 - const tweets = await fetchUserTweets(twitterUsername, 30); 1382 + // Use fetchUserTweets with early stopping optimization 1383 + // Increase limit slightly since we have early stopping now 1384 + const tweets = await fetchUserTweets(twitterUsername, 50, processedIds); 1328 1385 1329 1386 if (!tweets || tweets.length === 0) continue; 1330 1387 await processTweets(agent, twitterUsername, mapping.bskyIdentifier, tweets, dryRun); ··· 1410 1467 console.log(`Scheduler started. Base interval: ${config.checkIntervalMinutes} minutes.`); 1411 1468 updateLastCheckTime(); // Initialize next time 1412 1469 1470 + // Concurrency limit for processing accounts 1471 + const runLimit = pLimit(3); 1472 + 1413 1473 // Main loop 1414 1474 while (true) { 1415 1475 const now = Date.now(); ··· 1425 1485 updateLastCheckTime(); 1426 1486 } 1427 1487 1488 + const tasks: Promise<void>[] = []; 1489 + 1428 1490 for (const mapping of config.mappings) { 1429 1491 if (!mapping.enabled) continue; 1430 1492 ··· 1432 1494 1433 1495 // Run if scheduled OR backfill requested 1434 1496 if (isScheduledRun || hasPendingBackfill) { 1435 - // Await the task to ensure we don't bombard twitter 1436 - await runAccountTask(mapping, hasPendingBackfill, options.dryRun); 1437 - 1438 - // Random delay between 2-5 seconds between accounts 1439 - const accountDelay = Math.floor(Math.random() * 3000) + 2000; 1440 - console.log(`[Scheduler] ⏳ Waiting ${accountDelay}ms before next account...`); 1441 - await new Promise(r => setTimeout(r, accountDelay)); 1497 + // Queue task with concurrency limit 1498 + tasks.push(runLimit(async () => { 1499 + await runAccountTask(mapping, hasPendingBackfill, options.dryRun); 1500 + })); 1442 1501 } 1502 + } 1503 + 1504 + if (tasks.length > 0) { 1505 + await Promise.all(tasks); 1506 + console.log(`[Scheduler] ✅ All tasks for this cycle complete.`); 1443 1507 } 1444 1508 1445 1509 // Sleep for 5 seconds