like malachite (atproto-lastfm-importer) but in go and bluer
go spotify tealfm lastfm atproto
at main 1047 lines 28 kB view raw
1package main 2 3import ( 4 "archive/zip" 5 "cmp" 6 "context" 7 "encoding/json" 8 "fmt" 9 "io" 10 "io/fs" 11 "log/slog" 12 "os" 13 "slices" 14 "strings" 15 "time" 16 17 "tangled.org/karitham.dev/lazuli/cache" 18 "tangled.org/karitham.dev/lazuli/kway" 19 "tangled.org/karitham.dev/lazuli/sources/lastfm" 20 "tangled.org/karitham.dev/lazuli/sources/spotify" 21 "tangled.org/karitham.dev/lazuli/sync" 22 23 "github.com/failsafe-go/failsafe-go" 24 "github.com/failsafe-go/failsafe-go/retrypolicy" 25 "github.com/urfave/cli/v3" 26) 27 28var Version = "dev" 29 30const ( 31 DefaultBatchSize = 20 32) 33 34const ( 35 EnvHandle = "LAZULI_HANDLE" 36 EnvPassword = "LAZULI_PASSWORD" 37 EnvVerbose = "LAZULI_VERBOSE" 38 EnvQuiet = "LAZULI_QUIET" 39 EnvReverse = "LAZULI_REVERSE" 40 EnvDryRun = "LAZULI_DRY_RUN" 41 EnvFresh = "LAZULI_FRESH" 42 EnvClearCache = "LAZULI_CLEAR_CACHE" 43 EnvYes = "LAZULI_YES" 44) 45 46var ( 47 verboseCount int 48 quietCount int 49) 50 51type App struct { 52 log *slog.Logger 53 outputFormat string 54} 55 56func main() { 57 var code int 58 if err := run(); err != nil { 59 code = 1 60 fmt.Fprintf(os.Stderr, "Error: %v\n", err) 61 } 62 os.Exit(code) 63} 64 65func run() error { 66 app := &App{} 67 cmd := &cli.Command{ 68 Name: "lazuli", 69 Usage: "Import Last.fm and Spotify listening history to Bluesky", 70 Commands: []*cli.Command{ 71 app.exportCommand(), 72 app.importCommand(), 73 app.syncCommand(), 74 app.statsCommand(), 75 app.failedCommand(), 76 app.retryCommand(), 77 app.dedupeCommand(), 78 app.debugCommand(), 79 app.versionCommand(), 80 }, 81 } 82 83 return cmd.Run(context.Background(), os.Args) 84} 85 86func (a *App) exportCommand() *cli.Command { 87 flags := make([]cli.Flag, 0, len(exportFlags)+len(commonFlags)) 88 flags = append(flags, exportFlags...) 89 flags = append(flags, commonFlags...) 90 return &cli.Command{ 91 Name: "export", 92 Usage: "Parse and merge Last.fm/Spotify exports, output JSON", 93 UsageText: " lazuli export --lastfm=/path/to/lastfm.csv --spotify=/path/to/spotify.json -o merged.json", 94 Flags: flags, 95 Action: a.runExport, 96 Before: a.initLoggerBefore, 97 } 98} 99 100func (a *App) importCommand() *cli.Command { 101 flags := make([]cli.Flag, 0, len(importFlags)+len(commonFlags)) 102 flags = append(flags, importFlags...) 103 flags = append(flags, commonFlags...) 104 return &cli.Command{ 105 Name: "import", 106 Usage: "Import listening history to Bluesky", 107 UsageText: " lazuli import --handle=user.bsky.social --password=app-password --lastfm=plays.csv", 108 Flags: flags, 109 Action: a.runImport, 110 Before: a.initLoggerBefore, 111 } 112} 113 114func (a *App) syncCommand() *cli.Command { 115 flags := make([]cli.Flag, 0, len(syncFlags)+len(commonFlags)) 116 flags = append(flags, syncFlags...) 117 flags = append(flags, commonFlags...) 118 return &cli.Command{ 119 Name: "sync", 120 Usage: "Fetch existing records, show stats, filter new records", 121 UsageText: " lazuli sync --handle=user.bsky.social --password=app-password", 122 Flags: flags, 123 Action: a.runSync, 124 Before: a.initLoggerBefore, 125 } 126} 127 128func (a *App) dedupeCommand() *cli.Command { 129 flags := make([]cli.Flag, 0, len(dedupeFlags)+len(commonFlags)) 130 flags = append(flags, dedupeFlags...) 131 flags = append(flags, commonFlags...) 132 return &cli.Command{ 133 Name: "dedupe", 134 Usage: "Find and remove duplicate records", 135 UsageText: " lazuli dedupe --handle=user.bsky.social --password=app-password\n lazuli dedupe --handle=user.bsky.social --password=app-password --dry-run", 136 Flags: flags, 137 Action: a.runDedupe, 138 Before: a.initLoggerBefore, 139 } 140} 141 142func (a *App) debugCommand() *cli.Command { 143 flags := make([]cli.Flag, 0, len(commonFlags)) 144 flags = append(flags, commonFlags...) 145 return &cli.Command{ 146 Name: "debug", 147 Usage: "Fetch and dump raw records for debugging", 148 UsageText: " lazuli debug --handle=user.bsky.social --password=app-password", 149 Flags: flags, 150 Action: a.runDebugFetch, 151 Before: a.initLoggerBefore, 152 } 153} 154 155func (a *App) statsCommand() *cli.Command { 156 flags := make([]cli.Flag, 0, len(commonFlags)) 157 flags = append(flags, commonFlags...) 158 return &cli.Command{ 159 Name: "stats", 160 Usage: "Display statistics about the local database and rate limits", 161 UsageText: " lazuli stats", 162 Flags: flags, 163 Action: a.runStats, 164 Before: a.initLoggerBefore, 165 } 166} 167 168func (a *App) failedCommand() *cli.Command { 169 flags := make([]cli.Flag, 0, len(commonFlags)) 170 flags = append(flags, commonFlags...) 171 return &cli.Command{ 172 Name: "failed", 173 Usage: "List records that failed to publish", 174 UsageText: " lazuli failed --handle=user.bsky.social", 175 Flags: flags, 176 Action: a.runFailed, 177 Before: a.initLoggerBefore, 178 } 179} 180 181func (a *App) retryCommand() *cli.Command { 182 flags := make([]cli.Flag, 0, len(commonFlags)+1) 183 flags = append(flags, commonFlags...) 184 flags = append(flags, &cli.BoolFlag{ 185 Name: "dry-run", 186 Usage: "Preview what will be retried", 187 Sources: cli.EnvVars(EnvDryRun), 188 }) 189 return &cli.Command{ 190 Name: "retry", 191 Usage: "Retry failed records one by one", 192 UsageText: " lazuli retry --handle=user.bsky.social", 193 Flags: flags, 194 Action: a.runRetry, 195 Before: a.initLoggerBefore, 196 } 197} 198 199func (a *App) versionCommand() *cli.Command { 200 return &cli.Command{ 201 Name: "version", 202 Usage: "Print the version number", 203 Action: func(ctx context.Context, cmd *cli.Command) error { 204 fmt.Println(Version) 205 return nil 206 }, 207 } 208} 209 210func (a *App) runStats(ctx context.Context, cmd *cli.Command) error { 211 // Use read-only storage for stats to allow viewing while main process has it open 212 statsStorage, err := cache.NewBoltStorage(true) 213 if err != nil { 214 return fmt.Errorf("open read-only cache: %w", err) 215 } 216 defer statsStorage.Close() 217 218 stats, err := statsStorage.Stats() 219 if err != nil { 220 return fmt.Errorf("failed to get database stats: %w", err) 221 } 222 223 limiter := sync.NewRateLimiter(statsStorage, 1) 224 writes, global, err := limiter.Stats() 225 if err != nil { 226 return fmt.Errorf("failed to get rate limit stats: %w", err) 227 } 228 229 if a.outputFormat == "json" { 230 out := map[string]any{ 231 "db": stats, 232 "rateLimits": map[string]any{ 233 "writesConsumed": writes, 234 "globalConsumed": global, 235 "writesLimit": sync.WriteLimitDay, 236 "globalLimit": sync.GlobalLimitDay, 237 "writesRemaining": sync.WriteLimitDay - writes, 238 "globalRemaining": sync.GlobalLimitDay - global, 239 }, 240 } 241 data, _ := json.MarshalIndent(out, "", " ") 242 fmt.Println(string(data)) 243 return nil 244 } 245 246 fmt.Println("Database Statistics:") 247 fmt.Printf(" Total Records: %d\n", stats.TotalRecords) 248 fmt.Printf(" Marked Published: %d\n", stats.MarkedPublished) 249 fmt.Printf(" Failed Count: %d\n", stats.FailedCount) 250 fmt.Printf(" Unpublished Count: %d\n", stats.UnpublishedCount) 251 252 if len(stats.UserStats) > 0 { 253 fmt.Println("\nUser Statistics:") 254 for did, s := range stats.UserStats { 255 m := s.(map[string]int) 256 fmt.Printf(" %s:\n", did) 257 fmt.Printf(" Total: %d\n", m["total"]) 258 fmt.Printf(" Published: %d\n", m["published"]) 259 fmt.Printf(" Failed: %d\n", m["failed"]) 260 fmt.Printf(" Pending: %d\n", m["total"]-m["published"]-m["failed"]) 261 } 262 } 263 264 fmt.Println("\nRate Limit Consumption (Today):") 265 fmt.Printf(" Writes: %d / %d (Remaining: %d)\n", writes, sync.WriteLimitDay, sync.WriteLimitDay-writes) 266 fmt.Printf(" Global: %d / %d (Remaining: %d)\n", global, sync.GlobalLimitDay, sync.GlobalLimitDay-global) 267 268 return nil 269} 270 271func (a *App) runRetry(ctx context.Context, cmd *cli.Command) error { 272 storage, err := cache.NewBoltStorage(false) 273 if err != nil { 274 return fmt.Errorf("open cache: %w", err) 275 } 276 defer storage.Close() 277 278 authClient, err := a.prepareAuth(ctx, cmd) 279 if err != nil { 280 return err 281 } 282 did := authClient.DID() 283 dryRun := cmd.Bool("dry-run") 284 285 limiter := sync.NewRateLimiter(storage, 0.9) 286 repoClient := sync.NewRateClient(authClient.APIClient(), did, limiter) 287 288 var failedRecords []struct { 289 key string 290 rec sync.PlayRecord 291 } 292 293 iterateFailed := storage.IterateFailed(did) 294 iterateFailed(func(key string, rec []byte, errMsg string) bool { 295 var playRec sync.PlayRecord 296 if err := json.Unmarshal(rec, &playRec); err != nil { 297 return true // continue 298 } 299 failedRecords = append(failedRecords, struct { 300 key string 301 rec sync.PlayRecord 302 }{key, playRec}) 303 return true // continue 304 }) 305 306 if len(failedRecords) == 0 { 307 fmt.Println("No failed records to retry.") 308 return nil 309 } 310 311 fmt.Printf("Retrying %d failed records for %s...\n", len(failedRecords), did) 312 313 successCount := 0 314 errorCount := 0 315 316 for _, fr := range failedRecords { 317 if dryRun { 318 fmt.Printf("[DRY-RUN] Would retry: %s - %s\n", fr.rec.ArtistName(), fr.rec.TrackName) 319 successCount++ 320 continue 321 } 322 323 res := sync.PublishBatch(ctx, repoClient, did, []*sync.PlayRecord{&fr.rec}, storage, sync.DefaultClientAgent) 324 325 if res == nil { 326 fmt.Printf("Successfully retried: %s - %s\n", fr.rec.ArtistName(), fr.rec.TrackName) 327 if err := storage.MarkPublished(did, fr.key); err != nil { 328 a.log.Error("Failed to mark record as published", sync.ErrorAttr(err), slog.String("key", fr.key)) 329 } 330 if err := storage.RemoveFailed(did, fr.key); err != nil { 331 a.log.Error("Failed to remove record from failed list", sync.ErrorAttr(err), slog.String("key", fr.key)) 332 } 333 successCount++ 334 } else { 335 fmt.Printf("Failed again: %s - %s: %v\n", fr.rec.ArtistName(), fr.rec.TrackName, res) 336 errorCount++ 337 } 338 } 339 340 fmt.Printf("\nRetry complete: %d succeeded, %d failed.\n", successCount, errorCount) 341 return nil 342} 343 344func (a *App) runFailed(ctx context.Context, cmd *cli.Command) error { 345 storage, err := cache.NewBoltStorage(true) 346 if err != nil { 347 return fmt.Errorf("open read-only cache: %w", err) 348 } 349 defer storage.Close() 350 351 authClient, err := a.prepareAuth(ctx, cmd) 352 if err != nil { 353 return err 354 } 355 did := authClient.DID() 356 357 type FailedRecord struct { 358 Key string `json:"key"` 359 Error string `json:"error"` 360 Record sync.PlayRecord `json:"record"` 361 } 362 363 var failed []FailedRecord 364 iterateFailed := storage.IterateFailed(did) 365 iterateFailed(func(key string, rec []byte, errMsg string) bool { 366 var playRec sync.PlayRecord 367 _ = json.Unmarshal(rec, &playRec) 368 failed = append(failed, FailedRecord{ 369 Key: key, 370 Error: errMsg, 371 Record: playRec, 372 }) 373 return true // continue 374 }) 375 376 if a.outputFormat == "json" { 377 data, _ := json.MarshalIndent(failed, "", " ") 378 fmt.Println(string(data)) 379 return nil 380 } 381 382 if len(failed) == 0 { 383 fmt.Println("No failed records found.") 384 return nil 385 } 386 387 fmt.Printf("Failed Records for %s (%d):\n", did, len(failed)) 388 for _, f := range failed { 389 fmt.Printf(" [%s] %s - %s: %s\n", 390 f.Record.PlayedTime.Format(time.RFC3339), 391 f.Record.ArtistName(), 392 f.Record.TrackName, 393 f.Error) 394 } 395 396 return nil 397} 398 399func (a *App) runDebugFetch(ctx context.Context, cmd *cli.Command) error { 400 authClient, err := a.prepareAuth(ctx, cmd) 401 if err != nil { 402 return fmt.Errorf("authentication failed: %w", err) 403 } 404 405 repoClient := sync.NewRateClient(authClient.APIClient(), authClient.DID(), nil) 406 407 records, _, err := repoClient.ListRecords(ctx, sync.RecordType, 10, "") 408 if err != nil { 409 return fmt.Errorf("failed to fetch records from Bluesky: %w", err) 410 } 411 412 enc := json.NewEncoder(os.Stdout) 413 enc.SetIndent("", " ") 414 for _, r := range records { 415 if err := enc.Encode(r); err != nil { 416 return fmt.Errorf("failed to encode record: %w", err) 417 } 418 } 419 420 return nil 421} 422 423func (a *App) initLoggerBefore(ctx context.Context, cmd *cli.Command) (context.Context, error) { 424 var level slog.Level 425 426 switch verbosity := verboseCount - quietCount; { 427 case verbosity >= 2: 428 level = slog.LevelDebug 429 case verbosity == 1: 430 level = slog.LevelInfo 431 case verbosity <= -1: 432 level = slog.LevelError 433 default: 434 level = slog.LevelInfo 435 } 436 437 a.outputFormat = cmd.String("output-format") 438 439 var handler slog.Handler = slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: level}) 440 if a.outputFormat == "json" { 441 handler = slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: level}) 442 } 443 444 a.log = slog.New(handler) 445 slog.SetDefault(a.log) 446 return ctx, nil 447} 448 449func (a *App) getCredentials(cmd *cli.Command) (string, string, error) { 450 handle := cmd.String("handle") 451 password := cmd.String("password") 452 453 if handle == "" { 454 return "", "", fmt.Errorf("bluesky handle is required (set --handle or set the LAZULI_HANDLE environment variable)") 455 } 456 if password == "" { 457 return "", "", fmt.Errorf("app password is required (set --password or set the LAZULI_PASSWORD environment variable)") 458 } 459 460 return handle, password, nil 461} 462 463func (a *App) prepareAuth(ctx context.Context, cmd *cli.Command) (*sync.Client, error) { 464 handle, password, err := a.getCredentials(cmd) 465 if err != nil { 466 return nil, err 467 } 468 469 authClient, err := sync.NewClient(ctx, handle, password) 470 if err != nil { 471 return nil, fmt.Errorf("create auth client: %w", err) 472 } 473 474 return authClient, nil 475} 476 477func (a *App) runExport(ctx context.Context, cmd *cli.Command) error { 478 a.log.Info("Starting export operation") 479 480 lastfmPath := cmd.String("lastfm") 481 spotifyPath := cmd.String("spotify") 482 outputPath := cmd.String("output") 483 reverse := cmd.Bool("reverse") 484 tolerance := cmd.Duration("tolerance") 485 486 records, _, err := loadRecordsMerge(ctx, lastfmPath, spotifyPath, tolerance) 487 if err != nil { 488 return fmt.Errorf("failed to deduplicate records: %w", err) 489 } 490 491 if reverse { 492 slices.Reverse(records) 493 } 494 495 return a.outputRecords(records, outputPath) 496} 497 498func (a *App) runImport(ctx context.Context, cmd *cli.Command) error { 499 storage, err := cache.NewBoltStorage(false) 500 if err != nil { 501 return fmt.Errorf("open cache: %w", err) 502 } 503 defer storage.Close() 504 505 handle, password, err := a.getCredentials(cmd) 506 if err != nil { 507 return err 508 } 509 510 a.log.Info("Starting import operation", sync.DIDAttr(handle)) 511 512 lastfmPath := cmd.String("lastfm") 513 spotifyPath := cmd.String("spotify") 514 dryRun := cmd.Bool("dry-run") 515 reverse := cmd.Bool("reverse") 516 fresh := cmd.Bool("fresh") 517 clearCache := cmd.Bool("clear-cache") 518 batchSize := cmd.Int("batch-size") 519 tolerance := cmd.Duration("tolerance") 520 521 if clearCache { 522 if err := storage.ClearAll(); err != nil { 523 a.log.Error("Failed to clear cache", sync.ErrorAttr(err)) 524 } else { 525 a.log.Info("Cache cleared") 526 } 527 } 528 529 records, totalCount, err := loadRecordsMerge(ctx, lastfmPath, spotifyPath, tolerance) 530 if err != nil { 531 return fmt.Errorf("load records: %w", err) 532 } 533 a.log.Info("Loaded records for import", slog.Int("total", totalCount), slog.Int("filtered", len(records))) 534 535 if len(records) == 0 { 536 a.log.Info("No new records to import") 537 return nil 538 } 539 540 authClient, err := sync.NewClient(ctx, handle, password) 541 if err != nil { 542 return fmt.Errorf("create auth client: %w", err) 543 } 544 a.log.Info("Authenticated", sync.DIDAttr(authClient.DID()), slog.String("pds", authClient.PDS())) 545 546 limiter := sync.NewRateLimiter(storage, 0.9) 547 repoClient := sync.NewRateClient(authClient.APIClient(), authClient.DID(), limiter) 548 549 existingRecords, err := sync.FetchExisting(ctx, repoClient, authClient.DID(), storage, fresh) 550 if err != nil { 551 return fmt.Errorf("fetch existing records: %w", err) 552 } 553 a.log.Info("Fetched existing records", slog.Int("count", len(existingRecords))) 554 555 published, _ := storage.GetPublished(authClient.DID()) 556 newRecords := sync.FilterNew(records, existingRecords, published, tolerance) 557 skippedCount := len(records) - len(newRecords) 558 a.log.Info("Filtered to new records", 559 slog.Int("count", len(newRecords)), 560 slog.Int("skipped", skippedCount)) 561 562 if len(newRecords) == 0 { 563 a.log.Info("All records already exist, nothing to import") 564 return nil 565 } 566 567 if reverse { 568 slices.Reverse(newRecords) 569 } 570 571 if len(newRecords) > 0 { 572 newEntries := make(map[string][]byte) 573 keys := sync.CreateRecordKeys(newRecords) 574 for i, rec := range newRecords { 575 key := keys[i] 576 value, _ := json.Marshal(rec) 577 newEntries[key] = value 578 } 579 if err := storage.SaveRecords(authClient.DID(), newEntries); err != nil { 580 return fmt.Errorf("save new records to storage: %w", err) 581 } 582 } 583 584 progressLog := a.createProgressLogger() 585 586 publishOpts := sync.PublishOptions{ 587 BatchSize: batchSize, 588 DryRun: dryRun, 589 Reverse: reverse, 590 ATProtoClient: repoClient, 591 ProgressLog: progressLog, 592 ClientAgent: fmt.Sprintf("lazuli/%s", Version), 593 Storage: storage, 594 Limiter: limiter, 595 } 596 597 result := sync.Publish(ctx, authClient, publishOpts) 598 599 a.log.Info("Import completed", 600 slog.Int("success_count", result.SuccessCount), 601 slog.Int("error_count", result.ErrorCount), 602 slog.Bool("cancelled", result.Cancelled), 603 slog.Duration("duration", result.Duration), 604 slog.Float64("records_per_minute", result.RecordsPerMinute)) 605 606 if a.outputFormat == "json" { 607 summary := map[string]any{ 608 "successCount": result.SuccessCount, 609 "errorCount": result.ErrorCount, 610 "cancelled": result.Cancelled, 611 "durationSeconds": result.Duration.Seconds(), 612 "recordsPerMinute": result.RecordsPerMinute, 613 "totalRecords": result.TotalRecords, 614 } 615 if data, err := json.MarshalIndent(summary, "", " "); err == nil { 616 fmt.Fprintln(os.Stderr, string(data)) 617 } 618 } 619 620 if result.ErrorCount > 0 { 621 return fmt.Errorf("import completed with %d errors", result.ErrorCount) 622 } 623 624 return nil 625} 626 627func (a *App) createProgressLogger() func(sync.ProgressReport) { 628 return func(pr sync.ProgressReport) { 629 if a.outputFormat == "json" { 630 if data, err := json.MarshalIndent(pr, "", " "); err == nil { 631 fmt.Fprintln(os.Stderr, string(data)) 632 } 633 } else { 634 a.log.Info("sync progress", 635 slog.Int("completed", pr.Completed), 636 slog.Int("total", pr.Total), 637 slog.Float64("percent", pr.Percent), 638 slog.String("elapsed", pr.Elapsed), 639 slog.String("eta", pr.ETA), 640 slog.String("rate", pr.Rate), 641 slog.Int("errors", pr.Errors), 642 slog.Int("writes", pr.WritesConsumed), 643 slog.Int("global", pr.GlobalConsumed), 644 slog.String("limited_rate", pr.ConstrainedRate), 645 slog.String("reset_in", pr.TimeUntilReset)) 646 } 647 } 648} 649 650func (a *App) runSync(ctx context.Context, cmd *cli.Command) error { 651 storage, err := cache.NewBoltStorage(false) 652 if err != nil { 653 return fmt.Errorf("open cache: %w", err) 654 } 655 defer storage.Close() 656 657 authClient, err := a.prepareAuth(ctx, cmd) 658 if err != nil { 659 return err 660 } 661 662 fresh := cmd.Bool("fresh") 663 useCAR := cmd.Bool("car") 664 a.log.Info("Starting sync operation", sync.DIDAttr(authClient.DID()), slog.Bool("fresh", fresh), slog.Bool("use_car", useCAR)) 665 666 limiter := sync.NewRateLimiter(storage, 0.85) 667 repoClient := sync.NewRateClient(authClient.APIClient(), authClient.DID(), limiter) 668 669 if fresh { 670 if err := storage.Clear(authClient.DID()); err != nil { 671 a.log.Error("Failed to clear cache", sync.ErrorAttr(err)) 672 } else { 673 a.log.Info("Cache cleared") 674 } 675 } 676 677 var existingRecords []sync.ExistingRecord 678 if useCAR { 679 existingRecords, err = sync.FetchExistingViaCAR(ctx, authClient, authClient.DID(), sync.RecordType, storage) 680 } else { 681 existingRecords, err = sync.FetchExisting(ctx, repoClient, authClient.DID(), storage, fresh) 682 } 683 if err != nil { 684 return fmt.Errorf("fetch existing records: %w", err) 685 } 686 687 a.log.Info("Sync stats", slog.Int("total_records", len(existingRecords))) 688 689 return nil 690} 691 692func (a *App) runDedupe(ctx context.Context, cmd *cli.Command) error { 693 storage, err := cache.NewBoltStorage(false) 694 if err != nil { 695 return fmt.Errorf("open cache: %w", err) 696 } 697 defer storage.Close() 698 699 authClient, err := a.prepareAuth(ctx, cmd) 700 if err != nil { 701 return fmt.Errorf("authentication failed: %w", err) 702 } 703 704 dryRun := cmd.Bool("dry-run") 705 fresh := cmd.Bool("fresh") 706 yes := cmd.Bool("yes") 707 useCAR := cmd.Bool("car") 708 a.log.Info("Starting dedupe operation", 709 sync.DIDAttr(authClient.DID()), 710 slog.Bool("dry_run", dryRun), 711 slog.Bool("fresh", fresh), 712 slog.Bool("use_car", useCAR)) 713 714 limiter := sync.NewRateLimiter(storage, 0.9) 715 repoClient := sync.NewRateClient(authClient.APIClient(), authClient.DID(), limiter) 716 717 if fresh { 718 if err := storage.Clear(authClient.DID()); err != nil { 719 a.log.Error("Failed to clear cache", sync.ErrorAttr(err)) 720 } else { 721 a.log.Info("Cache cleared") 722 } 723 } 724 725 var existingRecords []sync.ExistingRecord 726 if useCAR { 727 existingRecords, err = sync.FetchExistingViaCAR(ctx, authClient, authClient.DID(), sync.RecordType, storage) 728 } else { 729 existingRecords, err = sync.FetchExisting(ctx, repoClient, authClient.DID(), storage, fresh) 730 } 731 if err != nil { 732 return fmt.Errorf("failed to fetch existing records: %w", err) 733 } 734 735 duplicates := sync.FindDuplicates(existingRecords) 736 totalDuplicates := 0 737 for _, group := range duplicates { 738 totalDuplicates += len(group) - 1 739 } 740 741 a.log.Info("Dedupe analysis", 742 slog.Int("total_records", len(existingRecords)), 743 slog.Int("duplicate_groups", len(duplicates)), 744 slog.Int("total_duplicates", totalDuplicates)) 745 746 if totalDuplicates == 0 { 747 a.log.Info("No duplicates found") 748 return nil 749 } 750 751 if dryRun { 752 a.log.InfoContext(ctx, "Dry run - would remove the following duplicates") 753 for _, group := range duplicates { 754 keep := group[0] 755 for _, rec := range group[1:] { 756 a.log.InfoContext(ctx, "Would remove", 757 slog.String("uri", rec.URI), 758 slog.String("track", keep.Value.TrackName), 759 slog.String("artist", keep.Value.ArtistName()), 760 slog.String("time", keep.Value.PlayedTime.Format(time.RFC3339))) 761 } 762 } 763 return nil 764 } 765 766 if !yes { 767 fmt.Fprintf(os.Stderr, "\nThis will permanently delete %d duplicate record(s). Continue? [y/N]: ", totalDuplicates) 768 var response string 769 fmt.Scanln(&response) 770 if response != "y" && response != "Y" { 771 a.log.Info("Dedupe cancelled by user") 772 return nil 773 } 774 } 775 776 for _, group := range duplicates { 777 for i := 1; i < len(group); i++ { 778 rec := group[i] 779 uri := rec.URI 780 parts := strings.Split(uri, "/") 781 rkey := parts[len(parts)-1] 782 783 retryPolicy := retrypolicy.NewBuilder[any](). 784 WithMaxRetries(10). 785 WithBackoff(sync.BaseRetryDelay, 5*time.Minute). 786 HandleIf(func(_ any, err error) bool { 787 return sync.IsTransientError(err) 788 }). 789 OnRetryScheduled(func(e failsafe.ExecutionScheduledEvent[any]) { 790 a.log.Warn("Delete failed with transient error, retrying", 791 slog.Duration("retryDelay", e.Delay), 792 sync.ErrorAttr(e.LastError()), 793 slog.Int("attempt", e.Attempts()), 794 slog.String("uri", uri)) 795 }). 796 Build() 797 798 err := failsafe.With(retryPolicy).WithContext(ctx).Run(func() error { 799 return repoClient.DeleteRecord(ctx, sync.RecordType, rkey) 800 }) 801 802 if err != nil { 803 a.log.Error("Failed to delete record", sync.ErrorAttr(err), slog.String("uri", uri)) 804 } else { 805 a.log.Info("Deleted duplicate", slog.String("uri", uri), slog.String("track", rec.Value.TrackName)) 806 } 807 } 808 } 809 810 if err := storage.Clear(authClient.DID()); err != nil { 811 a.log.Error("Failed to clear cache", "err", err) 812 } 813 814 return nil 815} 816 817func (a *App) outputRecords(records []*sync.PlayRecord, outputPath string) error { 818 var output io.Writer = os.Stdout 819 if outputPath != "" { 820 file, err := os.Create(outputPath) 821 if err != nil { 822 return fmt.Errorf("create output file: %w", err) 823 } 824 defer file.Close() 825 output = file 826 } 827 828 enc := json.NewEncoder(output) 829 for _, r := range records { 830 if err := enc.Encode(r); err != nil { 831 return fmt.Errorf("encode record: %w", err) 832 } 833 } 834 835 return nil 836} 837 838var commonFlags = []cli.Flag{ 839 &cli.StringFlag{ 840 Name: "handle", 841 Usage: "Bluesky handle", 842 Sources: cli.EnvVars(EnvHandle), 843 }, 844 &cli.StringFlag{ 845 Name: "password", 846 Usage: "App password", 847 Sources: cli.EnvVars(EnvPassword), 848 }, 849 &cli.BoolFlag{ 850 Name: "verbose", 851 Usage: "Enable verbose logging (-v for debug, -vv for trace)", 852 Aliases: []string{"v"}, 853 Sources: cli.EnvVars(EnvVerbose), 854 Config: cli.BoolConfig{Count: &verboseCount}, 855 }, 856 &cli.BoolFlag{ 857 Name: "quiet", 858 Usage: "Suppress non-essential output (-q for warn, -qq for errors, -qqq for silent)", 859 Aliases: []string{"q"}, 860 Sources: cli.EnvVars(EnvQuiet), 861 Config: cli.BoolConfig{Count: &quietCount}, 862 }, 863 &cli.StringFlag{ 864 Name: "output-format", 865 Usage: "Output format: text or json", 866 Value: "text", 867 Sources: cli.EnvVars("LAZULI_OUTPUT_FORMAT"), 868 }, 869} 870 871var lastfmFlag = &cli.StringFlag{ 872 Name: "lastfm", 873 Usage: "Path to Last.fm CSV file or directory", 874 Sources: cli.EnvVars("LAZULI_LASTFM"), 875} 876 877var spotifyFlag = &cli.StringFlag{ 878 Name: "spotify", 879 Usage: "Path to Spotify JSON/directory/zip", 880 Sources: cli.EnvVars("LAZULI_SPOTIFY"), 881} 882 883var exportFlags = []cli.Flag{ 884 lastfmFlag, 885 spotifyFlag, 886 &cli.StringFlag{ 887 Name: "output", 888 Usage: "Output file (stdout if not set)", 889 Sources: cli.EnvVars("LAZULI_OUTPUT"), 890 }, 891 &cli.BoolFlag{ 892 Name: "reverse", 893 Usage: "Sort records reverse chronologically", 894 Sources: cli.EnvVars(EnvReverse), 895 }, 896 &cli.DurationFlag{ 897 Name: "tolerance", 898 Usage: "Time tolerance for cross-source deduplication (e.g., 5m, 10m)", 899 Value: sync.DefaultCrossSourceTolerance, 900 Sources: cli.EnvVars("LAZULI_TOLERANCE"), 901 }, 902} 903 904var importFlags = []cli.Flag{ 905 lastfmFlag, 906 spotifyFlag, 907 &cli.BoolFlag{ 908 Name: "dry-run", 909 Usage: "Preview without publishing", 910 Sources: cli.EnvVars(EnvDryRun), 911 }, 912 &cli.BoolFlag{ 913 Name: "reverse", 914 Usage: "Import in reverse order", 915 Sources: cli.EnvVars(EnvReverse), 916 }, 917 &cli.BoolFlag{ 918 Name: "fresh", 919 Usage: "Don't use cached Bluesky records", 920 Sources: cli.EnvVars(EnvFresh), 921 }, 922 &cli.BoolFlag{ 923 Name: "clear-cache", 924 Usage: "Clear cache before running", 925 Sources: cli.EnvVars(EnvClearCache), 926 }, 927 &cli.IntFlag{ 928 Name: "batch-size", 929 Usage: "Records per batch (default: 20)", 930 Value: DefaultBatchSize, 931 Sources: cli.EnvVars("LAZULI_BATCH_SIZE"), 932 }, 933 &cli.DurationFlag{ 934 Name: "tolerance", 935 Usage: "Time tolerance for cross-source deduplication (e.g., 5m, 10m)", 936 Value: sync.DefaultCrossSourceTolerance, 937 Sources: cli.EnvVars("LAZULI_TOLERANCE"), 938 }, 939} 940 941var syncFlags = []cli.Flag{ 942 &cli.BoolFlag{ 943 Name: "fresh", 944 Usage: "Force refresh cache", 945 Sources: cli.EnvVars(EnvFresh), 946 }, 947 &cli.BoolFlag{ 948 Name: "car", 949 Usage: "Use CAR export (faster for large repos)", 950 Sources: cli.EnvVars("LAZULI_USE_CAR"), 951 }, 952} 953 954var dedupeFlags = []cli.Flag{ 955 &cli.BoolFlag{ 956 Name: "dry-run", 957 Usage: "Preview without deleting", 958 Sources: cli.EnvVars(EnvDryRun), 959 }, 960 &cli.BoolFlag{ 961 Name: "fresh", 962 Usage: "Force refresh cache", 963 Sources: cli.EnvVars(EnvFresh), 964 }, 965 &cli.BoolFlag{ 966 Name: "yes", 967 Usage: "Skip confirmation prompt", 968 Aliases: []string{"y"}, 969 Sources: cli.EnvVars(EnvYes), 970 }, 971 &cli.BoolFlag{ 972 Name: "car", 973 Usage: "Use CAR export (faster for large repos)", 974 Sources: cli.EnvVars("LAZULI_USE_CAR"), 975 }, 976} 977 978type Parser interface { 979 ParseFile(ctx context.Context, r io.Reader) ([]*sync.PlayRecord, error) 980 ParseFS(ctx context.Context, fsys fs.FS) ([]*sync.PlayRecord, error) 981} 982 983func parseInput(ctx context.Context, path string, parser Parser) ([]*sync.PlayRecord, error) { 984 info, err := os.Stat(path) 985 if err != nil { 986 return nil, fmt.Errorf("stat path: %w", err) 987 } 988 989 if info.IsDir() { 990 return parser.ParseFS(ctx, os.DirFS(path)) 991 } 992 993 if strings.HasSuffix(path, ".zip") { 994 zf, err := zip.OpenReader(path) 995 if err != nil { 996 return nil, fmt.Errorf("open zip: %w", err) 997 } 998 999 defer zf.Close() 1000 1001 return parser.ParseFS(ctx, zf) 1002 } 1003 1004 file, err := os.Open(path) 1005 if err != nil { 1006 return nil, fmt.Errorf("open file: %w", err) 1007 } 1008 1009 defer file.Close() 1010 1011 return parser.ParseFile(ctx, file) 1012} 1013 1014func loadRecordsMerge(ctx context.Context, lastFMPath, spotifyPath string, tolerance time.Duration) ([]*sync.PlayRecord, int, error) { 1015 var lastfmRecords, spotifyRecords []*sync.PlayRecord 1016 var err error 1017 1018 if lastFMPath != "" { 1019 lastfmRecords, err = parseInput(ctx, lastFMPath, lastfm.Parser{}) 1020 if err != nil { 1021 return nil, 0, fmt.Errorf("parse lastfm: %w", err) 1022 } 1023 } 1024 1025 if spotifyPath != "" { 1026 spotifyRecords, err = parseInput(ctx, spotifyPath, spotify.Parser{}) 1027 if err != nil { 1028 return nil, 0, fmt.Errorf("parse spotify: %w", err) 1029 } 1030 } 1031 1032 f := func(a, b *sync.PlayRecord) int { 1033 if v := a.Time().Compare(b.Time()); v != 0 { 1034 return v 1035 } 1036 1037 return cmp.Compare(a.ArtistName(), b.ArtistName()) 1038 } 1039 slices.SortFunc(spotifyRecords, f) 1040 slices.SortFunc(lastfmRecords, f) 1041 1042 totalInput := len(lastfmRecords) + len(spotifyRecords) 1043 1044 mergedRecords := kway.Merge([][]*sync.PlayRecord{lastfmRecords, spotifyRecords}, tolerance) 1045 1046 return mergedRecords, totalInput, nil 1047}