Monorepo for Tangled

appview: upsert star/reaction/follow records

Most service flow will be:

1. start db transaction
2. run db operation
3. run PDS operation
4. rollback db if anything above failed
5. commit transaction

If PDS operation succeed, don't try rollback anymore. The ingester will
backfill the missed db operations.

Signed-off-by: Seongmin Lee <git@boltless.me>

boltless.me c193338d 35de823f

verified
+143 -63
+12 -3
appview/db/follow.go
··· 11 11 "tangled.org/core/orm" 12 12 ) 13 13 14 - func AddFollow(e Execer, follow *models.Follow) error { 15 - query := `insert or ignore into follows (did, subject_did, rkey) values (?, ?, ?)` 16 - _, err := e.Exec(query, follow.UserDid, follow.SubjectDid, follow.Rkey) 14 + func UpsertFollow(e Execer, follow models.Follow) error { 15 + _, err := e.Exec( 16 + `insert into follows (did, rkey, subject_did, created) 17 + values (?, ?, ?, ?) 18 + on conflict(did, rkey) do update set 19 + subject_did = excluded.subject_did, 20 + created = excluded.created`, 21 + follow.UserDid, 22 + follow.Rkey, 23 + follow.SubjectDid, 24 + follow.FollowedAt.Format(time.RFC3339), 25 + ) 17 26 return err 18 27 } 19 28
+14 -3
appview/db/reaction.go
··· 9 9 "tangled.org/core/appview/models" 10 10 ) 11 11 12 - func AddReaction(e Execer, did string, subjectAt syntax.ATURI, kind models.ReactionKind, rkey string) error { 13 - query := `insert or ignore into reactions (did, subject_at, kind, rkey) values (?, ?, ?, ?)` 14 - _, err := e.Exec(query, did, subjectAt, kind, rkey) 12 + func UpsertReaction(e Execer, reaction models.Reaction) error { 13 + _, err := e.Exec( 14 + `insert into reactions (did, rkey, subject_at, kind, created) 15 + values (?, ?, ?, ?, ?) 16 + on conflict(did, rkey) do update set 17 + subject_at = excluded.subject_at, 18 + kind = excluded.kind, 19 + created = excluded.created`, 20 + reaction.ReactedByDid, 21 + reaction.Rkey, 22 + reaction.ThreadAt, 23 + reaction.Kind, 24 + reaction.Created.Format(time.RFC3339), 25 + ) 15 26 return err 16 27 } 17 28
+8 -4
appview/db/star.go
··· 13 13 "tangled.org/core/orm" 14 14 ) 15 15 16 - func AddStar(e Execer, star *models.Star) error { 17 - query := `insert or ignore into stars (did, subject_at, rkey) values (?, ?, ?)` 16 + func UpsertStar(e Execer, star models.Star) error { 18 17 _, err := e.Exec( 19 - query, 18 + `insert into stars (did, rkey, subject_at, created) 19 + values (?, ?, ?, ?) 20 + on conflict(did, rkey) do update set 21 + subject_at = excluded.subject_at, 22 + created = excluded.created`, 20 23 star.Did, 21 - star.RepoAt.String(), 22 24 star.Rkey, 25 + star.RepoAt, 26 + star.Created.Format(time.RFC3339), 23 27 ) 24 28 return err 25 29 }
+2 -2
appview/ingester.go
··· 119 119 l.Error("invalid record", "err", err) 120 120 return err 121 121 } 122 - err = db.AddStar(i.Db, &models.Star{ 122 + err = db.UpsertStar(i.Db, models.Star{ 123 123 Did: did, 124 124 RepoAt: subjectUri, 125 125 Rkey: e.Commit.RKey, ··· 152 152 return err 153 153 } 154 154 155 - err = db.AddFollow(i.Db, &models.Follow{ 155 + err = db.UpsertFollow(i.Db, models.Follow{ 156 156 UserDid: did, 157 157 SubjectDid: record.Subject, 158 158 Rkey: e.Commit.RKey,
+9
appview/models/follow.go
··· 2 2 3 3 import ( 4 4 "time" 5 + 6 + "tangled.org/core/api/tangled" 5 7 ) 6 8 7 9 type Follow struct { ··· 9 11 SubjectDid string 10 12 FollowedAt time.Time 11 13 Rkey string 14 + } 15 + 16 + func (f *Follow) AsRecord() tangled.GraphFollow { 17 + return tangled.GraphFollow{ 18 + Subject: f.SubjectDid, 19 + CreatedAt: f.FollowedAt.Format(time.RFC3339), 20 + } 12 21 } 13 22 14 23 type FollowStats struct {
+9
appview/models/reaction.go
··· 4 4 "time" 5 5 6 6 "github.com/bluesky-social/indigo/atproto/syntax" 7 + "tangled.org/core/api/tangled" 7 8 ) 8 9 9 10 type ReactionKind string ··· 54 55 Created time.Time 55 56 Rkey string 56 57 Kind ReactionKind 58 + } 59 + 60 + func (r *Reaction) AsRecord() tangled.FeedReaction { 61 + return tangled.FeedReaction{ 62 + Subject: r.ThreadAt.String(), 63 + Reaction: r.Kind.String(), 64 + CreatedAt: r.Created.Format(time.RFC3339), 65 + } 57 66 } 58 67 59 68 type ReactionDisplayData struct {
+8
appview/models/star.go
··· 4 4 "time" 5 5 6 6 "github.com/bluesky-social/indigo/atproto/syntax" 7 + "tangled.org/core/api/tangled" 7 8 ) 8 9 9 10 type Star struct { ··· 11 12 RepoAt syntax.ATURI 12 13 Created time.Time 13 14 Rkey string 15 + } 16 + 17 + func (s *Star) AsRecord() tangled.FeedStar { 18 + return tangled.FeedStar{ 19 + Subject: s.RepoAt.String(), 20 + CreatedAt: s.Created.Format(time.RFC3339), 21 + } 14 22 } 15 23 16 24 // RepoStar is used for reverse mapping to repos
+27 -19
appview/state/follow.go
··· 43 43 44 44 switch r.Method { 45 45 case http.MethodPost: 46 - createdAt := time.Now().Format(time.RFC3339) 47 - rkey := tid.TID() 46 + follow := models.Follow{ 47 + UserDid: currentUser.Active.Did, 48 + SubjectDid: subjectIdent.DID.String(), 49 + Rkey: tid.TID(), 50 + FollowedAt: time.Now(), 51 + } 52 + 53 + tx, err := s.db.BeginTx(r.Context(), nil) 54 + if err != nil { 55 + s.logger.Error("failed to start transaction", "err", err) 56 + return 57 + } 58 + defer tx.Rollback() 59 + 60 + if err := db.UpsertFollow(tx, follow); err != nil { 61 + s.logger.Error("failed to follow", "err", err) 62 + return 63 + } 64 + 65 + record := follow.AsRecord() 48 66 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 49 67 Collection: tangled.GraphFollowNSID, 50 68 Repo: currentUser.Active.Did, 51 - Rkey: rkey, 69 + Rkey: follow.Rkey, 52 70 Record: &lexutil.LexiconTypeDecoder{ 53 - Val: &tangled.GraphFollow{ 54 - Subject: subjectIdent.DID.String(), 55 - CreatedAt: createdAt, 56 - }}, 71 + Val: &record, 72 + }, 57 73 }) 58 74 if err != nil { 59 75 log.Println("failed to create atproto record", err) 60 76 return 61 77 } 62 - 63 78 log.Println("created atproto record: ", resp.Uri) 64 79 65 - follow := &models.Follow{ 66 - UserDid: currentUser.Active.Did, 67 - SubjectDid: subjectIdent.DID.String(), 68 - Rkey: rkey, 80 + if err := tx.Commit(); err != nil { 81 + s.logger.Error("failed to commit transaction", "err", err) 82 + // DB op failed but record is created in PDS. Ingester will backfill the missed operation 69 83 } 70 84 71 - err = db.AddFollow(s.db, follow) 72 - if err != nil { 73 - log.Println("failed to follow", err) 74 - return 75 - } 76 - 77 - s.notifier.NewFollow(r.Context(), follow) 85 + s.notifier.NewFollow(r.Context(), &follow) 78 86 79 87 followStats, err := db.GetFollowerFollowingCount(s.db, subjectIdent.DID.String()) 80 88 if err != nil {
+27 -14
appview/state/reaction.go
··· 45 45 46 46 switch r.Method { 47 47 case http.MethodPost: 48 - createdAt := time.Now().Format(time.RFC3339) 49 - rkey := tid.TID() 48 + reaction := models.Reaction{ 49 + ReactedByDid: currentUser.Active.Did, 50 + Rkey: tid.TID(), 51 + Kind: reactionKind, 52 + ThreadAt: subjectUri, 53 + Created: time.Now(), 54 + } 55 + 56 + tx, err := s.db.BeginTx(r.Context(), nil) 57 + if err != nil { 58 + s.logger.Error("failed to start transaction", "err", err) 59 + return 60 + } 61 + defer tx.Rollback() 62 + 63 + if err := db.UpsertReaction(tx, reaction); err != nil { 64 + log.Println("failed to react", err) 65 + return 66 + } 67 + 68 + record := reaction.AsRecord() 50 69 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 51 70 Collection: tangled.FeedReactionNSID, 52 71 Repo: currentUser.Active.Did, 53 - Rkey: rkey, 72 + Rkey: reaction.Rkey, 54 73 Record: &lexutil.LexiconTypeDecoder{ 55 - Val: &tangled.FeedReaction{ 56 - Subject: subjectUri.String(), 57 - Reaction: reactionKind.String(), 58 - CreatedAt: createdAt, 59 - }, 74 + Val: &record, 60 75 }, 61 76 }) 62 77 if err != nil { 63 78 log.Println("failed to create atproto record", err) 64 79 return 65 80 } 81 + log.Println("created atproto record: ", resp.Uri) 66 82 67 - err = db.AddReaction(s.db, currentUser.Active.Did, subjectUri, reactionKind, rkey) 68 - if err != nil { 69 - log.Println("failed to react", err) 70 - return 83 + if err := tx.Commit(); err != nil { 84 + s.logger.Error("failed to commit transaction", "err", err) 85 + // DB op failed but record is created in PDS. Ingester will backfill the missed operation 71 86 } 72 87 73 88 reactionMap, err := db.GetReactionMap(s.db, 20, subjectUri) 74 89 if err != nil { 75 90 log.Println("failed to get reactions for ", subjectUri) 76 91 } 77 - 78 - log.Println("created atproto record: ", resp.Uri) 79 92 80 93 s.pages.ThreadReactionFragment(w, pages.ThreadReactionFragmentParams{ 81 94 ThreadAt: subjectUri,
+27 -18
appview/state/star.go
··· 38 38 39 39 switch r.Method { 40 40 case http.MethodPost: 41 - createdAt := time.Now().Format(time.RFC3339) 42 - rkey := tid.TID() 41 + star := models.Star{ 42 + Did: currentUser.Active.Did, 43 + Rkey: tid.TID(), 44 + RepoAt: subjectUri, 45 + Created: time.Now(), 46 + } 47 + 48 + tx, err := s.db.BeginTx(r.Context(), nil) 49 + if err != nil { 50 + s.logger.Error("failed to start transaction", "err", err) 51 + return 52 + } 53 + defer tx.Rollback() 54 + 55 + if err := db.UpsertStar(tx, star); err != nil { 56 + s.logger.Error("failed to star", "err", err) 57 + return 58 + } 59 + 60 + record := star.AsRecord() 43 61 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 44 62 Collection: tangled.FeedStarNSID, 45 63 Repo: currentUser.Active.Did, 46 - Rkey: rkey, 64 + Rkey: star.Rkey, 47 65 Record: &lexutil.LexiconTypeDecoder{ 48 - Val: &tangled.FeedStar{ 49 - Subject: subjectUri.String(), 50 - CreatedAt: createdAt, 51 - }}, 66 + Val: &record, 67 + }, 52 68 }) 53 69 if err != nil { 54 70 log.Println("failed to create atproto record", err) ··· 56 72 } 57 73 log.Println("created atproto record: ", resp.Uri) 58 74 59 - star := &models.Star{ 60 - Did: currentUser.Active.Did, 61 - RepoAt: subjectUri, 62 - Rkey: rkey, 75 + if err := tx.Commit(); err != nil { 76 + s.logger.Error("failed to commit transaction", "err", err) 77 + // DB op failed but record is created in PDS. Ingester will backfill the missed operation 63 78 } 64 79 65 - err = db.AddStar(s.db, star) 66 - if err != nil { 67 - log.Println("failed to star", err) 68 - return 69 - } 80 + s.notifier.NewStar(r.Context(), &star) 70 81 71 82 starCount, err := db.GetStarCount(s.db, subjectUri) 72 83 if err != nil { 73 84 log.Println("failed to get star count for ", subjectUri) 74 85 } 75 - 76 - s.notifier.NewStar(r.Context(), star) 77 86 78 87 s.pages.StarBtnFragment(w, pages.StarBtnFragmentParams{ 79 88 IsStarred: true,