this repo has no description
at main 816 lines 28 kB view raw
1// 2// BlobDownloader.swift 3// shortcut 4// 5// Created by Bailey Townsend on 7/14/25. 6// 7 8import Foundation 9import UniformTypeIdentifiers 10import ZIPFoundation 11 12// Progress event types 13public enum DownloadProgress { 14 case started(totalCIDs: Int) 15 case progressUpdate(currentCount: Int, totalCount: Int) 16 case completed(totalDownloaded: Int, zipURL: URL) 17 case failed(error: Error) 18} 19 20// Structure to hold download result 21struct DownloadResult { 22 let tempURL: URL 23 let response: URLResponse? 24} 25 26// Async semaphore for concurrency control 27actor AsyncSemaphore { 28 private var value: Int 29 private var waiters: [CheckedContinuation<Void, Never>] = [] 30 31 init(value: Int) { 32 self.value = value 33 } 34 35 func wait() async { 36 if value > 0 { 37 value -= 1 38 return 39 } 40 41 await withCheckedContinuation { continuation in 42 waiters.append(continuation) 43 } 44 } 45 46 func signal() { 47 if let waiter = waiters.first { 48 waiters.removeFirst() 49 waiter.resume() 50 } else { 51 value += 1 52 } 53 } 54} 55 56// Delegate handler for background downloads 57class BlobDownloadDelegate: NSObject, URLSessionDownloadDelegate { 58 private var downloadCompletions: 59 [URLSessionDownloadTask: CheckedContinuation<DownloadResult, Error>] = [:] 60 private var taskResponses: [URLSessionDownloadTask: URLResponse] = [:] 61 private var taskAccountDids: [URLSessionDownloadTask: String] = [:] 62 private let completionQueue = DispatchQueue(label: "com.app.atproto.download-completions") 63 64 func addCompletion( 65 _ continuation: CheckedContinuation<DownloadResult, Error>, 66 for task: URLSessionDownloadTask, accountDid: String? = nil 67 ) { 68 completionQueue.sync { 69 downloadCompletions[task] = continuation 70 if let accountDid = accountDid { 71 taskAccountDids[task] = accountDid 72 } 73 } 74 } 75 76 func urlSession( 77 _ session: URLSession, downloadTask: URLSessionDownloadTask, 78 didFinishDownloadingTo location: URL 79 ) { 80 completionQueue.sync { 81 if let continuation = downloadCompletions[downloadTask] { 82 // Move file to temporary location to prevent deletion 83 let tempDir = FileManager.default.temporaryDirectory 84 let tempURL = tempDir.appendingPathComponent(UUID().uuidString) 85 do { 86 try FileManager.default.moveItem(at: location, to: tempURL) 87 let response = taskResponses[downloadTask] ?? downloadTask.response 88 let result = DownloadResult(tempURL: tempURL, response: response) 89 continuation.resume(returning: result) 90 91 // Update background tracker if this is a blob download 92 if let accountDid = taskAccountDids[downloadTask] { 93 Task { 94 await BackgroundDownloadTracker.shared.incrementProgress( 95 for: accountDid) 96 } 97 } 98 } catch { 99 continuation.resume(throwing: error) 100 } 101 downloadCompletions.removeValue(forKey: downloadTask) 102 taskResponses.removeValue(forKey: downloadTask) 103 taskAccountDids.removeValue(forKey: downloadTask) 104 } 105 } 106 } 107 108 func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) 109 { 110 completionQueue.sync { 111 if let downloadTask = task as? URLSessionDownloadTask, 112 let continuation = downloadCompletions[downloadTask] 113 { 114 if let error = error { 115 continuation.resume(throwing: error) 116 downloadCompletions.removeValue(forKey: downloadTask) 117 taskResponses.removeValue(forKey: downloadTask) 118 taskAccountDids.removeValue(forKey: downloadTask) 119 } 120 // If no error, success is handled in didFinishDownloadingTo 121 } 122 } 123 } 124 125 func urlSessionDidFinishEvents(forBackgroundURLSession session: URLSession) { 126 // Handle background session completion 127 // You might want to post a notification here if needed 128 } 129} 130 131public actor BlobDownloader { 132 private var maxConcurrentDownloads: Int = 5 133 private var activeTasks = 0 134 private var continuation: CheckedContinuation<Void, Never>? 135 // private var atProtocolManger: AtProtocolManager 136 137 // Add these for task management 138 private var activeDownloadTasks: Set<Task<URL?, Error>> = [] 139 private var currentDownloadTask: Task<[URL], Error>? 140 141 // Delegate instance 142 private let sessionDelegate = BlobDownloadDelegate() 143 144 // Stable session identifier for background downloads 145 private static let backgroundSessionIdentifier = "com.app.atproto.backup.downloader" 146 147 private lazy var backgroundSession: URLSession = { 148 let config = URLSessionConfiguration.background( 149 withIdentifier: Self.backgroundSessionIdentifier 150 ) 151 config.isDiscretionary = false 152 config.sessionSendsLaunchEvents = true 153 config.allowsCellularAccess = true // For critical data 154 return URLSession(configuration: config, delegate: sessionDelegate, delegateQueue: nil) 155 }() 156 157 // Initialize and restore any pending background downloads 158 public init() { 159 // Note: backgroundSession will be lazily created on first use 160 // which will automatically reconnect with any existing downloads 161 } 162 163 // Static method to handle background session events (call from AppDelegate) 164 public static func handleEventsForBackgroundURLSession( 165 identifier: String, 166 completionHandler: @escaping () -> Void 167 ) { 168 if identifier == backgroundSessionIdentifier { 169 // Store the completion handler to call when all tasks complete 170 // This would typically be handled in the delegate's urlSessionDidFinishEvents method 171 completionHandler() 172 } 173 } 174 // Progress tracking 175 // private var progressContinuation: AsyncStream<DownloadProgress>.Continuation? 176 177 // Actor for thread-safe archive writes 178 179 // init(maxConcurrentDownloads: Int = 2) { 180 // self.maxConcurrentDownloads = maxConcurrentDownloads 181 // } 182 183 private func waitForAvailableSlot() async { 184 while activeTasks >= maxConcurrentDownloads { 185 await withCheckedContinuation { cont in 186 continuation = cont 187 } 188 } 189 activeTasks += 1 190 } 191 192 private func releaseSlot() { 193 activeTasks -= 1 194 if let cont = continuation { 195 continuation = nil 196 cont.resume() 197 } 198 } 199 200 // Background-optimized download method that enqueues all tasks immediately 201 func downloadBlobsBackground( 202 repo: String, 203 pdsURL: String, 204 cids: [String], 205 saveLocationBookmark: Data? = nil, 206 maxConcurrent: Int = 5, 207 progressHandler: ((Int, Int) -> Void)? = nil 208 ) async throws -> (urls: [URL], newDownloads: Int) { 209 // Resolve save location 210 var saveLocation: URL 211 if let override = saveLocationBookmark { 212 var isStale = false 213 guard 214 let saveUrl = try? URL( 215 resolvingBookmarkData: override, 216 options: .withoutUI, 217 relativeTo: nil, 218 bookmarkDataIsStale: &isStale 219 ) 220 else { 221 throw GenericIntentError.message("Failed to resolve bookmark data") 222 } 223 saveLocation = saveUrl 224 } else { 225 let tempDirectory = FileManager.default.temporaryDirectory 226 saveLocation = tempDirectory.appendingPathComponent(repo) 227 228 do { 229 try FileManager.default.createDirectory( 230 at: saveLocation, withIntermediateDirectories: true, attributes: nil) 231 } catch CocoaError.fileWriteFileExists { 232 print("Folder already exists at: \(saveLocation.path)") 233 } catch { 234 throw error 235 } 236 } 237 238 print( 239 "Starting background download of \(cids.count) blobs with max \(maxConcurrent) concurrent" 240 ) 241 242 // Create all download tasks immediately so they continue in background 243 var results: [String: URL] = [:] 244 var newDownloadCount = 0 245 246 // Semaphore to limit concurrent downloads 247 let semaphore = AsyncSemaphore(value: maxConcurrent) 248 249 // Use a TaskGroup to wait for all downloads 250 try await withThrowingTaskGroup(of: (String, URL, Bool)?.self) { group in 251 for cid in cids { 252 group.addTask { [weak self] in 253 guard let self = self else { return nil } 254 255 // Wait for semaphore before starting download 256 await semaphore.wait() 257 258 do { 259 let (url, wasNewDownload) = try await self.getBlob( 260 from: repo, 261 cid: cid, 262 fileManager: FileManager.default, 263 saveLocation: saveLocation, 264 pdsURL: pdsURL 265 ) 266 await semaphore.signal() 267 return (cid, url, wasNewDownload) 268 } catch { 269 print("Failed to download blob \(cid): \(error)") 270 await semaphore.signal() 271 return nil 272 } 273 } 274 } 275 276 var downloadedCount = 0 277 for try await result in group { 278 if let (cid, url, wasNewDownload) = result { 279 results[cid] = url 280 downloadedCount += 1 281 if wasNewDownload { 282 newDownloadCount += 1 283 } 284 285 progressHandler?(downloadedCount, cids.count) 286 } 287 } 288 } 289 290 // Return URLs in original CID order and count of new downloads 291 return (cids.compactMap { results[$0] }, newDownloadCount) 292 } 293 294 // Modified downloadBlobs method with proper cleanup 295 func downloadBlobs( 296 repo: String, 297 pdsURL: String, 298 cids: [String], 299 saveLocationBookmark: Data? = nil, 300 maxConcurrentDownloads: Int = 1, 301 progressHandler: ((Int, Int) -> Void)? = nil 302 ) async throws -> (urls: [URL], newDownloads: Int) { 303 // Use background-optimized method for better suspension handling 304 return try await downloadBlobsBackground( 305 repo: repo, 306 pdsURL: pdsURL, 307 cids: cids, 308 saveLocationBookmark: saveLocationBookmark, 309 maxConcurrent: maxConcurrentDownloads, 310 progressHandler: progressHandler 311 ) 312 } 313 314 // Legacy chunked download method (kept for reference) 315 func downloadBlobsChunked( 316 repo: String, 317 pdsURL: String, 318 cids: [String], 319 saveLocationBookmark: Data? = nil, 320 maxConcurrentDownloads: Int = 1, 321 progressHandler: ((Int, Int) -> Void)? = nil 322 ) async throws -> [URL] { 323 self.maxConcurrentDownloads = maxConcurrentDownloads 324 // Cancel any existing download task 325 cancelAllActiveTasks() 326 327 // Create new download task and store reference 328 329 // guard let self = self else { throw BlobDownloadError.noData } 330 331 // Check for cancellation at the start 332 try Task.checkCancellation() 333 334 do { 335 var totalProcessed = 0 336 var totalCIDs = 0 337 var successfulDownloads = 0 338 339 var saveLocation: URL 340 if let override = saveLocationBookmark { 341 var isStale = false 342 guard 343 let saveUrl = try? URL( 344 resolvingBookmarkData: override, 345 options: .withoutUI, 346 relativeTo: nil, 347 bookmarkDataIsStale: &isStale 348 ) 349 else { 350 throw GenericIntentError.message("Failed to resolve bookmark data") 351 } 352 saveLocation = saveUrl 353 } else { 354 let tempDirectory = FileManager.default.temporaryDirectory 355 saveLocation = tempDirectory.appendingPathComponent(repo) 356 357 do { 358 try FileManager.default.createDirectory( 359 at: saveLocation, withIntermediateDirectories: true, attributes: nil) 360 } catch CocoaError.fileWriteFileExists { 361 print("Folder already exists at: \(saveLocation.path)") 362 } catch { 363 throw error 364 } 365 } 366 367 // Check for cancellation before processing 368 // try Task.checkCancellation() 369 if Task.isCancelled { 370 throw CancellationError() 371 } 372 373 totalCIDs += cids.count 374 var urls = [URL]() 375 let chunkSize = maxConcurrentDownloads 376 377 for chunk in cids.chunked(into: chunkSize) { 378 // Check for cancellation before each chunk 379 try Task.checkCancellation() 380 381 let newUrls = try await downloadBlobsConcurrently( 382 repo: repo, 383 pdsURL: pdsURL, 384 cids: chunk, 385 fileManager: FileManager.default, 386 saveToDirectory: saveLocation 387 ) 388 urls.append(contentsOf: newUrls) 389 totalProcessed += chunk.count 390 successfulDownloads += chunk.count 391 392 if Task.isCancelled { 393 print("Download cancelled early") 394 } 395 print("Downloaded \(totalProcessed) of \(totalCIDs) blobs") 396 progressHandler?(totalProcessed, totalCIDs) 397 } 398 399 return urls 400 401 } catch { 402 // Clean up on any error (including cancellation) 403 self.cancelAllActiveTasks() 404 throw error 405 } 406 407 // Store the task reference 408 // currentDownloadTask = downloadTask 409 // 410 // defer { 411 // // Clean up task reference when done 412 // Task { [weak self] in 413 // await self?.clearCurrentTask() 414 // } 415 // } 416 417 // return try await downloadTask.value 418 } 419 420 public func CancelAll() { 421 self.cancelAllActiveTasks() 422 } 423 424 // Method to cancel all active tasks 425 private func cancelAllActiveTasks() { 426 print("Cancelling all active download tasks...") 427 428 // Cancel all individual download tasks 429 for task in activeDownloadTasks { 430 task.cancel() 431 } 432 activeDownloadTasks.removeAll() 433 434 // Cancel current download task 435 currentDownloadTask?.cancel() 436 currentDownloadTask = nil 437 438 // Reset active task counter 439 activeTasks = 0 440 441 // Resume any waiting continuations 442 if let cont = continuation { 443 continuation = nil 444 cont.resume() 445 } 446 } 447 448 private func clearCurrentTask() { 449 currentDownloadTask = nil 450 } 451 452 // Add task tracking to downloadBlobsConcurrently 453 private func downloadBlobsConcurrently( 454 repo: String, 455 pdsURL: String, 456 cids: [String], 457 fileManager: FileManager, 458 saveToDirectory: URL 459 ) async throws -> [URL] { 460 var successCount = 0 461 var urls: [URL] = [] 462 463 try await withThrowingTaskGroup(of: URL?.self) { group in 464 for cid in cids { 465 // Check for cancellation before adding each task 466 try Task.checkCancellation() 467 468 group.addTask { [weak self] in 469 // Create a cancellable task and track it 470 let downloadTask = Task<URL?, Error> { 471 do { 472 // Check for cancellation at start of each download 473 try Task.checkCancellation() 474 475 let checkForUrl = findFile( 476 withBaseName: cid, inDirectory: saveToDirectory) 477 if let checkForUrl { 478 return checkForUrl 479 } 480 481 let url = try await self?.downloadBlobWithRetry( 482 repo: repo, 483 pdsURL: pdsURL, 484 cid: cid, 485 maxRetries: 3, 486 fileManger: fileManager, 487 saveToDirectory: saveToDirectory 488 ) 489 return url 490 } catch let downloadError as BlobDownloadError { 491 throw downloadError 492 493 } catch { 494 if error is CancellationError { 495 print("Download cancelled for blob \(cid)") 496 throw error 497 } else { 498 print("Failed to download blob \(cid): \(error)") 499 } 500 return nil 501 } 502 } 503 504 // Track the task 505 await self?.addActiveTask(downloadTask) 506 507 let result = try await downloadTask.value 508 509 // Remove from tracking when done 510 await self?.removeActiveTask(downloadTask) 511 512 return result 513 } 514 } 515 516 // Wait for all tasks to complete, but check for cancellation 517 for try await success in group { 518 try Task.checkCancellation() 519 if let url = success { 520 urls.append(url) 521 successCount += 1 522 } 523 } 524 } 525 526 return urls 527 } 528 529 // Helper methods for task tracking 530 private func addActiveTask(_ task: Task<URL?, Error>) { 531 activeDownloadTasks.insert(task) 532 } 533 534 private func removeActiveTask(_ task: Task<URL?, Error>) { 535 activeDownloadTasks.remove(task) 536 } 537 538 // Public method to cancel downloads from outside 539 // public func cancelDownloads() async { 540 // cancelAllActiveTasks() 541 // } 542 543 // Alternative: Stream large blobs to temporary files 544 545 public func streamBlobToDisk( 546 resourceURL: URL, 547 fileManager: FileManager, 548 saveLocation: URL, 549 fileName: String, 550 accountDid: String? = nil 551 ) async throws -> URL { 552 try Task.checkCancellation() 553 554 var request = URLRequest(url: resourceURL) 555 request.httpMethod = "GET" 556 request.setValue("*/*", forHTTPHeaderField: "Accept") 557 request.timeoutInterval = 30 558 559 // Create download task and use continuation 560 let result = try await withCheckedThrowingContinuation { 561 (continuation: CheckedContinuation<DownloadResult, Error>) in 562 let downloadTask = self.backgroundSession.downloadTask(with: request) 563 self.sessionDelegate.addCompletion( 564 continuation, for: downloadTask, accountDid: accountDid) 565 downloadTask.resume() 566 } 567 568 let tempURL = result.tempURL 569 let response = result.response 570 571 if let httpResponse = response as? HTTPURLResponse { 572 switch httpResponse.statusCode { 573 case (200...299): 574 let mimeType = httpResponse.value(forHTTPHeaderField: "Content-Type") ?? "" 575 let ending = fileExtension(fromMimeType: mimeType).map { ".\($0)" } ?? "" 576 let newLocation = saveLocation.appendingPathComponent("\(fileName)\(ending)") 577 try fileManager.moveItem(at: tempURL, to: newLocation) 578 return newLocation 579 case 400: 580 // For 400 errors, we'd need to handle differently since background downloads 581 // don't give us the response body. Just throw the error. 582 try? FileManager.default.removeItem(at: tempURL) 583 throw BlobDownloadError.httpError(statusCode: 400) 584 585 default: 586 try? FileManager.default.removeItem(at: tempURL) 587 throw BlobDownloadError.httpError( 588 statusCode: httpResponse.statusCode) 589 } 590 } 591 592 // If no response info, assume success and process based on content 593 let mimeType = "application/octet-stream" // Default 594 let ending = fileExtension(fromMimeType: mimeType).map { ".\($0)" } ?? "" 595 let newLocation = saveLocation.appendingPathComponent("\(fileName)\(ending)") 596 try fileManager.moveItem(at: tempURL, to: newLocation) 597 return newLocation 598 } 599 600 public func getBlob( 601 from accountDID: String, 602 cid: String, 603 fileManager: FileManager, 604 saveLocation: URL, 605 pdsURL: String? = nil 606 ) async throws -> (url: URL, wasNewDownload: Bool) { 607 // Check if blob already exists with any extension 608 let fileEnumerator = fileManager.enumerator( 609 at: saveLocation, 610 includingPropertiesForKeys: [.isRegularFileKey], 611 options: [.skipsHiddenFiles, .skipsSubdirectoryDescendants] 612 ) 613 614 if let fileEnumerator = fileEnumerator { 615 // Convert to array to avoid async iteration issues 616 let files = fileEnumerator.allObjects.compactMap { $0 as? URL } 617 for fileURL in files { 618 let fileName = fileURL.deletingPathExtension().lastPathComponent 619 if fileName == cid { 620 print("Blob \(cid) already exists at \(fileURL.path), skipping download") 621 622 Task { 623 await BackgroundDownloadTracker.shared.incrementProgress(for: accountDID) 624 } 625 626 return (fileURL, false) 627 } 628 } 629 } 630 631 let baseUrl = pdsURL ?? "https://bsky.network" 632 633 // Construct URL with query parameters 634 guard var urlComponents = URLComponents(string: "\(baseUrl)/xrpc/com.atproto.sync.getBlob") 635 else { 636 throw BlobDownloadError.invalidURL 637 } 638 639 urlComponents.queryItems = [ 640 URLQueryItem(name: "did", value: accountDID), 641 URLQueryItem(name: "cid", value: cid), 642 ] 643 644 guard let url = urlComponents.url else { 645 throw BlobDownloadError.invalidURL 646 } 647 648 let downloadedURL = try await streamBlobToDisk( 649 resourceURL: url, fileManager: fileManager, saveLocation: saveLocation, fileName: cid, 650 accountDid: accountDID) 651 652 return (downloadedURL, true) 653 } 654 655 public func getCar( 656 from accountDID: String, 657 since: String?, 658 fileManager: FileManager, 659 saveLocation: URL, 660 fileName: String = "repo.car", 661 pdsURL: String? = nil 662 ) async throws -> URL { 663 664 let baseUrl = pdsURL ?? "https://bsky.network" 665 guard var urlComponents = URLComponents(string: "\(baseUrl)/xrpc/com.atproto.sync.getRepo") 666 else { 667 throw BlobDownloadError.invalidURL 668 } 669 670 urlComponents.queryItems = [ 671 URLQueryItem(name: "did", value: accountDID) 672 673 ] 674 675 if let sinceQuery = since { 676 urlComponents.queryItems?.append(URLQueryItem(name: "since", value: sinceQuery)) 677 } 678 679 guard let url = urlComponents.url else { 680 throw BlobDownloadError.invalidURL 681 } 682 683 return try await streamBlobToDisk( 684 resourceURL: url, fileManager: fileManager, saveLocation: saveLocation, 685 fileName: fileName) 686 } 687 688 /// Downloads a single blob with retry logic 689 public func downloadBlobWithRetry( 690 repo: String, 691 pdsURL: String, 692 cid: String, 693 maxRetries: Int, 694 fileManger: FileManager, 695 saveToDirectory: URL 696 ) async throws -> URL { 697 var lastError: Error? 698 699 for attempt in 0..<maxRetries { 700 // Check for cancellation before each retry 701 try Task.checkCancellation() 702 703 do { 704 if attempt > 0 { 705 // Exponential backoff 706 let delay = UInt64(pow(2.0, Double(attempt)) * 1_000_000_000) 707 try await Task.sleep(nanoseconds: delay) 708 } 709 let (url, _) = try await getBlob( 710 from: repo, cid: cid, fileManager: fileManger, saveLocation: saveToDirectory, 711 pdsURL: pdsURL) 712 return url 713 } catch let downloadError as BlobDownloadError { 714 switch downloadError { 715 case .apiError(_): 716 throw downloadError 717 default: 718 lastError = downloadError 719 if attempt < maxRetries - 1 { 720 continue 721 } 722 break 723 } 724 } catch { 725 // If it's a cancellation error, don't retry 726 if error is CancellationError { 727 throw error 728 } 729 730 lastError = error 731 if attempt < maxRetries - 1 { 732 continue 733 } 734 735 } 736 } 737 738 throw lastError 739 ?? GenericIntentError.message("Failed to download blob after \(maxRetries) attempts") 740 } 741 public struct BlobDownloadOutput { 742 public var data: Data 743 public var cid: String 744 public var mimeType: String 745 } 746 747 // MARK: - Optimized Concurrent Blob Downloader 748 749 public class ConcurrentBlobDownloader { 750 private let urlSession: URLSession 751 752 public init() { 753 let config = URLSessionConfiguration.default 754 config.httpMaximumConnectionsPerHost = 10 755 config.timeoutIntervalForRequest = 30 756 config.timeoutIntervalForResource = 300 757 self.urlSession = URLSession(configuration: config) 758 } 759 760 } 761 762} 763 764public func fileExtension(fromMimeType mimeType: String) -> String? { 765 guard let utType = UTType(mimeType: mimeType) else { return nil } 766 return utType.preferredFilenameExtension 767} 768 769// MARK: - Helper Extensions 770 771extension Array { 772 /// Splits array into chunks of specified size 773 func chunked(into size: Int) -> [[Element]] { 774 return stride(from: 0, to: count, by: size).map { 775 Array(self[$0..<Swift.min($0 + size, count)]) 776 } 777 } 778} 779 780/// An error type related to issues surrounding HTTP responses. 781public struct ATHTTPResponseError: Decodable { 782 783 /// The name of the error. 784 public let error: String 785 786 /// The message for the error. 787 public let message: String 788} 789 790func findFile(withBaseName baseName: String, inDirectory directory: URL) -> URL? { 791 let fileManager = FileManager.default 792 793 do { 794 let contents = try fileManager.contentsOfDirectory( 795 at: directory, includingPropertiesForKeys: nil) 796 797 // Find the first file that matches the base name 798 return contents.first { fileURL in 799 let fileNameWithoutExtension = fileURL.deletingPathExtension().lastPathComponent 800 return fileNameWithoutExtension == baseName 801 } 802 } catch { 803 print("Error reading directory: \(error)") 804 return nil 805 } 806} 807 808public enum BlobDownloadError: Error { 809 case invalidURL 810 case networkError(Error) 811 case noData 812 case httpError(statusCode: Int) 813 case unknownError 814 case apiError(error: ATHTTPResponseError) 815 816}