···13 "sync"
14 "time"
150000016 "github.com/spf13/viper"
017 "github.com/teal-fm/piper/db"
18 "github.com/teal-fm/piper/models"
19- "github.com/teal-fm/piper/oauth/atproto"
20 "github.com/teal-fm/piper/service/musicbrainz"
21 "github.com/teal-fm/piper/session"
22)
2324type SpotifyService struct {
25 DB *db.DB
26- atprotoService *atproto.ATprotoAuthService // Added field
27 mb *musicbrainz.MusicBrainzService // Added field
28 userTracks map[int64]*models.Track
29 userTokens map[int64]string
30 mu sync.RWMutex
31}
3233-func NewSpotifyService(database *db.DB, atprotoService *atproto.ATprotoAuthService, musicBrainzService *musicbrainz.MusicBrainzService) *SpotifyService {
34 return &SpotifyService{
35 DB: database,
36 atprotoService: atprotoService,
···40 }
41}
420000000000000000000000000000000000000000000000000000000000000000000000043func (s *SpotifyService) SetAccessToken(token string, userId int64, hasSession bool) (int64, error) {
44 userID, err := s.identifyAndStoreUser(token, userId, hasSession)
45 if err != nil {
···347 var err error
348349 // Retry logic: try once, if 401, refresh and try again
350- for attempt := 0; attempt < 2; attempt++ {
351 // We need to be able to re-read the body if the request is retried,
352 // but since this is a GET request with no body, we don't need to worry about it.
353 resp, err = client.Do(req) // Use = instead of := inside loop
···552 s.mu.Lock()
553 s.userTracks[userID] = track
554 s.mu.Unlock()
00000000000000000000000000000000000000000555556 log.Printf("User %d is listening to: %s by %s", userID, track.Name, track.Artist)
557 }
···13 "sync"
14 "time"
1516+ "context" // Added for context.Context
17+18+ "github.com/bluesky-social/indigo/api/atproto" // Added for atproto.RepoCreateRecord_Input
19+ lexutil "github.com/bluesky-social/indigo/lex/util" // Added for lexutil.LexiconTypeDecoder
20+ "github.com/bluesky-social/indigo/xrpc" // Added for xrpc.Client
21 "github.com/spf13/viper"
22+ "github.com/teal-fm/piper/api/teal" // Added for teal.AlphaFeedPlay
23 "github.com/teal-fm/piper/db"
24 "github.com/teal-fm/piper/models"
25+ atprotoauth "github.com/teal-fm/piper/oauth/atproto"
26 "github.com/teal-fm/piper/service/musicbrainz"
27 "github.com/teal-fm/piper/session"
28)
2930type SpotifyService struct {
31 DB *db.DB
32+ atprotoService *atprotoauth.ATprotoAuthService // Added field
33 mb *musicbrainz.MusicBrainzService // Added field
34 userTracks map[int64]*models.Track
35 userTokens map[int64]string
36 mu sync.RWMutex
37}
3839+func NewSpotifyService(database *db.DB, atprotoService *atprotoauth.ATprotoAuthService, musicBrainzService *musicbrainz.MusicBrainzService) *SpotifyService {
40 return &SpotifyService{
41 DB: database,
42 atprotoService: atprotoService,
···46 }
47}
4849+func (s *SpotifyService) SubmitTrackToPDS(did string, track *models.Track, ctx context.Context) error {
50+ client, err := s.atprotoService.GetATProtoClient()
51+ if err != nil || client == nil {
52+ log.Printf("Error getting ATProto client: %v", err)
53+ return fmt.Errorf("failed to get ATProto client: %w", err)
54+ }
55+56+ xrpcClient := s.atprotoService.GetXrpcClient()
57+ if xrpcClient == nil {
58+ return errors.New("xrpc client is not available")
59+ }
60+61+ sess, err := s.DB.GetAtprotoSession(did, ctx, *client)
62+ if err != nil {
63+ return fmt.Errorf("couldn't get Atproto session for DID %s: %w", did, err)
64+ }
65+66+ artistArr := make([]string, 0, len(track.Artist))
67+ artistMbIdArr := make([]string, 0, len(track.Artist))
68+ for _, a := range track.Artist {
69+ artistArr = append(artistArr, a.Name)
70+ artistMbIdArr = append(artistMbIdArr, a.MBID)
71+ }
72+73+ var durationPtr *int64
74+ if track.DurationMs > 0 {
75+ durationSeconds := track.DurationMs / 1000
76+ durationPtr = &durationSeconds
77+ }
78+79+ playedTimeStr := track.Timestamp.Format(time.RFC3339)
80+ submissionAgent := viper.GetString("app.submission_agent")
81+ if submissionAgent == "" {
82+ submissionAgent = "piper/v0.0.1" // Default if not configured
83+ }
84+85+ tfmTrack := teal.AlphaFeedPlay{
86+ LexiconTypeID: "fm.teal.alpha.feed.play",
87+ Duration: durationPtr,
88+ TrackName: track.Name,
89+ PlayedTime: &playedTimeStr,
90+ ArtistNames: artistArr,
91+ ArtistMbIds: artistMbIdArr,
92+ ReleaseMbId: &track.ReleaseMBID,
93+ ReleaseName: &track.Album,
94+ RecordingMbId: &track.RecordingMBID,
95+ // Optional: Spotify specific data if your lexicon supports it
96+ // SpotifyTrackID: &track.ServiceID,
97+ // SpotifyAlbumID: &track.ServiceAlbumID,
98+ // SpotifyArtistIDs: track.ServiceArtistIDs, // Assuming this is a []string
99+ SubmissionClientAgent: &submissionAgent,
100+ }
101+102+ input := atproto.RepoCreateRecord_Input{
103+ Collection: "fm.teal.alpha.feed.play", // Ensure this collection is correct
104+ Repo: sess.DID,
105+ Record: &lexutil.LexiconTypeDecoder{Val: &tfmTrack},
106+ }
107+108+ authArgs := db.AtpSessionToAuthArgs(sess)
109+110+ var out atproto.RepoCreateRecord_Output
111+ if err := xrpcClient.Do(ctx, authArgs, xrpc.Procedure, "application/json", "com.atproto.repo.createRecord", nil, input, &out); err != nil {
112+ log.Printf("Error creating record for DID %s: %v. Input: %+v", did, err, input)
113+ return fmt.Errorf("failed to create record on PDS for DID %s: %w", did, err)
114+ }
115+116+ log.Printf("Successfully submitted track '%s' to PDS for DID %s. Record URI: %s", track.Name, did, out.Uri)
117+ return nil
118+}
119+120func (s *SpotifyService) SetAccessToken(token string, userId int64, hasSession bool) (int64, error) {
121 userID, err := s.identifyAndStoreUser(token, userId, hasSession)
122 if err != nil {
···424 var err error
425426 // Retry logic: try once, if 401, refresh and try again
427+ for attempt := range 2 {
428 // We need to be able to re-read the body if the request is retried,
429 // but since this is a GET request with no body, we don't need to worry about it.
430 resp, err = client.Do(req) // Use = instead of := inside loop
···629 s.mu.Lock()
630 s.userTracks[userID] = track
631 s.mu.Unlock()
632+633+ // Submit to ATProto PDS
634+ // The 'track' variable is *models.Track and has been saved to DB, PlayID is populated.
635+ dbUser, errUser := s.DB.GetUserByID(userID) // Fetch user by their internal ID
636+ if errUser != nil {
637+ log.Printf("User %d: Error fetching user details for PDS submission: %v", userID, errUser)
638+ } else if dbUser == nil {
639+ log.Printf("User %d: User not found in DB. Skipping PDS submission.", userID)
640+ } else if dbUser.ATProtoDID == nil || *dbUser.ATProtoDID == "" {
641+ log.Printf("User %d (%d): ATProto DID not set. Skipping PDS submission for track '%s'.", userID, dbUser.ATProtoDID, track.Name)
642+ } else {
643+ // User has a DID, proceed with hydration and submission
644+ var trackToSubmitToPDS *models.Track = track // Default to the original track (already *models.Track)
645+ if s.mb != nil { // Check if MusicBrainz service is available
646+ // musicbrainz.HydrateTrack expects models.Track as second argument, so we pass *track
647+ // and it returns *models.Track
648+ hydratedTrack, errHydrate := musicbrainz.HydrateTrack(s.mb, *track)
649+ if errHydrate != nil {
650+ log.Printf("User %d (%d): Error hydrating track '%s' with MusicBrainz: %v. Proceeding with original track data for PDS.", userID, dbUser.ATProtoDID, track.Name, errHydrate)
651+ } else {
652+ log.Printf("User %d (%d): Successfully hydrated track '%s' with MusicBrainz.", userID, dbUser.ATProtoDID, track.Name)
653+ trackToSubmitToPDS = hydratedTrack // hydratedTrack is *models.Track
654+ }
655+ } else {
656+ log.Printf("User %d (%d): MusicBrainz service not configured. Proceeding with original track data for PDS.", userID, dbUser.ATProtoDID)
657+ }
658+659+ artistName := "Unknown Artist"
660+ if len(trackToSubmitToPDS.Artist) > 0 {
661+ artistName = trackToSubmitToPDS.Artist[0].Name
662+ }
663+664+ log.Printf("User %d (%d): Attempting to submit track '%s' by %s to PDS (DID: %s)", userID, dbUser.ATProtoDID, trackToSubmitToPDS.Name, artistName, *dbUser.ATProtoDID)
665+ // Use context.Background() for now, or pass down a context if available
666+ if errPDS := s.SubmitTrackToPDS(*dbUser.ATProtoDID, trackToSubmitToPDS, context.Background()); errPDS != nil {
667+ log.Printf("User %d (%d): Error submitting track '%s' to PDS: %v", userID, dbUser.ATProtoDID, trackToSubmitToPDS.Name, errPDS)
668+ } else {
669+ log.Printf("User %d (%d): Successfully submitted track '%s' to PDS.", userID, dbUser.ATProtoDID, trackToSubmitToPDS.Name)
670+ }
671+ }
672+ // End of PDS submission block
673674 log.Printf("User %d is listening to: %s by %s", userID, track.Name, track.Artist)
675 }