this repo has no description
1//
2// BlobDownloader.swift
3// shortcut
4//
5// Created by Bailey Townsend on 7/14/25.
6//
7
8import Foundation
9import UniformTypeIdentifiers
10import ZIPFoundation
11
12// Progress event types
13public enum DownloadProgress {
14 case started(totalCIDs: Int)
15 case progressUpdate(currentCount: Int, totalCount: Int)
16 case completed(totalDownloaded: Int, zipURL: URL)
17 case failed(error: Error)
18}
19
20// Structure to hold download result
21struct DownloadResult {
22 let tempURL: URL
23 let response: URLResponse?
24}
25
26// Async semaphore for concurrency control
27actor AsyncSemaphore {
28 private var value: Int
29 private var waiters: [CheckedContinuation<Void, Never>] = []
30
31 init(value: Int) {
32 self.value = value
33 }
34
35 func wait() async {
36 if value > 0 {
37 value -= 1
38 return
39 }
40
41 await withCheckedContinuation { continuation in
42 waiters.append(continuation)
43 }
44 }
45
46 func signal() {
47 if let waiter = waiters.first {
48 waiters.removeFirst()
49 waiter.resume()
50 } else {
51 value += 1
52 }
53 }
54}
55
56// Delegate handler for background downloads
57class BlobDownloadDelegate: NSObject, URLSessionDownloadDelegate {
58 private var downloadCompletions:
59 [URLSessionDownloadTask: CheckedContinuation<DownloadResult, Error>] = [:]
60 private var taskResponses: [URLSessionDownloadTask: URLResponse] = [:]
61 private var taskAccountDids: [URLSessionDownloadTask: String] = [:]
62 private let completionQueue = DispatchQueue(label: "com.app.atproto.download-completions")
63
64 func addCompletion(
65 _ continuation: CheckedContinuation<DownloadResult, Error>,
66 for task: URLSessionDownloadTask, accountDid: String? = nil
67 ) {
68 completionQueue.sync {
69 downloadCompletions[task] = continuation
70 if let accountDid = accountDid {
71 taskAccountDids[task] = accountDid
72 }
73 }
74 }
75
76 func urlSession(
77 _ session: URLSession, downloadTask: URLSessionDownloadTask,
78 didFinishDownloadingTo location: URL
79 ) {
80 completionQueue.sync {
81 if let continuation = downloadCompletions[downloadTask] {
82 // Move file to temporary location to prevent deletion
83 let tempDir = FileManager.default.temporaryDirectory
84 let tempURL = tempDir.appendingPathComponent(UUID().uuidString)
85 do {
86 try FileManager.default.moveItem(at: location, to: tempURL)
87 let response = taskResponses[downloadTask] ?? downloadTask.response
88 let result = DownloadResult(tempURL: tempURL, response: response)
89 continuation.resume(returning: result)
90
91 // Update background tracker if this is a blob download
92 if let accountDid = taskAccountDids[downloadTask] {
93 Task {
94 await BackgroundDownloadTracker.shared.incrementProgress(
95 for: accountDid)
96 }
97 }
98 } catch {
99 continuation.resume(throwing: error)
100 }
101 downloadCompletions.removeValue(forKey: downloadTask)
102 taskResponses.removeValue(forKey: downloadTask)
103 taskAccountDids.removeValue(forKey: downloadTask)
104 }
105 }
106 }
107
108 func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?)
109 {
110 completionQueue.sync {
111 if let downloadTask = task as? URLSessionDownloadTask,
112 let continuation = downloadCompletions[downloadTask]
113 {
114 if let error = error {
115 continuation.resume(throwing: error)
116 downloadCompletions.removeValue(forKey: downloadTask)
117 taskResponses.removeValue(forKey: downloadTask)
118 taskAccountDids.removeValue(forKey: downloadTask)
119 }
120 // If no error, success is handled in didFinishDownloadingTo
121 }
122 }
123 }
124
125 func urlSessionDidFinishEvents(forBackgroundURLSession session: URLSession) {
126 // Handle background session completion
127 // You might want to post a notification here if needed
128 }
129}
130
131public actor BlobDownloader {
132 private var maxConcurrentDownloads: Int = 5
133 private var activeTasks = 0
134 private var continuation: CheckedContinuation<Void, Never>?
135 // private var atProtocolManger: AtProtocolManager
136
137 // Add these for task management
138 private var activeDownloadTasks: Set<Task<URL?, Error>> = []
139 private var currentDownloadTask: Task<[URL], Error>?
140
141 // Delegate instance
142 private let sessionDelegate = BlobDownloadDelegate()
143
144 // Stable session identifier for background downloads
145 private static let backgroundSessionIdentifier = "com.app.atproto.backup.downloader"
146
147 private lazy var backgroundSession: URLSession = {
148 let config = URLSessionConfiguration.background(
149 withIdentifier: Self.backgroundSessionIdentifier
150 )
151 config.isDiscretionary = false
152 config.sessionSendsLaunchEvents = true
153 config.allowsCellularAccess = true // For critical data
154 return URLSession(configuration: config, delegate: sessionDelegate, delegateQueue: nil)
155 }()
156
157 // Initialize and restore any pending background downloads
158 public init() {
159 // Note: backgroundSession will be lazily created on first use
160 // which will automatically reconnect with any existing downloads
161 }
162
163 // Static method to handle background session events (call from AppDelegate)
164 public static func handleEventsForBackgroundURLSession(
165 identifier: String,
166 completionHandler: @escaping () -> Void
167 ) {
168 if identifier == backgroundSessionIdentifier {
169 // Store the completion handler to call when all tasks complete
170 // This would typically be handled in the delegate's urlSessionDidFinishEvents method
171 completionHandler()
172 }
173 }
174 // Progress tracking
175 // private var progressContinuation: AsyncStream<DownloadProgress>.Continuation?
176
177 // Actor for thread-safe archive writes
178
179 // init(maxConcurrentDownloads: Int = 2) {
180 // self.maxConcurrentDownloads = maxConcurrentDownloads
181 // }
182
183 private func waitForAvailableSlot() async {
184 while activeTasks >= maxConcurrentDownloads {
185 await withCheckedContinuation { cont in
186 continuation = cont
187 }
188 }
189 activeTasks += 1
190 }
191
192 private func releaseSlot() {
193 activeTasks -= 1
194 if let cont = continuation {
195 continuation = nil
196 cont.resume()
197 }
198 }
199
200 // Background-optimized download method that enqueues all tasks immediately
201 func downloadBlobsBackground(
202 repo: String,
203 pdsURL: String,
204 cids: [String],
205 saveLocationBookmark: Data? = nil,
206 maxConcurrent: Int = 5,
207 progressHandler: ((Int, Int) -> Void)? = nil
208 ) async throws -> (urls: [URL], newDownloads: Int) {
209 // Resolve save location
210 var saveLocation: URL
211 if let override = saveLocationBookmark {
212 var isStale = false
213 guard
214 let saveUrl = try? URL(
215 resolvingBookmarkData: override,
216 options: .withoutUI,
217 relativeTo: nil,
218 bookmarkDataIsStale: &isStale
219 )
220 else {
221 throw GenericIntentError.message("Failed to resolve bookmark data")
222 }
223 saveLocation = saveUrl
224 } else {
225 let tempDirectory = FileManager.default.temporaryDirectory
226 saveLocation = tempDirectory.appendingPathComponent(repo)
227
228 do {
229 try FileManager.default.createDirectory(
230 at: saveLocation, withIntermediateDirectories: true, attributes: nil)
231 } catch CocoaError.fileWriteFileExists {
232 print("Folder already exists at: \(saveLocation.path)")
233 } catch {
234 throw error
235 }
236 }
237
238 print(
239 "Starting background download of \(cids.count) blobs with max \(maxConcurrent) concurrent"
240 )
241
242 // Create all download tasks immediately so they continue in background
243 var results: [String: URL] = [:]
244 var newDownloadCount = 0
245
246 // Semaphore to limit concurrent downloads
247 let semaphore = AsyncSemaphore(value: maxConcurrent)
248
249 // Use a TaskGroup to wait for all downloads
250 try await withThrowingTaskGroup(of: (String, URL, Bool)?.self) { group in
251 for cid in cids {
252 group.addTask { [weak self] in
253 guard let self = self else { return nil }
254
255 // Wait for semaphore before starting download
256 await semaphore.wait()
257
258 do {
259 let (url, wasNewDownload) = try await self.getBlob(
260 from: repo,
261 cid: cid,
262 fileManager: FileManager.default,
263 saveLocation: saveLocation,
264 pdsURL: pdsURL
265 )
266 await semaphore.signal()
267 return (cid, url, wasNewDownload)
268 } catch {
269 print("Failed to download blob \(cid): \(error)")
270 await semaphore.signal()
271 return nil
272 }
273 }
274 }
275
276 var downloadedCount = 0
277 for try await result in group {
278 if let (cid, url, wasNewDownload) = result {
279 results[cid] = url
280 downloadedCount += 1
281 if wasNewDownload {
282 newDownloadCount += 1
283 }
284
285 progressHandler?(downloadedCount, cids.count)
286 }
287 }
288 }
289
290 // Return URLs in original CID order and count of new downloads
291 return (cids.compactMap { results[$0] }, newDownloadCount)
292 }
293
294 // Modified downloadBlobs method with proper cleanup
295 func downloadBlobs(
296 repo: String,
297 pdsURL: String,
298 cids: [String],
299 saveLocationBookmark: Data? = nil,
300 maxConcurrentDownloads: Int = 1,
301 progressHandler: ((Int, Int) -> Void)? = nil
302 ) async throws -> (urls: [URL], newDownloads: Int) {
303 // Use background-optimized method for better suspension handling
304 return try await downloadBlobsBackground(
305 repo: repo,
306 pdsURL: pdsURL,
307 cids: cids,
308 saveLocationBookmark: saveLocationBookmark,
309 maxConcurrent: maxConcurrentDownloads,
310 progressHandler: progressHandler
311 )
312 }
313
314 // Legacy chunked download method (kept for reference)
315 func downloadBlobsChunked(
316 repo: String,
317 pdsURL: String,
318 cids: [String],
319 saveLocationBookmark: Data? = nil,
320 maxConcurrentDownloads: Int = 1,
321 progressHandler: ((Int, Int) -> Void)? = nil
322 ) async throws -> [URL] {
323 self.maxConcurrentDownloads = maxConcurrentDownloads
324 // Cancel any existing download task
325 cancelAllActiveTasks()
326
327 // Create new download task and store reference
328
329 // guard let self = self else { throw BlobDownloadError.noData }
330
331 // Check for cancellation at the start
332 try Task.checkCancellation()
333
334 do {
335 var totalProcessed = 0
336 var totalCIDs = 0
337 var successfulDownloads = 0
338
339 var saveLocation: URL
340 if let override = saveLocationBookmark {
341 var isStale = false
342 guard
343 let saveUrl = try? URL(
344 resolvingBookmarkData: override,
345 options: .withoutUI,
346 relativeTo: nil,
347 bookmarkDataIsStale: &isStale
348 )
349 else {
350 throw GenericIntentError.message("Failed to resolve bookmark data")
351 }
352 saveLocation = saveUrl
353 } else {
354 let tempDirectory = FileManager.default.temporaryDirectory
355 saveLocation = tempDirectory.appendingPathComponent(repo)
356
357 do {
358 try FileManager.default.createDirectory(
359 at: saveLocation, withIntermediateDirectories: true, attributes: nil)
360 } catch CocoaError.fileWriteFileExists {
361 print("Folder already exists at: \(saveLocation.path)")
362 } catch {
363 throw error
364 }
365 }
366
367 // Check for cancellation before processing
368 // try Task.checkCancellation()
369 if Task.isCancelled {
370 throw CancellationError()
371 }
372
373 totalCIDs += cids.count
374 var urls = [URL]()
375 let chunkSize = maxConcurrentDownloads
376
377 for chunk in cids.chunked(into: chunkSize) {
378 // Check for cancellation before each chunk
379 try Task.checkCancellation()
380
381 let newUrls = try await downloadBlobsConcurrently(
382 repo: repo,
383 pdsURL: pdsURL,
384 cids: chunk,
385 fileManager: FileManager.default,
386 saveToDirectory: saveLocation
387 )
388 urls.append(contentsOf: newUrls)
389 totalProcessed += chunk.count
390 successfulDownloads += chunk.count
391
392 if Task.isCancelled {
393 print("Download cancelled early")
394 }
395 print("Downloaded \(totalProcessed) of \(totalCIDs) blobs")
396 progressHandler?(totalProcessed, totalCIDs)
397 }
398
399 return urls
400
401 } catch {
402 // Clean up on any error (including cancellation)
403 self.cancelAllActiveTasks()
404 throw error
405 }
406
407 // Store the task reference
408 // currentDownloadTask = downloadTask
409 //
410 // defer {
411 // // Clean up task reference when done
412 // Task { [weak self] in
413 // await self?.clearCurrentTask()
414 // }
415 // }
416
417 // return try await downloadTask.value
418 }
419
420 public func CancelAll() {
421 self.cancelAllActiveTasks()
422 }
423
424 // Method to cancel all active tasks
425 private func cancelAllActiveTasks() {
426 print("Cancelling all active download tasks...")
427
428 // Cancel all individual download tasks
429 for task in activeDownloadTasks {
430 task.cancel()
431 }
432 activeDownloadTasks.removeAll()
433
434 // Cancel current download task
435 currentDownloadTask?.cancel()
436 currentDownloadTask = nil
437
438 // Reset active task counter
439 activeTasks = 0
440
441 // Resume any waiting continuations
442 if let cont = continuation {
443 continuation = nil
444 cont.resume()
445 }
446 }
447
448 private func clearCurrentTask() {
449 currentDownloadTask = nil
450 }
451
452 // Add task tracking to downloadBlobsConcurrently
453 private func downloadBlobsConcurrently(
454 repo: String,
455 pdsURL: String,
456 cids: [String],
457 fileManager: FileManager,
458 saveToDirectory: URL
459 ) async throws -> [URL] {
460 var successCount = 0
461 var urls: [URL] = []
462
463 try await withThrowingTaskGroup(of: URL?.self) { group in
464 for cid in cids {
465 // Check for cancellation before adding each task
466 try Task.checkCancellation()
467
468 group.addTask { [weak self] in
469 // Create a cancellable task and track it
470 let downloadTask = Task<URL?, Error> {
471 do {
472 // Check for cancellation at start of each download
473 try Task.checkCancellation()
474
475 let checkForUrl = findFile(
476 withBaseName: cid, inDirectory: saveToDirectory)
477 if let checkForUrl {
478 return checkForUrl
479 }
480
481 let url = try await self?.downloadBlobWithRetry(
482 repo: repo,
483 pdsURL: pdsURL,
484 cid: cid,
485 maxRetries: 3,
486 fileManger: fileManager,
487 saveToDirectory: saveToDirectory
488 )
489 return url
490 } catch let downloadError as BlobDownloadError {
491 throw downloadError
492
493 } catch {
494 if error is CancellationError {
495 print("Download cancelled for blob \(cid)")
496 throw error
497 } else {
498 print("Failed to download blob \(cid): \(error)")
499 }
500 return nil
501 }
502 }
503
504 // Track the task
505 await self?.addActiveTask(downloadTask)
506
507 let result = try await downloadTask.value
508
509 // Remove from tracking when done
510 await self?.removeActiveTask(downloadTask)
511
512 return result
513 }
514 }
515
516 // Wait for all tasks to complete, but check for cancellation
517 for try await success in group {
518 try Task.checkCancellation()
519 if let url = success {
520 urls.append(url)
521 successCount += 1
522 }
523 }
524 }
525
526 return urls
527 }
528
529 // Helper methods for task tracking
530 private func addActiveTask(_ task: Task<URL?, Error>) {
531 activeDownloadTasks.insert(task)
532 }
533
534 private func removeActiveTask(_ task: Task<URL?, Error>) {
535 activeDownloadTasks.remove(task)
536 }
537
538 // Public method to cancel downloads from outside
539 // public func cancelDownloads() async {
540 // cancelAllActiveTasks()
541 // }
542
543 // Alternative: Stream large blobs to temporary files
544
545 public func streamBlobToDisk(
546 resourceURL: URL,
547 fileManager: FileManager,
548 saveLocation: URL,
549 fileName: String,
550 accountDid: String? = nil
551 ) async throws -> URL {
552 try Task.checkCancellation()
553
554 var request = URLRequest(url: resourceURL)
555 request.httpMethod = "GET"
556 request.setValue("*/*", forHTTPHeaderField: "Accept")
557 request.timeoutInterval = 30
558
559 // Create download task and use continuation
560 let result = try await withCheckedThrowingContinuation {
561 (continuation: CheckedContinuation<DownloadResult, Error>) in
562 let downloadTask = self.backgroundSession.downloadTask(with: request)
563 self.sessionDelegate.addCompletion(
564 continuation, for: downloadTask, accountDid: accountDid)
565 downloadTask.resume()
566 }
567
568 let tempURL = result.tempURL
569 let response = result.response
570
571 if let httpResponse = response as? HTTPURLResponse {
572 switch httpResponse.statusCode {
573 case (200...299):
574 let mimeType = httpResponse.value(forHTTPHeaderField: "Content-Type") ?? ""
575 let ending = fileExtension(fromMimeType: mimeType).map { ".\($0)" } ?? ""
576 let newLocation = saveLocation.appendingPathComponent("\(fileName)\(ending)")
577 try fileManager.moveItem(at: tempURL, to: newLocation)
578 return newLocation
579 case 400:
580 // For 400 errors, we'd need to handle differently since background downloads
581 // don't give us the response body. Just throw the error.
582 try? FileManager.default.removeItem(at: tempURL)
583 throw BlobDownloadError.httpError(statusCode: 400)
584
585 default:
586 try? FileManager.default.removeItem(at: tempURL)
587 throw BlobDownloadError.httpError(
588 statusCode: httpResponse.statusCode)
589 }
590 }
591
592 // If no response info, assume success and process based on content
593 let mimeType = "application/octet-stream" // Default
594 let ending = fileExtension(fromMimeType: mimeType).map { ".\($0)" } ?? ""
595 let newLocation = saveLocation.appendingPathComponent("\(fileName)\(ending)")
596 try fileManager.moveItem(at: tempURL, to: newLocation)
597 return newLocation
598 }
599
600 public func getBlob(
601 from accountDID: String,
602 cid: String,
603 fileManager: FileManager,
604 saveLocation: URL,
605 pdsURL: String? = nil
606 ) async throws -> (url: URL, wasNewDownload: Bool) {
607 // Check if blob already exists with any extension
608 let fileEnumerator = fileManager.enumerator(
609 at: saveLocation,
610 includingPropertiesForKeys: [.isRegularFileKey],
611 options: [.skipsHiddenFiles, .skipsSubdirectoryDescendants]
612 )
613
614 if let fileEnumerator = fileEnumerator {
615 // Convert to array to avoid async iteration issues
616 let files = fileEnumerator.allObjects.compactMap { $0 as? URL }
617 for fileURL in files {
618 let fileName = fileURL.deletingPathExtension().lastPathComponent
619 if fileName == cid {
620 print("Blob \(cid) already exists at \(fileURL.path), skipping download")
621
622 Task {
623 await BackgroundDownloadTracker.shared.incrementProgress(for: accountDID)
624 }
625
626 return (fileURL, false)
627 }
628 }
629 }
630
631 let baseUrl = pdsURL ?? "https://bsky.network"
632
633 // Construct URL with query parameters
634 guard var urlComponents = URLComponents(string: "\(baseUrl)/xrpc/com.atproto.sync.getBlob")
635 else {
636 throw BlobDownloadError.invalidURL
637 }
638
639 urlComponents.queryItems = [
640 URLQueryItem(name: "did", value: accountDID),
641 URLQueryItem(name: "cid", value: cid),
642 ]
643
644 guard let url = urlComponents.url else {
645 throw BlobDownloadError.invalidURL
646 }
647
648 let downloadedURL = try await streamBlobToDisk(
649 resourceURL: url, fileManager: fileManager, saveLocation: saveLocation, fileName: cid,
650 accountDid: accountDID)
651
652 return (downloadedURL, true)
653 }
654
655 public func getCar(
656 from accountDID: String,
657 since: String?,
658 fileManager: FileManager,
659 saveLocation: URL,
660 fileName: String = "repo.car",
661 pdsURL: String? = nil
662 ) async throws -> URL {
663
664 let baseUrl = pdsURL ?? "https://bsky.network"
665 guard var urlComponents = URLComponents(string: "\(baseUrl)/xrpc/com.atproto.sync.getRepo")
666 else {
667 throw BlobDownloadError.invalidURL
668 }
669
670 urlComponents.queryItems = [
671 URLQueryItem(name: "did", value: accountDID)
672
673 ]
674
675 if let sinceQuery = since {
676 urlComponents.queryItems?.append(URLQueryItem(name: "since", value: sinceQuery))
677 }
678
679 guard let url = urlComponents.url else {
680 throw BlobDownloadError.invalidURL
681 }
682
683 return try await streamBlobToDisk(
684 resourceURL: url, fileManager: fileManager, saveLocation: saveLocation,
685 fileName: fileName)
686 }
687
688 /// Downloads a single blob with retry logic
689 public func downloadBlobWithRetry(
690 repo: String,
691 pdsURL: String,
692 cid: String,
693 maxRetries: Int,
694 fileManger: FileManager,
695 saveToDirectory: URL
696 ) async throws -> URL {
697 var lastError: Error?
698
699 for attempt in 0..<maxRetries {
700 // Check for cancellation before each retry
701 try Task.checkCancellation()
702
703 do {
704 if attempt > 0 {
705 // Exponential backoff
706 let delay = UInt64(pow(2.0, Double(attempt)) * 1_000_000_000)
707 try await Task.sleep(nanoseconds: delay)
708 }
709 let (url, _) = try await getBlob(
710 from: repo, cid: cid, fileManager: fileManger, saveLocation: saveToDirectory,
711 pdsURL: pdsURL)
712 return url
713 } catch let downloadError as BlobDownloadError {
714 switch downloadError {
715 case .apiError(_):
716 throw downloadError
717 default:
718 lastError = downloadError
719 if attempt < maxRetries - 1 {
720 continue
721 }
722 break
723 }
724 } catch {
725 // If it's a cancellation error, don't retry
726 if error is CancellationError {
727 throw error
728 }
729
730 lastError = error
731 if attempt < maxRetries - 1 {
732 continue
733 }
734
735 }
736 }
737
738 throw lastError
739 ?? GenericIntentError.message("Failed to download blob after \(maxRetries) attempts")
740 }
741 public struct BlobDownloadOutput {
742 public var data: Data
743 public var cid: String
744 public var mimeType: String
745 }
746
747 // MARK: - Optimized Concurrent Blob Downloader
748
749 public class ConcurrentBlobDownloader {
750 private let urlSession: URLSession
751
752 public init() {
753 let config = URLSessionConfiguration.default
754 config.httpMaximumConnectionsPerHost = 10
755 config.timeoutIntervalForRequest = 30
756 config.timeoutIntervalForResource = 300
757 self.urlSession = URLSession(configuration: config)
758 }
759
760 }
761
762}
763
764public func fileExtension(fromMimeType mimeType: String) -> String? {
765 guard let utType = UTType(mimeType: mimeType) else { return nil }
766 return utType.preferredFilenameExtension
767}
768
769// MARK: - Helper Extensions
770
771extension Array {
772 /// Splits array into chunks of specified size
773 func chunked(into size: Int) -> [[Element]] {
774 return stride(from: 0, to: count, by: size).map {
775 Array(self[$0..<Swift.min($0 + size, count)])
776 }
777 }
778}
779
780/// An error type related to issues surrounding HTTP responses.
781public struct ATHTTPResponseError: Decodable {
782
783 /// The name of the error.
784 public let error: String
785
786 /// The message for the error.
787 public let message: String
788}
789
790func findFile(withBaseName baseName: String, inDirectory directory: URL) -> URL? {
791 let fileManager = FileManager.default
792
793 do {
794 let contents = try fileManager.contentsOfDirectory(
795 at: directory, includingPropertiesForKeys: nil)
796
797 // Find the first file that matches the base name
798 return contents.first { fileURL in
799 let fileNameWithoutExtension = fileURL.deletingPathExtension().lastPathComponent
800 return fileNameWithoutExtension == baseName
801 }
802 } catch {
803 print("Error reading directory: \(error)")
804 return nil
805 }
806}
807
808public enum BlobDownloadError: Error {
809 case invalidURL
810 case networkError(Error)
811 case noData
812 case httpError(statusCode: Int)
813 case unknownError
814 case apiError(error: ATHTTPResponseError)
815
816}