package main import ( "archive/zip" "cmp" "context" "encoding/json" "fmt" "io" "io/fs" "log/slog" "os" "slices" "strings" "time" "tangled.org/karitham.dev/lazuli/cache" "tangled.org/karitham.dev/lazuli/kway" "tangled.org/karitham.dev/lazuli/sources/lastfm" "tangled.org/karitham.dev/lazuli/sources/spotify" "tangled.org/karitham.dev/lazuli/sync" "github.com/failsafe-go/failsafe-go" "github.com/failsafe-go/failsafe-go/retrypolicy" "github.com/urfave/cli/v3" ) var Version = "dev" const ( DefaultBatchSize = 20 ) const ( EnvHandle = "LAZULI_HANDLE" EnvPassword = "LAZULI_PASSWORD" EnvVerbose = "LAZULI_VERBOSE" EnvQuiet = "LAZULI_QUIET" EnvReverse = "LAZULI_REVERSE" EnvDryRun = "LAZULI_DRY_RUN" EnvFresh = "LAZULI_FRESH" EnvClearCache = "LAZULI_CLEAR_CACHE" EnvYes = "LAZULI_YES" ) var ( verboseCount int quietCount int ) type App struct { log *slog.Logger outputFormat string } func main() { var code int if err := run(); err != nil { code = 1 fmt.Fprintf(os.Stderr, "Error: %v\n", err) } os.Exit(code) } func run() error { app := &App{} cmd := &cli.Command{ Name: "lazuli", Usage: "Import Last.fm and Spotify listening history to Bluesky", Commands: []*cli.Command{ app.exportCommand(), app.importCommand(), app.syncCommand(), app.statsCommand(), app.failedCommand(), app.retryCommand(), app.dedupeCommand(), app.debugCommand(), app.versionCommand(), }, } return cmd.Run(context.Background(), os.Args) } func (a *App) exportCommand() *cli.Command { flags := make([]cli.Flag, 0, len(exportFlags)+len(commonFlags)) flags = append(flags, exportFlags...) flags = append(flags, commonFlags...) return &cli.Command{ Name: "export", Usage: "Parse and merge Last.fm/Spotify exports, output JSON", UsageText: " lazuli export --lastfm=/path/to/lastfm.csv --spotify=/path/to/spotify.json -o merged.json", Flags: flags, Action: a.runExport, Before: a.initLoggerBefore, } } func (a *App) importCommand() *cli.Command { flags := make([]cli.Flag, 0, len(importFlags)+len(commonFlags)) flags = append(flags, importFlags...) flags = append(flags, commonFlags...) return &cli.Command{ Name: "import", Usage: "Import listening history to Bluesky", UsageText: " lazuli import --handle=user.bsky.social --password=app-password --lastfm=plays.csv", Flags: flags, Action: a.runImport, Before: a.initLoggerBefore, } } func (a *App) syncCommand() *cli.Command { flags := make([]cli.Flag, 0, len(syncFlags)+len(commonFlags)) flags = append(flags, syncFlags...) flags = append(flags, commonFlags...) return &cli.Command{ Name: "sync", Usage: "Fetch existing records, show stats, filter new records", UsageText: " lazuli sync --handle=user.bsky.social --password=app-password", Flags: flags, Action: a.runSync, Before: a.initLoggerBefore, } } func (a *App) dedupeCommand() *cli.Command { flags := make([]cli.Flag, 0, len(dedupeFlags)+len(commonFlags)) flags = append(flags, dedupeFlags...) flags = append(flags, commonFlags...) return &cli.Command{ Name: "dedupe", Usage: "Find and remove duplicate records", UsageText: " lazuli dedupe --handle=user.bsky.social --password=app-password\n lazuli dedupe --handle=user.bsky.social --password=app-password --dry-run", Flags: flags, Action: a.runDedupe, Before: a.initLoggerBefore, } } func (a *App) debugCommand() *cli.Command { flags := make([]cli.Flag, 0, len(commonFlags)) flags = append(flags, commonFlags...) return &cli.Command{ Name: "debug", Usage: "Fetch and dump raw records for debugging", UsageText: " lazuli debug --handle=user.bsky.social --password=app-password", Flags: flags, Action: a.runDebugFetch, Before: a.initLoggerBefore, } } func (a *App) statsCommand() *cli.Command { flags := make([]cli.Flag, 0, len(commonFlags)) flags = append(flags, commonFlags...) return &cli.Command{ Name: "stats", Usage: "Display statistics about the local database and rate limits", UsageText: " lazuli stats", Flags: flags, Action: a.runStats, Before: a.initLoggerBefore, } } func (a *App) failedCommand() *cli.Command { flags := make([]cli.Flag, 0, len(commonFlags)) flags = append(flags, commonFlags...) return &cli.Command{ Name: "failed", Usage: "List records that failed to publish", UsageText: " lazuli failed --handle=user.bsky.social", Flags: flags, Action: a.runFailed, Before: a.initLoggerBefore, } } func (a *App) retryCommand() *cli.Command { flags := make([]cli.Flag, 0, len(commonFlags)+1) flags = append(flags, commonFlags...) flags = append(flags, &cli.BoolFlag{ Name: "dry-run", Usage: "Preview what will be retried", Sources: cli.EnvVars(EnvDryRun), }) return &cli.Command{ Name: "retry", Usage: "Retry failed records one by one", UsageText: " lazuli retry --handle=user.bsky.social", Flags: flags, Action: a.runRetry, Before: a.initLoggerBefore, } } func (a *App) versionCommand() *cli.Command { return &cli.Command{ Name: "version", Usage: "Print the version number", Action: func(ctx context.Context, cmd *cli.Command) error { fmt.Println(Version) return nil }, } } func (a *App) runStats(ctx context.Context, cmd *cli.Command) error { // Use read-only storage for stats to allow viewing while main process has it open statsStorage, err := cache.NewBoltStorage(true) if err != nil { return fmt.Errorf("open read-only cache: %w", err) } defer statsStorage.Close() stats, err := statsStorage.Stats() if err != nil { return fmt.Errorf("failed to get database stats: %w", err) } limiter := sync.NewRateLimiter(statsStorage, 1) writes, global, err := limiter.Stats() if err != nil { return fmt.Errorf("failed to get rate limit stats: %w", err) } if a.outputFormat == "json" { out := map[string]any{ "db": stats, "rateLimits": map[string]any{ "writesConsumed": writes, "globalConsumed": global, "writesLimit": sync.WriteLimitDay, "globalLimit": sync.GlobalLimitDay, "writesRemaining": sync.WriteLimitDay - writes, "globalRemaining": sync.GlobalLimitDay - global, }, } data, _ := json.MarshalIndent(out, "", " ") fmt.Println(string(data)) return nil } fmt.Println("Database Statistics:") fmt.Printf(" Total Records: %d\n", stats.TotalRecords) fmt.Printf(" Marked Published: %d\n", stats.MarkedPublished) fmt.Printf(" Failed Count: %d\n", stats.FailedCount) fmt.Printf(" Unpublished Count: %d\n", stats.UnpublishedCount) if len(stats.UserStats) > 0 { fmt.Println("\nUser Statistics:") for did, s := range stats.UserStats { m := s.(map[string]int) fmt.Printf(" %s:\n", did) fmt.Printf(" Total: %d\n", m["total"]) fmt.Printf(" Published: %d\n", m["published"]) fmt.Printf(" Failed: %d\n", m["failed"]) fmt.Printf(" Pending: %d\n", m["total"]-m["published"]-m["failed"]) } } fmt.Println("\nRate Limit Consumption (Today):") fmt.Printf(" Writes: %d / %d (Remaining: %d)\n", writes, sync.WriteLimitDay, sync.WriteLimitDay-writes) fmt.Printf(" Global: %d / %d (Remaining: %d)\n", global, sync.GlobalLimitDay, sync.GlobalLimitDay-global) return nil } func (a *App) runRetry(ctx context.Context, cmd *cli.Command) error { storage, err := cache.NewBoltStorage(false) if err != nil { return fmt.Errorf("open cache: %w", err) } defer storage.Close() authClient, err := a.prepareAuth(ctx, cmd) if err != nil { return err } did := authClient.DID() dryRun := cmd.Bool("dry-run") limiter := sync.NewRateLimiter(storage, 0.9) repoClient := sync.NewRateClient(authClient.APIClient(), did, limiter) var failedRecords []struct { key string rec sync.PlayRecord } iterateFailed := storage.IterateFailed(did) iterateFailed(func(key string, rec []byte, errMsg string) bool { var playRec sync.PlayRecord if err := json.Unmarshal(rec, &playRec); err != nil { return true // continue } failedRecords = append(failedRecords, struct { key string rec sync.PlayRecord }{key, playRec}) return true // continue }) if len(failedRecords) == 0 { fmt.Println("No failed records to retry.") return nil } fmt.Printf("Retrying %d failed records for %s...\n", len(failedRecords), did) successCount := 0 errorCount := 0 for _, fr := range failedRecords { if dryRun { fmt.Printf("[DRY-RUN] Would retry: %s - %s\n", fr.rec.ArtistName(), fr.rec.TrackName) successCount++ continue } res := sync.PublishBatch(ctx, repoClient, did, []*sync.PlayRecord{&fr.rec}, storage, sync.DefaultClientAgent) if res == nil { fmt.Printf("Successfully retried: %s - %s\n", fr.rec.ArtistName(), fr.rec.TrackName) if err := storage.MarkPublished(did, fr.key); err != nil { a.log.Error("Failed to mark record as published", sync.ErrorAttr(err), slog.String("key", fr.key)) } if err := storage.RemoveFailed(did, fr.key); err != nil { a.log.Error("Failed to remove record from failed list", sync.ErrorAttr(err), slog.String("key", fr.key)) } successCount++ } else { fmt.Printf("Failed again: %s - %s: %v\n", fr.rec.ArtistName(), fr.rec.TrackName, res) errorCount++ } } fmt.Printf("\nRetry complete: %d succeeded, %d failed.\n", successCount, errorCount) return nil } func (a *App) runFailed(ctx context.Context, cmd *cli.Command) error { storage, err := cache.NewBoltStorage(true) if err != nil { return fmt.Errorf("open read-only cache: %w", err) } defer storage.Close() authClient, err := a.prepareAuth(ctx, cmd) if err != nil { return err } did := authClient.DID() type FailedRecord struct { Key string `json:"key"` Error string `json:"error"` Record sync.PlayRecord `json:"record"` } var failed []FailedRecord iterateFailed := storage.IterateFailed(did) iterateFailed(func(key string, rec []byte, errMsg string) bool { var playRec sync.PlayRecord _ = json.Unmarshal(rec, &playRec) failed = append(failed, FailedRecord{ Key: key, Error: errMsg, Record: playRec, }) return true // continue }) if a.outputFormat == "json" { data, _ := json.MarshalIndent(failed, "", " ") fmt.Println(string(data)) return nil } if len(failed) == 0 { fmt.Println("No failed records found.") return nil } fmt.Printf("Failed Records for %s (%d):\n", did, len(failed)) for _, f := range failed { fmt.Printf(" [%s] %s - %s: %s\n", f.Record.PlayedTime.Format(time.RFC3339), f.Record.ArtistName(), f.Record.TrackName, f.Error) } return nil } func (a *App) runDebugFetch(ctx context.Context, cmd *cli.Command) error { authClient, err := a.prepareAuth(ctx, cmd) if err != nil { return fmt.Errorf("authentication failed: %w", err) } repoClient := sync.NewRateClient(authClient.APIClient(), authClient.DID(), nil) records, _, err := repoClient.ListRecords(ctx, sync.RecordType, 10, "") if err != nil { return fmt.Errorf("failed to fetch records from Bluesky: %w", err) } enc := json.NewEncoder(os.Stdout) enc.SetIndent("", " ") for _, r := range records { if err := enc.Encode(r); err != nil { return fmt.Errorf("failed to encode record: %w", err) } } return nil } func (a *App) initLoggerBefore(ctx context.Context, cmd *cli.Command) (context.Context, error) { var level slog.Level switch verbosity := verboseCount - quietCount; { case verbosity >= 2: level = slog.LevelDebug case verbosity == 1: level = slog.LevelInfo case verbosity <= -1: level = slog.LevelError default: level = slog.LevelInfo } a.outputFormat = cmd.String("output-format") var handler slog.Handler = slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: level}) if a.outputFormat == "json" { handler = slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: level}) } a.log = slog.New(handler) slog.SetDefault(a.log) return ctx, nil } func (a *App) getCredentials(cmd *cli.Command) (string, string, error) { handle := cmd.String("handle") password := cmd.String("password") if handle == "" { return "", "", fmt.Errorf("bluesky handle is required (set --handle or set the LAZULI_HANDLE environment variable)") } if password == "" { return "", "", fmt.Errorf("app password is required (set --password or set the LAZULI_PASSWORD environment variable)") } return handle, password, nil } func (a *App) prepareAuth(ctx context.Context, cmd *cli.Command) (*sync.Client, error) { handle, password, err := a.getCredentials(cmd) if err != nil { return nil, err } authClient, err := sync.NewClient(ctx, handle, password) if err != nil { return nil, fmt.Errorf("create auth client: %w", err) } return authClient, nil } func (a *App) runExport(ctx context.Context, cmd *cli.Command) error { a.log.Info("Starting export operation") lastfmPath := cmd.String("lastfm") spotifyPath := cmd.String("spotify") outputPath := cmd.String("output") reverse := cmd.Bool("reverse") tolerance := cmd.Duration("tolerance") records, _, err := loadRecordsMerge(ctx, lastfmPath, spotifyPath, tolerance) if err != nil { return fmt.Errorf("failed to deduplicate records: %w", err) } if reverse { slices.Reverse(records) } return a.outputRecords(records, outputPath) } func (a *App) runImport(ctx context.Context, cmd *cli.Command) error { storage, err := cache.NewBoltStorage(false) if err != nil { return fmt.Errorf("open cache: %w", err) } defer storage.Close() handle, password, err := a.getCredentials(cmd) if err != nil { return err } a.log.Info("Starting import operation", sync.DIDAttr(handle)) lastfmPath := cmd.String("lastfm") spotifyPath := cmd.String("spotify") dryRun := cmd.Bool("dry-run") reverse := cmd.Bool("reverse") fresh := cmd.Bool("fresh") clearCache := cmd.Bool("clear-cache") batchSize := cmd.Int("batch-size") tolerance := cmd.Duration("tolerance") if clearCache { if err := storage.ClearAll(); err != nil { a.log.Error("Failed to clear cache", sync.ErrorAttr(err)) } else { a.log.Info("Cache cleared") } } records, totalCount, err := loadRecordsMerge(ctx, lastfmPath, spotifyPath, tolerance) if err != nil { return fmt.Errorf("load records: %w", err) } a.log.Info("Loaded records for import", slog.Int("total", totalCount), slog.Int("filtered", len(records))) if len(records) == 0 { a.log.Info("No new records to import") return nil } authClient, err := sync.NewClient(ctx, handle, password) if err != nil { return fmt.Errorf("create auth client: %w", err) } a.log.Info("Authenticated", sync.DIDAttr(authClient.DID()), slog.String("pds", authClient.PDS())) limiter := sync.NewRateLimiter(storage, 0.9) repoClient := sync.NewRateClient(authClient.APIClient(), authClient.DID(), limiter) existingRecords, err := sync.FetchExisting(ctx, repoClient, authClient.DID(), storage, fresh) if err != nil { return fmt.Errorf("fetch existing records: %w", err) } a.log.Info("Fetched existing records", slog.Int("count", len(existingRecords))) published, _ := storage.GetPublished(authClient.DID()) newRecords := sync.FilterNew(records, existingRecords, published, tolerance) skippedCount := len(records) - len(newRecords) a.log.Info("Filtered to new records", slog.Int("count", len(newRecords)), slog.Int("skipped", skippedCount)) if len(newRecords) == 0 { a.log.Info("All records already exist, nothing to import") return nil } if reverse { slices.Reverse(newRecords) } if len(newRecords) > 0 { newEntries := make(map[string][]byte) keys := sync.CreateRecordKeys(newRecords) for i, rec := range newRecords { key := keys[i] value, _ := json.Marshal(rec) newEntries[key] = value } if err := storage.SaveRecords(authClient.DID(), newEntries); err != nil { return fmt.Errorf("save new records to storage: %w", err) } } progressLog := a.createProgressLogger() publishOpts := sync.PublishOptions{ BatchSize: batchSize, DryRun: dryRun, Reverse: reverse, ATProtoClient: repoClient, ProgressLog: progressLog, ClientAgent: fmt.Sprintf("lazuli/%s", Version), Storage: storage, Limiter: limiter, } result := sync.Publish(ctx, authClient, publishOpts) a.log.Info("Import completed", slog.Int("success_count", result.SuccessCount), slog.Int("error_count", result.ErrorCount), slog.Bool("cancelled", result.Cancelled), slog.Duration("duration", result.Duration), slog.Float64("records_per_minute", result.RecordsPerMinute)) if a.outputFormat == "json" { summary := map[string]any{ "successCount": result.SuccessCount, "errorCount": result.ErrorCount, "cancelled": result.Cancelled, "durationSeconds": result.Duration.Seconds(), "recordsPerMinute": result.RecordsPerMinute, "totalRecords": result.TotalRecords, } if data, err := json.MarshalIndent(summary, "", " "); err == nil { fmt.Fprintln(os.Stderr, string(data)) } } if result.ErrorCount > 0 { return fmt.Errorf("import completed with %d errors", result.ErrorCount) } return nil } func (a *App) createProgressLogger() func(sync.ProgressReport) { return func(pr sync.ProgressReport) { if a.outputFormat == "json" { if data, err := json.MarshalIndent(pr, "", " "); err == nil { fmt.Fprintln(os.Stderr, string(data)) } } else { a.log.Info("sync progress", slog.Int("completed", pr.Completed), slog.Int("total", pr.Total), slog.Float64("percent", pr.Percent), slog.String("elapsed", pr.Elapsed), slog.String("eta", pr.ETA), slog.String("rate", pr.Rate), slog.Int("errors", pr.Errors), slog.Int("writes", pr.WritesConsumed), slog.Int("global", pr.GlobalConsumed), slog.String("limited_rate", pr.ConstrainedRate), slog.String("reset_in", pr.TimeUntilReset)) } } } func (a *App) runSync(ctx context.Context, cmd *cli.Command) error { storage, err := cache.NewBoltStorage(false) if err != nil { return fmt.Errorf("open cache: %w", err) } defer storage.Close() authClient, err := a.prepareAuth(ctx, cmd) if err != nil { return err } fresh := cmd.Bool("fresh") useCAR := cmd.Bool("car") a.log.Info("Starting sync operation", sync.DIDAttr(authClient.DID()), slog.Bool("fresh", fresh), slog.Bool("use_car", useCAR)) limiter := sync.NewRateLimiter(storage, 0.85) repoClient := sync.NewRateClient(authClient.APIClient(), authClient.DID(), limiter) if fresh { if err := storage.Clear(authClient.DID()); err != nil { a.log.Error("Failed to clear cache", sync.ErrorAttr(err)) } else { a.log.Info("Cache cleared") } } var existingRecords []sync.ExistingRecord if useCAR { existingRecords, err = sync.FetchExistingViaCAR(ctx, authClient, authClient.DID(), sync.RecordType, storage) } else { existingRecords, err = sync.FetchExisting(ctx, repoClient, authClient.DID(), storage, fresh) } if err != nil { return fmt.Errorf("fetch existing records: %w", err) } a.log.Info("Sync stats", slog.Int("total_records", len(existingRecords))) return nil } func (a *App) runDedupe(ctx context.Context, cmd *cli.Command) error { storage, err := cache.NewBoltStorage(false) if err != nil { return fmt.Errorf("open cache: %w", err) } defer storage.Close() authClient, err := a.prepareAuth(ctx, cmd) if err != nil { return fmt.Errorf("authentication failed: %w", err) } dryRun := cmd.Bool("dry-run") fresh := cmd.Bool("fresh") yes := cmd.Bool("yes") useCAR := cmd.Bool("car") a.log.Info("Starting dedupe operation", sync.DIDAttr(authClient.DID()), slog.Bool("dry_run", dryRun), slog.Bool("fresh", fresh), slog.Bool("use_car", useCAR)) limiter := sync.NewRateLimiter(storage, 0.9) repoClient := sync.NewRateClient(authClient.APIClient(), authClient.DID(), limiter) if fresh { if err := storage.Clear(authClient.DID()); err != nil { a.log.Error("Failed to clear cache", sync.ErrorAttr(err)) } else { a.log.Info("Cache cleared") } } var existingRecords []sync.ExistingRecord if useCAR { existingRecords, err = sync.FetchExistingViaCAR(ctx, authClient, authClient.DID(), sync.RecordType, storage) } else { existingRecords, err = sync.FetchExisting(ctx, repoClient, authClient.DID(), storage, fresh) } if err != nil { return fmt.Errorf("failed to fetch existing records: %w", err) } duplicates := sync.FindDuplicates(existingRecords) totalDuplicates := 0 for _, group := range duplicates { totalDuplicates += len(group) - 1 } a.log.Info("Dedupe analysis", slog.Int("total_records", len(existingRecords)), slog.Int("duplicate_groups", len(duplicates)), slog.Int("total_duplicates", totalDuplicates)) if totalDuplicates == 0 { a.log.Info("No duplicates found") return nil } if dryRun { a.log.InfoContext(ctx, "Dry run - would remove the following duplicates") for _, group := range duplicates { keep := group[0] for _, rec := range group[1:] { a.log.InfoContext(ctx, "Would remove", slog.String("uri", rec.URI), slog.String("track", keep.Value.TrackName), slog.String("artist", keep.Value.ArtistName()), slog.String("time", keep.Value.PlayedTime.Format(time.RFC3339))) } } return nil } if !yes { fmt.Fprintf(os.Stderr, "\nThis will permanently delete %d duplicate record(s). Continue? [y/N]: ", totalDuplicates) var response string fmt.Scanln(&response) if response != "y" && response != "Y" { a.log.Info("Dedupe cancelled by user") return nil } } for _, group := range duplicates { for i := 1; i < len(group); i++ { rec := group[i] uri := rec.URI parts := strings.Split(uri, "/") rkey := parts[len(parts)-1] retryPolicy := retrypolicy.NewBuilder[any](). WithMaxRetries(10). WithBackoff(sync.BaseRetryDelay, 5*time.Minute). HandleIf(func(_ any, err error) bool { return sync.IsTransientError(err) }). OnRetryScheduled(func(e failsafe.ExecutionScheduledEvent[any]) { a.log.Warn("Delete failed with transient error, retrying", slog.Duration("retryDelay", e.Delay), sync.ErrorAttr(e.LastError()), slog.Int("attempt", e.Attempts()), slog.String("uri", uri)) }). Build() err := failsafe.With(retryPolicy).WithContext(ctx).Run(func() error { return repoClient.DeleteRecord(ctx, sync.RecordType, rkey) }) if err != nil { a.log.Error("Failed to delete record", sync.ErrorAttr(err), slog.String("uri", uri)) } else { a.log.Info("Deleted duplicate", slog.String("uri", uri), slog.String("track", rec.Value.TrackName)) } } } if err := storage.Clear(authClient.DID()); err != nil { a.log.Error("Failed to clear cache", "err", err) } return nil } func (a *App) outputRecords(records []*sync.PlayRecord, outputPath string) error { var output io.Writer = os.Stdout if outputPath != "" { file, err := os.Create(outputPath) if err != nil { return fmt.Errorf("create output file: %w", err) } defer file.Close() output = file } enc := json.NewEncoder(output) for _, r := range records { if err := enc.Encode(r); err != nil { return fmt.Errorf("encode record: %w", err) } } return nil } var commonFlags = []cli.Flag{ &cli.StringFlag{ Name: "handle", Usage: "Bluesky handle", Sources: cli.EnvVars(EnvHandle), }, &cli.StringFlag{ Name: "password", Usage: "App password", Sources: cli.EnvVars(EnvPassword), }, &cli.BoolFlag{ Name: "verbose", Usage: "Enable verbose logging (-v for debug, -vv for trace)", Aliases: []string{"v"}, Sources: cli.EnvVars(EnvVerbose), Config: cli.BoolConfig{Count: &verboseCount}, }, &cli.BoolFlag{ Name: "quiet", Usage: "Suppress non-essential output (-q for warn, -qq for errors, -qqq for silent)", Aliases: []string{"q"}, Sources: cli.EnvVars(EnvQuiet), Config: cli.BoolConfig{Count: &quietCount}, }, &cli.StringFlag{ Name: "output-format", Usage: "Output format: text or json", Value: "text", Sources: cli.EnvVars("LAZULI_OUTPUT_FORMAT"), }, } var lastfmFlag = &cli.StringFlag{ Name: "lastfm", Usage: "Path to Last.fm CSV file or directory", Sources: cli.EnvVars("LAZULI_LASTFM"), } var spotifyFlag = &cli.StringFlag{ Name: "spotify", Usage: "Path to Spotify JSON/directory/zip", Sources: cli.EnvVars("LAZULI_SPOTIFY"), } var exportFlags = []cli.Flag{ lastfmFlag, spotifyFlag, &cli.StringFlag{ Name: "output", Usage: "Output file (stdout if not set)", Sources: cli.EnvVars("LAZULI_OUTPUT"), }, &cli.BoolFlag{ Name: "reverse", Usage: "Sort records reverse chronologically", Sources: cli.EnvVars(EnvReverse), }, &cli.DurationFlag{ Name: "tolerance", Usage: "Time tolerance for cross-source deduplication (e.g., 5m, 10m)", Value: sync.DefaultCrossSourceTolerance, Sources: cli.EnvVars("LAZULI_TOLERANCE"), }, } var importFlags = []cli.Flag{ lastfmFlag, spotifyFlag, &cli.BoolFlag{ Name: "dry-run", Usage: "Preview without publishing", Sources: cli.EnvVars(EnvDryRun), }, &cli.BoolFlag{ Name: "reverse", Usage: "Import in reverse order", Sources: cli.EnvVars(EnvReverse), }, &cli.BoolFlag{ Name: "fresh", Usage: "Don't use cached Bluesky records", Sources: cli.EnvVars(EnvFresh), }, &cli.BoolFlag{ Name: "clear-cache", Usage: "Clear cache before running", Sources: cli.EnvVars(EnvClearCache), }, &cli.IntFlag{ Name: "batch-size", Usage: "Records per batch (default: 20)", Value: DefaultBatchSize, Sources: cli.EnvVars("LAZULI_BATCH_SIZE"), }, &cli.DurationFlag{ Name: "tolerance", Usage: "Time tolerance for cross-source deduplication (e.g., 5m, 10m)", Value: sync.DefaultCrossSourceTolerance, Sources: cli.EnvVars("LAZULI_TOLERANCE"), }, } var syncFlags = []cli.Flag{ &cli.BoolFlag{ Name: "fresh", Usage: "Force refresh cache", Sources: cli.EnvVars(EnvFresh), }, &cli.BoolFlag{ Name: "car", Usage: "Use CAR export (faster for large repos)", Sources: cli.EnvVars("LAZULI_USE_CAR"), }, } var dedupeFlags = []cli.Flag{ &cli.BoolFlag{ Name: "dry-run", Usage: "Preview without deleting", Sources: cli.EnvVars(EnvDryRun), }, &cli.BoolFlag{ Name: "fresh", Usage: "Force refresh cache", Sources: cli.EnvVars(EnvFresh), }, &cli.BoolFlag{ Name: "yes", Usage: "Skip confirmation prompt", Aliases: []string{"y"}, Sources: cli.EnvVars(EnvYes), }, &cli.BoolFlag{ Name: "car", Usage: "Use CAR export (faster for large repos)", Sources: cli.EnvVars("LAZULI_USE_CAR"), }, } type Parser interface { ParseFile(ctx context.Context, r io.Reader) ([]*sync.PlayRecord, error) ParseFS(ctx context.Context, fsys fs.FS) ([]*sync.PlayRecord, error) } func parseInput(ctx context.Context, path string, parser Parser) ([]*sync.PlayRecord, error) { info, err := os.Stat(path) if err != nil { return nil, fmt.Errorf("stat path: %w", err) } if info.IsDir() { return parser.ParseFS(ctx, os.DirFS(path)) } if strings.HasSuffix(path, ".zip") { zf, err := zip.OpenReader(path) if err != nil { return nil, fmt.Errorf("open zip: %w", err) } defer zf.Close() return parser.ParseFS(ctx, zf) } file, err := os.Open(path) if err != nil { return nil, fmt.Errorf("open file: %w", err) } defer file.Close() return parser.ParseFile(ctx, file) } func loadRecordsMerge(ctx context.Context, lastFMPath, spotifyPath string, tolerance time.Duration) ([]*sync.PlayRecord, int, error) { var lastfmRecords, spotifyRecords []*sync.PlayRecord var err error if lastFMPath != "" { lastfmRecords, err = parseInput(ctx, lastFMPath, lastfm.Parser{}) if err != nil { return nil, 0, fmt.Errorf("parse lastfm: %w", err) } } if spotifyPath != "" { spotifyRecords, err = parseInput(ctx, spotifyPath, spotify.Parser{}) if err != nil { return nil, 0, fmt.Errorf("parse spotify: %w", err) } } f := func(a, b *sync.PlayRecord) int { if v := a.Time().Compare(b.Time()); v != 0 { return v } return cmp.Compare(a.ArtistName(), b.ArtistName()) } slices.SortFunc(spotifyRecords, f) slices.SortFunc(lastfmRecords, f) totalInput := len(lastfmRecords) + len(spotifyRecords) mergedRecords := kway.Merge([][]*sync.PlayRecord{lastfmRecords, spotifyRecords}, tolerance) return mergedRecords, totalInput, nil }