A simple tool which lets you scrape twitter accounts and crosspost them to bluesky accounts! Comes with a CLI and a webapp for managing profiles! Works with images/videos/link embeds/threads.

feat: live progress UI, backfill cancellation, and video upload resilience

jack 94e24c8f c2c1ac14

+206 -55
+67
public/index.html
··· 221 221 } 222 222 }; 223 223 224 + const cancelBackfill = async (id) => { 225 + try { 226 + await axios.delete(`/api/backfill/${id}`, { 227 + headers: { Authorization: `Bearer ${token}` } 228 + }); 229 + fetchStatus(); 230 + } catch (err) { 231 + alert('Failed to cancel backfill'); 232 + } 233 + }; 234 + 235 + const clearAllBackfills = async () => { 236 + if (!confirm('Stop all pending and running backfills?')) return; 237 + try { 238 + await axios.post('/api/backfill/clear-all', {}, { 239 + headers: { Authorization: `Bearer ${token}` } 240 + }); 241 + fetchStatus(); 242 + } catch (err) { 243 + alert('Failed to clear backfills'); 244 + } 245 + }; 246 + 224 247 const clearCache = async (id, twitterUsername) => { 225 248 if (!confirm(`Clear processed tweets cache for @${twitterUsername}? This will make the next run re-process recent tweets (potentially causing duplicates if they were already posted).`)) return; 226 249 try { ··· 319 342 <button className="btn btn-outline-secondary btn-sm" onClick={runNow}> 320 343 Run Now 321 344 </button> 345 + {isAdmin && status.pendingBackfills?.length > 0 && ( 346 + <button className="btn btn-outline-danger btn-sm" onClick={clearAllBackfills}> 347 + Stop All Backfills ({status.pendingBackfills.length}) 348 + </button> 349 + )} 322 350 <button className="btn btn-outline-secondary btn-sm" onClick={() => setDarkMode(!darkMode)}> 323 351 {darkMode ? 'Light' : 'Dark'} 324 352 </button> ··· 328 356 </nav> 329 357 330 358 <div className="container"> 359 + {status.currentStatus && status.currentStatus.state !== 'idle' && ( 360 + <div className="row mb-4"> 361 + <div className="col-12"> 362 + <div className={`card p-3 border-start border-4 ${status.currentStatus.state === 'backfilling' ? 'border-warning' : 'border-primary'}`}> 363 + <div className="d-flex justify-content-between align-items-center"> 364 + <div> 365 + <h6 className="mb-1 d-flex align-items-center text-capitalize"> 366 + <div className="spinner-border spinner-border-sm text-primary me-2" role="status"></div> 367 + {status.currentStatus.state} 368 + {status.currentStatus.currentAccount && ` - @${status.currentStatus.currentAccount}`} 369 + </h6> 370 + <div className="text-muted small"> 371 + {status.currentStatus.message} 372 + </div> 373 + </div> 374 + {status.currentStatus.totalCount > 0 && ( 375 + <div className="text-end"> 376 + <div className="h4 mb-0">{Math.round((status.currentStatus.processedCount / status.currentStatus.totalCount) * 100)}%</div> 377 + <div className="text-muted small">{status.currentStatus.processedCount} / {status.currentStatus.totalCount}</div> 378 + </div> 379 + )} 380 + </div> 381 + {status.currentStatus.totalCount > 0 && ( 382 + <div className="progress mt-2" style={{ height: '4px' }}> 383 + <div 384 + className="progress-bar progress-bar-striped progress-bar-animated" 385 + style={{ width: `${(status.currentStatus.processedCount / status.currentStatus.totalCount) * 100}%` }} 386 + ></div> 387 + </div> 388 + )} 389 + </div> 390 + </div> 391 + </div> 392 + )} 331 393 <div className="row"> 332 394 {isAdmin && ( 333 395 <div className="col-md-4 mb-4"> ··· 403 465 <td> 404 466 <span className={`status-dot ${isBackfillQueued(m.id) ? 'status-queued' : 'status-active'}`}></span> 405 467 {isBackfillQueued(m.id) ? 'Backfill' : 'Active'} 468 + {isBackfillQueued(m.id) && ( 469 + <button onClick={() => cancelBackfill(m.id)} className="btn btn-link btn-sm text-danger p-0 ms-1" title="Stop Backfill"> 470 + <span className="material-icons" style={{fontSize: '14px'}}>cancel</span> 471 + </button> 472 + )} 406 473 </td> 407 474 <td className="text-end"> 408 475 {isAdmin && (
+111 -55
src/index.ts
··· 342 342 return null; 343 343 } 344 344 345 + async function pollForVideoProcessing(agent: BskyAgent, jobId: string): Promise<BlobRef> { 346 + console.log(`[VIDEO] ⏳ Polling for processing completion (this can take a minute)...`); 347 + let attempts = 0; 348 + let blob: BlobRef | undefined; 349 + 350 + while (!blob) { 351 + attempts++; 352 + const statusUrl = new URL("https://video.bsky.app/xrpc/app.bsky.video.getJobStatus"); 353 + statusUrl.searchParams.append("jobId", jobId); 354 + 355 + const statusResponse = await fetch(statusUrl); 356 + if (!statusResponse.ok) { 357 + console.warn(`[VIDEO] ⚠️ Job status fetch failed (${statusResponse.status}), retrying...`); 358 + await new Promise((resolve) => setTimeout(resolve, 5000)); 359 + continue; 360 + } 361 + 362 + const statusData = (await statusResponse.json()) as any; 363 + const state = statusData.jobStatus.state; 364 + const progress = statusData.jobStatus.progress || 0; 365 + 366 + console.log(`[VIDEO] 🔄 Job ${jobId}: ${state} (${progress}%)`); 367 + 368 + if (statusData.jobStatus.blob) { 369 + blob = statusData.jobStatus.blob; 370 + console.log(`[VIDEO] 🎉 Video processing complete! Blob ref obtained.`); 371 + } else if (state === "JOB_STATE_FAILED") { 372 + throw new Error(`Video processing failed: ${statusData.jobStatus.error || "Unknown error"}`); 373 + } else { 374 + // Wait before next poll 375 + await new Promise((resolve) => setTimeout(resolve, 5000)); 376 + } 377 + 378 + if (attempts > 60) { 379 + // ~5 minute timeout 380 + throw new Error("Video processing timed out after 5 minutes."); 381 + } 382 + } 383 + return blob!; 384 + } 385 + 345 386 async function uploadVideoToBluesky(agent: BskyAgent, buffer: Buffer, filename: string): Promise<BlobRef> { 346 - const sanitizedFilename = filename.split('?')[0] || 'video.mp4'; 347 - console.log(`[VIDEO] 🟢 Starting upload process for ${sanitizedFilename} (${(buffer.length / 1024 / 1024).toFixed(2)} MB)`); 348 - 387 + const sanitizedFilename = filename.split("?")[0] || "video.mp4"; 388 + console.log( 389 + `[VIDEO] 🟢 Starting upload process for ${sanitizedFilename} (${(buffer.length / 1024 / 1024).toFixed(2)} MB)`, 390 + ); 391 + 349 392 try { 350 393 // 1. Get Service Auth 351 394 // We need to resolve the actual PDS host for this DID 352 395 console.log(`[VIDEO] 🔍 Resolving PDS host for DID: ${agent.session!.did}...`); 353 396 const { data: repoDesc } = await agent.com.atproto.repo.describeRepo({ repo: agent.session!.did! }); 354 - 397 + 355 398 // didDoc might be present in repoDesc 356 - const pdsService = (repoDesc as any).didDoc?.service?.find((s: any) => s.id === '#atproto_pds' || s.type === 'AtProtoPds'); 399 + const pdsService = (repoDesc as any).didDoc?.service?.find( 400 + (s: any) => s.id === "#atproto_pds" || s.type === "AtProtoPds", 401 + ); 357 402 const pdsUrl = pdsService?.serviceEndpoint; 358 - const pdsHost = pdsUrl ? new URL(pdsUrl).host : 'bsky.social'; 359 - 403 + const pdsHost = pdsUrl ? new URL(pdsUrl).host : "bsky.social"; 404 + 360 405 console.log(`[VIDEO] 🌐 PDS Host detected: ${pdsHost}`); 361 406 console.log(`[VIDEO] 🔑 Requesting service auth token for audience: did:web:${pdsHost}...`); 362 - 407 + 363 408 const { data: serviceAuth } = await agent.com.atproto.server.getServiceAuth({ 364 - aud: `did:web:${pdsHost}`, 365 - lxm: "com.atproto.repo.uploadBlob", 366 - exp: Math.floor(Date.now() / 1000) + 60 * 30, 409 + aud: `did:web:${pdsHost}`, 410 + lxm: "com.atproto.repo.uploadBlob", 411 + exp: Math.floor(Date.now() / 1000) + 60 * 30, 367 412 }); 368 413 console.log(`[VIDEO] ✅ Service auth token obtained.`); 369 414 ··· 385 430 }); 386 431 387 432 if (!uploadResponse.ok) { 388 - const errText = await uploadResponse.text(); 389 - console.error(`[VIDEO] ❌ Server responded with ${uploadResponse.status}: ${errText}`); 390 - throw new Error(`Video upload failed: ${uploadResponse.status} ${errText}`); 433 + const errorText = await uploadResponse.text(); 434 + console.error(`[VIDEO] ❌ Server responded with ${uploadResponse.status}: ${errorText}`); 435 + 436 + try { 437 + const errorJson = JSON.parse(errorText); 438 + if (errorJson.error === "already_exists" && errorJson.jobId) { 439 + console.log(`[VIDEO] ♻️ Video already exists. Resuming with Job ID: ${errorJson.jobId}`); 440 + return await pollForVideoProcessing(agent, errorJson.jobId); 441 + } 442 + } catch (e) { 443 + // Not JSON or missing fields, proceed with throwing 444 + } 445 + 446 + throw new Error(`Video upload failed: ${uploadResponse.status} ${errorText}`); 391 447 } 392 448 393 449 const jobStatus = (await uploadResponse.json()) as any; 394 450 console.log(`[VIDEO] 📦 Upload accepted. Job ID: ${jobStatus.jobId}, State: ${jobStatus.state}`); 395 - 396 - let blob: BlobRef | undefined = jobStatus.blob; 397 451 398 - // 3. Poll for processing status 399 - if (!blob) { 400 - console.log(`[VIDEO] ⏳ Polling for processing completion (this can take a minute)...`); 401 - let attempts = 0; 402 - while (!blob) { 403 - attempts++; 404 - const statusUrl = new URL("https://video.bsky.app/xrpc/app.bsky.video.getJobStatus"); 405 - statusUrl.searchParams.append("jobId", jobStatus.jobId); 406 - 407 - const statusResponse = await fetch(statusUrl); 408 - if (!statusResponse.ok) { 409 - console.warn(`[VIDEO] ⚠️ Job status fetch failed (${statusResponse.status}), retrying...`); 410 - await new Promise((resolve) => setTimeout(resolve, 5000)); 411 - continue; 412 - } 413 - 414 - const statusData = await statusResponse.json() as any; 415 - const state = statusData.jobStatus.state; 416 - const progress = statusData.jobStatus.progress || 0; 417 - 418 - console.log(`[VIDEO] 🔄 Job ${jobStatus.jobId}: ${state} (${progress}%)`); 419 - 420 - if (statusData.jobStatus.blob) { 421 - blob = statusData.jobStatus.blob; 422 - console.log(`[VIDEO] 🎉 Video processing complete! Blob ref obtained.`); 423 - } else if (state === 'JOB_STATE_FAILED') { 424 - throw new Error(`Video processing failed: ${statusData.jobStatus.error || 'Unknown error'}`); 425 - } else { 426 - // Wait before next poll 427 - await new Promise((resolve) => setTimeout(resolve, 5000)); 428 - } 429 - 430 - if (attempts > 60) { // ~5 minute timeout 431 - throw new Error("Video processing timed out after 5 minutes."); 432 - } 433 - } 452 + if (jobStatus.blob) { 453 + return jobStatus.blob; 434 454 } 435 455 436 - return blob!; 456 + // 3. Poll for processing status 457 + return await pollForVideoProcessing(agent, jobStatus.jobId); 437 458 } catch (err) { 438 459 console.error(`[VIDEO] ❌ Error in uploadVideoToBluesky:`, (err as Error).message); 439 460 throw err; ··· 540 561 const processedTweets = loadProcessedTweets(twitterUsername); 541 562 tweets.reverse(); 542 563 564 + let count = 0; 543 565 for (const tweet of tweets) { 566 + count++; 544 567 const tweetId = tweet.id_str || tweet.id; 545 568 if (!tweetId) continue; 546 569 547 570 if (processedTweets[tweetId]) continue; 548 571 549 572 console.log(`\n[${twitterUsername}] 🕒 Processing tweet: ${tweetId}`); 573 + updateAppStatus({ 574 + state: 'processing', 575 + currentAccount: twitterUsername, 576 + processedCount: count, 577 + totalCount: tweets.length, 578 + message: `Processing tweet ${tweetId}`, 579 + }); 550 580 551 581 const replyStatusId = tweet.in_reply_to_status_id_str || tweet.in_reply_to_status_id; 552 582 const replyUserId = tweet.in_reply_to_user_id_str || tweet.in_reply_to_user_id; ··· 624 654 if (!url) continue; 625 655 try { 626 656 console.log(`[${twitterUsername}] 📥 Downloading image: ${url}`); 657 + updateAppStatus({ message: `Downloading image: ${path.basename(url)}` }); 627 658 const { buffer, mimeType } = await downloadMedia(url); 628 659 console.log(`[${twitterUsername}] 📤 Uploading image to Bluesky...`); 660 + updateAppStatus({ message: `Uploading image to Bluesky...` }); 629 661 const blob = await uploadToBluesky(agent, buffer, mimeType); 630 662 images.push({ alt: media.ext_alt_text || 'Image from Twitter', image: blob, aspectRatio }); 631 663 console.log(`[${twitterUsername}] ✅ Image uploaded.`); ··· 644 676 const videoUrl = firstVariant.url; 645 677 try { 646 678 console.log(`[${twitterUsername}] 📥 Downloading video: ${videoUrl}`); 679 + updateAppStatus({ message: `Downloading video: ${path.basename(videoUrl)}` }); 647 680 const { buffer, mimeType } = await downloadMedia(videoUrl); 648 681 649 682 if (buffer.length <= 100 * 1024 * 1024) { 650 683 const filename = videoUrl.split('/').pop() || 'video.mp4'; 684 + updateAppStatus({ message: `Uploading video to Bluesky...` }); 651 685 videoBlob = await uploadVideoToBluesky(agent, buffer, filename); 652 686 videoAspectRatio = aspectRatio; 653 687 console.log(`[${twitterUsername}] ✅ Video upload process complete.`); ··· 726 760 for (let i = 0; i < chunks.length; i++) { 727 761 const chunk = chunks[i] as string; 728 762 console.log(`[${twitterUsername}] 📤 Posting chunk ${i + 1}/${chunks.length}...`); 763 + updateAppStatus({ message: `Posting chunk ${i + 1}/${chunks.length}...` }); 729 764 730 765 const rt = new RichText({ text: chunk }); 731 766 await rt.detectFacets(agent); ··· 793 828 794 829 const wait = getRandomDelay(2000, 5000); 795 830 console.log(`[${twitterUsername}] 😴 Pacing: Waiting ${wait}ms before next tweet.`); 831 + updateAppStatus({ state: 'pacing', message: `Pacing: Waiting ${Math.round(wait/1000)}s...` }); 796 832 await new Promise((r) => setTimeout(r, wait)); 797 833 } 798 834 } ··· 824 860 const config = getConfig(); 825 861 if (config.mappings.length === 0) return; 826 862 863 + updateAppStatus({ state: 'checking', message: 'Starting account check...' }); 864 + 827 865 const pendingBackfills = getPendingBackfills(); 828 866 829 867 console.log(`[${new Date().toISOString()}] Checking all accounts...`); ··· 834 872 const agent = await getAgent(mapping); 835 873 if (!agent) continue; 836 874 837 - const backfillReq = pendingBackfills.find(b => b.id === mapping.id); 875 + const backfillReq = getPendingBackfills().find(b => b.id === mapping.id); 838 876 if (forceBackfill || backfillReq) { 839 877 const limit = backfillReq?.limit || 100; 840 878 console.log(`[${mapping.twitterUsername}] Running backfill (limit ${limit})...`); 879 + updateAppStatus({ state: 'backfilling', currentAccount: mapping.twitterUsername, message: `Starting backfill (limit ${limit})...` }); 841 880 await importHistory(mapping.twitterUsername, limit, dryRun); 842 881 clearBackfill(mapping.id); 843 882 console.log(`[${mapping.twitterUsername}] Backfill complete.`); 844 883 } else { 884 + updateAppStatus({ state: 'checking', currentAccount: mapping.twitterUsername, message: 'Fetching latest tweets...' }); 845 885 const result = await safeSearch(`from:${mapping.twitterUsername}`, 30); 846 886 if (!result.success || !result.tweets) continue; 847 887 await processTweets(agent, mapping.twitterUsername, result.tweets, dryRun); ··· 851 891 } 852 892 } 853 893 894 + updateAppStatus({ state: 'idle', currentAccount: undefined, message: 'Check complete.' }); 854 895 if (!dryRun) { 855 896 updateLastCheckTime(); 856 897 } ··· 876 917 const processedTweets = loadProcessedTweets(twitterUsername); 877 918 878 919 while (true) { 920 + // Check if this backfill request was cancelled 921 + const stillPending = getPendingBackfills().some(b => b.id === mapping.id); 922 + if (!stillPending) { 923 + console.log(`[${twitterUsername}] 🛑 Backfill cancelled by user.`); 924 + return; 925 + } 926 + 879 927 let query = `from:${twitterUsername}`; 880 928 if (maxId) query += ` max_id:${maxId}`; 881 929 882 930 console.log(`Fetching batch... (Collected: ${allFoundTweets.length})`); 931 + updateAppStatus({ message: `Fetching batch... (Collected: ${allFoundTweets.length})` }); 883 932 const result = await safeSearch(query, batchSize); 884 933 885 934 if (!result.success || !result.tweets || result.tweets.length === 0) break; ··· 911 960 } 912 961 } 913 962 914 - import { startServer, updateLastCheckTime, getPendingBackfills, clearBackfill, getNextCheckTime } from './server.js'; 963 + import { 964 + startServer, 965 + updateLastCheckTime, 966 + getPendingBackfills, 967 + clearBackfill, 968 + getNextCheckTime, 969 + updateAppStatus, 970 + } from './server.js'; 915 971 916 972 async function main(): Promise<void> { 917 973 const program = new Command();
+28
src/server.ts
··· 23 23 } 24 24 let pendingBackfills: PendingBackfill[] = []; 25 25 26 + interface AppStatus { 27 + state: 'idle' | 'checking' | 'backfilling' | 'pacing' | 'processing'; 28 + currentAccount?: string; 29 + processedCount?: number; 30 + totalCount?: number; 31 + message?: string; 32 + lastUpdate: number; 33 + } 34 + 35 + let currentAppStatus: AppStatus = { 36 + state: 'idle', 37 + lastUpdate: Date.now() 38 + }; 39 + 26 40 app.use(cors()); 27 41 app.use(express.json()); 28 42 ··· 169 183 nextCheckMinutes: Math.ceil(nextRunMs / 60000), 170 184 checkIntervalMinutes: config.checkIntervalMinutes, 171 185 pendingBackfills, 186 + currentStatus: currentAppStatus, 172 187 }); 173 188 }); 174 189 ··· 204 219 res.json({ success: true }); 205 220 }); 206 221 222 + app.post('/api/backfill/clear-all', authenticateToken, requireAdmin, (_req, res) => { 223 + pendingBackfills = []; 224 + res.json({ success: true, message: 'All backfills cleared' }); 225 + }); 226 + 207 227 // Export for use by index.ts 208 228 export function updateLastCheckTime() { 209 229 const config = getConfig(); 210 230 lastCheckTime = Date.now(); 211 231 nextCheckTime = lastCheckTime + (config.checkIntervalMinutes || 5) * 60 * 1000; 232 + } 233 + 234 + export function updateAppStatus(status: Partial<AppStatus>) { 235 + currentAppStatus = { 236 + ...currentAppStatus, 237 + ...status, 238 + lastUpdate: Date.now() 239 + }; 212 240 } 213 241 214 242 export function getPendingBackfills(): PendingBackfill[] {