like malachite (atproto-lastfm-importer) but in go and bluer
go
spotify
tealfm
lastfm
atproto
1package main
2
3import (
4 "archive/zip"
5 "cmp"
6 "context"
7 "encoding/json"
8 "fmt"
9 "io"
10 "io/fs"
11 "log/slog"
12 "os"
13 "slices"
14 "strings"
15 "time"
16
17 "tangled.org/karitham.dev/lazuli/cache"
18 "tangled.org/karitham.dev/lazuli/kway"
19 "tangled.org/karitham.dev/lazuli/sources/lastfm"
20 "tangled.org/karitham.dev/lazuli/sources/spotify"
21 "tangled.org/karitham.dev/lazuli/sync"
22
23 "github.com/failsafe-go/failsafe-go"
24 "github.com/failsafe-go/failsafe-go/retrypolicy"
25 "github.com/urfave/cli/v3"
26)
27
28var Version = "dev"
29
30const (
31 DefaultBatchSize = 20
32)
33
34const (
35 EnvHandle = "LAZULI_HANDLE"
36 EnvPassword = "LAZULI_PASSWORD"
37 EnvVerbose = "LAZULI_VERBOSE"
38 EnvQuiet = "LAZULI_QUIET"
39 EnvReverse = "LAZULI_REVERSE"
40 EnvDryRun = "LAZULI_DRY_RUN"
41 EnvFresh = "LAZULI_FRESH"
42 EnvClearCache = "LAZULI_CLEAR_CACHE"
43 EnvYes = "LAZULI_YES"
44)
45
46var (
47 verboseCount int
48 quietCount int
49)
50
51type App struct {
52 log *slog.Logger
53 outputFormat string
54}
55
56func main() {
57 var code int
58 if err := run(); err != nil {
59 code = 1
60 fmt.Fprintf(os.Stderr, "Error: %v\n", err)
61 }
62 os.Exit(code)
63}
64
65func run() error {
66 app := &App{}
67 cmd := &cli.Command{
68 Name: "lazuli",
69 Usage: "Import Last.fm and Spotify listening history to Bluesky",
70 Commands: []*cli.Command{
71 app.exportCommand(),
72 app.importCommand(),
73 app.syncCommand(),
74 app.statsCommand(),
75 app.failedCommand(),
76 app.retryCommand(),
77 app.dedupeCommand(),
78 app.debugCommand(),
79 app.versionCommand(),
80 },
81 }
82
83 return cmd.Run(context.Background(), os.Args)
84}
85
86func (a *App) exportCommand() *cli.Command {
87 flags := make([]cli.Flag, 0, len(exportFlags)+len(commonFlags))
88 flags = append(flags, exportFlags...)
89 flags = append(flags, commonFlags...)
90 return &cli.Command{
91 Name: "export",
92 Usage: "Parse and merge Last.fm/Spotify exports, output JSON",
93 UsageText: " lazuli export --lastfm=/path/to/lastfm.csv --spotify=/path/to/spotify.json -o merged.json",
94 Flags: flags,
95 Action: a.runExport,
96 Before: a.initLoggerBefore,
97 }
98}
99
100func (a *App) importCommand() *cli.Command {
101 flags := make([]cli.Flag, 0, len(importFlags)+len(commonFlags))
102 flags = append(flags, importFlags...)
103 flags = append(flags, commonFlags...)
104 return &cli.Command{
105 Name: "import",
106 Usage: "Import listening history to Bluesky",
107 UsageText: " lazuli import --handle=user.bsky.social --password=app-password --lastfm=plays.csv",
108 Flags: flags,
109 Action: a.runImport,
110 Before: a.initLoggerBefore,
111 }
112}
113
114func (a *App) syncCommand() *cli.Command {
115 flags := make([]cli.Flag, 0, len(syncFlags)+len(commonFlags))
116 flags = append(flags, syncFlags...)
117 flags = append(flags, commonFlags...)
118 return &cli.Command{
119 Name: "sync",
120 Usage: "Fetch existing records, show stats, filter new records",
121 UsageText: " lazuli sync --handle=user.bsky.social --password=app-password",
122 Flags: flags,
123 Action: a.runSync,
124 Before: a.initLoggerBefore,
125 }
126}
127
128func (a *App) dedupeCommand() *cli.Command {
129 flags := make([]cli.Flag, 0, len(dedupeFlags)+len(commonFlags))
130 flags = append(flags, dedupeFlags...)
131 flags = append(flags, commonFlags...)
132 return &cli.Command{
133 Name: "dedupe",
134 Usage: "Find and remove duplicate records",
135 UsageText: " lazuli dedupe --handle=user.bsky.social --password=app-password\n lazuli dedupe --handle=user.bsky.social --password=app-password --dry-run",
136 Flags: flags,
137 Action: a.runDedupe,
138 Before: a.initLoggerBefore,
139 }
140}
141
142func (a *App) debugCommand() *cli.Command {
143 flags := make([]cli.Flag, 0, len(commonFlags))
144 flags = append(flags, commonFlags...)
145 return &cli.Command{
146 Name: "debug",
147 Usage: "Fetch and dump raw records for debugging",
148 UsageText: " lazuli debug --handle=user.bsky.social --password=app-password",
149 Flags: flags,
150 Action: a.runDebugFetch,
151 Before: a.initLoggerBefore,
152 }
153}
154
155func (a *App) statsCommand() *cli.Command {
156 flags := make([]cli.Flag, 0, len(commonFlags))
157 flags = append(flags, commonFlags...)
158 return &cli.Command{
159 Name: "stats",
160 Usage: "Display statistics about the local database and rate limits",
161 UsageText: " lazuli stats",
162 Flags: flags,
163 Action: a.runStats,
164 Before: a.initLoggerBefore,
165 }
166}
167
168func (a *App) failedCommand() *cli.Command {
169 flags := make([]cli.Flag, 0, len(commonFlags))
170 flags = append(flags, commonFlags...)
171 return &cli.Command{
172 Name: "failed",
173 Usage: "List records that failed to publish",
174 UsageText: " lazuli failed --handle=user.bsky.social",
175 Flags: flags,
176 Action: a.runFailed,
177 Before: a.initLoggerBefore,
178 }
179}
180
181func (a *App) retryCommand() *cli.Command {
182 flags := make([]cli.Flag, 0, len(commonFlags)+1)
183 flags = append(flags, commonFlags...)
184 flags = append(flags, &cli.BoolFlag{
185 Name: "dry-run",
186 Usage: "Preview what will be retried",
187 Sources: cli.EnvVars(EnvDryRun),
188 })
189 return &cli.Command{
190 Name: "retry",
191 Usage: "Retry failed records one by one",
192 UsageText: " lazuli retry --handle=user.bsky.social",
193 Flags: flags,
194 Action: a.runRetry,
195 Before: a.initLoggerBefore,
196 }
197}
198
199func (a *App) versionCommand() *cli.Command {
200 return &cli.Command{
201 Name: "version",
202 Usage: "Print the version number",
203 Action: func(ctx context.Context, cmd *cli.Command) error {
204 fmt.Println(Version)
205 return nil
206 },
207 }
208}
209
210func (a *App) runStats(ctx context.Context, cmd *cli.Command) error {
211 // Use read-only storage for stats to allow viewing while main process has it open
212 statsStorage, err := cache.NewBoltStorage(true)
213 if err != nil {
214 return fmt.Errorf("open read-only cache: %w", err)
215 }
216 defer statsStorage.Close()
217
218 stats, err := statsStorage.Stats()
219 if err != nil {
220 return fmt.Errorf("failed to get database stats: %w", err)
221 }
222
223 limiter := sync.NewRateLimiter(statsStorage, 1)
224 writes, global, err := limiter.Stats()
225 if err != nil {
226 return fmt.Errorf("failed to get rate limit stats: %w", err)
227 }
228
229 if a.outputFormat == "json" {
230 out := map[string]any{
231 "db": stats,
232 "rateLimits": map[string]any{
233 "writesConsumed": writes,
234 "globalConsumed": global,
235 "writesLimit": sync.WriteLimitDay,
236 "globalLimit": sync.GlobalLimitDay,
237 "writesRemaining": sync.WriteLimitDay - writes,
238 "globalRemaining": sync.GlobalLimitDay - global,
239 },
240 }
241 data, _ := json.MarshalIndent(out, "", " ")
242 fmt.Println(string(data))
243 return nil
244 }
245
246 fmt.Println("Database Statistics:")
247 fmt.Printf(" Total Records: %d\n", stats.TotalRecords)
248 fmt.Printf(" Marked Published: %d\n", stats.MarkedPublished)
249 fmt.Printf(" Failed Count: %d\n", stats.FailedCount)
250 fmt.Printf(" Unpublished Count: %d\n", stats.UnpublishedCount)
251
252 if len(stats.UserStats) > 0 {
253 fmt.Println("\nUser Statistics:")
254 for did, s := range stats.UserStats {
255 m := s.(map[string]int)
256 fmt.Printf(" %s:\n", did)
257 fmt.Printf(" Total: %d\n", m["total"])
258 fmt.Printf(" Published: %d\n", m["published"])
259 fmt.Printf(" Failed: %d\n", m["failed"])
260 fmt.Printf(" Pending: %d\n", m["total"]-m["published"]-m["failed"])
261 }
262 }
263
264 fmt.Println("\nRate Limit Consumption (Today):")
265 fmt.Printf(" Writes: %d / %d (Remaining: %d)\n", writes, sync.WriteLimitDay, sync.WriteLimitDay-writes)
266 fmt.Printf(" Global: %d / %d (Remaining: %d)\n", global, sync.GlobalLimitDay, sync.GlobalLimitDay-global)
267
268 return nil
269}
270
271func (a *App) runRetry(ctx context.Context, cmd *cli.Command) error {
272 storage, err := cache.NewBoltStorage(false)
273 if err != nil {
274 return fmt.Errorf("open cache: %w", err)
275 }
276 defer storage.Close()
277
278 authClient, err := a.prepareAuth(ctx, cmd)
279 if err != nil {
280 return err
281 }
282 did := authClient.DID()
283 dryRun := cmd.Bool("dry-run")
284
285 limiter := sync.NewRateLimiter(storage, 0.9)
286 repoClient := sync.NewRateClient(authClient.APIClient(), did, limiter)
287
288 var failedRecords []struct {
289 key string
290 rec sync.PlayRecord
291 }
292
293 iterateFailed := storage.IterateFailed(did)
294 iterateFailed(func(key string, rec []byte, errMsg string) bool {
295 var playRec sync.PlayRecord
296 if err := json.Unmarshal(rec, &playRec); err != nil {
297 return true // continue
298 }
299 failedRecords = append(failedRecords, struct {
300 key string
301 rec sync.PlayRecord
302 }{key, playRec})
303 return true // continue
304 })
305
306 if len(failedRecords) == 0 {
307 fmt.Println("No failed records to retry.")
308 return nil
309 }
310
311 fmt.Printf("Retrying %d failed records for %s...\n", len(failedRecords), did)
312
313 successCount := 0
314 errorCount := 0
315
316 for _, fr := range failedRecords {
317 if dryRun {
318 fmt.Printf("[DRY-RUN] Would retry: %s - %s\n", fr.rec.ArtistName(), fr.rec.TrackName)
319 successCount++
320 continue
321 }
322
323 res := sync.PublishBatch(ctx, repoClient, did, []*sync.PlayRecord{&fr.rec}, storage, sync.DefaultClientAgent)
324
325 if res == nil {
326 fmt.Printf("Successfully retried: %s - %s\n", fr.rec.ArtistName(), fr.rec.TrackName)
327 if err := storage.MarkPublished(did, fr.key); err != nil {
328 a.log.Error("Failed to mark record as published", sync.ErrorAttr(err), slog.String("key", fr.key))
329 }
330 if err := storage.RemoveFailed(did, fr.key); err != nil {
331 a.log.Error("Failed to remove record from failed list", sync.ErrorAttr(err), slog.String("key", fr.key))
332 }
333 successCount++
334 } else {
335 fmt.Printf("Failed again: %s - %s: %v\n", fr.rec.ArtistName(), fr.rec.TrackName, res)
336 errorCount++
337 }
338 }
339
340 fmt.Printf("\nRetry complete: %d succeeded, %d failed.\n", successCount, errorCount)
341 return nil
342}
343
344func (a *App) runFailed(ctx context.Context, cmd *cli.Command) error {
345 storage, err := cache.NewBoltStorage(true)
346 if err != nil {
347 return fmt.Errorf("open read-only cache: %w", err)
348 }
349 defer storage.Close()
350
351 authClient, err := a.prepareAuth(ctx, cmd)
352 if err != nil {
353 return err
354 }
355 did := authClient.DID()
356
357 type FailedRecord struct {
358 Key string `json:"key"`
359 Error string `json:"error"`
360 Record sync.PlayRecord `json:"record"`
361 }
362
363 var failed []FailedRecord
364 iterateFailed := storage.IterateFailed(did)
365 iterateFailed(func(key string, rec []byte, errMsg string) bool {
366 var playRec sync.PlayRecord
367 _ = json.Unmarshal(rec, &playRec)
368 failed = append(failed, FailedRecord{
369 Key: key,
370 Error: errMsg,
371 Record: playRec,
372 })
373 return true // continue
374 })
375
376 if a.outputFormat == "json" {
377 data, _ := json.MarshalIndent(failed, "", " ")
378 fmt.Println(string(data))
379 return nil
380 }
381
382 if len(failed) == 0 {
383 fmt.Println("No failed records found.")
384 return nil
385 }
386
387 fmt.Printf("Failed Records for %s (%d):\n", did, len(failed))
388 for _, f := range failed {
389 fmt.Printf(" [%s] %s - %s: %s\n",
390 f.Record.PlayedTime.Format(time.RFC3339),
391 f.Record.ArtistName(),
392 f.Record.TrackName,
393 f.Error)
394 }
395
396 return nil
397}
398
399func (a *App) runDebugFetch(ctx context.Context, cmd *cli.Command) error {
400 authClient, err := a.prepareAuth(ctx, cmd)
401 if err != nil {
402 return fmt.Errorf("authentication failed: %w", err)
403 }
404
405 repoClient := sync.NewRateClient(authClient.APIClient(), authClient.DID(), nil)
406
407 records, _, err := repoClient.ListRecords(ctx, sync.RecordType, 10, "")
408 if err != nil {
409 return fmt.Errorf("failed to fetch records from Bluesky: %w", err)
410 }
411
412 enc := json.NewEncoder(os.Stdout)
413 enc.SetIndent("", " ")
414 for _, r := range records {
415 if err := enc.Encode(r); err != nil {
416 return fmt.Errorf("failed to encode record: %w", err)
417 }
418 }
419
420 return nil
421}
422
423func (a *App) initLoggerBefore(ctx context.Context, cmd *cli.Command) (context.Context, error) {
424 var level slog.Level
425
426 switch verbosity := verboseCount - quietCount; {
427 case verbosity >= 2:
428 level = slog.LevelDebug
429 case verbosity == 1:
430 level = slog.LevelInfo
431 case verbosity <= -1:
432 level = slog.LevelError
433 default:
434 level = slog.LevelInfo
435 }
436
437 a.outputFormat = cmd.String("output-format")
438
439 var handler slog.Handler = slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: level})
440 if a.outputFormat == "json" {
441 handler = slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: level})
442 }
443
444 a.log = slog.New(handler)
445 slog.SetDefault(a.log)
446 return ctx, nil
447}
448
449func (a *App) getCredentials(cmd *cli.Command) (string, string, error) {
450 handle := cmd.String("handle")
451 password := cmd.String("password")
452
453 if handle == "" {
454 return "", "", fmt.Errorf("bluesky handle is required (set --handle or set the LAZULI_HANDLE environment variable)")
455 }
456 if password == "" {
457 return "", "", fmt.Errorf("app password is required (set --password or set the LAZULI_PASSWORD environment variable)")
458 }
459
460 return handle, password, nil
461}
462
463func (a *App) prepareAuth(ctx context.Context, cmd *cli.Command) (*sync.Client, error) {
464 handle, password, err := a.getCredentials(cmd)
465 if err != nil {
466 return nil, err
467 }
468
469 authClient, err := sync.NewClient(ctx, handle, password)
470 if err != nil {
471 return nil, fmt.Errorf("create auth client: %w", err)
472 }
473
474 return authClient, nil
475}
476
477func (a *App) runExport(ctx context.Context, cmd *cli.Command) error {
478 a.log.Info("Starting export operation")
479
480 lastfmPath := cmd.String("lastfm")
481 spotifyPath := cmd.String("spotify")
482 outputPath := cmd.String("output")
483 reverse := cmd.Bool("reverse")
484 tolerance := cmd.Duration("tolerance")
485
486 records, _, err := loadRecordsMerge(ctx, lastfmPath, spotifyPath, tolerance)
487 if err != nil {
488 return fmt.Errorf("failed to deduplicate records: %w", err)
489 }
490
491 if reverse {
492 slices.Reverse(records)
493 }
494
495 return a.outputRecords(records, outputPath)
496}
497
498func (a *App) runImport(ctx context.Context, cmd *cli.Command) error {
499 storage, err := cache.NewBoltStorage(false)
500 if err != nil {
501 return fmt.Errorf("open cache: %w", err)
502 }
503 defer storage.Close()
504
505 handle, password, err := a.getCredentials(cmd)
506 if err != nil {
507 return err
508 }
509
510 a.log.Info("Starting import operation", sync.DIDAttr(handle))
511
512 lastfmPath := cmd.String("lastfm")
513 spotifyPath := cmd.String("spotify")
514 dryRun := cmd.Bool("dry-run")
515 reverse := cmd.Bool("reverse")
516 fresh := cmd.Bool("fresh")
517 clearCache := cmd.Bool("clear-cache")
518 batchSize := cmd.Int("batch-size")
519 tolerance := cmd.Duration("tolerance")
520
521 if clearCache {
522 if err := storage.ClearAll(); err != nil {
523 a.log.Error("Failed to clear cache", sync.ErrorAttr(err))
524 } else {
525 a.log.Info("Cache cleared")
526 }
527 }
528
529 records, totalCount, err := loadRecordsMerge(ctx, lastfmPath, spotifyPath, tolerance)
530 if err != nil {
531 return fmt.Errorf("load records: %w", err)
532 }
533 a.log.Info("Loaded records for import", slog.Int("total", totalCount), slog.Int("filtered", len(records)))
534
535 if len(records) == 0 {
536 a.log.Info("No new records to import")
537 return nil
538 }
539
540 authClient, err := sync.NewClient(ctx, handle, password)
541 if err != nil {
542 return fmt.Errorf("create auth client: %w", err)
543 }
544 a.log.Info("Authenticated", sync.DIDAttr(authClient.DID()), slog.String("pds", authClient.PDS()))
545
546 limiter := sync.NewRateLimiter(storage, 0.9)
547 repoClient := sync.NewRateClient(authClient.APIClient(), authClient.DID(), limiter)
548
549 existingRecords, err := sync.FetchExisting(ctx, repoClient, authClient.DID(), storage, fresh)
550 if err != nil {
551 return fmt.Errorf("fetch existing records: %w", err)
552 }
553 a.log.Info("Fetched existing records", slog.Int("count", len(existingRecords)))
554
555 published, _ := storage.GetPublished(authClient.DID())
556 newRecords := sync.FilterNew(records, existingRecords, published, tolerance)
557 skippedCount := len(records) - len(newRecords)
558 a.log.Info("Filtered to new records",
559 slog.Int("count", len(newRecords)),
560 slog.Int("skipped", skippedCount))
561
562 if len(newRecords) == 0 {
563 a.log.Info("All records already exist, nothing to import")
564 return nil
565 }
566
567 if reverse {
568 slices.Reverse(newRecords)
569 }
570
571 if len(newRecords) > 0 {
572 newEntries := make(map[string][]byte)
573 keys := sync.CreateRecordKeys(newRecords)
574 for i, rec := range newRecords {
575 key := keys[i]
576 value, _ := json.Marshal(rec)
577 newEntries[key] = value
578 }
579 if err := storage.SaveRecords(authClient.DID(), newEntries); err != nil {
580 return fmt.Errorf("save new records to storage: %w", err)
581 }
582 }
583
584 progressLog := a.createProgressLogger()
585
586 publishOpts := sync.PublishOptions{
587 BatchSize: batchSize,
588 DryRun: dryRun,
589 Reverse: reverse,
590 ATProtoClient: repoClient,
591 ProgressLog: progressLog,
592 ClientAgent: fmt.Sprintf("lazuli/%s", Version),
593 Storage: storage,
594 Limiter: limiter,
595 }
596
597 result := sync.Publish(ctx, authClient, publishOpts)
598
599 a.log.Info("Import completed",
600 slog.Int("success_count", result.SuccessCount),
601 slog.Int("error_count", result.ErrorCount),
602 slog.Bool("cancelled", result.Cancelled),
603 slog.Duration("duration", result.Duration),
604 slog.Float64("records_per_minute", result.RecordsPerMinute))
605
606 if a.outputFormat == "json" {
607 summary := map[string]any{
608 "successCount": result.SuccessCount,
609 "errorCount": result.ErrorCount,
610 "cancelled": result.Cancelled,
611 "durationSeconds": result.Duration.Seconds(),
612 "recordsPerMinute": result.RecordsPerMinute,
613 "totalRecords": result.TotalRecords,
614 }
615 if data, err := json.MarshalIndent(summary, "", " "); err == nil {
616 fmt.Fprintln(os.Stderr, string(data))
617 }
618 }
619
620 if result.ErrorCount > 0 {
621 return fmt.Errorf("import completed with %d errors", result.ErrorCount)
622 }
623
624 return nil
625}
626
627func (a *App) createProgressLogger() func(sync.ProgressReport) {
628 return func(pr sync.ProgressReport) {
629 if a.outputFormat == "json" {
630 if data, err := json.MarshalIndent(pr, "", " "); err == nil {
631 fmt.Fprintln(os.Stderr, string(data))
632 }
633 } else {
634 a.log.Info("sync progress",
635 slog.Int("completed", pr.Completed),
636 slog.Int("total", pr.Total),
637 slog.Float64("percent", pr.Percent),
638 slog.String("elapsed", pr.Elapsed),
639 slog.String("eta", pr.ETA),
640 slog.String("rate", pr.Rate),
641 slog.Int("errors", pr.Errors),
642 slog.Int("writes", pr.WritesConsumed),
643 slog.Int("global", pr.GlobalConsumed),
644 slog.String("limited_rate", pr.ConstrainedRate),
645 slog.String("reset_in", pr.TimeUntilReset))
646 }
647 }
648}
649
650func (a *App) runSync(ctx context.Context, cmd *cli.Command) error {
651 storage, err := cache.NewBoltStorage(false)
652 if err != nil {
653 return fmt.Errorf("open cache: %w", err)
654 }
655 defer storage.Close()
656
657 authClient, err := a.prepareAuth(ctx, cmd)
658 if err != nil {
659 return err
660 }
661
662 fresh := cmd.Bool("fresh")
663 useCAR := cmd.Bool("car")
664 a.log.Info("Starting sync operation", sync.DIDAttr(authClient.DID()), slog.Bool("fresh", fresh), slog.Bool("use_car", useCAR))
665
666 limiter := sync.NewRateLimiter(storage, 0.85)
667 repoClient := sync.NewRateClient(authClient.APIClient(), authClient.DID(), limiter)
668
669 if fresh {
670 if err := storage.Clear(authClient.DID()); err != nil {
671 a.log.Error("Failed to clear cache", sync.ErrorAttr(err))
672 } else {
673 a.log.Info("Cache cleared")
674 }
675 }
676
677 var existingRecords []sync.ExistingRecord
678 if useCAR {
679 existingRecords, err = sync.FetchExistingViaCAR(ctx, authClient, authClient.DID(), sync.RecordType, storage)
680 } else {
681 existingRecords, err = sync.FetchExisting(ctx, repoClient, authClient.DID(), storage, fresh)
682 }
683 if err != nil {
684 return fmt.Errorf("fetch existing records: %w", err)
685 }
686
687 a.log.Info("Sync stats", slog.Int("total_records", len(existingRecords)))
688
689 return nil
690}
691
692func (a *App) runDedupe(ctx context.Context, cmd *cli.Command) error {
693 storage, err := cache.NewBoltStorage(false)
694 if err != nil {
695 return fmt.Errorf("open cache: %w", err)
696 }
697 defer storage.Close()
698
699 authClient, err := a.prepareAuth(ctx, cmd)
700 if err != nil {
701 return fmt.Errorf("authentication failed: %w", err)
702 }
703
704 dryRun := cmd.Bool("dry-run")
705 fresh := cmd.Bool("fresh")
706 yes := cmd.Bool("yes")
707 useCAR := cmd.Bool("car")
708 a.log.Info("Starting dedupe operation",
709 sync.DIDAttr(authClient.DID()),
710 slog.Bool("dry_run", dryRun),
711 slog.Bool("fresh", fresh),
712 slog.Bool("use_car", useCAR))
713
714 limiter := sync.NewRateLimiter(storage, 0.9)
715 repoClient := sync.NewRateClient(authClient.APIClient(), authClient.DID(), limiter)
716
717 if fresh {
718 if err := storage.Clear(authClient.DID()); err != nil {
719 a.log.Error("Failed to clear cache", sync.ErrorAttr(err))
720 } else {
721 a.log.Info("Cache cleared")
722 }
723 }
724
725 var existingRecords []sync.ExistingRecord
726 if useCAR {
727 existingRecords, err = sync.FetchExistingViaCAR(ctx, authClient, authClient.DID(), sync.RecordType, storage)
728 } else {
729 existingRecords, err = sync.FetchExisting(ctx, repoClient, authClient.DID(), storage, fresh)
730 }
731 if err != nil {
732 return fmt.Errorf("failed to fetch existing records: %w", err)
733 }
734
735 duplicates := sync.FindDuplicates(existingRecords)
736 totalDuplicates := 0
737 for _, group := range duplicates {
738 totalDuplicates += len(group) - 1
739 }
740
741 a.log.Info("Dedupe analysis",
742 slog.Int("total_records", len(existingRecords)),
743 slog.Int("duplicate_groups", len(duplicates)),
744 slog.Int("total_duplicates", totalDuplicates))
745
746 if totalDuplicates == 0 {
747 a.log.Info("No duplicates found")
748 return nil
749 }
750
751 if dryRun {
752 a.log.InfoContext(ctx, "Dry run - would remove the following duplicates")
753 for _, group := range duplicates {
754 keep := group[0]
755 for _, rec := range group[1:] {
756 a.log.InfoContext(ctx, "Would remove",
757 slog.String("uri", rec.URI),
758 slog.String("track", keep.Value.TrackName),
759 slog.String("artist", keep.Value.ArtistName()),
760 slog.String("time", keep.Value.PlayedTime.Format(time.RFC3339)))
761 }
762 }
763 return nil
764 }
765
766 if !yes {
767 fmt.Fprintf(os.Stderr, "\nThis will permanently delete %d duplicate record(s). Continue? [y/N]: ", totalDuplicates)
768 var response string
769 fmt.Scanln(&response)
770 if response != "y" && response != "Y" {
771 a.log.Info("Dedupe cancelled by user")
772 return nil
773 }
774 }
775
776 for _, group := range duplicates {
777 for i := 1; i < len(group); i++ {
778 rec := group[i]
779 uri := rec.URI
780 parts := strings.Split(uri, "/")
781 rkey := parts[len(parts)-1]
782
783 retryPolicy := retrypolicy.NewBuilder[any]().
784 WithMaxRetries(10).
785 WithBackoff(sync.BaseRetryDelay, 5*time.Minute).
786 HandleIf(func(_ any, err error) bool {
787 return sync.IsTransientError(err)
788 }).
789 OnRetryScheduled(func(e failsafe.ExecutionScheduledEvent[any]) {
790 a.log.Warn("Delete failed with transient error, retrying",
791 slog.Duration("retryDelay", e.Delay),
792 sync.ErrorAttr(e.LastError()),
793 slog.Int("attempt", e.Attempts()),
794 slog.String("uri", uri))
795 }).
796 Build()
797
798 err := failsafe.With(retryPolicy).WithContext(ctx).Run(func() error {
799 return repoClient.DeleteRecord(ctx, sync.RecordType, rkey)
800 })
801
802 if err != nil {
803 a.log.Error("Failed to delete record", sync.ErrorAttr(err), slog.String("uri", uri))
804 } else {
805 a.log.Info("Deleted duplicate", slog.String("uri", uri), slog.String("track", rec.Value.TrackName))
806 }
807 }
808 }
809
810 if err := storage.Clear(authClient.DID()); err != nil {
811 a.log.Error("Failed to clear cache", "err", err)
812 }
813
814 return nil
815}
816
817func (a *App) outputRecords(records []*sync.PlayRecord, outputPath string) error {
818 var output io.Writer = os.Stdout
819 if outputPath != "" {
820 file, err := os.Create(outputPath)
821 if err != nil {
822 return fmt.Errorf("create output file: %w", err)
823 }
824 defer file.Close()
825 output = file
826 }
827
828 enc := json.NewEncoder(output)
829 for _, r := range records {
830 if err := enc.Encode(r); err != nil {
831 return fmt.Errorf("encode record: %w", err)
832 }
833 }
834
835 return nil
836}
837
838var commonFlags = []cli.Flag{
839 &cli.StringFlag{
840 Name: "handle",
841 Usage: "Bluesky handle",
842 Sources: cli.EnvVars(EnvHandle),
843 },
844 &cli.StringFlag{
845 Name: "password",
846 Usage: "App password",
847 Sources: cli.EnvVars(EnvPassword),
848 },
849 &cli.BoolFlag{
850 Name: "verbose",
851 Usage: "Enable verbose logging (-v for debug, -vv for trace)",
852 Aliases: []string{"v"},
853 Sources: cli.EnvVars(EnvVerbose),
854 Config: cli.BoolConfig{Count: &verboseCount},
855 },
856 &cli.BoolFlag{
857 Name: "quiet",
858 Usage: "Suppress non-essential output (-q for warn, -qq for errors, -qqq for silent)",
859 Aliases: []string{"q"},
860 Sources: cli.EnvVars(EnvQuiet),
861 Config: cli.BoolConfig{Count: &quietCount},
862 },
863 &cli.StringFlag{
864 Name: "output-format",
865 Usage: "Output format: text or json",
866 Value: "text",
867 Sources: cli.EnvVars("LAZULI_OUTPUT_FORMAT"),
868 },
869}
870
871var lastfmFlag = &cli.StringFlag{
872 Name: "lastfm",
873 Usage: "Path to Last.fm CSV file or directory",
874 Sources: cli.EnvVars("LAZULI_LASTFM"),
875}
876
877var spotifyFlag = &cli.StringFlag{
878 Name: "spotify",
879 Usage: "Path to Spotify JSON/directory/zip",
880 Sources: cli.EnvVars("LAZULI_SPOTIFY"),
881}
882
883var exportFlags = []cli.Flag{
884 lastfmFlag,
885 spotifyFlag,
886 &cli.StringFlag{
887 Name: "output",
888 Usage: "Output file (stdout if not set)",
889 Sources: cli.EnvVars("LAZULI_OUTPUT"),
890 },
891 &cli.BoolFlag{
892 Name: "reverse",
893 Usage: "Sort records reverse chronologically",
894 Sources: cli.EnvVars(EnvReverse),
895 },
896 &cli.DurationFlag{
897 Name: "tolerance",
898 Usage: "Time tolerance for cross-source deduplication (e.g., 5m, 10m)",
899 Value: sync.DefaultCrossSourceTolerance,
900 Sources: cli.EnvVars("LAZULI_TOLERANCE"),
901 },
902}
903
904var importFlags = []cli.Flag{
905 lastfmFlag,
906 spotifyFlag,
907 &cli.BoolFlag{
908 Name: "dry-run",
909 Usage: "Preview without publishing",
910 Sources: cli.EnvVars(EnvDryRun),
911 },
912 &cli.BoolFlag{
913 Name: "reverse",
914 Usage: "Import in reverse order",
915 Sources: cli.EnvVars(EnvReverse),
916 },
917 &cli.BoolFlag{
918 Name: "fresh",
919 Usage: "Don't use cached Bluesky records",
920 Sources: cli.EnvVars(EnvFresh),
921 },
922 &cli.BoolFlag{
923 Name: "clear-cache",
924 Usage: "Clear cache before running",
925 Sources: cli.EnvVars(EnvClearCache),
926 },
927 &cli.IntFlag{
928 Name: "batch-size",
929 Usage: "Records per batch (default: 20)",
930 Value: DefaultBatchSize,
931 Sources: cli.EnvVars("LAZULI_BATCH_SIZE"),
932 },
933 &cli.DurationFlag{
934 Name: "tolerance",
935 Usage: "Time tolerance for cross-source deduplication (e.g., 5m, 10m)",
936 Value: sync.DefaultCrossSourceTolerance,
937 Sources: cli.EnvVars("LAZULI_TOLERANCE"),
938 },
939}
940
941var syncFlags = []cli.Flag{
942 &cli.BoolFlag{
943 Name: "fresh",
944 Usage: "Force refresh cache",
945 Sources: cli.EnvVars(EnvFresh),
946 },
947 &cli.BoolFlag{
948 Name: "car",
949 Usage: "Use CAR export (faster for large repos)",
950 Sources: cli.EnvVars("LAZULI_USE_CAR"),
951 },
952}
953
954var dedupeFlags = []cli.Flag{
955 &cli.BoolFlag{
956 Name: "dry-run",
957 Usage: "Preview without deleting",
958 Sources: cli.EnvVars(EnvDryRun),
959 },
960 &cli.BoolFlag{
961 Name: "fresh",
962 Usage: "Force refresh cache",
963 Sources: cli.EnvVars(EnvFresh),
964 },
965 &cli.BoolFlag{
966 Name: "yes",
967 Usage: "Skip confirmation prompt",
968 Aliases: []string{"y"},
969 Sources: cli.EnvVars(EnvYes),
970 },
971 &cli.BoolFlag{
972 Name: "car",
973 Usage: "Use CAR export (faster for large repos)",
974 Sources: cli.EnvVars("LAZULI_USE_CAR"),
975 },
976}
977
978type Parser interface {
979 ParseFile(ctx context.Context, r io.Reader) ([]*sync.PlayRecord, error)
980 ParseFS(ctx context.Context, fsys fs.FS) ([]*sync.PlayRecord, error)
981}
982
983func parseInput(ctx context.Context, path string, parser Parser) ([]*sync.PlayRecord, error) {
984 info, err := os.Stat(path)
985 if err != nil {
986 return nil, fmt.Errorf("stat path: %w", err)
987 }
988
989 if info.IsDir() {
990 return parser.ParseFS(ctx, os.DirFS(path))
991 }
992
993 if strings.HasSuffix(path, ".zip") {
994 zf, err := zip.OpenReader(path)
995 if err != nil {
996 return nil, fmt.Errorf("open zip: %w", err)
997 }
998
999 defer zf.Close()
1000
1001 return parser.ParseFS(ctx, zf)
1002 }
1003
1004 file, err := os.Open(path)
1005 if err != nil {
1006 return nil, fmt.Errorf("open file: %w", err)
1007 }
1008
1009 defer file.Close()
1010
1011 return parser.ParseFile(ctx, file)
1012}
1013
1014func loadRecordsMerge(ctx context.Context, lastFMPath, spotifyPath string, tolerance time.Duration) ([]*sync.PlayRecord, int, error) {
1015 var lastfmRecords, spotifyRecords []*sync.PlayRecord
1016 var err error
1017
1018 if lastFMPath != "" {
1019 lastfmRecords, err = parseInput(ctx, lastFMPath, lastfm.Parser{})
1020 if err != nil {
1021 return nil, 0, fmt.Errorf("parse lastfm: %w", err)
1022 }
1023 }
1024
1025 if spotifyPath != "" {
1026 spotifyRecords, err = parseInput(ctx, spotifyPath, spotify.Parser{})
1027 if err != nil {
1028 return nil, 0, fmt.Errorf("parse spotify: %w", err)
1029 }
1030 }
1031
1032 f := func(a, b *sync.PlayRecord) int {
1033 if v := a.Time().Compare(b.Time()); v != 0 {
1034 return v
1035 }
1036
1037 return cmp.Compare(a.ArtistName(), b.ArtistName())
1038 }
1039 slices.SortFunc(spotifyRecords, f)
1040 slices.SortFunc(lastfmRecords, f)
1041
1042 totalInput := len(lastfmRecords) + len(spotifyRecords)
1043
1044 mergedRecords := kway.Merge([][]*sync.PlayRecord{lastfmRecords, spotifyRecords}, tolerance)
1045
1046 return mergedRecords, totalInput, nil
1047}