A simple tool which lets you scrape twitter accounts and crosspost them to bluesky accounts! Comes with a CLI and a webapp for managing profiles! Works with images/videos/link embeds/threads.
at f349fd21fe2c4c1bbfb7f4506d0b48d10e984a28 2328 lines 85 kB view raw
1import 'dotenv/config'; 2import fs from 'node:fs'; 3import path from 'node:path'; 4import { fileURLToPath } from 'node:url'; 5import { type BskyAgent, RichText } from '@atproto/api'; 6import type { BlobRef } from '@atproto/api'; 7import { Scraper } from '@the-convocation/twitter-scraper'; 8import type { Tweet as ScraperTweet } from '@the-convocation/twitter-scraper'; 9import axios from 'axios'; 10import * as cheerio from 'cheerio'; 11import { Command } from 'commander'; 12import * as francModule from 'franc-min'; 13import iso6391 from 'iso-639-1'; 14import puppeteer from 'puppeteer-core'; 15import sharp from 'sharp'; 16import { generateAltText } from './ai-manager.js'; 17 18import { getConfig } from './config-manager.js'; 19 20// ESM __dirname equivalent 21const __filename = fileURLToPath(import.meta.url); 22const __dirname = path.dirname(__filename); 23 24// ============================================================================ 25// Type Definitions 26// ============================================================================ 27 28interface ProcessedTweetEntry { 29 uri?: string; 30 cid?: string; 31 root?: { uri: string; cid: string }; 32 tail?: { uri: string; cid: string }; 33 migrated?: boolean; 34 skipped?: boolean; 35 text?: string; 36} 37 38interface ProcessedTweetsMap { 39 [twitterId: string]: ProcessedTweetEntry; 40} 41 42interface UrlEntity { 43 url?: string; 44 expanded_url?: string; 45} 46 47interface CardImageValue { 48 url?: string; 49 width?: number; 50 height?: number; 51 alt?: string; 52} 53 54interface CardBindingValue { 55 type?: string; 56 string_value?: string; 57 image_value?: CardImageValue; 58} 59 60interface CardBindingEntry { 61 key?: string; 62 value?: CardBindingValue; 63} 64 65type CardBindingValues = Record<string, CardBindingValue> | CardBindingEntry[]; 66 67interface TweetCard { 68 name?: string; 69 binding_values?: CardBindingValues; 70 url?: string; 71} 72 73interface MediaSize { 74 w: number; 75 h: number; 76} 77 78interface MediaSizes { 79 large?: MediaSize; 80} 81 82interface OriginalInfo { 83 width: number; 84 height: number; 85} 86 87interface VideoVariant { 88 content_type: string; 89 url: string; 90 bitrate?: number; 91} 92 93interface VideoInfo { 94 variants?: VideoVariant[]; 95 duration_millis?: number; 96} 97 98interface MediaEntity { 99 url?: string; 100 expanded_url?: string; 101 media_url_https?: string; 102 type?: 'photo' | 'video' | 'animated_gif'; 103 ext_alt_text?: string; 104 sizes?: MediaSizes; 105 original_info?: OriginalInfo; 106 video_info?: VideoInfo; 107 source?: 'tweet' | 'card'; 108} 109 110interface TweetEntities { 111 urls?: UrlEntity[]; 112 media?: MediaEntity[]; 113} 114 115interface Tweet { 116 id?: string; 117 id_str?: string; 118 text?: string; 119 full_text?: string; 120 created_at?: string; 121 entities?: TweetEntities; 122 extended_entities?: TweetEntities; 123 quoted_status_id_str?: string; 124 retweeted_status_id_str?: string; 125 is_quote_status?: boolean; 126 in_reply_to_status_id_str?: string; 127 in_reply_to_status_id?: string; 128 in_reply_to_user_id_str?: string; 129 in_reply_to_user_id?: string; 130 isRetweet?: boolean; 131 user?: { 132 screen_name?: string; 133 id_str?: string; 134 }; 135 card?: TweetCard | null; 136 permanentUrl?: string; 137} 138 139interface AspectRatio { 140 width: number; 141 height: number; 142} 143 144interface ImageEmbed { 145 alt: string; 146 image: BlobRef; 147 aspectRatio?: AspectRatio; 148} 149 150import { dbService } from './db.js'; 151 152// ============================================================================ 153// State Management 154// ============================================================================ 155 156const PROCESSED_DIR = path.join(__dirname, '..', 'processed'); 157 158async function migrateJsonToSqlite() { 159 if (!fs.existsSync(PROCESSED_DIR)) return; 160 161 const files = fs.readdirSync(PROCESSED_DIR).filter((f) => f.endsWith('.json')); 162 if (files.length === 0) return; 163 164 console.log(`📦 Found ${files.length} legacy cache files. Migrating to SQLite...`); 165 const config = getConfig(); 166 167 for (const file of files) { 168 const username = file.replace('.json', '').toLowerCase(); 169 // Try to find a matching bskyIdentifier from config 170 const mapping = config.mappings.find((m) => m.twitterUsernames.map((u) => u.toLowerCase()).includes(username)); 171 const bskyIdentifier = mapping?.bskyIdentifier || 'unknown'; 172 173 try { 174 const filePath = path.join(PROCESSED_DIR, file); 175 const data = JSON.parse(fs.readFileSync(filePath, 'utf8')) as ProcessedTweetsMap; 176 177 for (const [twitterId, entry] of Object.entries(data)) { 178 dbService.saveTweet({ 179 twitter_id: twitterId, 180 twitter_username: username, 181 bsky_identifier: bskyIdentifier, 182 bsky_uri: entry.uri, 183 bsky_cid: entry.cid, 184 bsky_root_uri: entry.root?.uri, 185 bsky_root_cid: entry.root?.cid, 186 status: entry.migrated ? 'migrated' : entry.skipped ? 'skipped' : 'failed', 187 }); 188 } 189 // Move file to backup 190 const backupDir = path.join(PROCESSED_DIR, 'backup'); 191 if (!fs.existsSync(backupDir)) fs.mkdirSync(backupDir); 192 fs.renameSync(filePath, path.join(backupDir, file)); 193 } catch (err) { 194 console.error(`❌ Failed to migrate ${file}:`, err); 195 } 196 } 197 198 // REPAIR STEP: Fix any 'unknown' records in SQLite that came from the broken schema migration 199 for (const mapping of config.mappings) { 200 for (const username of mapping.twitterUsernames) { 201 dbService.repairUnknownIdentifiers(username, mapping.bskyIdentifier); 202 } 203 } 204 205 console.log('✅ Migration complete.'); 206} 207 208function loadProcessedTweets(bskyIdentifier: string): ProcessedTweetsMap { 209 return dbService.getTweetsByBskyIdentifier(bskyIdentifier); 210} 211 212function saveProcessedTweet( 213 twitterUsername: string, 214 bskyIdentifier: string, 215 twitterId: string, 216 entry: ProcessedTweetEntry, 217): void { 218 dbService.saveTweet({ 219 twitter_id: twitterId, 220 twitter_username: twitterUsername.toLowerCase(), 221 bsky_identifier: bskyIdentifier.toLowerCase(), 222 tweet_text: entry.text, 223 bsky_uri: entry.uri, 224 bsky_cid: entry.cid, 225 bsky_root_uri: entry.root?.uri, 226 bsky_root_cid: entry.root?.cid, 227 bsky_tail_uri: entry.tail?.uri, 228 bsky_tail_cid: entry.tail?.cid, 229 status: entry.migrated || (entry.uri && entry.cid) ? 'migrated' : entry.skipped ? 'skipped' : 'failed', 230 }); 231} 232 233// ============================================================================ 234// Custom Twitter Client 235// ============================================================================ 236 237let scraper: Scraper | null = null; 238let currentTwitterCookies = { authToken: '', ct0: '' }; 239let useBackupCredentials = false; 240const lastCreatedAtByBsky = new Map<string, number>(); 241 242function getUniqueCreatedAtIso(bskyIdentifier: string, desiredMs: number): string { 243 const key = bskyIdentifier.toLowerCase(); 244 const lastMs = lastCreatedAtByBsky.get(key) ?? Number.MIN_SAFE_INTEGER; 245 const nextMs = Math.max(desiredMs, lastMs + 1); 246 lastCreatedAtByBsky.set(key, nextMs); 247 return new Date(nextMs).toISOString(); 248} 249 250async function getTwitterScraper(forceReset = false): Promise<Scraper | null> { 251 const config = getConfig(); 252 let authToken = config.twitter.authToken; 253 let ct0 = config.twitter.ct0; 254 255 // Use backup if toggled 256 if (useBackupCredentials && config.twitter.backupAuthToken && config.twitter.backupCt0) { 257 authToken = config.twitter.backupAuthToken; 258 ct0 = config.twitter.backupCt0; 259 } 260 261 if (!authToken || !ct0) return null; 262 263 // Re-initialize if config changed, not yet initialized, or forced reset 264 if (!scraper || forceReset || currentTwitterCookies.authToken !== authToken || currentTwitterCookies.ct0 !== ct0) { 265 console.log(`🔄 Initializing Twitter scraper with ${useBackupCredentials ? 'BACKUP' : 'PRIMARY'} credentials...`); 266 scraper = new Scraper(); 267 await scraper.setCookies([`auth_token=${authToken}`, `ct0=${ct0}`]); 268 269 currentTwitterCookies = { 270 authToken: authToken, 271 ct0: ct0, 272 }; 273 } 274 return scraper; 275} 276 277async function switchCredentials() { 278 const config = getConfig(); 279 if (config.twitter.backupAuthToken && config.twitter.backupCt0) { 280 useBackupCredentials = !useBackupCredentials; 281 console.log(`⚠️ Switching to ${useBackupCredentials ? 'BACKUP' : 'PRIMARY'} Twitter credentials...`); 282 await getTwitterScraper(true); 283 return true; 284 } 285 console.log('⚠️ No backup credentials available to switch to.'); 286 return false; 287} 288 289function mapScraperTweetToLocalTweet(scraperTweet: ScraperTweet): Tweet { 290 const raw = scraperTweet.__raw_UNSTABLE; 291 if (!raw) { 292 // Fallback if raw data is missing (shouldn't happen for timeline tweets usually) 293 return { 294 id: scraperTweet.id, 295 id_str: scraperTweet.id, 296 text: scraperTweet.text, 297 full_text: scraperTweet.text, 298 isRetweet: scraperTweet.isRetweet, 299 // Construct minimal entities from parsed data 300 entities: { 301 urls: scraperTweet.urls.map((url: string) => ({ url, expanded_url: url })), 302 media: scraperTweet.photos.map((p: any) => ({ 303 url: p.url, 304 expanded_url: p.url, 305 media_url_https: p.url, 306 type: 'photo', 307 ext_alt_text: p.alt_text, 308 })), 309 }, 310 created_at: scraperTweet.timeParsed?.toUTCString(), 311 permanentUrl: scraperTweet.permanentUrl, 312 }; 313 } 314 315 return { 316 id: raw.id_str, 317 id_str: raw.id_str, 318 text: raw.full_text, 319 full_text: raw.full_text, 320 created_at: raw.created_at, 321 isRetweet: scraperTweet.isRetweet, 322 // biome-ignore lint/suspicious/noExplicitAny: raw types match compatible structure 323 entities: raw.entities as any, 324 // biome-ignore lint/suspicious/noExplicitAny: raw types match compatible structure 325 extended_entities: raw.extended_entities as any, 326 quoted_status_id_str: raw.quoted_status_id_str, 327 retweeted_status_id_str: raw.retweeted_status_id_str, 328 is_quote_status: !!raw.quoted_status_id_str, 329 in_reply_to_status_id_str: raw.in_reply_to_status_id_str, 330 // biome-ignore lint/suspicious/noExplicitAny: missing in LegacyTweetRaw type 331 in_reply_to_user_id_str: (raw as any).in_reply_to_user_id_str, 332 // biome-ignore lint/suspicious/noExplicitAny: card comes from raw tweet 333 card: (raw as any).card, 334 permanentUrl: scraperTweet.permanentUrl, 335 user: { 336 screen_name: scraperTweet.username, 337 id_str: scraperTweet.userId, 338 }, 339 }; 340} 341 342// ============================================================================ 343// Helper Functions 344// ============================================================================ 345 346function normalizeCardBindings(bindingValues?: CardBindingValues): Record<string, CardBindingValue> { 347 if (!bindingValues) return {}; 348 if (Array.isArray(bindingValues)) { 349 return bindingValues.reduce( 350 (acc, entry) => { 351 if (entry?.key && entry.value) acc[entry.key] = entry.value; 352 return acc; 353 }, 354 {} as Record<string, CardBindingValue>, 355 ); 356 } 357 return bindingValues as Record<string, CardBindingValue>; 358} 359 360function isLikelyUrl(value?: string): value is string { 361 if (!value) return false; 362 return /^https?:\/\//i.test(value); 363} 364 365function extractCardImageUrl(bindingValues: CardBindingValues, preferredKeys: string[]): string | undefined { 366 const normalized = normalizeCardBindings(bindingValues); 367 for (const key of preferredKeys) { 368 const value = normalized[key]; 369 const imageUrl = value?.image_value?.url; 370 if (imageUrl) return imageUrl; 371 } 372 const fallbackValue = Object.values(normalized).find((value) => value?.image_value?.url); 373 return fallbackValue?.image_value?.url; 374} 375 376function extractCardLink(bindingValues: CardBindingValues, preferredKeys: string[]): string | undefined { 377 const normalized = normalizeCardBindings(bindingValues); 378 for (const key of preferredKeys) { 379 const value = normalized[key]; 380 const link = value?.string_value; 381 if (isLikelyUrl(link)) return link; 382 } 383 const fallbackValue = Object.values(normalized).find((value) => isLikelyUrl(value?.string_value)); 384 return fallbackValue?.string_value; 385} 386 387function extractCardTitle(bindingValues: CardBindingValues, preferredKeys: string[]): string | undefined { 388 const normalized = normalizeCardBindings(bindingValues); 389 for (const key of preferredKeys) { 390 const value = normalized[key]; 391 const title = value?.string_value; 392 if (title && !isLikelyUrl(title)) return title; 393 } 394 const fallbackValue = Object.values(normalized).find( 395 (value) => value?.string_value && !isLikelyUrl(value?.string_value), 396 ); 397 return fallbackValue?.string_value; 398} 399 400function extractCardAlt(bindingValues: CardBindingValues): string | undefined { 401 const normalized = normalizeCardBindings(bindingValues); 402 const altValue = Object.values(normalized).find((value) => value?.image_value?.alt); 403 return altValue?.image_value?.alt; 404} 405 406function appendCallToAction(text: string, link?: string, label = 'Sponsored') { 407 if (!link) return text; 408 if (text.includes(link)) return text; 409 return `${text}\n\n${label}: ${link}`.trim(); 410} 411 412function detectCardMedia(tweet: Tweet): { imageUrls: string[]; link?: string; title?: string; alt?: string } { 413 if (!tweet.card?.binding_values) return { imageUrls: [] }; 414 const bindings = tweet.card.binding_values; 415 416 const imageUrls: string[] = []; 417 const preferredImageKeys = [ 418 'photo_image_full_size', 419 'photo_image_full_size_original', 420 'thumbnail_image', 421 'image', 422 'thumbnail', 423 'summary_photo_image', 424 'player_image', 425 ]; 426 const preferredLinkKeys = ['site', 'destination', 'landing_url', 'cta_link', 'card_url', 'url']; 427 const preferredTitleKeys = ['title', 'summary', 'card_title']; 428 429 const primaryImage = extractCardImageUrl(bindings, preferredImageKeys); 430 if (primaryImage) imageUrls.push(primaryImage); 431 432 const imageKeys = normalizeCardBindings(bindings); 433 Object.values(imageKeys).forEach((value) => { 434 const url = value?.image_value?.url; 435 if (url && !imageUrls.includes(url)) imageUrls.push(url); 436 }); 437 438 const link = extractCardLink(bindings, preferredLinkKeys); 439 const title = extractCardTitle(bindings, preferredTitleKeys); 440 const alt = extractCardAlt(bindings); 441 442 return { imageUrls, link, title, alt }; 443} 444 445function buildCardMediaEntities(tweet: Tweet): { media: MediaEntity[]; link?: string } { 446 const cardData = detectCardMedia(tweet); 447 if (cardData.imageUrls.length === 0) return { media: [] }; 448 449 const media = cardData.imageUrls.slice(0, 4).map((url) => ({ 450 media_url_https: url, 451 type: 'photo' as const, 452 ext_alt_text: cardData.alt || cardData.title || 'Sponsored image', 453 source: 'card' as const, 454 })); 455 456 return { media, link: cardData.link }; 457} 458 459function ensureUrlEntity(entities: TweetEntities | undefined, link?: string) { 460 if (!link) return; 461 if (!entities) return; 462 const urls = entities.urls || []; 463 if (!urls.some((url) => url.expanded_url === link || url.url === link)) { 464 urls.push({ url: link, expanded_url: link }); 465 entities.urls = urls; 466 } 467} 468 469function detectSponsoredCard(tweet: Tweet): boolean { 470 if (!tweet.card?.binding_values) return false; 471 const cardName = tweet.card.name?.toLowerCase() || ''; 472 const cardMedia = detectCardMedia(tweet); 473 const hasMultipleImages = cardMedia.imageUrls.length > 1; 474 const promoKeywords = ['promo', 'unified', 'carousel', 'collection', 'amplify']; 475 const hasPromoName = promoKeywords.some((keyword) => cardName.includes(keyword)); 476 return hasMultipleImages || hasPromoName; 477} 478 479function mergeMediaEntities(primary: MediaEntity[], secondary: MediaEntity[], limit = 4): MediaEntity[] { 480 const merged: MediaEntity[] = []; 481 const seen = new Set<string>(); 482 const ordered = [ 483 ...primary.filter((media) => media?.source !== 'card'), 484 ...primary.filter((media) => media?.source === 'card'), 485 ...secondary.filter((media) => media?.source !== 'card'), 486 ...secondary.filter((media) => media?.source === 'card'), 487 ]; 488 489 for (const media of ordered) { 490 if (!media?.media_url_https) continue; 491 if (seen.has(media.media_url_https)) continue; 492 merged.push(media); 493 seen.add(media.media_url_https); 494 if (merged.length >= limit) break; 495 } 496 497 return merged; 498} 499 500function detectCarouselLinks(tweet: Tweet): string[] { 501 if (!tweet.card?.binding_values) return []; 502 const bindings = normalizeCardBindings(tweet.card.binding_values); 503 const links = Object.values(bindings) 504 .map((value) => value?.string_value) 505 .filter((value): value is string => isLikelyUrl(value)); 506 return [...new Set(links)]; 507} 508 509function mergeUrlEntities(entities: TweetEntities | undefined, links: string[]) { 510 if (!entities || links.length === 0) return; 511 const urls = entities.urls || []; 512 links.forEach((link) => { 513 if (!urls.some((url) => url.expanded_url === link || url.url === link)) { 514 urls.push({ url: link, expanded_url: link }); 515 } 516 }); 517 entities.urls = urls; 518} 519 520function injectCardMedia(tweet: Tweet) { 521 if (!tweet.card?.binding_values) return; 522 const cardMedia = buildCardMediaEntities(tweet); 523 if (cardMedia.media.length === 0) return; 524 525 const existingMedia = tweet.extended_entities?.media || tweet.entities?.media || []; 526 const mergedMedia = mergeMediaEntities(existingMedia, cardMedia.media); 527 528 if (!tweet.extended_entities) tweet.extended_entities = {}; 529 tweet.extended_entities.media = mergedMedia; 530 if (!tweet.entities) tweet.entities = {}; 531 if (!tweet.entities.media) tweet.entities.media = mergedMedia; 532 533 if (cardMedia.link) { 534 ensureUrlEntity(tweet.entities, cardMedia.link); 535 } 536 537 const carouselLinks = detectCarouselLinks(tweet); 538 mergeUrlEntities(tweet.entities, carouselLinks); 539} 540 541function ensureSponsoredLinks(text: string, tweet: Tweet): string { 542 if (!tweet.card?.binding_values) return text; 543 const carouselLinks = detectCarouselLinks(tweet); 544 const cardLink = detectCardMedia(tweet).link; 545 const links = [...new Set([cardLink, ...carouselLinks].filter(Boolean))] as string[]; 546 if (links.length === 0) return text; 547 548 const appendedLinks = links.slice(0, 2).map((link) => `Link: ${link}`); 549 const updatedText = `${text}\n\n${appendedLinks.join('\n')}`.trim(); 550 return updatedText; 551} 552 553function addTextFallbacks(text: string): string { 554 return text.replace(/\s+$/g, '').trim(); 555} 556 557function getTweetText(tweet: Tweet): string { 558 return tweet.full_text || tweet.text || ''; 559} 560 561function normalizeContextText(text: string): string { 562 return text.replace(/\s+/g, ' ').trim(); 563} 564 565function addTweetsToMap(tweetMap: Map<string, Tweet>, tweets: Tweet[]): void { 566 for (const tweet of tweets) { 567 const tweetId = tweet.id_str || tweet.id; 568 if (!tweetId) continue; 569 tweetMap.set(String(tweetId), tweet); 570 } 571} 572 573function buildThreadContext(tweet: Tweet, tweetMap: Map<string, Tweet>, maxHops = 8): string { 574 const parts: string[] = []; 575 const visited = new Set<string>(); 576 let current: Tweet | undefined = tweet; 577 578 for (let hops = 0; hops < maxHops; hops++) { 579 const parentId = current?.in_reply_to_status_id_str || current?.in_reply_to_status_id; 580 if (!parentId) break; 581 const parentKey = String(parentId); 582 if (visited.has(parentKey)) break; 583 visited.add(parentKey); 584 585 const parentTweet = tweetMap.get(parentKey); 586 if (!parentTweet) break; 587 588 const parentText = normalizeContextText(getTweetText(parentTweet)); 589 if (parentText) parts.push(parentText); 590 591 current = parentTweet; 592 } 593 594 if (parts.length === 0) return ''; 595 return parts.reverse().join(' | '); 596} 597 598function buildAltTextContext(tweet: Tweet, tweetText: string, tweetMap: Map<string, Tweet>): string { 599 const threadContext = buildThreadContext(tweet, tweetMap); 600 const currentText = normalizeContextText(tweetText); 601 602 if (threadContext && currentText) { 603 return `Thread above: ${threadContext}. Current tweet: ${currentText}`; 604 } 605 606 if (threadContext) return `Thread above: ${threadContext}.`; 607 return currentText; 608} 609 610async function fetchSyndicationMedia(tweetUrl: string): Promise<{ images: string[] }> { 611 try { 612 const normalized = tweetUrl.replace('twitter.com', 'x.com'); 613 const res = await axios.get('https://publish.twitter.com/oembed', { 614 params: { url: normalized }, 615 headers: { 'User-Agent': 'Mozilla/5.0' }, 616 }); 617 const html = res.data?.html as string | undefined; 618 if (!html) return { images: [] }; 619 620 const match = html.match(/status\/(\d+)/); 621 const tweetId = match?.[1]; 622 if (!tweetId) return { images: [] }; 623 624 const syndicationUrl = `https://cdn.syndication.twimg.com/tweet-result?id=${tweetId}`; 625 const syndication = await axios.get(syndicationUrl, { 626 headers: { 'User-Agent': 'Mozilla/5.0', Accept: 'application/json' }, 627 }); 628 const data = syndication.data as Record<string, unknown>; 629 const images = (data?.photos as { url?: string }[] | undefined) 630 ?.map((photo) => photo.url) 631 .filter(Boolean) as string[]; 632 return { images: images || [] }; 633 } catch (err) { 634 return { images: [] }; 635 } 636} 637 638function injectSyndicationMedia(tweet: Tweet, syndication: { images: string[] }) { 639 if (syndication.images.length === 0) return; 640 const media = syndication.images.slice(0, 4).map((url) => ({ 641 media_url_https: url, 642 type: 'photo' as const, 643 ext_alt_text: 'Image from Twitter', 644 source: 'card' as const, 645 })); 646 647 const existingMedia = tweet.extended_entities?.media || tweet.entities?.media || []; 648 const mergedMedia = mergeMediaEntities(existingMedia, media); 649 650 if (!tweet.extended_entities) tweet.extended_entities = {}; 651 tweet.extended_entities.media = mergedMedia; 652 if (!tweet.entities) tweet.entities = {}; 653 if (!tweet.entities.media) tweet.entities.media = mergedMedia; 654} 655 656function detectLanguage(text: string): string[] { 657 if (!text || text.trim().length === 0) return ['en']; 658 try { 659 const code3 = (francModule as unknown as (text: string) => string)(text); 660 if (code3 === 'und') return ['en']; 661 const code2 = iso6391.getCode(code3); 662 return code2 ? [code2] : ['en']; 663 } catch { 664 return ['en']; 665 } 666} 667 668async function expandUrl(shortUrl: string): Promise<string> { 669 try { 670 const response = await axios.head(shortUrl, { 671 maxRedirects: 5, 672 validateStatus: (status) => status >= 200 && status < 400, 673 }); 674 // biome-ignore lint/suspicious/noExplicitAny: axios internal types 675 return (response.request as any)?.res?.responseUrl || shortUrl; 676 } catch { 677 try { 678 const response = await axios.get(shortUrl, { 679 responseType: 'stream', 680 maxRedirects: 5, 681 }); 682 response.data.destroy(); 683 // biome-ignore lint/suspicious/noExplicitAny: axios internal types 684 return (response.request as any)?.res?.responseUrl || shortUrl; 685 } catch (e: any) { 686 if (e.code === 'ERR_FR_TOO_MANY_REDIRECTS' || e.response?.status === 403 || e.response?.status === 401) { 687 // Silent fallback for common expansion issues (redirect loops, login walls) 688 return shortUrl; 689 } 690 return shortUrl; 691 } 692 } 693} 694 695interface DownloadedMedia { 696 buffer: Buffer; 697 mimeType: string; 698} 699 700async function downloadMedia(url: string): Promise<DownloadedMedia> { 701 const response = await axios({ 702 url, 703 method: 'GET', 704 responseType: 'arraybuffer', 705 timeout: 30000, 706 }); 707 return { 708 buffer: Buffer.from(response.data as ArrayBuffer), 709 mimeType: (response.headers['content-type'] as string) || 'application/octet-stream', 710 }; 711} 712 713async function uploadToBluesky(agent: BskyAgent, buffer: Buffer, mimeType: string): Promise<BlobRef> { 714 let finalBuffer = buffer; 715 let finalMimeType = mimeType; 716 const MAX_SIZE = 950 * 1024; 717 718 const isPng = mimeType === 'image/png'; 719 const isJpeg = mimeType === 'image/jpeg' || mimeType === 'image/jpg'; 720 const isWebp = mimeType === 'image/webp'; 721 const isGif = mimeType === 'image/gif'; 722 const isAnimation = isGif || isWebp; 723 724 if ( 725 (buffer.length > MAX_SIZE && (mimeType.startsWith('image/') || mimeType === 'application/octet-stream')) || 726 (isPng && buffer.length > MAX_SIZE) 727 ) { 728 console.log(`[UPLOAD] ⚖️ Image too large (${(buffer.length / 1024).toFixed(2)} KB). Optimizing...`); 729 try { 730 let image = sharp(buffer); 731 const metadata = await image.metadata(); 732 let currentBuffer = buffer; 733 let width = metadata.width || 2000; 734 let quality = 90; 735 736 // Iterative compression loop 737 let attempts = 0; 738 while (currentBuffer.length > MAX_SIZE && attempts < 5) { 739 attempts++; 740 console.log(`[UPLOAD] 📉 Compression attempt ${attempts}: Width ${width}, Quality ${quality}...`); 741 742 if (isAnimation) { 743 // For animations (GIF/WebP), we can only do so much without losing frames 744 // Try to convert to WebP if it's a GIF, or optimize WebP 745 image = sharp(buffer, { animated: true }); 746 if (isGif) { 747 // Convert GIF to WebP for better compression 748 image = image.webp({ quality: Math.max(quality, 50), effort: 6 }); 749 finalMimeType = 'image/webp'; 750 } else { 751 image = image.webp({ quality: Math.max(quality, 50), effort: 6 }); 752 } 753 // Resize if really big 754 if (metadata.width && metadata.width > 800) { 755 image = image.resize({ width: 800, withoutEnlargement: true }); 756 } 757 } else { 758 // Static images 759 if (width > 1600) width = 1600; 760 else if (attempts > 1) width = Math.floor(width * 0.8); 761 762 quality = Math.max(50, quality - 10); 763 764 image = sharp(buffer).resize({ width, withoutEnlargement: true }).jpeg({ quality, mozjpeg: true }); 765 766 finalMimeType = 'image/jpeg'; 767 } 768 769 currentBuffer = await image.toBuffer(); 770 if (currentBuffer.length <= MAX_SIZE) { 771 finalBuffer = currentBuffer; 772 console.log(`[UPLOAD] ✅ Optimized to ${(finalBuffer.length / 1024).toFixed(2)} KB`); 773 break; 774 } 775 } 776 777 if (finalBuffer.length > MAX_SIZE) { 778 console.warn( 779 `[UPLOAD] ⚠️ Could not compress below limit. Current: ${(finalBuffer.length / 1024).toFixed(2)} KB. Upload might fail.`, 780 ); 781 } 782 } catch (err) { 783 console.warn(`[UPLOAD] ⚠️ Optimization failed, attempting original upload:`, (err as Error).message); 784 finalBuffer = buffer; 785 finalMimeType = mimeType; 786 } 787 } 788 789 const { data } = await agent.uploadBlob(finalBuffer, { encoding: finalMimeType }); 790 return data.blob; 791} 792 793interface ScreenshotResult { 794 buffer: Buffer; 795 width: number; 796 height: number; 797} 798 799async function captureTweetScreenshot(tweetUrl: string): Promise<ScreenshotResult | null> { 800 const browserPaths = [ 801 '/usr/bin/google-chrome', 802 '/usr/bin/chromium-browser', 803 '/usr/bin/chromium', 804 '/usr/bin/google-chrome-stable', 805 'C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe', 806 'C:\\Program Files (x86)\\Google\\Chrome\\Application\\chrome.exe', 807 ]; 808 809 const executablePath = browserPaths.find((p) => fs.existsSync(p)); 810 811 if (!executablePath) { 812 console.warn(`[SCREENSHOT] ⏩ Skipping screenshot (no Chrome/Chromium found at common paths).`); 813 return null; 814 } 815 816 console.log(`[SCREENSHOT] 📸 Capturing screenshot for: ${tweetUrl} using ${executablePath}`); 817 let browser; 818 try { 819 browser = await puppeteer.launch({ 820 executablePath, 821 args: ['--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage'], 822 }); 823 const page = await browser.newPage(); 824 await page.setViewport({ width: 800, height: 1200, deviceScaleFactor: 2 }); 825 826 const html = ` 827 <!DOCTYPE html> 828 <html> 829 <head> 830 <style> 831 body { 832 margin: 0; 833 padding: 20px; 834 background: #ffffff; 835 display: flex; 836 justify-content: center; 837 font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif; 838 } 839 #container { width: 550px; } 840 </style> 841 </head> 842 <body> 843 <div id="container"> 844 <blockquote class="twitter-tweet" data-dnt="true"> 845 <a href="${tweetUrl}"></a> 846 </blockquote> 847 <script async src="https://platform.twitter.com/widgets.js" charset="utf-8"></script> 848 </div> 849 </body> 850 </html> 851 `; 852 853 await page.setContent(html, { waitUntil: 'networkidle0' }); 854 855 // Wait for the twitter iframe to load and render 856 try { 857 await page.waitForSelector('iframe', { timeout: 10000 }); 858 // Small extra wait for images inside iframe 859 await new Promise((r) => setTimeout(r, 2000)); 860 } catch (e) { 861 console.warn(`[SCREENSHOT] ⚠️ Timeout waiting for tweet iframe, taking screenshot anyway.`); 862 } 863 864 const element = await page.$('#container'); 865 if (element) { 866 const box = await element.boundingBox(); 867 const buffer = await element.screenshot({ type: 'png', omitBackground: true }); 868 if (box) { 869 console.log( 870 `[SCREENSHOT] ✅ Captured successfully (${(buffer.length / 1024).toFixed(2)} KB) - ${Math.round(box.width)}x${Math.round(box.height)}`, 871 ); 872 return { buffer: buffer as Buffer, width: Math.round(box.width), height: Math.round(box.height) }; 873 } 874 } 875 } catch (err) { 876 console.error(`[SCREENSHOT] ❌ Error capturing tweet:`, (err as Error).message); 877 } finally { 878 if (browser) await browser.close(); 879 } 880 return null; 881} 882 883async function pollForVideoProcessing(agent: BskyAgent, jobId: string): Promise<BlobRef> { 884 console.log(`[VIDEO] ⏳ Polling for processing completion (this can take a minute)...`); 885 let attempts = 0; 886 let blob: BlobRef | undefined; 887 888 while (!blob) { 889 attempts++; 890 const statusUrl = new URL('https://video.bsky.app/xrpc/app.bsky.video.getJobStatus'); 891 statusUrl.searchParams.append('jobId', jobId); 892 893 const statusResponse = await fetch(statusUrl); 894 if (!statusResponse.ok) { 895 console.warn(`[VIDEO] ⚠️ Job status fetch failed (${statusResponse.status}), retrying...`); 896 await new Promise((resolve) => setTimeout(resolve, 5000)); 897 continue; 898 } 899 900 const statusData = (await statusResponse.json()) as any; 901 const state = statusData.jobStatus.state; 902 const progress = statusData.jobStatus.progress || 0; 903 904 console.log(`[VIDEO] 🔄 Job ${jobId}: ${state} (${progress}%)`); 905 906 if (statusData.jobStatus.blob) { 907 blob = statusData.jobStatus.blob; 908 console.log(`[VIDEO] 🎉 Video processing complete! Blob ref obtained.`); 909 } else if (state === 'JOB_STATE_FAILED') { 910 throw new Error(`Video processing failed: ${statusData.jobStatus.error || 'Unknown error'}`); 911 } else { 912 // Wait before next poll 913 await new Promise((resolve) => setTimeout(resolve, 5000)); 914 } 915 916 if (attempts > 60) { 917 // ~5 minute timeout 918 throw new Error('Video processing timed out after 5 minutes.'); 919 } 920 } 921 return blob!; 922} 923 924async function fetchEmbedUrlCard(agent: BskyAgent, url: string): Promise<any> { 925 try { 926 const response = await axios.get(url, { 927 headers: { 928 'User-Agent': 929 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 930 Accept: 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8', 931 'Accept-Language': 'en-US,en;q=0.9', 932 }, 933 timeout: 10000, 934 maxRedirects: 5, 935 }); 936 937 const $ = cheerio.load(response.data); 938 const title = $('meta[property="og:title"]').attr('content') || $('title').text() || ''; 939 const description = 940 $('meta[property="og:description"]').attr('content') || $('meta[name="description"]').attr('content') || ''; 941 let thumbBlob: BlobRef | undefined; 942 943 let imageUrl = $('meta[property="og:image"]').attr('content'); 944 if (imageUrl) { 945 if (!imageUrl.startsWith('http')) { 946 const baseUrl = new URL(url); 947 imageUrl = new URL(imageUrl, baseUrl.origin).toString(); 948 } 949 try { 950 const { buffer, mimeType } = await downloadMedia(imageUrl); 951 thumbBlob = await uploadToBluesky(agent, buffer, mimeType); 952 } catch (e) { 953 // SIlently fail thumbnail upload 954 } 955 } 956 957 if (!title && !description) return null; 958 959 const external: any = { 960 uri: url, 961 title: title || url, 962 description: description, 963 }; 964 965 if (thumbBlob) { 966 external.thumb = thumbBlob; 967 } 968 969 return { 970 $type: 'app.bsky.embed.external', 971 external, 972 }; 973 } catch (err: any) { 974 if (err.code === 'ERR_FR_TOO_MANY_REDIRECTS') { 975 // Ignore redirect loops 976 return null; 977 } 978 console.warn(`Failed to fetch embed card for ${url}:`, err.message || err); 979 return null; 980 } 981} 982 983async function uploadVideoToBluesky(agent: BskyAgent, buffer: Buffer, filename: string): Promise<BlobRef> { 984 const sanitizedFilename = filename.split('?')[0] || 'video.mp4'; 985 console.log( 986 `[VIDEO] 🟢 Starting upload process for ${sanitizedFilename} (${(buffer.length / 1024 / 1024).toFixed(2)} MB)`, 987 ); 988 989 try { 990 // 1. Get Service Auth 991 // We need to resolve the actual PDS host for this DID 992 console.log(`[VIDEO] 🔍 Resolving PDS host for DID: ${agent.session!.did}...`); 993 const { data: repoDesc } = await agent.com.atproto.repo.describeRepo({ repo: agent.session!.did! }); 994 995 // didDoc might be present in repoDesc 996 const pdsService = (repoDesc as any).didDoc?.service?.find( 997 (s: any) => s.id === '#atproto_pds' || s.type === 'AtProtoPds', 998 ); 999 const pdsUrl = pdsService?.serviceEndpoint; 1000 const pdsHost = pdsUrl ? new URL(pdsUrl).host : 'bsky.social'; 1001 1002 console.log(`[VIDEO] 🌐 PDS Host detected: ${pdsHost}`); 1003 console.log(`[VIDEO] 🔑 Requesting service auth token for audience: did:web:${pdsHost}...`); 1004 1005 const { data: serviceAuth } = await agent.com.atproto.server.getServiceAuth({ 1006 aud: `did:web:${pdsHost}`, 1007 lxm: 'com.atproto.repo.uploadBlob', 1008 exp: Math.floor(Date.now() / 1000) + 60 * 30, 1009 }); 1010 console.log(`[VIDEO] ✅ Service auth token obtained.`); 1011 1012 const token = serviceAuth.token; 1013 1014 // 2. Upload to Video Service 1015 const uploadUrl = new URL('https://video.bsky.app/xrpc/app.bsky.video.uploadVideo'); 1016 uploadUrl.searchParams.append('did', agent.session!.did!); 1017 uploadUrl.searchParams.append('name', sanitizedFilename); 1018 1019 console.log(`[VIDEO] 📤 Uploading to ${uploadUrl.href}...`); 1020 const uploadResponse = await fetch(uploadUrl, { 1021 method: 'POST', 1022 headers: { 1023 Authorization: `Bearer ${token}`, 1024 'Content-Type': 'video/mp4', 1025 }, 1026 body: new Blob([new Uint8Array(buffer)]), 1027 }); 1028 1029 if (!uploadResponse.ok) { 1030 const errorText = await uploadResponse.text(); 1031 1032 // Handle specific error cases 1033 try { 1034 const errorJson = JSON.parse(errorText); 1035 1036 // Handle server overload gracefully 1037 if ( 1038 uploadResponse.status === 503 || 1039 errorJson.error === 'Server does not have enough capacity to handle uploads' 1040 ) { 1041 console.warn(`[VIDEO] ⚠️ Server overloaded (503). Skipping video upload and falling back to link.`); 1042 throw new Error('VIDEO_FALLBACK_503'); 1043 } 1044 1045 if (errorJson.error === 'already_exists' && errorJson.jobId) { 1046 console.log(`[VIDEO] ♻️ Video already exists. Resuming with Job ID: ${errorJson.jobId}`); 1047 return await pollForVideoProcessing(agent, errorJson.jobId); 1048 } 1049 if ( 1050 errorJson.error === 'unconfirmed_email' || 1051 (errorJson.jobStatus && errorJson.jobStatus.error === 'unconfirmed_email') 1052 ) { 1053 console.error( 1054 `[VIDEO] 🛑 BLUESKY ERROR: Your email is unconfirmed. You MUST verify your email on Bluesky to upload videos.`, 1055 ); 1056 throw new Error('Bluesky Email Unconfirmed - Video Upload Rejected'); 1057 } 1058 } catch (e) { 1059 if ((e as Error).message === 'VIDEO_FALLBACK_503') throw e; 1060 // Not JSON or missing fields, proceed with throwing original error 1061 } 1062 1063 console.error(`[VIDEO] ❌ Server responded with ${uploadResponse.status}: ${errorText}`); 1064 throw new Error(`Video upload failed: ${uploadResponse.status} ${errorText}`); 1065 } 1066 1067 const jobStatus = (await uploadResponse.json()) as any; 1068 console.log(`[VIDEO] 📦 Upload accepted. Job ID: ${jobStatus.jobId}, State: ${jobStatus.state}`); 1069 1070 if (jobStatus.blob) { 1071 return jobStatus.blob; 1072 } 1073 1074 // 3. Poll for processing status 1075 return await pollForVideoProcessing(agent, jobStatus.jobId); 1076 } catch (err) { 1077 console.error(`[VIDEO] ❌ Error in uploadVideoToBluesky:`, (err as Error).message); 1078 throw err; 1079 } 1080} 1081 1082function splitText(text: string, limit = 300): string[] { 1083 if (text.length <= limit) return [text]; 1084 1085 const chunks: string[] = []; 1086 let remaining = text; 1087 1088 // Reserve space for numbering like " (1/3)" -> approx 7 chars 1089 // We apply this reservation to the limit check 1090 const effectiveLimit = limit - 8; 1091 1092 while (remaining.length > 0) { 1093 if (remaining.length <= limit) { 1094 chunks.push(remaining); 1095 break; 1096 } 1097 1098 // Smart splitting priority: 1099 // 1. Double newline (paragraph) 1100 // 2. Sentence end (.!?) 1101 // 3. Space 1102 // 4. Force split 1103 1104 let splitIndex = -1; 1105 1106 // Check paragraphs 1107 let checkIndex = remaining.lastIndexOf('\n\n', effectiveLimit); 1108 if (checkIndex !== -1) splitIndex = checkIndex; 1109 1110 // Check sentences 1111 if (splitIndex === -1) { 1112 // Look for punctuation followed by space 1113 const sentenceMatches = Array.from(remaining.substring(0, effectiveLimit).matchAll(/[.!?]\s/g)); 1114 if (sentenceMatches.length > 0) { 1115 const lastMatch = sentenceMatches[sentenceMatches.length - 1]; 1116 if (lastMatch && lastMatch.index !== undefined) { 1117 splitIndex = lastMatch.index + 1; // Include punctuation 1118 } 1119 } 1120 } 1121 1122 // Check spaces 1123 if (splitIndex === -1) { 1124 checkIndex = remaining.lastIndexOf(' ', effectiveLimit); 1125 if (checkIndex !== -1) splitIndex = checkIndex; 1126 } 1127 1128 // Force split if no good break point found 1129 if (splitIndex === -1) { 1130 splitIndex = effectiveLimit; 1131 } 1132 1133 chunks.push(remaining.substring(0, splitIndex).trim()); 1134 remaining = remaining.substring(splitIndex).trim(); 1135 } 1136 1137 return chunks; 1138} 1139 1140function utf16IndexToUtf8Index(text: string, index: number): number { 1141 return Buffer.byteLength(text.slice(0, index), 'utf8'); 1142} 1143 1144function rangesOverlap(startA: number, endA: number, startB: number, endB: number): boolean { 1145 return startA < endB && startB < endA; 1146} 1147 1148function addTwitterHandleLinkFacets(text: string, facets?: any[]): any[] | undefined { 1149 const existingFacets = facets ?? []; 1150 const newFacets: any[] = []; 1151 const regex = /@([A-Za-z0-9_]{1,15})/g; 1152 let match: RegExpExecArray | null; 1153 1154 while ((match = regex.exec(text))) { 1155 const handle = match[1]; 1156 if (!handle) continue; 1157 1158 const atIndex = match.index; 1159 const prevChar = atIndex > 0 ? text[atIndex - 1] : ''; 1160 if (prevChar && /[A-Za-z0-9_]/.test(prevChar)) continue; 1161 1162 const endIndex = atIndex + handle.length + 1; 1163 const trailing = text.slice(endIndex); 1164 if (trailing.startsWith('.') && /^\.[A-Za-z0-9-]+/.test(trailing)) continue; 1165 1166 const nextChar = endIndex < text.length ? text[endIndex] : ''; 1167 if (nextChar && /[A-Za-z0-9_]/.test(nextChar)) continue; 1168 1169 const byteStart = utf16IndexToUtf8Index(text, atIndex); 1170 const byteEnd = utf16IndexToUtf8Index(text, endIndex); 1171 1172 const overlaps = existingFacets.some((facet) => 1173 rangesOverlap(byteStart, byteEnd, facet.index.byteStart, facet.index.byteEnd), 1174 ); 1175 if (overlaps) continue; 1176 1177 newFacets.push({ 1178 index: { byteStart, byteEnd }, 1179 features: [ 1180 { 1181 $type: 'app.bsky.richtext.facet#link', 1182 uri: `https://twitter.com/${handle}`, 1183 }, 1184 ], 1185 }); 1186 } 1187 1188 if (newFacets.length === 0) return facets; 1189 return [...existingFacets, ...newFacets].sort((a, b) => a.index.byteStart - b.index.byteStart); 1190} 1191 1192// Simple p-limit implementation for concurrency control 1193const pLimit = (concurrency: number) => { 1194 const queue: (() => Promise<void>)[] = []; 1195 let activeCount = 0; 1196 1197 const next = () => { 1198 activeCount--; 1199 if (queue.length > 0) { 1200 queue.shift()!(); 1201 } 1202 }; 1203 1204 return <T>(fn: () => Promise<T>): Promise<T> => { 1205 return new Promise<T>((resolve, reject) => { 1206 const run = async () => { 1207 activeCount++; 1208 try { 1209 resolve(await fn()); 1210 } catch (e) { 1211 reject(e); 1212 } finally { 1213 next(); 1214 } 1215 }; 1216 1217 if (activeCount < concurrency) { 1218 run(); 1219 } else { 1220 queue.push(run); 1221 } 1222 }); 1223 }; 1224}; 1225 1226// Replaced safeSearch with fetchUserTweets to use UserTweets endpoint instead of Search 1227// Added processedIds for early stopping optimization 1228async function fetchUserTweets(username: string, limit: number, processedIds?: Set<string>): Promise<Tweet[]> { 1229 const client = await getTwitterScraper(); 1230 if (!client) return []; 1231 1232 let retries = 3; 1233 while (retries > 0) { 1234 try { 1235 const tweets: Tweet[] = []; 1236 const generator = client.getTweets(username, limit); 1237 let consecutiveProcessedCount = 0; 1238 1239 for await (const t of generator) { 1240 const tweet = mapScraperTweetToLocalTweet(t); 1241 const tweetId = tweet.id_str || tweet.id; 1242 1243 // Early stopping logic: if we see 3 consecutive tweets we've already processed, stop. 1244 // This assumes timeline order (mostly true). 1245 if (processedIds && tweetId && processedIds.has(tweetId)) { 1246 consecutiveProcessedCount++; 1247 if (consecutiveProcessedCount >= 3) { 1248 console.log(`[${username}] 🛑 Found 3 consecutive processed tweets. Stopping fetch early.`); 1249 break; 1250 } 1251 } else { 1252 consecutiveProcessedCount = 0; 1253 } 1254 1255 tweets.push(tweet); 1256 if (tweets.length >= limit) break; 1257 } 1258 return tweets; 1259 } catch (e: any) { 1260 retries--; 1261 const isRetryable = 1262 e.message?.includes('ServiceUnavailable') || 1263 e.message?.includes('Timeout') || 1264 e.message?.includes('429') || 1265 e.message?.includes('401'); 1266 1267 // Check for Twitter Internal Server Error (often returns 400 with specific body) 1268 if (e?.response?.status === 400 && JSON.stringify(e?.response?.data || {}).includes('InternalServerError')) { 1269 console.warn(`⚠️ Twitter Internal Server Error (Transient) for ${username}.`); 1270 // Treat as retryable 1271 if (retries > 0) { 1272 await new Promise((r) => setTimeout(r, 5000)); 1273 continue; 1274 } 1275 } 1276 1277 if (isRetryable) { 1278 console.warn(`⚠️ Error fetching tweets for ${username} (${e.message}).`); 1279 1280 // Attempt credential switch if we have backups 1281 if (await switchCredentials()) { 1282 console.log(`🔄 Retrying with new credentials...`); 1283 continue; // Retry loop with new credentials 1284 } 1285 1286 if (retries > 0) { 1287 console.log(`Waiting 5s before retry...`); 1288 await new Promise((r) => setTimeout(r, 5000)); 1289 continue; 1290 } 1291 } 1292 1293 console.warn(`Error fetching tweets for ${username}:`, e.message || e); 1294 return []; 1295 } 1296 } 1297 1298 console.log(`[${username}] ⚠️ Scraper returned 0 tweets (or failed silently) after retries.`); 1299 return []; 1300} 1301 1302// ============================================================================ 1303// Main Processing Logic 1304// ============================================================================ 1305 1306// ============================================================================ 1307// Main Processing Logic 1308// ============================================================================ 1309 1310async function processTweets( 1311 agent: BskyAgent, 1312 twitterUsername: string, 1313 bskyIdentifier: string, 1314 tweets: Tweet[], 1315 dryRun = false, 1316 sharedProcessedMap?: ProcessedTweetsMap, 1317 sharedTweetMap?: Map<string, Tweet>, 1318): Promise<void> { 1319 // Filter tweets to ensure they're actually from this user 1320 const filteredTweets = tweets.filter((t) => { 1321 const authorScreenName = t.user?.screen_name?.toLowerCase(); 1322 if (authorScreenName && authorScreenName !== twitterUsername.toLowerCase()) { 1323 console.log( 1324 `[${twitterUsername}] ⏩ Skipping tweet ${t.id_str || t.id} - author is @${t.user?.screen_name}, not @${twitterUsername}`, 1325 ); 1326 return false; 1327 } 1328 return true; 1329 }); 1330 1331 const tweetMap = sharedTweetMap ?? new Map<string, Tweet>(); 1332 addTweetsToMap(tweetMap, filteredTweets); 1333 1334 // Maintain a local map that updates in real-time for intra-batch replies 1335 const localProcessedMap: ProcessedTweetsMap = 1336 sharedProcessedMap ?? { ...loadProcessedTweets(bskyIdentifier) }; 1337 1338 const toProcess = filteredTweets.filter((t) => !localProcessedMap[t.id_str || t.id || '']); 1339 1340 if (toProcess.length === 0) { 1341 console.log(`[${twitterUsername}] ✅ No new tweets to process for ${bskyIdentifier}.`); 1342 return; 1343 } 1344 1345 console.log(`[${twitterUsername}] 🚀 Processing ${toProcess.length} new tweets for ${bskyIdentifier}...`); 1346 1347 filteredTweets.reverse(); 1348 let count = 0; 1349 for (const tweet of filteredTweets) { 1350 count++; 1351 const tweetId = tweet.id_str || tweet.id; 1352 if (!tweetId) continue; 1353 1354 if (localProcessedMap[tweetId]) continue; 1355 1356 // Fallback to DB in case a nested backfill already saved this tweet. 1357 const dbRecord = dbService.getTweet(tweetId, bskyIdentifier); 1358 if (dbRecord) { 1359 localProcessedMap[tweetId] = { 1360 uri: dbRecord.bsky_uri, 1361 cid: dbRecord.bsky_cid, 1362 root: 1363 dbRecord.bsky_root_uri && dbRecord.bsky_root_cid 1364 ? { uri: dbRecord.bsky_root_uri, cid: dbRecord.bsky_root_cid } 1365 : undefined, 1366 tail: 1367 dbRecord.bsky_tail_uri && dbRecord.bsky_tail_cid 1368 ? { uri: dbRecord.bsky_tail_uri, cid: dbRecord.bsky_tail_cid } 1369 : undefined, 1370 migrated: dbRecord.status === 'migrated', 1371 skipped: dbRecord.status === 'skipped', 1372 }; 1373 continue; 1374 } 1375 1376 const isRetweet = tweet.isRetweet || tweet.retweeted_status_id_str || tweet.text?.startsWith('RT @'); 1377 1378 if (isRetweet) { 1379 console.log(`[${twitterUsername}] ⏩ Skipping retweet ${tweetId}.`); 1380 if (!dryRun) { 1381 // Save as skipped so we don't check it again 1382 saveProcessedTweet(twitterUsername, bskyIdentifier, tweetId, { skipped: true, text: tweet.text }); 1383 localProcessedMap[tweetId] = { skipped: true, text: tweet.text }; 1384 } 1385 continue; 1386 } 1387 1388 console.log(`\n[${twitterUsername}] 🔍 Inspecting tweet: ${tweetId}`); 1389 updateAppStatus({ 1390 state: 'processing', 1391 currentAccount: twitterUsername, 1392 processedCount: count, 1393 totalCount: filteredTweets.length, 1394 message: `Inspecting tweet ${tweetId}`, 1395 }); 1396 1397 const replyStatusId = tweet.in_reply_to_status_id_str || tweet.in_reply_to_status_id; 1398 const replyUserId = tweet.in_reply_to_user_id_str || tweet.in_reply_to_user_id; 1399 const tweetText = tweet.full_text || tweet.text || ''; 1400 const isReply = !!replyStatusId || !!replyUserId || tweetText.trim().startsWith('@'); 1401 1402 let replyParentInfo: ProcessedTweetEntry | null = null; 1403 1404 if (isReply) { 1405 if (replyStatusId && localProcessedMap[replyStatusId]) { 1406 console.log(`[${twitterUsername}] 🧵 Threading reply to post in ${bskyIdentifier}: ${replyStatusId}`); 1407 replyParentInfo = localProcessedMap[replyStatusId] ?? null; 1408 } else if (replyStatusId) { 1409 // Parent missing from local batch/DB. Attempt to fetch it if it's a self-thread. 1410 // We assume it's a self-thread if we don't have it, but we'll verify author after fetch. 1411 console.log(`[${twitterUsername}] 🕵️ Parent ${replyStatusId} missing. Checking if backfillable...`); 1412 1413 let parentBackfilled = false; 1414 try { 1415 const scraper = await getTwitterScraper(); 1416 if (scraper) { 1417 const parentRaw = await scraper.getTweet(replyStatusId); 1418 if (parentRaw) { 1419 const parentTweet = mapScraperTweetToLocalTweet(parentRaw); 1420 const parentAuthor = parentTweet.user?.screen_name; 1421 1422 if (parentAuthor?.toLowerCase() === twitterUsername.toLowerCase()) { 1423 console.log(`[${twitterUsername}] 🔄 Parent is ours (@${parentAuthor}). Backfilling parent first...`); 1424 addTweetsToMap(tweetMap, [parentTweet]); 1425 // Recursively process the parent 1426 await processTweets( 1427 agent, 1428 twitterUsername, 1429 bskyIdentifier, 1430 [parentTweet], 1431 dryRun, 1432 localProcessedMap, 1433 tweetMap, 1434 ); 1435 1436 // Check if it was saved 1437 const savedParent = dbService.getTweet(replyStatusId, bskyIdentifier); 1438 if (savedParent && savedParent.status === 'migrated') { 1439 // Update local map 1440 localProcessedMap[replyStatusId] = { 1441 uri: savedParent.bsky_uri, 1442 cid: savedParent.bsky_cid, 1443 root: 1444 savedParent.bsky_root_uri && savedParent.bsky_root_cid 1445 ? { uri: savedParent.bsky_root_uri, cid: savedParent.bsky_root_cid } 1446 : undefined, 1447 tail: 1448 savedParent.bsky_tail_uri && savedParent.bsky_tail_cid 1449 ? { uri: savedParent.bsky_tail_uri, cid: savedParent.bsky_tail_cid } 1450 : undefined, 1451 migrated: true, 1452 }; 1453 replyParentInfo = localProcessedMap[replyStatusId] ?? null; 1454 parentBackfilled = true; 1455 console.log(`[${twitterUsername}] ✅ Parent backfilled. Resuming thread.`); 1456 } 1457 } else { 1458 console.log(`[${twitterUsername}] ⏩ Parent is by @${parentAuthor}. Skipping external reply.`); 1459 } 1460 } 1461 } 1462 } catch (e) { 1463 console.warn(`[${twitterUsername}] ⚠️ Failed to fetch/backfill parent ${replyStatusId}:`, e); 1464 } 1465 1466 if (!parentBackfilled) { 1467 console.log(`[${twitterUsername}] ⏩ Skipping external/unknown reply (Parent not found or external).`); 1468 if (!dryRun) { 1469 saveProcessedTweet(twitterUsername, bskyIdentifier, tweetId, { skipped: true, text: tweetText }); 1470 localProcessedMap[tweetId] = { skipped: true, text: tweetText }; 1471 } 1472 continue; 1473 } 1474 } else { 1475 console.log(`[${twitterUsername}] ⏩ Skipping external/unknown reply.`); 1476 if (!dryRun) { 1477 saveProcessedTweet(twitterUsername, bskyIdentifier, tweetId, { skipped: true, text: tweetText }); 1478 localProcessedMap[tweetId] = { skipped: true, text: tweetText }; 1479 } 1480 continue; 1481 } 1482 } 1483 1484 // Removed early dryRun continue to allow verifying logic 1485 1486 let text = tweetText 1487 .replace(/&amp;/g, '&') 1488 .replace(/&lt;/g, '<') 1489 .replace(/&gt;/g, '>') 1490 .replace(/&quot;/g, '"') 1491 .replace(/&#39;/g, "'"); 1492 1493 // 1. Link Expansion 1494 console.log(`[${twitterUsername}] 🔗 Expanding links...`); 1495 const urls = tweet.entities?.urls || []; 1496 for (const urlEntity of urls) { 1497 const tco = urlEntity.url; 1498 const expanded = urlEntity.expanded_url; 1499 if (tco && expanded) text = text.replace(tco, expanded); 1500 } 1501 1502 // Fallback: Regex for t.co links (if entities failed or missed one) 1503 const tcoRegex = /https:\/\/t\.co\/[a-zA-Z0-9]+/g; 1504 const matches = text.match(tcoRegex) || []; 1505 for (const tco of matches) { 1506 // Avoid re-resolving if we already handled it via entities 1507 if (urls.some((u) => u.url === tco)) continue; 1508 1509 console.log(`[${twitterUsername}] 🔍 Resolving fallback link: ${tco}`); 1510 const resolved = await expandUrl(tco); 1511 if (resolved !== tco) { 1512 text = text.replace(tco, resolved); 1513 // Add to urls array so it can be used for card embedding later 1514 urls.push({ url: tco, expanded_url: resolved }); 1515 } 1516 } 1517 1518 const isSponsoredCard = detectSponsoredCard(tweet); 1519 if (isSponsoredCard) { 1520 console.log(`[${twitterUsername}] 🧩 Sponsored/card payload detected. Extracting carousel media...`); 1521 injectCardMedia(tweet); 1522 } else if (tweet.permanentUrl) { 1523 const syndication = await fetchSyndicationMedia(tweet.permanentUrl); 1524 if (syndication.images.length > 0) { 1525 console.log(`[${twitterUsername}] 🧩 Syndication carousel detected. Extracting media...`); 1526 injectSyndicationMedia(tweet, syndication); 1527 } 1528 } 1529 1530 // 2. Media Handling 1531 const images: ImageEmbed[] = []; 1532 let videoBlob: BlobRef | null = null; 1533 let videoAspectRatio: AspectRatio | undefined; 1534 const mediaEntities = tweet.extended_entities?.media || tweet.entities?.media || []; 1535 const mediaLinksToRemove: string[] = []; 1536 1537 console.log(`[${twitterUsername}] 🖼️ Found ${mediaEntities.length} media entities.`); 1538 1539 for (const media of mediaEntities) { 1540 if (media.url) { 1541 mediaLinksToRemove.push(media.url); 1542 if (media.expanded_url) mediaLinksToRemove.push(media.expanded_url); 1543 } 1544 if (media.source === 'card' && media.media_url_https) { 1545 mediaLinksToRemove.push(media.media_url_https); 1546 } 1547 1548 let aspectRatio: AspectRatio | undefined; 1549 if (media.sizes?.large) { 1550 aspectRatio = { width: media.sizes.large.w, height: media.sizes.large.h }; 1551 } else if (media.original_info) { 1552 aspectRatio = { width: media.original_info.width, height: media.original_info.height }; 1553 } 1554 1555 if (media.type === 'photo') { 1556 const url = media.media_url_https; 1557 if (!url) continue; 1558 try { 1559 const highQualityUrl = url.includes('?') ? url.replace('?', ':orig?') : url + ':orig'; 1560 console.log(`[${twitterUsername}] 📥 Downloading image (high quality): ${path.basename(highQualityUrl)}`); 1561 updateAppStatus({ message: `Downloading high quality image...` }); 1562 const { buffer, mimeType } = await downloadMedia(highQualityUrl); 1563 1564 let blob: BlobRef; 1565 if (dryRun) { 1566 console.log( 1567 `[${twitterUsername}] 🧪 [DRY RUN] Would upload image (${(buffer.length / 1024).toFixed(2)} KB)`, 1568 ); 1569 blob = { ref: { toString: () => 'mock-blob' }, mimeType, size: buffer.length } as any; 1570 } else { 1571 console.log(`[${twitterUsername}] 📤 Uploading image to Bluesky...`); 1572 updateAppStatus({ message: `Uploading image to Bluesky...` }); 1573 blob = await uploadToBluesky(agent, buffer, mimeType); 1574 } 1575 1576 let altText = media.ext_alt_text; 1577 if (!altText) { 1578 console.log(`[${twitterUsername}] 🤖 Generating alt text via Gemini...`); 1579 // Use original tweet text for context, not the modified/cleaned one 1580 const altTextContext = buildAltTextContext(tweet, tweetText, tweetMap); 1581 altText = await generateAltText(buffer, mimeType, altTextContext); 1582 if (altText) console.log(`[${twitterUsername}] ✅ Alt text generated: ${altText.substring(0, 50)}...`); 1583 } 1584 1585 images.push({ alt: altText || 'Image from Twitter', image: blob, aspectRatio }); 1586 console.log(`[${twitterUsername}] ✅ Image uploaded.`); 1587 } catch (err) { 1588 console.error(`[${twitterUsername}] ❌ High quality upload failed:`, (err as Error).message); 1589 try { 1590 console.log(`[${twitterUsername}] 🔄 Retrying with standard quality...`); 1591 updateAppStatus({ message: `Retrying with standard quality...` }); 1592 const { buffer, mimeType } = await downloadMedia(url); 1593 const blob = await uploadToBluesky(agent, buffer, mimeType); 1594 images.push({ alt: media.ext_alt_text || 'Image from Twitter', image: blob, aspectRatio }); 1595 console.log(`[${twitterUsername}] ✅ Image uploaded on retry.`); 1596 } catch (retryErr) { 1597 console.error(`[${twitterUsername}] ❌ Retry also failed:`, (retryErr as Error).message); 1598 } 1599 } 1600 } else if (media.type === 'video' || media.type === 'animated_gif') { 1601 const variants = media.video_info?.variants || []; 1602 const duration = media.video_info?.duration_millis || 0; 1603 1604 if (duration > 180000) { 1605 // 3 minutes 1606 console.warn(`[${twitterUsername}] ⚠️ Video too long (${(duration / 1000).toFixed(1)}s). Fallback to link.`); 1607 const tweetUrl = `https://twitter.com/${twitterUsername}/status/${tweetId}`; 1608 if (!text.includes(tweetUrl)) text += `\n\nVideo: ${tweetUrl}`; 1609 continue; 1610 } 1611 1612 const mp4s = variants 1613 .filter((v) => v.content_type === 'video/mp4') 1614 .sort((a, b) => (b.bitrate || 0) - (a.bitrate || 0)); 1615 1616 if (mp4s.length > 0) { 1617 const firstVariant = mp4s[0]; 1618 if (firstVariant) { 1619 const videoUrl = firstVariant.url; 1620 try { 1621 console.log(`[${twitterUsername}] 📥 Downloading video: ${videoUrl}`); 1622 updateAppStatus({ message: `Downloading video: ${path.basename(videoUrl)}` }); 1623 const { buffer, mimeType } = await downloadMedia(videoUrl); 1624 1625 if (buffer.length <= 90 * 1024 * 1024) { 1626 const filename = videoUrl.split('/').pop() || 'video.mp4'; 1627 if (dryRun) { 1628 console.log( 1629 `[${twitterUsername}] 🧪 [DRY RUN] Would upload video: ${filename} (${(buffer.length / 1024 / 1024).toFixed(2)} MB)`, 1630 ); 1631 videoBlob = { 1632 ref: { toString: () => 'mock-video-blob' }, 1633 mimeType: 'video/mp4', 1634 size: buffer.length, 1635 } as any; 1636 } else { 1637 updateAppStatus({ message: `Uploading video to Bluesky...` }); 1638 videoBlob = await uploadVideoToBluesky(agent, buffer, filename); 1639 } 1640 videoAspectRatio = aspectRatio; 1641 console.log(`[${twitterUsername}] ✅ Video upload process complete.`); 1642 break; // Prioritize first video 1643 } 1644 1645 console.warn( 1646 `[${twitterUsername}] ⚠️ Video too large (${(buffer.length / 1024 / 1024).toFixed(2)}MB). Fallback to link.`, 1647 ); 1648 const tweetUrl = `https://twitter.com/${twitterUsername}/status/${tweetId}`; 1649 if (!text.includes(tweetUrl)) text += `\n\nVideo: ${tweetUrl}`; 1650 } catch (err) { 1651 const errMsg = (err as Error).message; 1652 if (errMsg !== 'VIDEO_FALLBACK_503') { 1653 console.error(`[${twitterUsername}] ❌ Failed video upload flow:`, errMsg); 1654 } 1655 const tweetUrl = `https://twitter.com/${twitterUsername}/status/${tweetId}`; 1656 if (!text.includes(tweetUrl)) text += `\n\nVideo: ${tweetUrl}`; 1657 } 1658 } 1659 } 1660 } 1661 } 1662 1663 // Cleanup text 1664 for (const link of mediaLinksToRemove) text = text.split(link).join('').trim(); 1665 if (isSponsoredCard) { 1666 const cardLinks = detectCarouselLinks(tweet); 1667 const cardPrimaryLink = detectCardMedia(tweet).link; 1668 const requestedLinks = [cardPrimaryLink, ...cardLinks].filter(Boolean) as string[]; 1669 requestedLinks.forEach((link) => { 1670 if (!urls.some((u) => u.expanded_url === link || u.url === link)) { 1671 urls.push({ url: link, expanded_url: link }); 1672 } 1673 }); 1674 } 1675 text = text.replace(/\n\s*\n/g, '\n\n').trim(); 1676 text = addTextFallbacks(text); 1677 1678 // 3. Quoting Logic 1679 let quoteEmbed: { $type: string; record: { uri: string; cid: string } } | null = null; 1680 let externalQuoteUrl: string | null = null; 1681 let linkCard: any = null; 1682 1683 if (tweet.is_quote_status && tweet.quoted_status_id_str) { 1684 const quoteId = tweet.quoted_status_id_str; 1685 const quoteRef = localProcessedMap[quoteId]; 1686 if (quoteRef && !quoteRef.migrated && quoteRef.uri && quoteRef.cid) { 1687 console.log(`[${twitterUsername}] 🔄 Found quoted tweet in local history. Natively embedding.`); 1688 quoteEmbed = { $type: 'app.bsky.embed.record', record: { uri: quoteRef.uri, cid: quoteRef.cid } }; 1689 } else { 1690 const quoteUrlEntity = urls.find((u) => u.expanded_url?.includes(quoteId)); 1691 const qUrl = quoteUrlEntity?.expanded_url || `https://twitter.com/i/status/${quoteId}`; 1692 1693 // Check if it's a self-quote (same user) 1694 const isSelfQuote = 1695 qUrl.toLowerCase().includes(`twitter.com/${twitterUsername.toLowerCase()}/`) || 1696 qUrl.toLowerCase().includes(`x.com/${twitterUsername.toLowerCase()}/`); 1697 1698 if (!isSelfQuote) { 1699 externalQuoteUrl = qUrl; 1700 console.log(`[${twitterUsername}] 🔗 Quoted tweet is external: ${externalQuoteUrl}`); 1701 1702 // Try to capture screenshot for external QTs if we have space for images 1703 if (images.length < 4 && !videoBlob) { 1704 const ssResult = await captureTweetScreenshot(externalQuoteUrl); 1705 if (ssResult) { 1706 try { 1707 let blob: BlobRef; 1708 if (dryRun) { 1709 console.log( 1710 `[${twitterUsername}] 🧪 [DRY RUN] Would upload screenshot for quote (${(ssResult.buffer.length / 1024).toFixed(2)} KB)`, 1711 ); 1712 blob = { 1713 ref: { toString: () => 'mock-ss-blob' }, 1714 mimeType: 'image/png', 1715 size: ssResult.buffer.length, 1716 } as any; 1717 } else { 1718 blob = await uploadToBluesky(agent, ssResult.buffer, 'image/png'); 1719 } 1720 images.push({ 1721 alt: `Quote Tweet: ${externalQuoteUrl}`, 1722 image: blob, 1723 aspectRatio: { width: ssResult.width, height: ssResult.height }, 1724 }); 1725 } catch (e) { 1726 console.warn(`[${twitterUsername}] ⚠️ Failed to upload screenshot blob.`); 1727 } 1728 } 1729 } 1730 } else { 1731 console.log(`[${twitterUsername}] 🔁 Quoted tweet is a self-quote, skipping link.`); 1732 } 1733 } 1734 } else if ((images.length === 0 && !videoBlob) || isSponsoredCard) { 1735 // If no media and no quote, check for external links to embed 1736 // We prioritize the LAST link found as it's often the main content 1737 const potentialLinks = urls 1738 .map((u) => u.expanded_url) 1739 .filter((u) => u && !u.includes('twitter.com') && !u.includes('x.com')) as string[]; 1740 1741 if (potentialLinks.length > 0) { 1742 const linkToEmbed = potentialLinks[potentialLinks.length - 1]; 1743 if (linkToEmbed) { 1744 // Optimization: If text is too long, but removing the link makes it fit, do it! 1745 // The link will be present in the embed card anyway. 1746 if (text.length > 300 && text.includes(linkToEmbed)) { 1747 const lengthWithoutLink = text.length - linkToEmbed.length; 1748 // Allow some buffer (e.g. whitespace cleanup might save 1-2 chars) 1749 if (lengthWithoutLink <= 300) { 1750 console.log( 1751 `[${twitterUsername}] 📏 Optimizing: Removing link ${linkToEmbed} from text to avoid threading (Card will embed it).`, 1752 ); 1753 text = text.replace(linkToEmbed, '').trim(); 1754 // Clean up potential double punctuation/spaces left behind 1755 text = text.replace(/\s\.$/, '.').replace(/\s\s+/g, ' '); 1756 } 1757 } 1758 1759 console.log(`[${twitterUsername}] 🃏 Fetching link card for: ${linkToEmbed}`); 1760 linkCard = await fetchEmbedUrlCard(agent, linkToEmbed); 1761 } 1762 } 1763 } 1764 1765 // Only append link for external quotes IF we couldn't natively embed it OR screenshot it 1766 const hasScreenshot = images.some((img) => img.alt.startsWith('Quote Tweet:')); 1767 if (externalQuoteUrl && !quoteEmbed && !hasScreenshot && !text.includes(externalQuoteUrl)) { 1768 text += `\n\nQT: ${externalQuoteUrl}`; 1769 } 1770 1771 if (isSponsoredCard) { 1772 const hasCardImages = mediaEntities.some((media) => media.source === 'card'); 1773 if (hasCardImages) { 1774 text = ensureSponsoredLinks(text, tweet); 1775 } 1776 } 1777 1778 // 4. Threading and Posting 1779 const chunks = splitText(text); 1780 console.log(`[${twitterUsername}] 📝 Splitting text into ${chunks.length} chunks.`); 1781 1782 let lastPostInfo: ProcessedTweetEntry | null = replyParentInfo; 1783 1784 // We will save the first chunk as the "Root" of this tweet, and the last chunk as the "Tail". 1785 let firstChunkInfo: { uri: string; cid: string; root?: { uri: string; cid: string } } | null = null; 1786 let lastChunkInfo: { uri: string; cid: string; root?: { uri: string; cid: string } } | null = null; 1787 1788 for (let i = 0; i < chunks.length; i++) { 1789 let chunk = chunks[i] as string; 1790 1791 // Add (i/n) if split 1792 if (chunks.length > 1) { 1793 chunk += ` (${i + 1}/${chunks.length})`; 1794 } 1795 1796 console.log(`[${twitterUsername}] 📤 Posting chunk ${i + 1}/${chunks.length}...`); 1797 updateAppStatus({ message: `Posting chunk ${i + 1}/${chunks.length}...` }); 1798 1799 const rt = new RichText({ text: chunk }); 1800 await rt.detectFacets(agent); 1801 rt.facets = addTwitterHandleLinkFacets(rt.text, rt.facets); 1802 const detectedLangs = detectLanguage(chunk); 1803 1804 // Preserve original timing when available, but enforce monotonic per-account 1805 // timestamps to avoid equal-createdAt collisions in fast self-thread replies. 1806 const parsedCreatedAt = tweet.created_at ? Date.parse(tweet.created_at) : NaN; 1807 const baseCreatedAtMs = Number.isFinite(parsedCreatedAt) ? parsedCreatedAt : Date.now(); 1808 const chunkCreatedAtMs = baseCreatedAtMs + i * 1000; 1809 1810 // biome-ignore lint/suspicious/noExplicitAny: dynamic record construction 1811 const postRecord: Record<string, any> = { 1812 text: rt.text, 1813 facets: rt.facets, 1814 langs: detectedLangs, 1815 // CID is generated by the PDS from record content; unique createdAt keeps 1816 // near-simultaneous self-thread posts from colliding on identical payloads. 1817 createdAt: getUniqueCreatedAtIso(bskyIdentifier, chunkCreatedAtMs), 1818 }; 1819 1820 if (i === 0) { 1821 if (videoBlob) { 1822 const videoEmbed: any = { 1823 $type: 'app.bsky.embed.video', 1824 video: videoBlob, 1825 }; 1826 if (videoAspectRatio) videoEmbed.aspectRatio = videoAspectRatio; 1827 postRecord.embed = videoEmbed; 1828 } else if (images.length > 0) { 1829 const imagesEmbed = { $type: 'app.bsky.embed.images', images }; 1830 if (quoteEmbed) { 1831 postRecord.embed = { $type: 'app.bsky.embed.recordWithMedia', media: imagesEmbed, record: quoteEmbed }; 1832 } else { 1833 postRecord.embed = imagesEmbed; 1834 } 1835 } else if (quoteEmbed) { 1836 postRecord.embed = quoteEmbed; 1837 } else if (linkCard) { 1838 postRecord.embed = linkCard; 1839 } 1840 } 1841 1842 // Threading logic 1843 // Determine actual parent URI/CID to reply to 1844 let parentRef: { uri: string; cid: string } | null = null; 1845 let rootRef: { uri: string; cid: string } | null = null; 1846 1847 if (lastPostInfo?.uri && lastPostInfo?.cid) { 1848 // If this is the start of a new tweet (i=0), check if parent has a tail 1849 if (i === 0 && lastPostInfo.tail) { 1850 parentRef = lastPostInfo.tail; 1851 } else { 1852 // Otherwise (intra-tweet or parent has no tail), use the main uri/cid (which is the previous post/chunk) 1853 parentRef = { uri: lastPostInfo.uri, cid: lastPostInfo.cid }; 1854 } 1855 1856 rootRef = lastPostInfo.root || { uri: lastPostInfo.uri, cid: lastPostInfo.cid }; 1857 } 1858 1859 if (parentRef && rootRef) { 1860 postRecord.reply = { 1861 root: rootRef, 1862 parent: parentRef, 1863 }; 1864 } 1865 1866 try { 1867 // Retry logic for network/socket errors 1868 let response: any; 1869 let retries = 3; 1870 1871 if (dryRun) { 1872 console.log(`[${twitterUsername}] 🧪 [DRY RUN] Would post chunk ${i + 1}/${chunks.length}`); 1873 if (postRecord.embed) console.log(` - With embed: ${postRecord.embed.$type}`); 1874 if (postRecord.reply) console.log(` - As reply to: ${postRecord.reply.parent.uri}`); 1875 response = { uri: 'at://did:plc:mock/app.bsky.feed.post/mock', cid: 'mock-cid' }; 1876 } else { 1877 while (retries > 0) { 1878 try { 1879 response = await agent.post(postRecord); 1880 break; 1881 } catch (err: any) { 1882 retries--; 1883 if (retries === 0) throw err; 1884 console.warn( 1885 `[${twitterUsername}] ⚠️ Post failed (Socket/Network), retrying in 5s... (${retries} retries left)`, 1886 ); 1887 await new Promise((r) => setTimeout(r, 5000)); 1888 } 1889 } 1890 } 1891 1892 const currentPostInfo = { 1893 uri: response.uri, 1894 cid: response.cid, 1895 root: postRecord.reply ? postRecord.reply.root : { uri: response.uri, cid: response.cid }, 1896 // Text is just the current chunk text 1897 text: chunk, 1898 }; 1899 1900 if (i === 0) firstChunkInfo = currentPostInfo; 1901 lastChunkInfo = currentPostInfo; 1902 lastPostInfo = currentPostInfo; // Update for next iteration 1903 1904 console.log(`[${twitterUsername}] ✅ Chunk ${i + 1} posted successfully.`); 1905 1906 if (chunks.length > 1) { 1907 await new Promise((r) => setTimeout(r, 3000)); 1908 } 1909 } catch (err) { 1910 console.error(`[${twitterUsername}] ❌ Failed to post ${tweetId} (chunk ${i + 1}):`, err); 1911 break; 1912 } 1913 } 1914 1915 // Save to DB and Map 1916 if (firstChunkInfo && lastChunkInfo) { 1917 const entry: ProcessedTweetEntry = { 1918 uri: firstChunkInfo.uri, 1919 cid: firstChunkInfo.cid, 1920 root: firstChunkInfo.root, 1921 tail: { uri: lastChunkInfo.uri, cid: lastChunkInfo.cid }, // Save tail! 1922 text: tweetText, 1923 }; 1924 1925 if (!dryRun) { 1926 saveProcessedTweet(twitterUsername, bskyIdentifier, tweetId, entry); 1927 localProcessedMap[tweetId] = entry; // Update local map for subsequent replies in this batch 1928 } 1929 } 1930 1931 // Add a random delay between 5s and 15s to be more human-like 1932 const wait = Math.floor(Math.random() * 10000) + 5000; 1933 console.log(`[${twitterUsername}] 😴 Pacing: Waiting ${wait / 1000}s before next tweet.`); 1934 updateAppStatus({ state: 'pacing', message: `Pacing: Waiting ${wait / 1000}s...` }); 1935 await new Promise((r) => setTimeout(r, wait)); 1936 } 1937} 1938 1939import { getAgent } from './bsky.js'; 1940 1941async function importHistory( 1942 twitterUsername: string, 1943 bskyIdentifier: string, 1944 limit = 15, 1945 dryRun = false, 1946 ignoreCancellation = false, 1947 requestId?: string, 1948): Promise<void> { 1949 const config = getConfig(); 1950 const mapping = config.mappings.find((m) => 1951 m.twitterUsernames.map((u) => u.toLowerCase()).includes(twitterUsername.toLowerCase()), 1952 ); 1953 if (!mapping) { 1954 console.error(`No mapping found for twitter username: ${twitterUsername}`); 1955 return; 1956 } 1957 1958 let agent = await getAgent(mapping); 1959 if (!agent) { 1960 if (dryRun) { 1961 console.log('⚠️ Could not login to Bluesky, but proceeding with MOCK AGENT for Dry Run.'); 1962 // biome-ignore lint/suspicious/noExplicitAny: mock agent 1963 agent = { 1964 post: async (record: any) => ({ uri: 'at://did:plc:mock/app.bsky.feed.post/mock', cid: 'mock-cid' }), 1965 uploadBlob: async (data: any) => ({ data: { blob: { ref: { toString: () => 'mock-blob' } } } }), 1966 // Add other necessary methods if they are called outside of the already mocked dryRun blocks 1967 // But since we mocked the calls inside processTweets for dryRun, we just need the object to exist. 1968 session: { did: 'did:plc:mock' }, 1969 com: { atproto: { repo: { describeRepo: async () => ({ data: {} }) } } }, 1970 } as any; 1971 } else { 1972 return; 1973 } 1974 } 1975 1976 console.log(`Starting full history import for ${twitterUsername} -> ${mapping.bskyIdentifier}...`); 1977 1978 const allFoundTweets: Tweet[] = []; 1979 const seenIds = new Set<string>(); 1980 const processedTweets = loadProcessedTweets(bskyIdentifier); 1981 1982 console.log(`Fetching tweets for ${twitterUsername}...`); 1983 updateAppStatus({ message: `Fetching tweets...` }); 1984 1985 const client = await getTwitterScraper(); 1986 if (client) { 1987 try { 1988 // Use getTweets which reliably fetches user timeline 1989 // limit defaults to 15 in function signature, but for history import we might want more. 1990 // However, the generator will fetch as much as we ask. 1991 const fetchLimit = limit || 100; 1992 const generator = client.getTweets(twitterUsername, fetchLimit); 1993 1994 for await (const scraperTweet of generator) { 1995 if (!ignoreCancellation) { 1996 const stillPending = getPendingBackfills().some( 1997 (b) => b.id === mapping.id && (!requestId || b.requestId === requestId), 1998 ); 1999 if (!stillPending) { 2000 console.log(`[${twitterUsername}] 🛑 Backfill cancelled.`); 2001 break; 2002 } 2003 } 2004 2005 const t = mapScraperTweetToLocalTweet(scraperTweet); 2006 const tid = t.id_str || t.id; 2007 if (!tid) continue; 2008 2009 if (!processedTweets[tid] && !seenIds.has(tid)) { 2010 allFoundTweets.push(t); 2011 seenIds.add(tid); 2012 } 2013 2014 if (allFoundTweets.length >= fetchLimit) break; 2015 } 2016 } catch (e) { 2017 console.warn('Error during history fetch:', e); 2018 } 2019 } 2020 2021 console.log(`Fetch complete. Found ${allFoundTweets.length} new tweets to import.`); 2022 if (allFoundTweets.length > 0) { 2023 await processTweets(agent as BskyAgent, twitterUsername, bskyIdentifier, allFoundTweets, dryRun); 2024 console.log('History import complete.'); 2025 } 2026} 2027 2028// Task management 2029const activeTasks = new Map<string, Promise<void>>(); 2030 2031async function runAccountTask(mapping: AccountMapping, backfillRequest?: PendingBackfill, dryRun = false) { 2032 if (activeTasks.has(mapping.id)) return; // Already running 2033 2034 const task = (async () => { 2035 try { 2036 const agent = await getAgent(mapping); 2037 if (!agent) return; 2038 2039 const backfillReq = backfillRequest ?? getPendingBackfills().find((b) => b.id === mapping.id); 2040 const explicitBackfill = Boolean(backfillRequest); 2041 2042 if (backfillReq) { 2043 const limit = backfillReq.limit || 15; 2044 console.log( 2045 `[${mapping.bskyIdentifier}] Running backfill for ${mapping.twitterUsernames.length} accounts (limit ${limit})...`, 2046 ); 2047 updateAppStatus({ 2048 state: 'backfilling', 2049 currentAccount: mapping.twitterUsernames[0], 2050 message: `Starting backfill (limit ${limit})...`, 2051 backfillMappingId: mapping.id, 2052 backfillRequestId: backfillReq.requestId, 2053 }); 2054 2055 for (const twitterUsername of mapping.twitterUsernames) { 2056 const stillPending = explicitBackfill 2057 ? true 2058 : getPendingBackfills().some((b) => b.id === mapping.id && b.requestId === backfillReq.requestId); 2059 if (!stillPending) { 2060 console.log(`[${mapping.bskyIdentifier}] 🛑 Backfill request replaced; stopping.`); 2061 break; 2062 } 2063 2064 try { 2065 updateAppStatus({ 2066 state: 'backfilling', 2067 currentAccount: twitterUsername, 2068 message: `Starting backfill (limit ${limit})...`, 2069 backfillMappingId: mapping.id, 2070 backfillRequestId: backfillReq.requestId, 2071 }); 2072 await importHistory(twitterUsername, mapping.bskyIdentifier, limit, dryRun, false, backfillReq.requestId); 2073 } catch (err) { 2074 console.error(`❌ Error backfilling ${twitterUsername}:`, err); 2075 } 2076 } 2077 clearBackfill(mapping.id, backfillReq.requestId); 2078 updateAppStatus({ 2079 state: 'idle', 2080 message: `Backfill complete for ${mapping.bskyIdentifier}`, 2081 backfillMappingId: undefined, 2082 backfillRequestId: undefined, 2083 }); 2084 console.log(`[${mapping.bskyIdentifier}] Backfill complete.`); 2085 } else { 2086 updateAppStatus({ backfillMappingId: undefined, backfillRequestId: undefined }); 2087 2088 // Pre-load processed IDs for optimization 2089 const processedMap = loadProcessedTweets(mapping.bskyIdentifier); 2090 const processedIds = new Set(Object.keys(processedMap)); 2091 2092 for (const twitterUsername of mapping.twitterUsernames) { 2093 try { 2094 console.log(`[${twitterUsername}] 🏁 Starting check for new tweets...`); 2095 updateAppStatus({ 2096 state: 'checking', 2097 currentAccount: twitterUsername, 2098 message: 'Fetching latest tweets...', 2099 backfillMappingId: undefined, 2100 backfillRequestId: undefined, 2101 }); 2102 2103 // Use fetchUserTweets with early stopping optimization 2104 // Increase limit slightly since we have early stopping now 2105 const tweets = await fetchUserTweets(twitterUsername, 50, processedIds); 2106 2107 if (!tweets || tweets.length === 0) { 2108 console.log(`[${twitterUsername}] ℹ️ No tweets found (or fetch failed).`); 2109 continue; 2110 } 2111 2112 console.log(`[${twitterUsername}] 📥 Fetched ${tweets.length} tweets.`); 2113 await processTweets(agent, twitterUsername, mapping.bskyIdentifier, tweets, dryRun); 2114 } catch (err) { 2115 console.error(`❌ Error checking ${twitterUsername}:`, err); 2116 } 2117 } 2118 } 2119 } catch (err) { 2120 console.error(`Error processing mapping ${mapping.bskyIdentifier}:`, err); 2121 } finally { 2122 activeTasks.delete(mapping.id); 2123 } 2124 })(); 2125 2126 activeTasks.set(mapping.id, task); 2127 return task; // Return task promise for await in main loop 2128} 2129 2130import type { AccountMapping } from './config-manager.js'; 2131import { 2132 clearBackfill, 2133 getNextCheckTime, 2134 getPendingBackfills, 2135 startServer, 2136 updateAppStatus, 2137 updateLastCheckTime, 2138} from './server.js'; 2139import type { PendingBackfill } from './server.js'; 2140 2141async function main(): Promise<void> { 2142 const program = new Command(); 2143 program 2144 .name('tweets-2-bsky') 2145 // ... existing options ... 2146 .description('Crosspost tweets to Bluesky') 2147 .option('--dry-run', 'Fetch tweets but do not post to Bluesky', false) 2148 .option('--no-web', 'Disable the web interface') 2149 .option('--run-once', 'Run one check cycle immediately and exit', false) 2150 .option('--backfill-mapping <mapping>', 'Run backfill now for a mapping id/handle/twitter username') 2151 .option('--backfill-limit <number>', 'Limit for --backfill-mapping', (val) => Number.parseInt(val, 10)) 2152 .option('--import-history', 'Run in history import mode') 2153 .option('--username <username>', 'Twitter username for history import') 2154 .option('--limit <number>', 'Limit the number of tweets to import', (val) => Number.parseInt(val, 10)) 2155 .parse(process.argv); 2156 2157 const options = program.opts(); 2158 2159 const config = getConfig(); 2160 2161 await migrateJsonToSqlite(); 2162 2163 if (!options.web) { 2164 console.log('🌐 Web interface is disabled.'); 2165 } else { 2166 startServer(); 2167 if (config.users.length === 0) { 2168 console.log('ℹ️ No users found. Please register on the web interface to get started.'); 2169 } 2170 } 2171 2172 if (options.importHistory) { 2173 // ... existing import history logic ... 2174 if (!options.username) { 2175 console.error('Please specify a username with --username <username>'); 2176 process.exit(1); 2177 } 2178 const client = await getTwitterScraper(); 2179 if (!client) { 2180 console.error('Twitter credentials not set. Cannot import history.'); 2181 process.exit(1); 2182 } 2183 const mapping = config.mappings.find((m) => 2184 m.twitterUsernames.map((u) => u.toLowerCase()).includes(options.username.toLowerCase()), 2185 ); 2186 if (!mapping) { 2187 console.error(`No mapping found for ${options.username}`); 2188 process.exit(1); 2189 } 2190 await importHistory(options.username, mapping.bskyIdentifier, options.limit, options.dryRun, true); 2191 process.exit(0); 2192 } 2193 2194 const findMappingById = (mappings: AccountMapping[], id: string) => mappings.find((mapping) => mapping.id === id); 2195 const normalizeHandle = (value: string) => value.trim().replace(/^@/, '').toLowerCase(); 2196 const findMappingByRef = (mappings: AccountMapping[], ref: string) => { 2197 const needle = normalizeHandle(ref); 2198 return mappings.find( 2199 (mapping) => 2200 mapping.id === ref || 2201 normalizeHandle(mapping.bskyIdentifier) === needle || 2202 mapping.twitterUsernames.some((username) => normalizeHandle(username) === needle), 2203 ); 2204 }; 2205 2206 const runSingleCycle = async (cycleConfig: ReturnType<typeof getConfig>) => { 2207 const runLimit = pLimit(3); 2208 const tasks: Promise<void>[] = []; 2209 2210 if (options.backfillMapping) { 2211 const mapping = findMappingByRef(cycleConfig.mappings, options.backfillMapping); 2212 if (!mapping) { 2213 console.error(`No mapping found for '${options.backfillMapping}'.`); 2214 process.exit(1); 2215 } 2216 if (!mapping.enabled) { 2217 console.error(`Mapping '${mapping.bskyIdentifier}' is disabled.`); 2218 process.exit(1); 2219 } 2220 2221 const requestId = `cli-${Date.now()}`; 2222 const backfillRequest: PendingBackfill = { 2223 id: mapping.id, 2224 limit: options.backfillLimit || options.limit || 15, 2225 queuedAt: Date.now(), 2226 sequence: 0, 2227 requestId, 2228 }; 2229 2230 console.log(`[CLI] 🚧 Running backfill for ${mapping.bskyIdentifier}...`); 2231 await runAccountTask(mapping, backfillRequest, options.dryRun); 2232 updateAppStatus({ state: 'idle', message: `Backfill complete for ${mapping.bskyIdentifier}` }); 2233 return; 2234 } 2235 2236 for (const mapping of cycleConfig.mappings) { 2237 if (!mapping.enabled) continue; 2238 tasks.push( 2239 runLimit(async () => { 2240 await runAccountTask(mapping, undefined, options.dryRun); 2241 }), 2242 ); 2243 } 2244 2245 if (tasks.length === 0) { 2246 console.log('[CLI] No enabled mappings found.'); 2247 updateAppStatus({ state: 'idle', message: 'No enabled mappings found' }); 2248 return; 2249 } 2250 2251 await Promise.all(tasks); 2252 updateAppStatus({ state: 'idle', message: options.dryRun ? 'Dry run cycle complete' : 'Run-once cycle complete' }); 2253 }; 2254 2255 if (options.runOnce || options.backfillMapping || options.dryRun) { 2256 await runSingleCycle(getConfig()); 2257 console.log(options.dryRun ? 'Dry run cycle complete. Exiting.' : 'Run-once cycle complete. Exiting.'); 2258 process.exit(0); 2259 } 2260 2261 console.log(`Scheduler started. Base interval: ${config.checkIntervalMinutes} minutes.`); 2262 updateLastCheckTime(); // Initialize next time 2263 2264 // Concurrency limit for processing accounts 2265 const runLimit = pLimit(3); 2266 2267 // Main loop 2268 while (true) { 2269 const now = Date.now(); 2270 const config = getConfig(); // Reload config to get new mappings/settings 2271 const nextTime = getNextCheckTime(); 2272 2273 // Check if it's time for a scheduled run OR if we have pending backfills 2274 const isScheduledRun = now >= nextTime; 2275 const pendingBackfills = getPendingBackfills(); 2276 2277 if (isScheduledRun) { 2278 console.log(`[${new Date().toISOString()}] ⏰ Scheduled check triggered.`); 2279 updateLastCheckTime(); 2280 } 2281 2282 const tasks: Promise<void>[] = []; 2283 2284 if (pendingBackfills.length > 0) { 2285 const [nextBackfill, ...rest] = pendingBackfills; 2286 if (nextBackfill) { 2287 const mapping = findMappingById(config.mappings, nextBackfill.id); 2288 if (mapping && mapping.enabled) { 2289 console.log(`[Scheduler] 🚧 Backfill priority: ${mapping.bskyIdentifier}`); 2290 await runAccountTask(mapping, nextBackfill, options.dryRun); 2291 } else { 2292 clearBackfill(nextBackfill.id, nextBackfill.requestId); 2293 } 2294 } 2295 if (pendingBackfills.length === 0 && getPendingBackfills().length === 0) { 2296 updateAppStatus({ 2297 state: 'idle', 2298 message: 'Backfill queue empty', 2299 backfillMappingId: undefined, 2300 backfillRequestId: undefined, 2301 }); 2302 } 2303 updateLastCheckTime(); 2304 } else if (isScheduledRun) { 2305 for (const mapping of config.mappings) { 2306 if (!mapping.enabled) continue; 2307 2308 tasks.push( 2309 runLimit(async () => { 2310 await runAccountTask(mapping, undefined, options.dryRun); 2311 }), 2312 ); 2313 } 2314 2315 if (tasks.length > 0) { 2316 await Promise.all(tasks); 2317 console.log(`[Scheduler] ✅ All tasks for this cycle complete.`); 2318 } 2319 2320 updateAppStatus({ state: 'idle', message: 'Scheduled checks complete' }); 2321 } 2322 2323 // Sleep for 5 seconds 2324 await new Promise((resolve) => setTimeout(resolve, 5000)); 2325 } 2326} 2327 2328main();