forked from
j4ck.xyz/tweets2bsky
A simple tool which lets you scrape twitter accounts and crosspost them to bluesky accounts! Comes with a CLI and a webapp for managing profiles! Works with images/videos/link embeds/threads.
1import 'dotenv/config';
2import fs from 'node:fs';
3import path from 'node:path';
4import { fileURLToPath } from 'node:url';
5import { type BskyAgent, RichText } from '@atproto/api';
6import type { BlobRef } from '@atproto/api';
7import { Scraper } from '@the-convocation/twitter-scraper';
8import type { Tweet as ScraperTweet } from '@the-convocation/twitter-scraper';
9import axios from 'axios';
10import * as cheerio from 'cheerio';
11import { Command } from 'commander';
12import * as francModule from 'franc-min';
13import iso6391 from 'iso-639-1';
14import puppeteer from 'puppeteer-core';
15import sharp from 'sharp';
16import { generateAltText } from './ai-manager.js';
17
18import { getConfig } from './config-manager.js';
19
20// ESM __dirname equivalent
21const __filename = fileURLToPath(import.meta.url);
22const __dirname = path.dirname(__filename);
23
24// ============================================================================
25// Type Definitions
26// ============================================================================
27
28interface ProcessedTweetEntry {
29 uri?: string;
30 cid?: string;
31 root?: { uri: string; cid: string };
32 tail?: { uri: string; cid: string };
33 migrated?: boolean;
34 skipped?: boolean;
35 text?: string;
36}
37
38interface ProcessedTweetsMap {
39 [twitterId: string]: ProcessedTweetEntry;
40}
41
42interface UrlEntity {
43 url?: string;
44 expanded_url?: string;
45}
46
47interface CardImageValue {
48 url?: string;
49 width?: number;
50 height?: number;
51 alt?: string;
52}
53
54interface CardBindingValue {
55 type?: string;
56 string_value?: string;
57 image_value?: CardImageValue;
58}
59
60interface CardBindingEntry {
61 key?: string;
62 value?: CardBindingValue;
63}
64
65type CardBindingValues = Record<string, CardBindingValue> | CardBindingEntry[];
66
67interface TweetCard {
68 name?: string;
69 binding_values?: CardBindingValues;
70 url?: string;
71}
72
73interface MediaSize {
74 w: number;
75 h: number;
76}
77
78interface MediaSizes {
79 large?: MediaSize;
80}
81
82interface OriginalInfo {
83 width: number;
84 height: number;
85}
86
87interface VideoVariant {
88 content_type: string;
89 url: string;
90 bitrate?: number;
91}
92
93interface VideoInfo {
94 variants?: VideoVariant[];
95 duration_millis?: number;
96}
97
98interface MediaEntity {
99 url?: string;
100 expanded_url?: string;
101 media_url_https?: string;
102 type?: 'photo' | 'video' | 'animated_gif';
103 ext_alt_text?: string;
104 sizes?: MediaSizes;
105 original_info?: OriginalInfo;
106 video_info?: VideoInfo;
107 source?: 'tweet' | 'card';
108}
109
110interface TweetEntities {
111 urls?: UrlEntity[];
112 media?: MediaEntity[];
113}
114
115interface Tweet {
116 id?: string;
117 id_str?: string;
118 text?: string;
119 full_text?: string;
120 created_at?: string;
121 entities?: TweetEntities;
122 extended_entities?: TweetEntities;
123 quoted_status_id_str?: string;
124 retweeted_status_id_str?: string;
125 is_quote_status?: boolean;
126 in_reply_to_status_id_str?: string;
127 in_reply_to_status_id?: string;
128 in_reply_to_user_id_str?: string;
129 in_reply_to_user_id?: string;
130 isRetweet?: boolean;
131 user?: {
132 screen_name?: string;
133 id_str?: string;
134 };
135 card?: TweetCard | null;
136 permanentUrl?: string;
137}
138
139interface AspectRatio {
140 width: number;
141 height: number;
142}
143
144interface ImageEmbed {
145 alt: string;
146 image: BlobRef;
147 aspectRatio?: AspectRatio;
148}
149
150import { dbService } from './db.js';
151
152// ============================================================================
153// State Management
154// ============================================================================
155
156const PROCESSED_DIR = path.join(__dirname, '..', 'processed');
157
158async function migrateJsonToSqlite() {
159 if (!fs.existsSync(PROCESSED_DIR)) return;
160
161 const files = fs.readdirSync(PROCESSED_DIR).filter((f) => f.endsWith('.json'));
162 if (files.length === 0) return;
163
164 console.log(`📦 Found ${files.length} legacy cache files. Migrating to SQLite...`);
165 const config = getConfig();
166
167 for (const file of files) {
168 const username = file.replace('.json', '').toLowerCase();
169 // Try to find a matching bskyIdentifier from config
170 const mapping = config.mappings.find((m) => m.twitterUsernames.map((u) => u.toLowerCase()).includes(username));
171 const bskyIdentifier = mapping?.bskyIdentifier || 'unknown';
172
173 try {
174 const filePath = path.join(PROCESSED_DIR, file);
175 const data = JSON.parse(fs.readFileSync(filePath, 'utf8')) as ProcessedTweetsMap;
176
177 for (const [twitterId, entry] of Object.entries(data)) {
178 dbService.saveTweet({
179 twitter_id: twitterId,
180 twitter_username: username,
181 bsky_identifier: bskyIdentifier,
182 bsky_uri: entry.uri,
183 bsky_cid: entry.cid,
184 bsky_root_uri: entry.root?.uri,
185 bsky_root_cid: entry.root?.cid,
186 status: entry.migrated ? 'migrated' : entry.skipped ? 'skipped' : 'failed',
187 });
188 }
189 // Move file to backup
190 const backupDir = path.join(PROCESSED_DIR, 'backup');
191 if (!fs.existsSync(backupDir)) fs.mkdirSync(backupDir);
192 fs.renameSync(filePath, path.join(backupDir, file));
193 } catch (err) {
194 console.error(`❌ Failed to migrate ${file}:`, err);
195 }
196 }
197
198 // REPAIR STEP: Fix any 'unknown' records in SQLite that came from the broken schema migration
199 for (const mapping of config.mappings) {
200 for (const username of mapping.twitterUsernames) {
201 dbService.repairUnknownIdentifiers(username, mapping.bskyIdentifier);
202 }
203 }
204
205 console.log('✅ Migration complete.');
206}
207
208function loadProcessedTweets(bskyIdentifier: string): ProcessedTweetsMap {
209 return dbService.getTweetsByBskyIdentifier(bskyIdentifier);
210}
211
212function saveProcessedTweet(
213 twitterUsername: string,
214 bskyIdentifier: string,
215 twitterId: string,
216 entry: ProcessedTweetEntry,
217): void {
218 dbService.saveTweet({
219 twitter_id: twitterId,
220 twitter_username: twitterUsername.toLowerCase(),
221 bsky_identifier: bskyIdentifier.toLowerCase(),
222 tweet_text: entry.text,
223 bsky_uri: entry.uri,
224 bsky_cid: entry.cid,
225 bsky_root_uri: entry.root?.uri,
226 bsky_root_cid: entry.root?.cid,
227 bsky_tail_uri: entry.tail?.uri,
228 bsky_tail_cid: entry.tail?.cid,
229 status: entry.migrated || (entry.uri && entry.cid) ? 'migrated' : entry.skipped ? 'skipped' : 'failed',
230 });
231}
232
233// ============================================================================
234// Custom Twitter Client
235// ============================================================================
236
237let scraper: Scraper | null = null;
238let currentTwitterCookies = { authToken: '', ct0: '' };
239let useBackupCredentials = false;
240const lastCreatedAtByBsky = new Map<string, number>();
241
242function getUniqueCreatedAtIso(bskyIdentifier: string, desiredMs: number): string {
243 const key = bskyIdentifier.toLowerCase();
244 const lastMs = lastCreatedAtByBsky.get(key) ?? Number.MIN_SAFE_INTEGER;
245 const nextMs = Math.max(desiredMs, lastMs + 1);
246 lastCreatedAtByBsky.set(key, nextMs);
247 return new Date(nextMs).toISOString();
248}
249
250async function getTwitterScraper(forceReset = false): Promise<Scraper | null> {
251 const config = getConfig();
252 let authToken = config.twitter.authToken;
253 let ct0 = config.twitter.ct0;
254
255 // Use backup if toggled
256 if (useBackupCredentials && config.twitter.backupAuthToken && config.twitter.backupCt0) {
257 authToken = config.twitter.backupAuthToken;
258 ct0 = config.twitter.backupCt0;
259 }
260
261 if (!authToken || !ct0) return null;
262
263 // Re-initialize if config changed, not yet initialized, or forced reset
264 if (!scraper || forceReset || currentTwitterCookies.authToken !== authToken || currentTwitterCookies.ct0 !== ct0) {
265 console.log(`🔄 Initializing Twitter scraper with ${useBackupCredentials ? 'BACKUP' : 'PRIMARY'} credentials...`);
266 scraper = new Scraper();
267 await scraper.setCookies([`auth_token=${authToken}`, `ct0=${ct0}`]);
268
269 currentTwitterCookies = {
270 authToken: authToken,
271 ct0: ct0,
272 };
273 }
274 return scraper;
275}
276
277async function switchCredentials() {
278 const config = getConfig();
279 if (config.twitter.backupAuthToken && config.twitter.backupCt0) {
280 useBackupCredentials = !useBackupCredentials;
281 console.log(`⚠️ Switching to ${useBackupCredentials ? 'BACKUP' : 'PRIMARY'} Twitter credentials...`);
282 await getTwitterScraper(true);
283 return true;
284 }
285 console.log('⚠️ No backup credentials available to switch to.');
286 return false;
287}
288
289function mapScraperTweetToLocalTweet(scraperTweet: ScraperTweet): Tweet {
290 const raw = scraperTweet.__raw_UNSTABLE;
291 if (!raw) {
292 // Fallback if raw data is missing (shouldn't happen for timeline tweets usually)
293 return {
294 id: scraperTweet.id,
295 id_str: scraperTweet.id,
296 text: scraperTweet.text,
297 full_text: scraperTweet.text,
298 isRetweet: scraperTweet.isRetweet,
299 // Construct minimal entities from parsed data
300 entities: {
301 urls: scraperTweet.urls.map((url: string) => ({ url, expanded_url: url })),
302 media: scraperTweet.photos.map((p: any) => ({
303 url: p.url,
304 expanded_url: p.url,
305 media_url_https: p.url,
306 type: 'photo',
307 ext_alt_text: p.alt_text,
308 })),
309 },
310 created_at: scraperTweet.timeParsed?.toUTCString(),
311 permanentUrl: scraperTweet.permanentUrl,
312 };
313 }
314
315 return {
316 id: raw.id_str,
317 id_str: raw.id_str,
318 text: raw.full_text,
319 full_text: raw.full_text,
320 created_at: raw.created_at,
321 isRetweet: scraperTweet.isRetweet,
322 // biome-ignore lint/suspicious/noExplicitAny: raw types match compatible structure
323 entities: raw.entities as any,
324 // biome-ignore lint/suspicious/noExplicitAny: raw types match compatible structure
325 extended_entities: raw.extended_entities as any,
326 quoted_status_id_str: raw.quoted_status_id_str,
327 retweeted_status_id_str: raw.retweeted_status_id_str,
328 is_quote_status: !!raw.quoted_status_id_str,
329 in_reply_to_status_id_str: raw.in_reply_to_status_id_str,
330 // biome-ignore lint/suspicious/noExplicitAny: missing in LegacyTweetRaw type
331 in_reply_to_user_id_str: (raw as any).in_reply_to_user_id_str,
332 // biome-ignore lint/suspicious/noExplicitAny: card comes from raw tweet
333 card: (raw as any).card,
334 permanentUrl: scraperTweet.permanentUrl,
335 user: {
336 screen_name: scraperTweet.username,
337 id_str: scraperTweet.userId,
338 },
339 };
340}
341
342// ============================================================================
343// Helper Functions
344// ============================================================================
345
346function normalizeCardBindings(bindingValues?: CardBindingValues): Record<string, CardBindingValue> {
347 if (!bindingValues) return {};
348 if (Array.isArray(bindingValues)) {
349 return bindingValues.reduce(
350 (acc, entry) => {
351 if (entry?.key && entry.value) acc[entry.key] = entry.value;
352 return acc;
353 },
354 {} as Record<string, CardBindingValue>,
355 );
356 }
357 return bindingValues as Record<string, CardBindingValue>;
358}
359
360function isLikelyUrl(value?: string): value is string {
361 if (!value) return false;
362 return /^https?:\/\//i.test(value);
363}
364
365function extractCardImageUrl(bindingValues: CardBindingValues, preferredKeys: string[]): string | undefined {
366 const normalized = normalizeCardBindings(bindingValues);
367 for (const key of preferredKeys) {
368 const value = normalized[key];
369 const imageUrl = value?.image_value?.url;
370 if (imageUrl) return imageUrl;
371 }
372 const fallbackValue = Object.values(normalized).find((value) => value?.image_value?.url);
373 return fallbackValue?.image_value?.url;
374}
375
376function extractCardLink(bindingValues: CardBindingValues, preferredKeys: string[]): string | undefined {
377 const normalized = normalizeCardBindings(bindingValues);
378 for (const key of preferredKeys) {
379 const value = normalized[key];
380 const link = value?.string_value;
381 if (isLikelyUrl(link)) return link;
382 }
383 const fallbackValue = Object.values(normalized).find((value) => isLikelyUrl(value?.string_value));
384 return fallbackValue?.string_value;
385}
386
387function extractCardTitle(bindingValues: CardBindingValues, preferredKeys: string[]): string | undefined {
388 const normalized = normalizeCardBindings(bindingValues);
389 for (const key of preferredKeys) {
390 const value = normalized[key];
391 const title = value?.string_value;
392 if (title && !isLikelyUrl(title)) return title;
393 }
394 const fallbackValue = Object.values(normalized).find(
395 (value) => value?.string_value && !isLikelyUrl(value?.string_value),
396 );
397 return fallbackValue?.string_value;
398}
399
400function extractCardAlt(bindingValues: CardBindingValues): string | undefined {
401 const normalized = normalizeCardBindings(bindingValues);
402 const altValue = Object.values(normalized).find((value) => value?.image_value?.alt);
403 return altValue?.image_value?.alt;
404}
405
406function appendCallToAction(text: string, link?: string, label = 'Sponsored') {
407 if (!link) return text;
408 if (text.includes(link)) return text;
409 return `${text}\n\n${label}: ${link}`.trim();
410}
411
412function detectCardMedia(tweet: Tweet): { imageUrls: string[]; link?: string; title?: string; alt?: string } {
413 if (!tweet.card?.binding_values) return { imageUrls: [] };
414 const bindings = tweet.card.binding_values;
415
416 const imageUrls: string[] = [];
417 const preferredImageKeys = [
418 'photo_image_full_size',
419 'photo_image_full_size_original',
420 'thumbnail_image',
421 'image',
422 'thumbnail',
423 'summary_photo_image',
424 'player_image',
425 ];
426 const preferredLinkKeys = ['site', 'destination', 'landing_url', 'cta_link', 'card_url', 'url'];
427 const preferredTitleKeys = ['title', 'summary', 'card_title'];
428
429 const primaryImage = extractCardImageUrl(bindings, preferredImageKeys);
430 if (primaryImage) imageUrls.push(primaryImage);
431
432 const imageKeys = normalizeCardBindings(bindings);
433 Object.values(imageKeys).forEach((value) => {
434 const url = value?.image_value?.url;
435 if (url && !imageUrls.includes(url)) imageUrls.push(url);
436 });
437
438 const link = extractCardLink(bindings, preferredLinkKeys);
439 const title = extractCardTitle(bindings, preferredTitleKeys);
440 const alt = extractCardAlt(bindings);
441
442 return { imageUrls, link, title, alt };
443}
444
445function buildCardMediaEntities(tweet: Tweet): { media: MediaEntity[]; link?: string } {
446 const cardData = detectCardMedia(tweet);
447 if (cardData.imageUrls.length === 0) return { media: [] };
448
449 const media = cardData.imageUrls.slice(0, 4).map((url) => ({
450 media_url_https: url,
451 type: 'photo' as const,
452 ext_alt_text: cardData.alt || cardData.title || 'Sponsored image',
453 source: 'card' as const,
454 }));
455
456 return { media, link: cardData.link };
457}
458
459function ensureUrlEntity(entities: TweetEntities | undefined, link?: string) {
460 if (!link) return;
461 if (!entities) return;
462 const urls = entities.urls || [];
463 if (!urls.some((url) => url.expanded_url === link || url.url === link)) {
464 urls.push({ url: link, expanded_url: link });
465 entities.urls = urls;
466 }
467}
468
469function detectSponsoredCard(tweet: Tweet): boolean {
470 if (!tweet.card?.binding_values) return false;
471 const cardName = tweet.card.name?.toLowerCase() || '';
472 const cardMedia = detectCardMedia(tweet);
473 const hasMultipleImages = cardMedia.imageUrls.length > 1;
474 const promoKeywords = ['promo', 'unified', 'carousel', 'collection', 'amplify'];
475 const hasPromoName = promoKeywords.some((keyword) => cardName.includes(keyword));
476 return hasMultipleImages || hasPromoName;
477}
478
479function mergeMediaEntities(primary: MediaEntity[], secondary: MediaEntity[], limit = 4): MediaEntity[] {
480 const merged: MediaEntity[] = [];
481 const seen = new Set<string>();
482 const ordered = [
483 ...primary.filter((media) => media?.source !== 'card'),
484 ...primary.filter((media) => media?.source === 'card'),
485 ...secondary.filter((media) => media?.source !== 'card'),
486 ...secondary.filter((media) => media?.source === 'card'),
487 ];
488
489 for (const media of ordered) {
490 if (!media?.media_url_https) continue;
491 if (seen.has(media.media_url_https)) continue;
492 merged.push(media);
493 seen.add(media.media_url_https);
494 if (merged.length >= limit) break;
495 }
496
497 return merged;
498}
499
500function detectCarouselLinks(tweet: Tweet): string[] {
501 if (!tweet.card?.binding_values) return [];
502 const bindings = normalizeCardBindings(tweet.card.binding_values);
503 const links = Object.values(bindings)
504 .map((value) => value?.string_value)
505 .filter((value): value is string => isLikelyUrl(value));
506 return [...new Set(links)];
507}
508
509function mergeUrlEntities(entities: TweetEntities | undefined, links: string[]) {
510 if (!entities || links.length === 0) return;
511 const urls = entities.urls || [];
512 links.forEach((link) => {
513 if (!urls.some((url) => url.expanded_url === link || url.url === link)) {
514 urls.push({ url: link, expanded_url: link });
515 }
516 });
517 entities.urls = urls;
518}
519
520function injectCardMedia(tweet: Tweet) {
521 if (!tweet.card?.binding_values) return;
522 const cardMedia = buildCardMediaEntities(tweet);
523 if (cardMedia.media.length === 0) return;
524
525 const existingMedia = tweet.extended_entities?.media || tweet.entities?.media || [];
526 const mergedMedia = mergeMediaEntities(existingMedia, cardMedia.media);
527
528 if (!tweet.extended_entities) tweet.extended_entities = {};
529 tweet.extended_entities.media = mergedMedia;
530 if (!tweet.entities) tweet.entities = {};
531 if (!tweet.entities.media) tweet.entities.media = mergedMedia;
532
533 if (cardMedia.link) {
534 ensureUrlEntity(tweet.entities, cardMedia.link);
535 }
536
537 const carouselLinks = detectCarouselLinks(tweet);
538 mergeUrlEntities(tweet.entities, carouselLinks);
539}
540
541function ensureSponsoredLinks(text: string, tweet: Tweet): string {
542 if (!tweet.card?.binding_values) return text;
543 const carouselLinks = detectCarouselLinks(tweet);
544 const cardLink = detectCardMedia(tweet).link;
545 const links = [...new Set([cardLink, ...carouselLinks].filter(Boolean))] as string[];
546 if (links.length === 0) return text;
547
548 const appendedLinks = links.slice(0, 2).map((link) => `Link: ${link}`);
549 const updatedText = `${text}\n\n${appendedLinks.join('\n')}`.trim();
550 return updatedText;
551}
552
553function addTextFallbacks(text: string): string {
554 return text.replace(/\s+$/g, '').trim();
555}
556
557function getTweetText(tweet: Tweet): string {
558 return tweet.full_text || tweet.text || '';
559}
560
561function normalizeContextText(text: string): string {
562 return text.replace(/\s+/g, ' ').trim();
563}
564
565function addTweetsToMap(tweetMap: Map<string, Tweet>, tweets: Tweet[]): void {
566 for (const tweet of tweets) {
567 const tweetId = tweet.id_str || tweet.id;
568 if (!tweetId) continue;
569 tweetMap.set(String(tweetId), tweet);
570 }
571}
572
573function buildThreadContext(tweet: Tweet, tweetMap: Map<string, Tweet>, maxHops = 8): string {
574 const parts: string[] = [];
575 const visited = new Set<string>();
576 let current: Tweet | undefined = tweet;
577
578 for (let hops = 0; hops < maxHops; hops++) {
579 const parentId = current?.in_reply_to_status_id_str || current?.in_reply_to_status_id;
580 if (!parentId) break;
581 const parentKey = String(parentId);
582 if (visited.has(parentKey)) break;
583 visited.add(parentKey);
584
585 const parentTweet = tweetMap.get(parentKey);
586 if (!parentTweet) break;
587
588 const parentText = normalizeContextText(getTweetText(parentTweet));
589 if (parentText) parts.push(parentText);
590
591 current = parentTweet;
592 }
593
594 if (parts.length === 0) return '';
595 return parts.reverse().join(' | ');
596}
597
598function buildAltTextContext(tweet: Tweet, tweetText: string, tweetMap: Map<string, Tweet>): string {
599 const threadContext = buildThreadContext(tweet, tweetMap);
600 const currentText = normalizeContextText(tweetText);
601
602 if (threadContext && currentText) {
603 return `Thread above: ${threadContext}. Current tweet: ${currentText}`;
604 }
605
606 if (threadContext) return `Thread above: ${threadContext}.`;
607 return currentText;
608}
609
610async function fetchSyndicationMedia(tweetUrl: string): Promise<{ images: string[] }> {
611 try {
612 const normalized = tweetUrl.replace('twitter.com', 'x.com');
613 const res = await axios.get('https://publish.twitter.com/oembed', {
614 params: { url: normalized },
615 headers: { 'User-Agent': 'Mozilla/5.0' },
616 });
617 const html = res.data?.html as string | undefined;
618 if (!html) return { images: [] };
619
620 const match = html.match(/status\/(\d+)/);
621 const tweetId = match?.[1];
622 if (!tweetId) return { images: [] };
623
624 const syndicationUrl = `https://cdn.syndication.twimg.com/tweet-result?id=${tweetId}`;
625 const syndication = await axios.get(syndicationUrl, {
626 headers: { 'User-Agent': 'Mozilla/5.0', Accept: 'application/json' },
627 });
628 const data = syndication.data as Record<string, unknown>;
629 const images = (data?.photos as { url?: string }[] | undefined)
630 ?.map((photo) => photo.url)
631 .filter(Boolean) as string[];
632 return { images: images || [] };
633 } catch (err) {
634 return { images: [] };
635 }
636}
637
638function injectSyndicationMedia(tweet: Tweet, syndication: { images: string[] }) {
639 if (syndication.images.length === 0) return;
640 const media = syndication.images.slice(0, 4).map((url) => ({
641 media_url_https: url,
642 type: 'photo' as const,
643 ext_alt_text: 'Image from Twitter',
644 source: 'card' as const,
645 }));
646
647 const existingMedia = tweet.extended_entities?.media || tweet.entities?.media || [];
648 const mergedMedia = mergeMediaEntities(existingMedia, media);
649
650 if (!tweet.extended_entities) tweet.extended_entities = {};
651 tweet.extended_entities.media = mergedMedia;
652 if (!tweet.entities) tweet.entities = {};
653 if (!tweet.entities.media) tweet.entities.media = mergedMedia;
654}
655
656function detectLanguage(text: string): string[] {
657 if (!text || text.trim().length === 0) return ['en'];
658 try {
659 const code3 = (francModule as unknown as (text: string) => string)(text);
660 if (code3 === 'und') return ['en'];
661 const code2 = iso6391.getCode(code3);
662 return code2 ? [code2] : ['en'];
663 } catch {
664 return ['en'];
665 }
666}
667
668async function expandUrl(shortUrl: string): Promise<string> {
669 try {
670 const response = await axios.head(shortUrl, {
671 maxRedirects: 5,
672 validateStatus: (status) => status >= 200 && status < 400,
673 });
674 // biome-ignore lint/suspicious/noExplicitAny: axios internal types
675 return (response.request as any)?.res?.responseUrl || shortUrl;
676 } catch {
677 try {
678 const response = await axios.get(shortUrl, {
679 responseType: 'stream',
680 maxRedirects: 5,
681 });
682 response.data.destroy();
683 // biome-ignore lint/suspicious/noExplicitAny: axios internal types
684 return (response.request as any)?.res?.responseUrl || shortUrl;
685 } catch (e: any) {
686 if (e.code === 'ERR_FR_TOO_MANY_REDIRECTS' || e.response?.status === 403 || e.response?.status === 401) {
687 // Silent fallback for common expansion issues (redirect loops, login walls)
688 return shortUrl;
689 }
690 return shortUrl;
691 }
692 }
693}
694
695interface DownloadedMedia {
696 buffer: Buffer;
697 mimeType: string;
698}
699
700async function downloadMedia(url: string): Promise<DownloadedMedia> {
701 const response = await axios({
702 url,
703 method: 'GET',
704 responseType: 'arraybuffer',
705 timeout: 30000,
706 });
707 return {
708 buffer: Buffer.from(response.data as ArrayBuffer),
709 mimeType: (response.headers['content-type'] as string) || 'application/octet-stream',
710 };
711}
712
713async function uploadToBluesky(agent: BskyAgent, buffer: Buffer, mimeType: string): Promise<BlobRef> {
714 let finalBuffer = buffer;
715 let finalMimeType = mimeType;
716 const MAX_SIZE = 950 * 1024;
717
718 const isPng = mimeType === 'image/png';
719 const isJpeg = mimeType === 'image/jpeg' || mimeType === 'image/jpg';
720 const isWebp = mimeType === 'image/webp';
721 const isGif = mimeType === 'image/gif';
722 const isAnimation = isGif || isWebp;
723
724 if (
725 (buffer.length > MAX_SIZE && (mimeType.startsWith('image/') || mimeType === 'application/octet-stream')) ||
726 (isPng && buffer.length > MAX_SIZE)
727 ) {
728 console.log(`[UPLOAD] ⚖️ Image too large (${(buffer.length / 1024).toFixed(2)} KB). Optimizing...`);
729 try {
730 let image = sharp(buffer);
731 const metadata = await image.metadata();
732 let currentBuffer = buffer;
733 let width = metadata.width || 2000;
734 let quality = 90;
735
736 // Iterative compression loop
737 let attempts = 0;
738 while (currentBuffer.length > MAX_SIZE && attempts < 5) {
739 attempts++;
740 console.log(`[UPLOAD] 📉 Compression attempt ${attempts}: Width ${width}, Quality ${quality}...`);
741
742 if (isAnimation) {
743 // For animations (GIF/WebP), we can only do so much without losing frames
744 // Try to convert to WebP if it's a GIF, or optimize WebP
745 image = sharp(buffer, { animated: true });
746 if (isGif) {
747 // Convert GIF to WebP for better compression
748 image = image.webp({ quality: Math.max(quality, 50), effort: 6 });
749 finalMimeType = 'image/webp';
750 } else {
751 image = image.webp({ quality: Math.max(quality, 50), effort: 6 });
752 }
753 // Resize if really big
754 if (metadata.width && metadata.width > 800) {
755 image = image.resize({ width: 800, withoutEnlargement: true });
756 }
757 } else {
758 // Static images
759 if (width > 1600) width = 1600;
760 else if (attempts > 1) width = Math.floor(width * 0.8);
761
762 quality = Math.max(50, quality - 10);
763
764 image = sharp(buffer).resize({ width, withoutEnlargement: true }).jpeg({ quality, mozjpeg: true });
765
766 finalMimeType = 'image/jpeg';
767 }
768
769 currentBuffer = await image.toBuffer();
770 if (currentBuffer.length <= MAX_SIZE) {
771 finalBuffer = currentBuffer;
772 console.log(`[UPLOAD] ✅ Optimized to ${(finalBuffer.length / 1024).toFixed(2)} KB`);
773 break;
774 }
775 }
776
777 if (finalBuffer.length > MAX_SIZE) {
778 console.warn(
779 `[UPLOAD] ⚠️ Could not compress below limit. Current: ${(finalBuffer.length / 1024).toFixed(2)} KB. Upload might fail.`,
780 );
781 }
782 } catch (err) {
783 console.warn(`[UPLOAD] ⚠️ Optimization failed, attempting original upload:`, (err as Error).message);
784 finalBuffer = buffer;
785 finalMimeType = mimeType;
786 }
787 }
788
789 const { data } = await agent.uploadBlob(finalBuffer, { encoding: finalMimeType });
790 return data.blob;
791}
792
793interface ScreenshotResult {
794 buffer: Buffer;
795 width: number;
796 height: number;
797}
798
799async function captureTweetScreenshot(tweetUrl: string): Promise<ScreenshotResult | null> {
800 const browserPaths = [
801 '/usr/bin/google-chrome',
802 '/usr/bin/chromium-browser',
803 '/usr/bin/chromium',
804 '/usr/bin/google-chrome-stable',
805 'C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe',
806 'C:\\Program Files (x86)\\Google\\Chrome\\Application\\chrome.exe',
807 ];
808
809 const executablePath = browserPaths.find((p) => fs.existsSync(p));
810
811 if (!executablePath) {
812 console.warn(`[SCREENSHOT] ⏩ Skipping screenshot (no Chrome/Chromium found at common paths).`);
813 return null;
814 }
815
816 console.log(`[SCREENSHOT] 📸 Capturing screenshot for: ${tweetUrl} using ${executablePath}`);
817 let browser;
818 try {
819 browser = await puppeteer.launch({
820 executablePath,
821 args: ['--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage'],
822 });
823 const page = await browser.newPage();
824 await page.setViewport({ width: 800, height: 1200, deviceScaleFactor: 2 });
825
826 const html = `
827 <!DOCTYPE html>
828 <html>
829 <head>
830 <style>
831 body {
832 margin: 0;
833 padding: 20px;
834 background: #ffffff;
835 display: flex;
836 justify-content: center;
837 font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif;
838 }
839 #container { width: 550px; }
840 </style>
841 </head>
842 <body>
843 <div id="container">
844 <blockquote class="twitter-tweet" data-dnt="true">
845 <a href="${tweetUrl}"></a>
846 </blockquote>
847 <script async src="https://platform.twitter.com/widgets.js" charset="utf-8"></script>
848 </div>
849 </body>
850 </html>
851 `;
852
853 await page.setContent(html, { waitUntil: 'networkidle0' });
854
855 // Wait for the twitter iframe to load and render
856 try {
857 await page.waitForSelector('iframe', { timeout: 10000 });
858 // Small extra wait for images inside iframe
859 await new Promise((r) => setTimeout(r, 2000));
860 } catch (e) {
861 console.warn(`[SCREENSHOT] ⚠️ Timeout waiting for tweet iframe, taking screenshot anyway.`);
862 }
863
864 const element = await page.$('#container');
865 if (element) {
866 const box = await element.boundingBox();
867 const buffer = await element.screenshot({ type: 'png', omitBackground: true });
868 if (box) {
869 console.log(
870 `[SCREENSHOT] ✅ Captured successfully (${(buffer.length / 1024).toFixed(2)} KB) - ${Math.round(box.width)}x${Math.round(box.height)}`,
871 );
872 return { buffer: buffer as Buffer, width: Math.round(box.width), height: Math.round(box.height) };
873 }
874 }
875 } catch (err) {
876 console.error(`[SCREENSHOT] ❌ Error capturing tweet:`, (err as Error).message);
877 } finally {
878 if (browser) await browser.close();
879 }
880 return null;
881}
882
883async function pollForVideoProcessing(agent: BskyAgent, jobId: string): Promise<BlobRef> {
884 console.log(`[VIDEO] ⏳ Polling for processing completion (this can take a minute)...`);
885 let attempts = 0;
886 let blob: BlobRef | undefined;
887
888 while (!blob) {
889 attempts++;
890 const statusUrl = new URL('https://video.bsky.app/xrpc/app.bsky.video.getJobStatus');
891 statusUrl.searchParams.append('jobId', jobId);
892
893 const statusResponse = await fetch(statusUrl);
894 if (!statusResponse.ok) {
895 console.warn(`[VIDEO] ⚠️ Job status fetch failed (${statusResponse.status}), retrying...`);
896 await new Promise((resolve) => setTimeout(resolve, 5000));
897 continue;
898 }
899
900 const statusData = (await statusResponse.json()) as any;
901 const state = statusData.jobStatus.state;
902 const progress = statusData.jobStatus.progress || 0;
903
904 console.log(`[VIDEO] 🔄 Job ${jobId}: ${state} (${progress}%)`);
905
906 if (statusData.jobStatus.blob) {
907 blob = statusData.jobStatus.blob;
908 console.log(`[VIDEO] 🎉 Video processing complete! Blob ref obtained.`);
909 } else if (state === 'JOB_STATE_FAILED') {
910 throw new Error(`Video processing failed: ${statusData.jobStatus.error || 'Unknown error'}`);
911 } else {
912 // Wait before next poll
913 await new Promise((resolve) => setTimeout(resolve, 5000));
914 }
915
916 if (attempts > 60) {
917 // ~5 minute timeout
918 throw new Error('Video processing timed out after 5 minutes.');
919 }
920 }
921 return blob!;
922}
923
924async function fetchEmbedUrlCard(agent: BskyAgent, url: string): Promise<any> {
925 try {
926 const response = await axios.get(url, {
927 headers: {
928 'User-Agent':
929 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
930 Accept: 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
931 'Accept-Language': 'en-US,en;q=0.9',
932 },
933 timeout: 10000,
934 maxRedirects: 5,
935 });
936
937 const $ = cheerio.load(response.data);
938 const title = $('meta[property="og:title"]').attr('content') || $('title').text() || '';
939 const description =
940 $('meta[property="og:description"]').attr('content') || $('meta[name="description"]').attr('content') || '';
941 let thumbBlob: BlobRef | undefined;
942
943 let imageUrl = $('meta[property="og:image"]').attr('content');
944 if (imageUrl) {
945 if (!imageUrl.startsWith('http')) {
946 const baseUrl = new URL(url);
947 imageUrl = new URL(imageUrl, baseUrl.origin).toString();
948 }
949 try {
950 const { buffer, mimeType } = await downloadMedia(imageUrl);
951 thumbBlob = await uploadToBluesky(agent, buffer, mimeType);
952 } catch (e) {
953 // SIlently fail thumbnail upload
954 }
955 }
956
957 if (!title && !description) return null;
958
959 const external: any = {
960 uri: url,
961 title: title || url,
962 description: description,
963 };
964
965 if (thumbBlob) {
966 external.thumb = thumbBlob;
967 }
968
969 return {
970 $type: 'app.bsky.embed.external',
971 external,
972 };
973 } catch (err: any) {
974 if (err.code === 'ERR_FR_TOO_MANY_REDIRECTS') {
975 // Ignore redirect loops
976 return null;
977 }
978 console.warn(`Failed to fetch embed card for ${url}:`, err.message || err);
979 return null;
980 }
981}
982
983async function uploadVideoToBluesky(agent: BskyAgent, buffer: Buffer, filename: string): Promise<BlobRef> {
984 const sanitizedFilename = filename.split('?')[0] || 'video.mp4';
985 console.log(
986 `[VIDEO] 🟢 Starting upload process for ${sanitizedFilename} (${(buffer.length / 1024 / 1024).toFixed(2)} MB)`,
987 );
988
989 try {
990 // 1. Get Service Auth
991 // We need to resolve the actual PDS host for this DID
992 console.log(`[VIDEO] 🔍 Resolving PDS host for DID: ${agent.session!.did}...`);
993 const { data: repoDesc } = await agent.com.atproto.repo.describeRepo({ repo: agent.session!.did! });
994
995 // didDoc might be present in repoDesc
996 const pdsService = (repoDesc as any).didDoc?.service?.find(
997 (s: any) => s.id === '#atproto_pds' || s.type === 'AtProtoPds',
998 );
999 const pdsUrl = pdsService?.serviceEndpoint;
1000 const pdsHost = pdsUrl ? new URL(pdsUrl).host : 'bsky.social';
1001
1002 console.log(`[VIDEO] 🌐 PDS Host detected: ${pdsHost}`);
1003 console.log(`[VIDEO] 🔑 Requesting service auth token for audience: did:web:${pdsHost}...`);
1004
1005 const { data: serviceAuth } = await agent.com.atproto.server.getServiceAuth({
1006 aud: `did:web:${pdsHost}`,
1007 lxm: 'com.atproto.repo.uploadBlob',
1008 exp: Math.floor(Date.now() / 1000) + 60 * 30,
1009 });
1010 console.log(`[VIDEO] ✅ Service auth token obtained.`);
1011
1012 const token = serviceAuth.token;
1013
1014 // 2. Upload to Video Service
1015 const uploadUrl = new URL('https://video.bsky.app/xrpc/app.bsky.video.uploadVideo');
1016 uploadUrl.searchParams.append('did', agent.session!.did!);
1017 uploadUrl.searchParams.append('name', sanitizedFilename);
1018
1019 console.log(`[VIDEO] 📤 Uploading to ${uploadUrl.href}...`);
1020 const uploadResponse = await fetch(uploadUrl, {
1021 method: 'POST',
1022 headers: {
1023 Authorization: `Bearer ${token}`,
1024 'Content-Type': 'video/mp4',
1025 },
1026 body: new Blob([new Uint8Array(buffer)]),
1027 });
1028
1029 if (!uploadResponse.ok) {
1030 const errorText = await uploadResponse.text();
1031
1032 // Handle specific error cases
1033 try {
1034 const errorJson = JSON.parse(errorText);
1035
1036 // Handle server overload gracefully
1037 if (
1038 uploadResponse.status === 503 ||
1039 errorJson.error === 'Server does not have enough capacity to handle uploads'
1040 ) {
1041 console.warn(`[VIDEO] ⚠️ Server overloaded (503). Skipping video upload and falling back to link.`);
1042 throw new Error('VIDEO_FALLBACK_503');
1043 }
1044
1045 if (errorJson.error === 'already_exists' && errorJson.jobId) {
1046 console.log(`[VIDEO] ♻️ Video already exists. Resuming with Job ID: ${errorJson.jobId}`);
1047 return await pollForVideoProcessing(agent, errorJson.jobId);
1048 }
1049 if (
1050 errorJson.error === 'unconfirmed_email' ||
1051 (errorJson.jobStatus && errorJson.jobStatus.error === 'unconfirmed_email')
1052 ) {
1053 console.error(
1054 `[VIDEO] 🛑 BLUESKY ERROR: Your email is unconfirmed. You MUST verify your email on Bluesky to upload videos.`,
1055 );
1056 throw new Error('Bluesky Email Unconfirmed - Video Upload Rejected');
1057 }
1058 } catch (e) {
1059 if ((e as Error).message === 'VIDEO_FALLBACK_503') throw e;
1060 // Not JSON or missing fields, proceed with throwing original error
1061 }
1062
1063 console.error(`[VIDEO] ❌ Server responded with ${uploadResponse.status}: ${errorText}`);
1064 throw new Error(`Video upload failed: ${uploadResponse.status} ${errorText}`);
1065 }
1066
1067 const jobStatus = (await uploadResponse.json()) as any;
1068 console.log(`[VIDEO] 📦 Upload accepted. Job ID: ${jobStatus.jobId}, State: ${jobStatus.state}`);
1069
1070 if (jobStatus.blob) {
1071 return jobStatus.blob;
1072 }
1073
1074 // 3. Poll for processing status
1075 return await pollForVideoProcessing(agent, jobStatus.jobId);
1076 } catch (err) {
1077 console.error(`[VIDEO] ❌ Error in uploadVideoToBluesky:`, (err as Error).message);
1078 throw err;
1079 }
1080}
1081
1082function splitText(text: string, limit = 300): string[] {
1083 if (text.length <= limit) return [text];
1084
1085 const chunks: string[] = [];
1086 let remaining = text;
1087
1088 // Reserve space for numbering like " (1/3)" -> approx 7 chars
1089 // We apply this reservation to the limit check
1090 const effectiveLimit = limit - 8;
1091
1092 while (remaining.length > 0) {
1093 if (remaining.length <= limit) {
1094 chunks.push(remaining);
1095 break;
1096 }
1097
1098 // Smart splitting priority:
1099 // 1. Double newline (paragraph)
1100 // 2. Sentence end (.!?)
1101 // 3. Space
1102 // 4. Force split
1103
1104 let splitIndex = -1;
1105
1106 // Check paragraphs
1107 let checkIndex = remaining.lastIndexOf('\n\n', effectiveLimit);
1108 if (checkIndex !== -1) splitIndex = checkIndex;
1109
1110 // Check sentences
1111 if (splitIndex === -1) {
1112 // Look for punctuation followed by space
1113 const sentenceMatches = Array.from(remaining.substring(0, effectiveLimit).matchAll(/[.!?]\s/g));
1114 if (sentenceMatches.length > 0) {
1115 const lastMatch = sentenceMatches[sentenceMatches.length - 1];
1116 if (lastMatch && lastMatch.index !== undefined) {
1117 splitIndex = lastMatch.index + 1; // Include punctuation
1118 }
1119 }
1120 }
1121
1122 // Check spaces
1123 if (splitIndex === -1) {
1124 checkIndex = remaining.lastIndexOf(' ', effectiveLimit);
1125 if (checkIndex !== -1) splitIndex = checkIndex;
1126 }
1127
1128 // Force split if no good break point found
1129 if (splitIndex === -1) {
1130 splitIndex = effectiveLimit;
1131 }
1132
1133 chunks.push(remaining.substring(0, splitIndex).trim());
1134 remaining = remaining.substring(splitIndex).trim();
1135 }
1136
1137 return chunks;
1138}
1139
1140function utf16IndexToUtf8Index(text: string, index: number): number {
1141 return Buffer.byteLength(text.slice(0, index), 'utf8');
1142}
1143
1144function rangesOverlap(startA: number, endA: number, startB: number, endB: number): boolean {
1145 return startA < endB && startB < endA;
1146}
1147
1148function addTwitterHandleLinkFacets(text: string, facets?: any[]): any[] | undefined {
1149 const existingFacets = facets ?? [];
1150 const newFacets: any[] = [];
1151 const regex = /@([A-Za-z0-9_]{1,15})/g;
1152 let match: RegExpExecArray | null;
1153
1154 while ((match = regex.exec(text))) {
1155 const handle = match[1];
1156 if (!handle) continue;
1157
1158 const atIndex = match.index;
1159 const prevChar = atIndex > 0 ? text[atIndex - 1] : '';
1160 if (prevChar && /[A-Za-z0-9_]/.test(prevChar)) continue;
1161
1162 const endIndex = atIndex + handle.length + 1;
1163 const trailing = text.slice(endIndex);
1164 if (trailing.startsWith('.') && /^\.[A-Za-z0-9-]+/.test(trailing)) continue;
1165
1166 const nextChar = endIndex < text.length ? text[endIndex] : '';
1167 if (nextChar && /[A-Za-z0-9_]/.test(nextChar)) continue;
1168
1169 const byteStart = utf16IndexToUtf8Index(text, atIndex);
1170 const byteEnd = utf16IndexToUtf8Index(text, endIndex);
1171
1172 const overlaps = existingFacets.some((facet) =>
1173 rangesOverlap(byteStart, byteEnd, facet.index.byteStart, facet.index.byteEnd),
1174 );
1175 if (overlaps) continue;
1176
1177 newFacets.push({
1178 index: { byteStart, byteEnd },
1179 features: [
1180 {
1181 $type: 'app.bsky.richtext.facet#link',
1182 uri: `https://twitter.com/${handle}`,
1183 },
1184 ],
1185 });
1186 }
1187
1188 if (newFacets.length === 0) return facets;
1189 return [...existingFacets, ...newFacets].sort((a, b) => a.index.byteStart - b.index.byteStart);
1190}
1191
1192// Simple p-limit implementation for concurrency control
1193const pLimit = (concurrency: number) => {
1194 const queue: (() => Promise<void>)[] = [];
1195 let activeCount = 0;
1196
1197 const next = () => {
1198 activeCount--;
1199 if (queue.length > 0) {
1200 queue.shift()!();
1201 }
1202 };
1203
1204 return <T>(fn: () => Promise<T>): Promise<T> => {
1205 return new Promise<T>((resolve, reject) => {
1206 const run = async () => {
1207 activeCount++;
1208 try {
1209 resolve(await fn());
1210 } catch (e) {
1211 reject(e);
1212 } finally {
1213 next();
1214 }
1215 };
1216
1217 if (activeCount < concurrency) {
1218 run();
1219 } else {
1220 queue.push(run);
1221 }
1222 });
1223 };
1224};
1225
1226// Replaced safeSearch with fetchUserTweets to use UserTweets endpoint instead of Search
1227// Added processedIds for early stopping optimization
1228async function fetchUserTweets(username: string, limit: number, processedIds?: Set<string>): Promise<Tweet[]> {
1229 const client = await getTwitterScraper();
1230 if (!client) return [];
1231
1232 let retries = 3;
1233 while (retries > 0) {
1234 try {
1235 const tweets: Tweet[] = [];
1236 const generator = client.getTweets(username, limit);
1237 let consecutiveProcessedCount = 0;
1238
1239 for await (const t of generator) {
1240 const tweet = mapScraperTweetToLocalTweet(t);
1241 const tweetId = tweet.id_str || tweet.id;
1242
1243 // Early stopping logic: if we see 3 consecutive tweets we've already processed, stop.
1244 // This assumes timeline order (mostly true).
1245 if (processedIds && tweetId && processedIds.has(tweetId)) {
1246 consecutiveProcessedCount++;
1247 if (consecutiveProcessedCount >= 3) {
1248 console.log(`[${username}] 🛑 Found 3 consecutive processed tweets. Stopping fetch early.`);
1249 break;
1250 }
1251 } else {
1252 consecutiveProcessedCount = 0;
1253 }
1254
1255 tweets.push(tweet);
1256 if (tweets.length >= limit) break;
1257 }
1258 return tweets;
1259 } catch (e: any) {
1260 retries--;
1261 const isRetryable =
1262 e.message?.includes('ServiceUnavailable') ||
1263 e.message?.includes('Timeout') ||
1264 e.message?.includes('429') ||
1265 e.message?.includes('401');
1266
1267 // Check for Twitter Internal Server Error (often returns 400 with specific body)
1268 if (e?.response?.status === 400 && JSON.stringify(e?.response?.data || {}).includes('InternalServerError')) {
1269 console.warn(`⚠️ Twitter Internal Server Error (Transient) for ${username}.`);
1270 // Treat as retryable
1271 if (retries > 0) {
1272 await new Promise((r) => setTimeout(r, 5000));
1273 continue;
1274 }
1275 }
1276
1277 if (isRetryable) {
1278 console.warn(`⚠️ Error fetching tweets for ${username} (${e.message}).`);
1279
1280 // Attempt credential switch if we have backups
1281 if (await switchCredentials()) {
1282 console.log(`🔄 Retrying with new credentials...`);
1283 continue; // Retry loop with new credentials
1284 }
1285
1286 if (retries > 0) {
1287 console.log(`Waiting 5s before retry...`);
1288 await new Promise((r) => setTimeout(r, 5000));
1289 continue;
1290 }
1291 }
1292
1293 console.warn(`Error fetching tweets for ${username}:`, e.message || e);
1294 return [];
1295 }
1296 }
1297
1298 console.log(`[${username}] ⚠️ Scraper returned 0 tweets (or failed silently) after retries.`);
1299 return [];
1300}
1301
1302// ============================================================================
1303// Main Processing Logic
1304// ============================================================================
1305
1306// ============================================================================
1307// Main Processing Logic
1308// ============================================================================
1309
1310async function processTweets(
1311 agent: BskyAgent,
1312 twitterUsername: string,
1313 bskyIdentifier: string,
1314 tweets: Tweet[],
1315 dryRun = false,
1316 sharedProcessedMap?: ProcessedTweetsMap,
1317 sharedTweetMap?: Map<string, Tweet>,
1318): Promise<void> {
1319 // Filter tweets to ensure they're actually from this user
1320 const filteredTweets = tweets.filter((t) => {
1321 const authorScreenName = t.user?.screen_name?.toLowerCase();
1322 if (authorScreenName && authorScreenName !== twitterUsername.toLowerCase()) {
1323 console.log(
1324 `[${twitterUsername}] ⏩ Skipping tweet ${t.id_str || t.id} - author is @${t.user?.screen_name}, not @${twitterUsername}`,
1325 );
1326 return false;
1327 }
1328 return true;
1329 });
1330
1331 const tweetMap = sharedTweetMap ?? new Map<string, Tweet>();
1332 addTweetsToMap(tweetMap, filteredTweets);
1333
1334 // Maintain a local map that updates in real-time for intra-batch replies
1335 const localProcessedMap: ProcessedTweetsMap =
1336 sharedProcessedMap ?? { ...loadProcessedTweets(bskyIdentifier) };
1337
1338 const toProcess = filteredTweets.filter((t) => !localProcessedMap[t.id_str || t.id || '']);
1339
1340 if (toProcess.length === 0) {
1341 console.log(`[${twitterUsername}] ✅ No new tweets to process for ${bskyIdentifier}.`);
1342 return;
1343 }
1344
1345 console.log(`[${twitterUsername}] 🚀 Processing ${toProcess.length} new tweets for ${bskyIdentifier}...`);
1346
1347 filteredTweets.reverse();
1348 let count = 0;
1349 for (const tweet of filteredTweets) {
1350 count++;
1351 const tweetId = tweet.id_str || tweet.id;
1352 if (!tweetId) continue;
1353
1354 if (localProcessedMap[tweetId]) continue;
1355
1356 // Fallback to DB in case a nested backfill already saved this tweet.
1357 const dbRecord = dbService.getTweet(tweetId, bskyIdentifier);
1358 if (dbRecord) {
1359 localProcessedMap[tweetId] = {
1360 uri: dbRecord.bsky_uri,
1361 cid: dbRecord.bsky_cid,
1362 root:
1363 dbRecord.bsky_root_uri && dbRecord.bsky_root_cid
1364 ? { uri: dbRecord.bsky_root_uri, cid: dbRecord.bsky_root_cid }
1365 : undefined,
1366 tail:
1367 dbRecord.bsky_tail_uri && dbRecord.bsky_tail_cid
1368 ? { uri: dbRecord.bsky_tail_uri, cid: dbRecord.bsky_tail_cid }
1369 : undefined,
1370 migrated: dbRecord.status === 'migrated',
1371 skipped: dbRecord.status === 'skipped',
1372 };
1373 continue;
1374 }
1375
1376 const isRetweet = tweet.isRetweet || tweet.retweeted_status_id_str || tweet.text?.startsWith('RT @');
1377
1378 if (isRetweet) {
1379 console.log(`[${twitterUsername}] ⏩ Skipping retweet ${tweetId}.`);
1380 if (!dryRun) {
1381 // Save as skipped so we don't check it again
1382 saveProcessedTweet(twitterUsername, bskyIdentifier, tweetId, { skipped: true, text: tweet.text });
1383 localProcessedMap[tweetId] = { skipped: true, text: tweet.text };
1384 }
1385 continue;
1386 }
1387
1388 console.log(`\n[${twitterUsername}] 🔍 Inspecting tweet: ${tweetId}`);
1389 updateAppStatus({
1390 state: 'processing',
1391 currentAccount: twitterUsername,
1392 processedCount: count,
1393 totalCount: filteredTweets.length,
1394 message: `Inspecting tweet ${tweetId}`,
1395 });
1396
1397 const replyStatusId = tweet.in_reply_to_status_id_str || tweet.in_reply_to_status_id;
1398 const replyUserId = tweet.in_reply_to_user_id_str || tweet.in_reply_to_user_id;
1399 const tweetText = tweet.full_text || tweet.text || '';
1400 const isReply = !!replyStatusId || !!replyUserId || tweetText.trim().startsWith('@');
1401
1402 let replyParentInfo: ProcessedTweetEntry | null = null;
1403
1404 if (isReply) {
1405 if (replyStatusId && localProcessedMap[replyStatusId]) {
1406 console.log(`[${twitterUsername}] 🧵 Threading reply to post in ${bskyIdentifier}: ${replyStatusId}`);
1407 replyParentInfo = localProcessedMap[replyStatusId] ?? null;
1408 } else if (replyStatusId) {
1409 // Parent missing from local batch/DB. Attempt to fetch it if it's a self-thread.
1410 // We assume it's a self-thread if we don't have it, but we'll verify author after fetch.
1411 console.log(`[${twitterUsername}] 🕵️ Parent ${replyStatusId} missing. Checking if backfillable...`);
1412
1413 let parentBackfilled = false;
1414 try {
1415 const scraper = await getTwitterScraper();
1416 if (scraper) {
1417 const parentRaw = await scraper.getTweet(replyStatusId);
1418 if (parentRaw) {
1419 const parentTweet = mapScraperTweetToLocalTweet(parentRaw);
1420 const parentAuthor = parentTweet.user?.screen_name;
1421
1422 if (parentAuthor?.toLowerCase() === twitterUsername.toLowerCase()) {
1423 console.log(`[${twitterUsername}] 🔄 Parent is ours (@${parentAuthor}). Backfilling parent first...`);
1424 addTweetsToMap(tweetMap, [parentTweet]);
1425 // Recursively process the parent
1426 await processTweets(
1427 agent,
1428 twitterUsername,
1429 bskyIdentifier,
1430 [parentTweet],
1431 dryRun,
1432 localProcessedMap,
1433 tweetMap,
1434 );
1435
1436 // Check if it was saved
1437 const savedParent = dbService.getTweet(replyStatusId, bskyIdentifier);
1438 if (savedParent && savedParent.status === 'migrated') {
1439 // Update local map
1440 localProcessedMap[replyStatusId] = {
1441 uri: savedParent.bsky_uri,
1442 cid: savedParent.bsky_cid,
1443 root:
1444 savedParent.bsky_root_uri && savedParent.bsky_root_cid
1445 ? { uri: savedParent.bsky_root_uri, cid: savedParent.bsky_root_cid }
1446 : undefined,
1447 tail:
1448 savedParent.bsky_tail_uri && savedParent.bsky_tail_cid
1449 ? { uri: savedParent.bsky_tail_uri, cid: savedParent.bsky_tail_cid }
1450 : undefined,
1451 migrated: true,
1452 };
1453 replyParentInfo = localProcessedMap[replyStatusId] ?? null;
1454 parentBackfilled = true;
1455 console.log(`[${twitterUsername}] ✅ Parent backfilled. Resuming thread.`);
1456 }
1457 } else {
1458 console.log(`[${twitterUsername}] ⏩ Parent is by @${parentAuthor}. Skipping external reply.`);
1459 }
1460 }
1461 }
1462 } catch (e) {
1463 console.warn(`[${twitterUsername}] ⚠️ Failed to fetch/backfill parent ${replyStatusId}:`, e);
1464 }
1465
1466 if (!parentBackfilled) {
1467 console.log(`[${twitterUsername}] ⏩ Skipping external/unknown reply (Parent not found or external).`);
1468 if (!dryRun) {
1469 saveProcessedTweet(twitterUsername, bskyIdentifier, tweetId, { skipped: true, text: tweetText });
1470 localProcessedMap[tweetId] = { skipped: true, text: tweetText };
1471 }
1472 continue;
1473 }
1474 } else {
1475 console.log(`[${twitterUsername}] ⏩ Skipping external/unknown reply.`);
1476 if (!dryRun) {
1477 saveProcessedTweet(twitterUsername, bskyIdentifier, tweetId, { skipped: true, text: tweetText });
1478 localProcessedMap[tweetId] = { skipped: true, text: tweetText };
1479 }
1480 continue;
1481 }
1482 }
1483
1484 // Removed early dryRun continue to allow verifying logic
1485
1486 let text = tweetText
1487 .replace(/&/g, '&')
1488 .replace(/</g, '<')
1489 .replace(/>/g, '>')
1490 .replace(/"/g, '"')
1491 .replace(/'/g, "'");
1492
1493 // 1. Link Expansion
1494 console.log(`[${twitterUsername}] 🔗 Expanding links...`);
1495 const urls = tweet.entities?.urls || [];
1496 for (const urlEntity of urls) {
1497 const tco = urlEntity.url;
1498 const expanded = urlEntity.expanded_url;
1499 if (tco && expanded) text = text.replace(tco, expanded);
1500 }
1501
1502 // Fallback: Regex for t.co links (if entities failed or missed one)
1503 const tcoRegex = /https:\/\/t\.co\/[a-zA-Z0-9]+/g;
1504 const matches = text.match(tcoRegex) || [];
1505 for (const tco of matches) {
1506 // Avoid re-resolving if we already handled it via entities
1507 if (urls.some((u) => u.url === tco)) continue;
1508
1509 console.log(`[${twitterUsername}] 🔍 Resolving fallback link: ${tco}`);
1510 const resolved = await expandUrl(tco);
1511 if (resolved !== tco) {
1512 text = text.replace(tco, resolved);
1513 // Add to urls array so it can be used for card embedding later
1514 urls.push({ url: tco, expanded_url: resolved });
1515 }
1516 }
1517
1518 const isSponsoredCard = detectSponsoredCard(tweet);
1519 if (isSponsoredCard) {
1520 console.log(`[${twitterUsername}] 🧩 Sponsored/card payload detected. Extracting carousel media...`);
1521 injectCardMedia(tweet);
1522 } else if (tweet.permanentUrl) {
1523 const syndication = await fetchSyndicationMedia(tweet.permanentUrl);
1524 if (syndication.images.length > 0) {
1525 console.log(`[${twitterUsername}] 🧩 Syndication carousel detected. Extracting media...`);
1526 injectSyndicationMedia(tweet, syndication);
1527 }
1528 }
1529
1530 // 2. Media Handling
1531 const images: ImageEmbed[] = [];
1532 let videoBlob: BlobRef | null = null;
1533 let videoAspectRatio: AspectRatio | undefined;
1534 const mediaEntities = tweet.extended_entities?.media || tweet.entities?.media || [];
1535 const mediaLinksToRemove: string[] = [];
1536
1537 console.log(`[${twitterUsername}] 🖼️ Found ${mediaEntities.length} media entities.`);
1538
1539 for (const media of mediaEntities) {
1540 if (media.url) {
1541 mediaLinksToRemove.push(media.url);
1542 if (media.expanded_url) mediaLinksToRemove.push(media.expanded_url);
1543 }
1544 if (media.source === 'card' && media.media_url_https) {
1545 mediaLinksToRemove.push(media.media_url_https);
1546 }
1547
1548 let aspectRatio: AspectRatio | undefined;
1549 if (media.sizes?.large) {
1550 aspectRatio = { width: media.sizes.large.w, height: media.sizes.large.h };
1551 } else if (media.original_info) {
1552 aspectRatio = { width: media.original_info.width, height: media.original_info.height };
1553 }
1554
1555 if (media.type === 'photo') {
1556 const url = media.media_url_https;
1557 if (!url) continue;
1558 try {
1559 const highQualityUrl = url.includes('?') ? url.replace('?', ':orig?') : url + ':orig';
1560 console.log(`[${twitterUsername}] 📥 Downloading image (high quality): ${path.basename(highQualityUrl)}`);
1561 updateAppStatus({ message: `Downloading high quality image...` });
1562 const { buffer, mimeType } = await downloadMedia(highQualityUrl);
1563
1564 let blob: BlobRef;
1565 if (dryRun) {
1566 console.log(
1567 `[${twitterUsername}] 🧪 [DRY RUN] Would upload image (${(buffer.length / 1024).toFixed(2)} KB)`,
1568 );
1569 blob = { ref: { toString: () => 'mock-blob' }, mimeType, size: buffer.length } as any;
1570 } else {
1571 console.log(`[${twitterUsername}] 📤 Uploading image to Bluesky...`);
1572 updateAppStatus({ message: `Uploading image to Bluesky...` });
1573 blob = await uploadToBluesky(agent, buffer, mimeType);
1574 }
1575
1576 let altText = media.ext_alt_text;
1577 if (!altText) {
1578 console.log(`[${twitterUsername}] 🤖 Generating alt text via Gemini...`);
1579 // Use original tweet text for context, not the modified/cleaned one
1580 const altTextContext = buildAltTextContext(tweet, tweetText, tweetMap);
1581 altText = await generateAltText(buffer, mimeType, altTextContext);
1582 if (altText) console.log(`[${twitterUsername}] ✅ Alt text generated: ${altText.substring(0, 50)}...`);
1583 }
1584
1585 images.push({ alt: altText || 'Image from Twitter', image: blob, aspectRatio });
1586 console.log(`[${twitterUsername}] ✅ Image uploaded.`);
1587 } catch (err) {
1588 console.error(`[${twitterUsername}] ❌ High quality upload failed:`, (err as Error).message);
1589 try {
1590 console.log(`[${twitterUsername}] 🔄 Retrying with standard quality...`);
1591 updateAppStatus({ message: `Retrying with standard quality...` });
1592 const { buffer, mimeType } = await downloadMedia(url);
1593 const blob = await uploadToBluesky(agent, buffer, mimeType);
1594 images.push({ alt: media.ext_alt_text || 'Image from Twitter', image: blob, aspectRatio });
1595 console.log(`[${twitterUsername}] ✅ Image uploaded on retry.`);
1596 } catch (retryErr) {
1597 console.error(`[${twitterUsername}] ❌ Retry also failed:`, (retryErr as Error).message);
1598 }
1599 }
1600 } else if (media.type === 'video' || media.type === 'animated_gif') {
1601 const variants = media.video_info?.variants || [];
1602 const duration = media.video_info?.duration_millis || 0;
1603
1604 if (duration > 180000) {
1605 // 3 minutes
1606 console.warn(`[${twitterUsername}] ⚠️ Video too long (${(duration / 1000).toFixed(1)}s). Fallback to link.`);
1607 const tweetUrl = `https://twitter.com/${twitterUsername}/status/${tweetId}`;
1608 if (!text.includes(tweetUrl)) text += `\n\nVideo: ${tweetUrl}`;
1609 continue;
1610 }
1611
1612 const mp4s = variants
1613 .filter((v) => v.content_type === 'video/mp4')
1614 .sort((a, b) => (b.bitrate || 0) - (a.bitrate || 0));
1615
1616 if (mp4s.length > 0) {
1617 const firstVariant = mp4s[0];
1618 if (firstVariant) {
1619 const videoUrl = firstVariant.url;
1620 try {
1621 console.log(`[${twitterUsername}] 📥 Downloading video: ${videoUrl}`);
1622 updateAppStatus({ message: `Downloading video: ${path.basename(videoUrl)}` });
1623 const { buffer, mimeType } = await downloadMedia(videoUrl);
1624
1625 if (buffer.length <= 90 * 1024 * 1024) {
1626 const filename = videoUrl.split('/').pop() || 'video.mp4';
1627 if (dryRun) {
1628 console.log(
1629 `[${twitterUsername}] 🧪 [DRY RUN] Would upload video: ${filename} (${(buffer.length / 1024 / 1024).toFixed(2)} MB)`,
1630 );
1631 videoBlob = {
1632 ref: { toString: () => 'mock-video-blob' },
1633 mimeType: 'video/mp4',
1634 size: buffer.length,
1635 } as any;
1636 } else {
1637 updateAppStatus({ message: `Uploading video to Bluesky...` });
1638 videoBlob = await uploadVideoToBluesky(agent, buffer, filename);
1639 }
1640 videoAspectRatio = aspectRatio;
1641 console.log(`[${twitterUsername}] ✅ Video upload process complete.`);
1642 break; // Prioritize first video
1643 }
1644
1645 console.warn(
1646 `[${twitterUsername}] ⚠️ Video too large (${(buffer.length / 1024 / 1024).toFixed(2)}MB). Fallback to link.`,
1647 );
1648 const tweetUrl = `https://twitter.com/${twitterUsername}/status/${tweetId}`;
1649 if (!text.includes(tweetUrl)) text += `\n\nVideo: ${tweetUrl}`;
1650 } catch (err) {
1651 const errMsg = (err as Error).message;
1652 if (errMsg !== 'VIDEO_FALLBACK_503') {
1653 console.error(`[${twitterUsername}] ❌ Failed video upload flow:`, errMsg);
1654 }
1655 const tweetUrl = `https://twitter.com/${twitterUsername}/status/${tweetId}`;
1656 if (!text.includes(tweetUrl)) text += `\n\nVideo: ${tweetUrl}`;
1657 }
1658 }
1659 }
1660 }
1661 }
1662
1663 // Cleanup text
1664 for (const link of mediaLinksToRemove) text = text.split(link).join('').trim();
1665 if (isSponsoredCard) {
1666 const cardLinks = detectCarouselLinks(tweet);
1667 const cardPrimaryLink = detectCardMedia(tweet).link;
1668 const requestedLinks = [cardPrimaryLink, ...cardLinks].filter(Boolean) as string[];
1669 requestedLinks.forEach((link) => {
1670 if (!urls.some((u) => u.expanded_url === link || u.url === link)) {
1671 urls.push({ url: link, expanded_url: link });
1672 }
1673 });
1674 }
1675 text = text.replace(/\n\s*\n/g, '\n\n').trim();
1676 text = addTextFallbacks(text);
1677
1678 // 3. Quoting Logic
1679 let quoteEmbed: { $type: string; record: { uri: string; cid: string } } | null = null;
1680 let externalQuoteUrl: string | null = null;
1681 let linkCard: any = null;
1682
1683 if (tweet.is_quote_status && tweet.quoted_status_id_str) {
1684 const quoteId = tweet.quoted_status_id_str;
1685 const quoteRef = localProcessedMap[quoteId];
1686 if (quoteRef && !quoteRef.migrated && quoteRef.uri && quoteRef.cid) {
1687 console.log(`[${twitterUsername}] 🔄 Found quoted tweet in local history. Natively embedding.`);
1688 quoteEmbed = { $type: 'app.bsky.embed.record', record: { uri: quoteRef.uri, cid: quoteRef.cid } };
1689 } else {
1690 const quoteUrlEntity = urls.find((u) => u.expanded_url?.includes(quoteId));
1691 const qUrl = quoteUrlEntity?.expanded_url || `https://twitter.com/i/status/${quoteId}`;
1692
1693 // Check if it's a self-quote (same user)
1694 const isSelfQuote =
1695 qUrl.toLowerCase().includes(`twitter.com/${twitterUsername.toLowerCase()}/`) ||
1696 qUrl.toLowerCase().includes(`x.com/${twitterUsername.toLowerCase()}/`);
1697
1698 if (!isSelfQuote) {
1699 externalQuoteUrl = qUrl;
1700 console.log(`[${twitterUsername}] 🔗 Quoted tweet is external: ${externalQuoteUrl}`);
1701
1702 // Try to capture screenshot for external QTs if we have space for images
1703 if (images.length < 4 && !videoBlob) {
1704 const ssResult = await captureTweetScreenshot(externalQuoteUrl);
1705 if (ssResult) {
1706 try {
1707 let blob: BlobRef;
1708 if (dryRun) {
1709 console.log(
1710 `[${twitterUsername}] 🧪 [DRY RUN] Would upload screenshot for quote (${(ssResult.buffer.length / 1024).toFixed(2)} KB)`,
1711 );
1712 blob = {
1713 ref: { toString: () => 'mock-ss-blob' },
1714 mimeType: 'image/png',
1715 size: ssResult.buffer.length,
1716 } as any;
1717 } else {
1718 blob = await uploadToBluesky(agent, ssResult.buffer, 'image/png');
1719 }
1720 images.push({
1721 alt: `Quote Tweet: ${externalQuoteUrl}`,
1722 image: blob,
1723 aspectRatio: { width: ssResult.width, height: ssResult.height },
1724 });
1725 } catch (e) {
1726 console.warn(`[${twitterUsername}] ⚠️ Failed to upload screenshot blob.`);
1727 }
1728 }
1729 }
1730 } else {
1731 console.log(`[${twitterUsername}] 🔁 Quoted tweet is a self-quote, skipping link.`);
1732 }
1733 }
1734 } else if ((images.length === 0 && !videoBlob) || isSponsoredCard) {
1735 // If no media and no quote, check for external links to embed
1736 // We prioritize the LAST link found as it's often the main content
1737 const potentialLinks = urls
1738 .map((u) => u.expanded_url)
1739 .filter((u) => u && !u.includes('twitter.com') && !u.includes('x.com')) as string[];
1740
1741 if (potentialLinks.length > 0) {
1742 const linkToEmbed = potentialLinks[potentialLinks.length - 1];
1743 if (linkToEmbed) {
1744 // Optimization: If text is too long, but removing the link makes it fit, do it!
1745 // The link will be present in the embed card anyway.
1746 if (text.length > 300 && text.includes(linkToEmbed)) {
1747 const lengthWithoutLink = text.length - linkToEmbed.length;
1748 // Allow some buffer (e.g. whitespace cleanup might save 1-2 chars)
1749 if (lengthWithoutLink <= 300) {
1750 console.log(
1751 `[${twitterUsername}] 📏 Optimizing: Removing link ${linkToEmbed} from text to avoid threading (Card will embed it).`,
1752 );
1753 text = text.replace(linkToEmbed, '').trim();
1754 // Clean up potential double punctuation/spaces left behind
1755 text = text.replace(/\s\.$/, '.').replace(/\s\s+/g, ' ');
1756 }
1757 }
1758
1759 console.log(`[${twitterUsername}] 🃏 Fetching link card for: ${linkToEmbed}`);
1760 linkCard = await fetchEmbedUrlCard(agent, linkToEmbed);
1761 }
1762 }
1763 }
1764
1765 // Only append link for external quotes IF we couldn't natively embed it OR screenshot it
1766 const hasScreenshot = images.some((img) => img.alt.startsWith('Quote Tweet:'));
1767 if (externalQuoteUrl && !quoteEmbed && !hasScreenshot && !text.includes(externalQuoteUrl)) {
1768 text += `\n\nQT: ${externalQuoteUrl}`;
1769 }
1770
1771 if (isSponsoredCard) {
1772 const hasCardImages = mediaEntities.some((media) => media.source === 'card');
1773 if (hasCardImages) {
1774 text = ensureSponsoredLinks(text, tweet);
1775 }
1776 }
1777
1778 // 4. Threading and Posting
1779 const chunks = splitText(text);
1780 console.log(`[${twitterUsername}] 📝 Splitting text into ${chunks.length} chunks.`);
1781
1782 let lastPostInfo: ProcessedTweetEntry | null = replyParentInfo;
1783
1784 // We will save the first chunk as the "Root" of this tweet, and the last chunk as the "Tail".
1785 let firstChunkInfo: { uri: string; cid: string; root?: { uri: string; cid: string } } | null = null;
1786 let lastChunkInfo: { uri: string; cid: string; root?: { uri: string; cid: string } } | null = null;
1787
1788 for (let i = 0; i < chunks.length; i++) {
1789 let chunk = chunks[i] as string;
1790
1791 // Add (i/n) if split
1792 if (chunks.length > 1) {
1793 chunk += ` (${i + 1}/${chunks.length})`;
1794 }
1795
1796 console.log(`[${twitterUsername}] 📤 Posting chunk ${i + 1}/${chunks.length}...`);
1797 updateAppStatus({ message: `Posting chunk ${i + 1}/${chunks.length}...` });
1798
1799 const rt = new RichText({ text: chunk });
1800 await rt.detectFacets(agent);
1801 rt.facets = addTwitterHandleLinkFacets(rt.text, rt.facets);
1802 const detectedLangs = detectLanguage(chunk);
1803
1804 // Preserve original timing when available, but enforce monotonic per-account
1805 // timestamps to avoid equal-createdAt collisions in fast self-thread replies.
1806 const parsedCreatedAt = tweet.created_at ? Date.parse(tweet.created_at) : NaN;
1807 const baseCreatedAtMs = Number.isFinite(parsedCreatedAt) ? parsedCreatedAt : Date.now();
1808 const chunkCreatedAtMs = baseCreatedAtMs + i * 1000;
1809
1810 // biome-ignore lint/suspicious/noExplicitAny: dynamic record construction
1811 const postRecord: Record<string, any> = {
1812 text: rt.text,
1813 facets: rt.facets,
1814 langs: detectedLangs,
1815 // CID is generated by the PDS from record content; unique createdAt keeps
1816 // near-simultaneous self-thread posts from colliding on identical payloads.
1817 createdAt: getUniqueCreatedAtIso(bskyIdentifier, chunkCreatedAtMs),
1818 };
1819
1820 if (i === 0) {
1821 if (videoBlob) {
1822 const videoEmbed: any = {
1823 $type: 'app.bsky.embed.video',
1824 video: videoBlob,
1825 };
1826 if (videoAspectRatio) videoEmbed.aspectRatio = videoAspectRatio;
1827 postRecord.embed = videoEmbed;
1828 } else if (images.length > 0) {
1829 const imagesEmbed = { $type: 'app.bsky.embed.images', images };
1830 if (quoteEmbed) {
1831 postRecord.embed = { $type: 'app.bsky.embed.recordWithMedia', media: imagesEmbed, record: quoteEmbed };
1832 } else {
1833 postRecord.embed = imagesEmbed;
1834 }
1835 } else if (quoteEmbed) {
1836 postRecord.embed = quoteEmbed;
1837 } else if (linkCard) {
1838 postRecord.embed = linkCard;
1839 }
1840 }
1841
1842 // Threading logic
1843 // Determine actual parent URI/CID to reply to
1844 let parentRef: { uri: string; cid: string } | null = null;
1845 let rootRef: { uri: string; cid: string } | null = null;
1846
1847 if (lastPostInfo?.uri && lastPostInfo?.cid) {
1848 // If this is the start of a new tweet (i=0), check if parent has a tail
1849 if (i === 0 && lastPostInfo.tail) {
1850 parentRef = lastPostInfo.tail;
1851 } else {
1852 // Otherwise (intra-tweet or parent has no tail), use the main uri/cid (which is the previous post/chunk)
1853 parentRef = { uri: lastPostInfo.uri, cid: lastPostInfo.cid };
1854 }
1855
1856 rootRef = lastPostInfo.root || { uri: lastPostInfo.uri, cid: lastPostInfo.cid };
1857 }
1858
1859 if (parentRef && rootRef) {
1860 postRecord.reply = {
1861 root: rootRef,
1862 parent: parentRef,
1863 };
1864 }
1865
1866 try {
1867 // Retry logic for network/socket errors
1868 let response: any;
1869 let retries = 3;
1870
1871 if (dryRun) {
1872 console.log(`[${twitterUsername}] 🧪 [DRY RUN] Would post chunk ${i + 1}/${chunks.length}`);
1873 if (postRecord.embed) console.log(` - With embed: ${postRecord.embed.$type}`);
1874 if (postRecord.reply) console.log(` - As reply to: ${postRecord.reply.parent.uri}`);
1875 response = { uri: 'at://did:plc:mock/app.bsky.feed.post/mock', cid: 'mock-cid' };
1876 } else {
1877 while (retries > 0) {
1878 try {
1879 response = await agent.post(postRecord);
1880 break;
1881 } catch (err: any) {
1882 retries--;
1883 if (retries === 0) throw err;
1884 console.warn(
1885 `[${twitterUsername}] ⚠️ Post failed (Socket/Network), retrying in 5s... (${retries} retries left)`,
1886 );
1887 await new Promise((r) => setTimeout(r, 5000));
1888 }
1889 }
1890 }
1891
1892 const currentPostInfo = {
1893 uri: response.uri,
1894 cid: response.cid,
1895 root: postRecord.reply ? postRecord.reply.root : { uri: response.uri, cid: response.cid },
1896 // Text is just the current chunk text
1897 text: chunk,
1898 };
1899
1900 if (i === 0) firstChunkInfo = currentPostInfo;
1901 lastChunkInfo = currentPostInfo;
1902 lastPostInfo = currentPostInfo; // Update for next iteration
1903
1904 console.log(`[${twitterUsername}] ✅ Chunk ${i + 1} posted successfully.`);
1905
1906 if (chunks.length > 1) {
1907 await new Promise((r) => setTimeout(r, 3000));
1908 }
1909 } catch (err) {
1910 console.error(`[${twitterUsername}] ❌ Failed to post ${tweetId} (chunk ${i + 1}):`, err);
1911 break;
1912 }
1913 }
1914
1915 // Save to DB and Map
1916 if (firstChunkInfo && lastChunkInfo) {
1917 const entry: ProcessedTweetEntry = {
1918 uri: firstChunkInfo.uri,
1919 cid: firstChunkInfo.cid,
1920 root: firstChunkInfo.root,
1921 tail: { uri: lastChunkInfo.uri, cid: lastChunkInfo.cid }, // Save tail!
1922 text: tweetText,
1923 };
1924
1925 if (!dryRun) {
1926 saveProcessedTweet(twitterUsername, bskyIdentifier, tweetId, entry);
1927 localProcessedMap[tweetId] = entry; // Update local map for subsequent replies in this batch
1928 }
1929 }
1930
1931 // Add a random delay between 5s and 15s to be more human-like
1932 const wait = Math.floor(Math.random() * 10000) + 5000;
1933 console.log(`[${twitterUsername}] 😴 Pacing: Waiting ${wait / 1000}s before next tweet.`);
1934 updateAppStatus({ state: 'pacing', message: `Pacing: Waiting ${wait / 1000}s...` });
1935 await new Promise((r) => setTimeout(r, wait));
1936 }
1937}
1938
1939import { getAgent } from './bsky.js';
1940
1941async function importHistory(
1942 twitterUsername: string,
1943 bskyIdentifier: string,
1944 limit = 15,
1945 dryRun = false,
1946 ignoreCancellation = false,
1947 requestId?: string,
1948): Promise<void> {
1949 const config = getConfig();
1950 const mapping = config.mappings.find((m) =>
1951 m.twitterUsernames.map((u) => u.toLowerCase()).includes(twitterUsername.toLowerCase()),
1952 );
1953 if (!mapping) {
1954 console.error(`No mapping found for twitter username: ${twitterUsername}`);
1955 return;
1956 }
1957
1958 let agent = await getAgent(mapping);
1959 if (!agent) {
1960 if (dryRun) {
1961 console.log('⚠️ Could not login to Bluesky, but proceeding with MOCK AGENT for Dry Run.');
1962 // biome-ignore lint/suspicious/noExplicitAny: mock agent
1963 agent = {
1964 post: async (record: any) => ({ uri: 'at://did:plc:mock/app.bsky.feed.post/mock', cid: 'mock-cid' }),
1965 uploadBlob: async (data: any) => ({ data: { blob: { ref: { toString: () => 'mock-blob' } } } }),
1966 // Add other necessary methods if they are called outside of the already mocked dryRun blocks
1967 // But since we mocked the calls inside processTweets for dryRun, we just need the object to exist.
1968 session: { did: 'did:plc:mock' },
1969 com: { atproto: { repo: { describeRepo: async () => ({ data: {} }) } } },
1970 } as any;
1971 } else {
1972 return;
1973 }
1974 }
1975
1976 console.log(`Starting full history import for ${twitterUsername} -> ${mapping.bskyIdentifier}...`);
1977
1978 const allFoundTweets: Tweet[] = [];
1979 const seenIds = new Set<string>();
1980 const processedTweets = loadProcessedTweets(bskyIdentifier);
1981
1982 console.log(`Fetching tweets for ${twitterUsername}...`);
1983 updateAppStatus({ message: `Fetching tweets...` });
1984
1985 const client = await getTwitterScraper();
1986 if (client) {
1987 try {
1988 // Use getTweets which reliably fetches user timeline
1989 // limit defaults to 15 in function signature, but for history import we might want more.
1990 // However, the generator will fetch as much as we ask.
1991 const fetchLimit = limit || 100;
1992 const generator = client.getTweets(twitterUsername, fetchLimit);
1993
1994 for await (const scraperTweet of generator) {
1995 if (!ignoreCancellation) {
1996 const stillPending = getPendingBackfills().some(
1997 (b) => b.id === mapping.id && (!requestId || b.requestId === requestId),
1998 );
1999 if (!stillPending) {
2000 console.log(`[${twitterUsername}] 🛑 Backfill cancelled.`);
2001 break;
2002 }
2003 }
2004
2005 const t = mapScraperTweetToLocalTweet(scraperTweet);
2006 const tid = t.id_str || t.id;
2007 if (!tid) continue;
2008
2009 if (!processedTweets[tid] && !seenIds.has(tid)) {
2010 allFoundTweets.push(t);
2011 seenIds.add(tid);
2012 }
2013
2014 if (allFoundTweets.length >= fetchLimit) break;
2015 }
2016 } catch (e) {
2017 console.warn('Error during history fetch:', e);
2018 }
2019 }
2020
2021 console.log(`Fetch complete. Found ${allFoundTweets.length} new tweets to import.`);
2022 if (allFoundTweets.length > 0) {
2023 await processTweets(agent as BskyAgent, twitterUsername, bskyIdentifier, allFoundTweets, dryRun);
2024 console.log('History import complete.');
2025 }
2026}
2027
2028// Task management
2029const activeTasks = new Map<string, Promise<void>>();
2030
2031async function runAccountTask(mapping: AccountMapping, backfillRequest?: PendingBackfill, dryRun = false) {
2032 if (activeTasks.has(mapping.id)) return; // Already running
2033
2034 const task = (async () => {
2035 try {
2036 const agent = await getAgent(mapping);
2037 if (!agent) return;
2038
2039 const backfillReq = backfillRequest ?? getPendingBackfills().find((b) => b.id === mapping.id);
2040 const explicitBackfill = Boolean(backfillRequest);
2041
2042 if (backfillReq) {
2043 const limit = backfillReq.limit || 15;
2044 console.log(
2045 `[${mapping.bskyIdentifier}] Running backfill for ${mapping.twitterUsernames.length} accounts (limit ${limit})...`,
2046 );
2047 updateAppStatus({
2048 state: 'backfilling',
2049 currentAccount: mapping.twitterUsernames[0],
2050 message: `Starting backfill (limit ${limit})...`,
2051 backfillMappingId: mapping.id,
2052 backfillRequestId: backfillReq.requestId,
2053 });
2054
2055 for (const twitterUsername of mapping.twitterUsernames) {
2056 const stillPending = explicitBackfill
2057 ? true
2058 : getPendingBackfills().some((b) => b.id === mapping.id && b.requestId === backfillReq.requestId);
2059 if (!stillPending) {
2060 console.log(`[${mapping.bskyIdentifier}] 🛑 Backfill request replaced; stopping.`);
2061 break;
2062 }
2063
2064 try {
2065 updateAppStatus({
2066 state: 'backfilling',
2067 currentAccount: twitterUsername,
2068 message: `Starting backfill (limit ${limit})...`,
2069 backfillMappingId: mapping.id,
2070 backfillRequestId: backfillReq.requestId,
2071 });
2072 await importHistory(twitterUsername, mapping.bskyIdentifier, limit, dryRun, false, backfillReq.requestId);
2073 } catch (err) {
2074 console.error(`❌ Error backfilling ${twitterUsername}:`, err);
2075 }
2076 }
2077 clearBackfill(mapping.id, backfillReq.requestId);
2078 updateAppStatus({
2079 state: 'idle',
2080 message: `Backfill complete for ${mapping.bskyIdentifier}`,
2081 backfillMappingId: undefined,
2082 backfillRequestId: undefined,
2083 });
2084 console.log(`[${mapping.bskyIdentifier}] Backfill complete.`);
2085 } else {
2086 updateAppStatus({ backfillMappingId: undefined, backfillRequestId: undefined });
2087
2088 // Pre-load processed IDs for optimization
2089 const processedMap = loadProcessedTweets(mapping.bskyIdentifier);
2090 const processedIds = new Set(Object.keys(processedMap));
2091
2092 for (const twitterUsername of mapping.twitterUsernames) {
2093 try {
2094 console.log(`[${twitterUsername}] 🏁 Starting check for new tweets...`);
2095 updateAppStatus({
2096 state: 'checking',
2097 currentAccount: twitterUsername,
2098 message: 'Fetching latest tweets...',
2099 backfillMappingId: undefined,
2100 backfillRequestId: undefined,
2101 });
2102
2103 // Use fetchUserTweets with early stopping optimization
2104 // Increase limit slightly since we have early stopping now
2105 const tweets = await fetchUserTweets(twitterUsername, 50, processedIds);
2106
2107 if (!tweets || tweets.length === 0) {
2108 console.log(`[${twitterUsername}] ℹ️ No tweets found (or fetch failed).`);
2109 continue;
2110 }
2111
2112 console.log(`[${twitterUsername}] 📥 Fetched ${tweets.length} tweets.`);
2113 await processTweets(agent, twitterUsername, mapping.bskyIdentifier, tweets, dryRun);
2114 } catch (err) {
2115 console.error(`❌ Error checking ${twitterUsername}:`, err);
2116 }
2117 }
2118 }
2119 } catch (err) {
2120 console.error(`Error processing mapping ${mapping.bskyIdentifier}:`, err);
2121 } finally {
2122 activeTasks.delete(mapping.id);
2123 }
2124 })();
2125
2126 activeTasks.set(mapping.id, task);
2127 return task; // Return task promise for await in main loop
2128}
2129
2130import type { AccountMapping } from './config-manager.js';
2131import {
2132 clearBackfill,
2133 getNextCheckTime,
2134 getPendingBackfills,
2135 startServer,
2136 updateAppStatus,
2137 updateLastCheckTime,
2138} from './server.js';
2139import type { PendingBackfill } from './server.js';
2140
2141async function main(): Promise<void> {
2142 const program = new Command();
2143 program
2144 .name('tweets-2-bsky')
2145 // ... existing options ...
2146 .description('Crosspost tweets to Bluesky')
2147 .option('--dry-run', 'Fetch tweets but do not post to Bluesky', false)
2148 .option('--no-web', 'Disable the web interface')
2149 .option('--run-once', 'Run one check cycle immediately and exit', false)
2150 .option('--backfill-mapping <mapping>', 'Run backfill now for a mapping id/handle/twitter username')
2151 .option('--backfill-limit <number>', 'Limit for --backfill-mapping', (val) => Number.parseInt(val, 10))
2152 .option('--import-history', 'Run in history import mode')
2153 .option('--username <username>', 'Twitter username for history import')
2154 .option('--limit <number>', 'Limit the number of tweets to import', (val) => Number.parseInt(val, 10))
2155 .parse(process.argv);
2156
2157 const options = program.opts();
2158
2159 const config = getConfig();
2160
2161 await migrateJsonToSqlite();
2162
2163 if (!options.web) {
2164 console.log('🌐 Web interface is disabled.');
2165 } else {
2166 startServer();
2167 if (config.users.length === 0) {
2168 console.log('ℹ️ No users found. Please register on the web interface to get started.');
2169 }
2170 }
2171
2172 if (options.importHistory) {
2173 // ... existing import history logic ...
2174 if (!options.username) {
2175 console.error('Please specify a username with --username <username>');
2176 process.exit(1);
2177 }
2178 const client = await getTwitterScraper();
2179 if (!client) {
2180 console.error('Twitter credentials not set. Cannot import history.');
2181 process.exit(1);
2182 }
2183 const mapping = config.mappings.find((m) =>
2184 m.twitterUsernames.map((u) => u.toLowerCase()).includes(options.username.toLowerCase()),
2185 );
2186 if (!mapping) {
2187 console.error(`No mapping found for ${options.username}`);
2188 process.exit(1);
2189 }
2190 await importHistory(options.username, mapping.bskyIdentifier, options.limit, options.dryRun, true);
2191 process.exit(0);
2192 }
2193
2194 const findMappingById = (mappings: AccountMapping[], id: string) => mappings.find((mapping) => mapping.id === id);
2195 const normalizeHandle = (value: string) => value.trim().replace(/^@/, '').toLowerCase();
2196 const findMappingByRef = (mappings: AccountMapping[], ref: string) => {
2197 const needle = normalizeHandle(ref);
2198 return mappings.find(
2199 (mapping) =>
2200 mapping.id === ref ||
2201 normalizeHandle(mapping.bskyIdentifier) === needle ||
2202 mapping.twitterUsernames.some((username) => normalizeHandle(username) === needle),
2203 );
2204 };
2205
2206 const runSingleCycle = async (cycleConfig: ReturnType<typeof getConfig>) => {
2207 const runLimit = pLimit(3);
2208 const tasks: Promise<void>[] = [];
2209
2210 if (options.backfillMapping) {
2211 const mapping = findMappingByRef(cycleConfig.mappings, options.backfillMapping);
2212 if (!mapping) {
2213 console.error(`No mapping found for '${options.backfillMapping}'.`);
2214 process.exit(1);
2215 }
2216 if (!mapping.enabled) {
2217 console.error(`Mapping '${mapping.bskyIdentifier}' is disabled.`);
2218 process.exit(1);
2219 }
2220
2221 const requestId = `cli-${Date.now()}`;
2222 const backfillRequest: PendingBackfill = {
2223 id: mapping.id,
2224 limit: options.backfillLimit || options.limit || 15,
2225 queuedAt: Date.now(),
2226 sequence: 0,
2227 requestId,
2228 };
2229
2230 console.log(`[CLI] 🚧 Running backfill for ${mapping.bskyIdentifier}...`);
2231 await runAccountTask(mapping, backfillRequest, options.dryRun);
2232 updateAppStatus({ state: 'idle', message: `Backfill complete for ${mapping.bskyIdentifier}` });
2233 return;
2234 }
2235
2236 for (const mapping of cycleConfig.mappings) {
2237 if (!mapping.enabled) continue;
2238 tasks.push(
2239 runLimit(async () => {
2240 await runAccountTask(mapping, undefined, options.dryRun);
2241 }),
2242 );
2243 }
2244
2245 if (tasks.length === 0) {
2246 console.log('[CLI] No enabled mappings found.');
2247 updateAppStatus({ state: 'idle', message: 'No enabled mappings found' });
2248 return;
2249 }
2250
2251 await Promise.all(tasks);
2252 updateAppStatus({ state: 'idle', message: options.dryRun ? 'Dry run cycle complete' : 'Run-once cycle complete' });
2253 };
2254
2255 if (options.runOnce || options.backfillMapping || options.dryRun) {
2256 await runSingleCycle(getConfig());
2257 console.log(options.dryRun ? 'Dry run cycle complete. Exiting.' : 'Run-once cycle complete. Exiting.');
2258 process.exit(0);
2259 }
2260
2261 console.log(`Scheduler started. Base interval: ${config.checkIntervalMinutes} minutes.`);
2262 updateLastCheckTime(); // Initialize next time
2263
2264 // Concurrency limit for processing accounts
2265 const runLimit = pLimit(3);
2266
2267 // Main loop
2268 while (true) {
2269 const now = Date.now();
2270 const config = getConfig(); // Reload config to get new mappings/settings
2271 const nextTime = getNextCheckTime();
2272
2273 // Check if it's time for a scheduled run OR if we have pending backfills
2274 const isScheduledRun = now >= nextTime;
2275 const pendingBackfills = getPendingBackfills();
2276
2277 if (isScheduledRun) {
2278 console.log(`[${new Date().toISOString()}] ⏰ Scheduled check triggered.`);
2279 updateLastCheckTime();
2280 }
2281
2282 const tasks: Promise<void>[] = [];
2283
2284 if (pendingBackfills.length > 0) {
2285 const [nextBackfill, ...rest] = pendingBackfills;
2286 if (nextBackfill) {
2287 const mapping = findMappingById(config.mappings, nextBackfill.id);
2288 if (mapping && mapping.enabled) {
2289 console.log(`[Scheduler] 🚧 Backfill priority: ${mapping.bskyIdentifier}`);
2290 await runAccountTask(mapping, nextBackfill, options.dryRun);
2291 } else {
2292 clearBackfill(nextBackfill.id, nextBackfill.requestId);
2293 }
2294 }
2295 if (pendingBackfills.length === 0 && getPendingBackfills().length === 0) {
2296 updateAppStatus({
2297 state: 'idle',
2298 message: 'Backfill queue empty',
2299 backfillMappingId: undefined,
2300 backfillRequestId: undefined,
2301 });
2302 }
2303 updateLastCheckTime();
2304 } else if (isScheduledRun) {
2305 for (const mapping of config.mappings) {
2306 if (!mapping.enabled) continue;
2307
2308 tasks.push(
2309 runLimit(async () => {
2310 await runAccountTask(mapping, undefined, options.dryRun);
2311 }),
2312 );
2313 }
2314
2315 if (tasks.length > 0) {
2316 await Promise.all(tasks);
2317 console.log(`[Scheduler] ✅ All tasks for this cycle complete.`);
2318 }
2319
2320 updateAppStatus({ state: 'idle', message: 'Scheduled checks complete' });
2321 }
2322
2323 // Sleep for 5 seconds
2324 await new Promise((resolve) => setTimeout(resolve, 5000));
2325 }
2326}
2327
2328main();