Yōten: A social tracker for your language learning journey built on the atproto.

feat(db/xp): improve logging #23

merged opened by brookjeynes.dev targeting master from push-xlxvwwpkrpwv
Labels

None yet.

Participants 1
AT URI
at://did:plc:4mj54vc4ha3lh32ksxwunnbh/sh.tangled.repo.pull/3m3cf3hayab22
+53 -43
Diff #0
+35 -35
internal/consumer/ingester.go
··· 36 } 37 }() 38 39 - l := i.Logger.With("kind", e.Kind) 40 switch e.Kind { 41 case models.EventKindCommit: 42 switch e.Commit.Collection { 43 case yoten.ActorProfileNSID: 44 l = l.With("handler", "ingestProfile") 45 - err = i.ingestProfile(e) 46 case yoten.FeedSessionNSID: 47 l = l.With("handler", "ingestStudySession") 48 - err = i.ingestStudySession(e) 49 case yoten.ActivityDefNSID: 50 l = l.With("handler", "ingestActivityDef") 51 - err = i.ingestActivityDef(e) 52 case yoten.FeedResourceNSID: 53 l = l.With("handler", "ingestResource") 54 - err = i.ingestResource(e) 55 case yoten.GraphFollowNSID: 56 l = l.With("handler", "ingestFollow") 57 - err = i.ingestFollow(e) 58 case yoten.FeedReactionNSID: 59 l = l.With("handler", "ingestReaction") 60 - err = i.ingestReaction(e) 61 case yoten.FeedCommentNSID: 62 l = l.With("handler", "ingestComment") 63 - err = i.ingestComment(e) 64 } 65 - l = i.Logger.With("nsid", e.Commit.Collection) 66 } 67 if err != nil { 68 l.Error("failed to ingest event", "err", err) ··· 72 } 73 } 74 75 - func (i *Ingester) ingestProfile(e *models.Event) error { 76 did := e.Did 77 78 if e.Commit.RKey != "self" { ··· 139 return fmt.Errorf("failed to start transaction: %w", err) 140 } 141 142 - i.Logger.Debug("upserting profile from pds request") 143 err = db.UpsertProfile(tx, &profile) 144 if err != nil { 145 tx.Rollback() ··· 151 return nil 152 } 153 154 - func (i *Ingester) ingestStudySession(e *models.Event) error { 155 did := e.Did 156 157 switch e.Commit.Operation { ··· 170 171 date, err := time.Parse(time.RFC3339, record.Date) 172 if err != nil { 173 - i.Logger.Error("invalid record", "err", err) 174 return err 175 } 176 ··· 237 return fmt.Errorf("failed to start transaction: %w", err) 238 } 239 240 - i.Logger.Debug("upserting study session from pds request") 241 err = db.UpsertStudySession(tx, &studySession, e.Commit.RKey) 242 if err != nil { 243 tx.Rollback() 244 return fmt.Errorf("failed to upsert study session record: %w", err) 245 } 246 247 - err = db.UpdateXPForSession(tx, &studySession) 248 if err != nil { 249 tx.Rollback() 250 return fmt.Errorf("failed to add xp for session: %w", err) ··· 262 return fmt.Errorf("failed to start transaction: %w", err) 263 } 264 265 - i.Logger.Debug("deleting study session from pds request") 266 err = db.RemoveXPForSession(tx, did, e.Commit.RKey, logger) 267 if err != nil { 268 tx.Rollback() ··· 281 return nil 282 } 283 284 - func (i *Ingester) ingestActivityDef(e *models.Event) error { 285 did := e.Did 286 var err error 287 ··· 354 return fmt.Errorf("failed to start transaction: %w", err) 355 } 356 357 - i.Logger.Debug("upserting activity def from pds request") 358 err = db.UpsertActivityDef(tx, &activityDef, e.Commit.RKey) 359 if err != nil { 360 tx.Rollback() ··· 362 } 363 return tx.Commit() 364 case models.CommitOperationDelete: 365 - i.Logger.Debug("deleting activity def from pds request") 366 err = db.DeleteActivityDefByRkey(i.Db, did, e.Commit.RKey) 367 } 368 if err != nil { ··· 372 return nil 373 } 374 375 - func (i *Ingester) ingestFollow(e *models.Event) error { 376 var err error 377 did := e.Did 378 ··· 397 398 subjectDid := record.Subject 399 400 - i.Logger.Debug("upserting follow from pds request") 401 err = db.AddFollow(tx, did, subjectDid, e.Commit.RKey) 402 if err != nil { 403 tx.Rollback() ··· 407 subjectUri := fmt.Sprintf("at://%s/%s/%s", did, yoten.GraphFollowNSID, e.Commit.RKey) 408 err = db.CreateNotification(tx, subjectDid, did, subjectUri, db.NotificationTypeFollow) 409 if err != nil { 410 - i.Logger.Error("failed to create notification record", "err", err) 411 } 412 413 return tx.Commit() 414 case models.CommitOperationDelete: 415 - i.Logger.Debug("deleting follow from pds request") 416 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 417 } 418 if err != nil { ··· 422 return nil 423 } 424 425 - func (i *Ingester) ingestReaction(e *models.Event) error { 426 var err error 427 did := e.Did 428 ··· 475 CreatedAt: createdAt, 476 } 477 478 - i.Logger.Debug("upserting reaction from pds request") 479 err = db.UpsertReaction(i.Db, reactionEvent) 480 if err != nil { 481 tx.Rollback() ··· 484 485 err = db.CreateNotification(tx, subjectDid.String(), did, subject.String(), db.NotificationTypeReaction) 486 if err != nil { 487 - i.Logger.Error("failed to create notification record", "err", err) 488 } 489 490 return tx.Commit() 491 case models.CommitOperationDelete: 492 - i.Logger.Debug("deleting reaction from pds request") 493 err = db.DeleteReactionByRkey(i.Db, did, e.Commit.RKey) 494 } 495 if err != nil { ··· 499 return nil 500 } 501 502 - func (i *Ingester) ingestResource(e *models.Event) error { 503 var err error 504 did := e.Did 505 ··· 556 return fmt.Errorf("invalid resource: %w", err) 557 } 558 559 - i.Logger.Debug("upserting resource from pds request") 560 err = db.UpsertResource(i.Db, resource, resource.Rkey) 561 if err != nil { 562 tx.Rollback() ··· 564 } 565 return tx.Commit() 566 case models.CommitOperationDelete: 567 - i.Logger.Debug("deleting resource from pds request") 568 err = db.DeleteResourceByRkey(i.Db, did, e.Commit.RKey) 569 } 570 if err != nil { ··· 574 return nil 575 } 576 577 - func (i *Ingester) ingestComment(e *models.Event) error { 578 var err error 579 did := e.Did 580 ··· 636 CreatedAt: createdAt, 637 } 638 639 - i.Logger.Debug("upserting comment from pds request") 640 err = db.UpsertComment(i.Db, comment) 641 if err != nil { 642 tx.Rollback() ··· 647 if subjectDid.String() != did { 648 err = db.CreateNotification(tx, subjectDid.String(), did, subjectUri.String(), db.NotificationTypeComment) 649 if err != nil { 650 - i.Logger.Error("failed to create notification record", "err", err) 651 } 652 } 653 ··· 655 if comment.ParentCommentUri != nil && comment.ParentCommentUri.Authority().String() != did { 656 err = db.CreateNotification(tx, comment.ParentCommentUri.Authority().String(), did, parentCommentUri.String(), db.NotificationTypeReply) 657 if err != nil { 658 - i.Logger.Error("failed to create notification record", "err", err) 659 } 660 } 661 662 return tx.Commit() 663 case models.CommitOperationDelete: 664 - i.Logger.Debug("deleting comment from pds request") 665 err = db.DeleteCommentByRkey(i.Db, did, e.Commit.RKey) 666 } 667 if err != nil {
··· 36 } 37 }() 38 39 + l := i.Logger.With("kind", e.Kind).With("did", e.Did) 40 switch e.Kind { 41 case models.EventKindCommit: 42 + l = i.Logger.With("nsid", e.Commit.Collection).With("rkey", e.Commit.RKey) 43 switch e.Commit.Collection { 44 case yoten.ActorProfileNSID: 45 l = l.With("handler", "ingestProfile") 46 + err = i.ingestProfile(e, l) 47 case yoten.FeedSessionNSID: 48 l = l.With("handler", "ingestStudySession") 49 + err = i.ingestStudySession(e, l) 50 case yoten.ActivityDefNSID: 51 l = l.With("handler", "ingestActivityDef") 52 + err = i.ingestActivityDef(e, l) 53 case yoten.FeedResourceNSID: 54 l = l.With("handler", "ingestResource") 55 + err = i.ingestResource(e, l) 56 case yoten.GraphFollowNSID: 57 l = l.With("handler", "ingestFollow") 58 + err = i.ingestFollow(e, l) 59 case yoten.FeedReactionNSID: 60 l = l.With("handler", "ingestReaction") 61 + err = i.ingestReaction(e, l) 62 case yoten.FeedCommentNSID: 63 l = l.With("handler", "ingestComment") 64 + err = i.ingestComment(e, l) 65 } 66 } 67 if err != nil { 68 l.Error("failed to ingest event", "err", err) ··· 72 } 73 } 74 75 + func (i *Ingester) ingestProfile(e *models.Event, logger *slog.Logger) error { 76 did := e.Did 77 78 if e.Commit.RKey != "self" { ··· 139 return fmt.Errorf("failed to start transaction: %w", err) 140 } 141 142 + logger.Debug("upserting profile from pds request") 143 err = db.UpsertProfile(tx, &profile) 144 if err != nil { 145 tx.Rollback() ··· 151 return nil 152 } 153 154 + func (i *Ingester) ingestStudySession(e *models.Event, logger *slog.Logger) error { 155 did := e.Did 156 157 switch e.Commit.Operation { ··· 170 171 date, err := time.Parse(time.RFC3339, record.Date) 172 if err != nil { 173 + logger.Error("invalid record", "err", err) 174 return err 175 } 176 ··· 237 return fmt.Errorf("failed to start transaction: %w", err) 238 } 239 240 + logger.Debug("upserting study session from pds request") 241 err = db.UpsertStudySession(tx, &studySession, e.Commit.RKey) 242 if err != nil { 243 tx.Rollback() 244 return fmt.Errorf("failed to upsert study session record: %w", err) 245 } 246 247 + err = db.UpdateXPForSession(tx, &studySession, logger) 248 if err != nil { 249 tx.Rollback() 250 return fmt.Errorf("failed to add xp for session: %w", err) ··· 262 return fmt.Errorf("failed to start transaction: %w", err) 263 } 264 265 + logger.Debug("deleting study session from pds request") 266 err = db.RemoveXPForSession(tx, did, e.Commit.RKey, logger) 267 if err != nil { 268 tx.Rollback() ··· 281 return nil 282 } 283 284 + func (i *Ingester) ingestActivityDef(e *models.Event, logger *slog.Logger) error { 285 did := e.Did 286 var err error 287 ··· 354 return fmt.Errorf("failed to start transaction: %w", err) 355 } 356 357 + logger.Debug("upserting activity def from pds request") 358 err = db.UpsertActivityDef(tx, &activityDef, e.Commit.RKey) 359 if err != nil { 360 tx.Rollback() ··· 362 } 363 return tx.Commit() 364 case models.CommitOperationDelete: 365 + logger.Debug("deleting activity def from pds request") 366 err = db.DeleteActivityDefByRkey(i.Db, did, e.Commit.RKey) 367 } 368 if err != nil { ··· 372 return nil 373 } 374 375 + func (i *Ingester) ingestFollow(e *models.Event, logger *slog.Logger) error { 376 var err error 377 did := e.Did 378 ··· 397 398 subjectDid := record.Subject 399 400 + logger.Debug("upserting follow from pds request") 401 err = db.AddFollow(tx, did, subjectDid, e.Commit.RKey) 402 if err != nil { 403 tx.Rollback() ··· 407 subjectUri := fmt.Sprintf("at://%s/%s/%s", did, yoten.GraphFollowNSID, e.Commit.RKey) 408 err = db.CreateNotification(tx, subjectDid, did, subjectUri, db.NotificationTypeFollow) 409 if err != nil { 410 + logger.Error("failed to create notification record", "err", err) 411 } 412 413 return tx.Commit() 414 case models.CommitOperationDelete: 415 + logger.Debug("deleting follow from pds request") 416 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 417 } 418 if err != nil { ··· 422 return nil 423 } 424 425 + func (i *Ingester) ingestReaction(e *models.Event, logger *slog.Logger) error { 426 var err error 427 did := e.Did 428 ··· 475 CreatedAt: createdAt, 476 } 477 478 + logger.Debug("upserting reaction from pds request") 479 err = db.UpsertReaction(i.Db, reactionEvent) 480 if err != nil { 481 tx.Rollback() ··· 484 485 err = db.CreateNotification(tx, subjectDid.String(), did, subject.String(), db.NotificationTypeReaction) 486 if err != nil { 487 + logger.Error("failed to create notification record", "err", err) 488 } 489 490 return tx.Commit() 491 case models.CommitOperationDelete: 492 + logger.Debug("deleting reaction from pds request") 493 err = db.DeleteReactionByRkey(i.Db, did, e.Commit.RKey) 494 } 495 if err != nil { ··· 499 return nil 500 } 501 502 + func (i *Ingester) ingestResource(e *models.Event, logger *slog.Logger) error { 503 var err error 504 did := e.Did 505 ··· 556 return fmt.Errorf("invalid resource: %w", err) 557 } 558 559 + logger.Debug("upserting resource from pds request") 560 err = db.UpsertResource(i.Db, resource, resource.Rkey) 561 if err != nil { 562 tx.Rollback() ··· 564 } 565 return tx.Commit() 566 case models.CommitOperationDelete: 567 + logger.Debug("deleting resource from pds request") 568 err = db.DeleteResourceByRkey(i.Db, did, e.Commit.RKey) 569 } 570 if err != nil { ··· 574 return nil 575 } 576 577 + func (i *Ingester) ingestComment(e *models.Event, logger *slog.Logger) error { 578 var err error 579 did := e.Did 580 ··· 636 CreatedAt: createdAt, 637 } 638 639 + logger.Debug("upserting comment from pds request") 640 err = db.UpsertComment(i.Db, comment) 641 if err != nil { 642 tx.Rollback() ··· 647 if subjectDid.String() != did { 648 err = db.CreateNotification(tx, subjectDid.String(), did, subjectUri.String(), db.NotificationTypeComment) 649 if err != nil { 650 + logger.Error("failed to create notification record", "err", err) 651 } 652 } 653 ··· 655 if comment.ParentCommentUri != nil && comment.ParentCommentUri.Authority().String() != did { 656 err = db.CreateNotification(tx, comment.ParentCommentUri.Authority().String(), did, parentCommentUri.String(), db.NotificationTypeReply) 657 if err != nil { 658 + logger.Error("failed to create notification record", "err", err) 659 } 660 } 661 662 return tx.Commit() 663 case models.CommitOperationDelete: 664 + logger.Debug("deleting comment from pds request") 665 err = db.DeleteCommentByRkey(i.Db, did, e.Commit.RKey) 666 } 667 if err != nil {
+18 -8
internal/db/xp.go
··· 3 import ( 4 "database/sql" 5 "fmt" 6 - "log" 7 "math" 8 "time" 9 ) ··· 105 return nil 106 } 107 108 - func GetXPEventsForUser(e Execer, did string) ([]XpEvent, error) { 109 rows, err := e.Query(` 110 select did, session_rkey, xp_gained, created_at 111 from xp_events ··· 129 createdAtStr, 130 ) 131 if err != nil { 132 - log.Println("failed to scan xp event:", err) 133 continue 134 } 135 ··· 148 return xpEvents, nil 149 } 150 151 - func UpdateXPForSession(e Execer, updatedSession *StudySession) error { 152 var oldXPGained int 153 err := e.QueryRow(` 154 select xp_gained from xp_events where did = ? and session_rkey = ? 155 `, updatedSession.Did, updatedSession.Rkey).Scan(&oldXPGained) 156 if err != nil { 157 if err == sql.ErrNoRows { 158 - log.Println("adding xp for new session") 159 return AddXPForSession(e, updatedSession.Did, *updatedSession) 160 } 161 return fmt.Errorf("failed to get old xp for session: %w", err) ··· 166 return nil 167 } 168 169 _, err = e.Exec(` 170 insert into xp_events (did, session_rkey, xp_gained) 171 values (?, ?, ?) ··· 194 return nil 195 } 196 197 - func RemoveXPForSession(e Execer, did string, sessionRkey string) error { 198 var xpToRemove int 199 err := e.QueryRow(` 200 select xp_gained from xp_events where did = ? and session_rkey = ? 201 `, did, sessionRkey).Scan(&xpToRemove) 202 - 203 if err != nil { 204 if err == sql.ErrNoRows { 205 - log.Printf("no xp event found for session '%s', nothing to remove.", sessionRkey) 206 return nil 207 } 208 return fmt.Errorf("failed to get xp for session being deleted: %w", err) ··· 213 return fmt.Errorf("failed to delete xp_event: %w", err) 214 } 215 216 var currentTotalXP int 217 err = e.QueryRow("select xp from profiles where did = ?", did).Scan(&currentTotalXP) 218 if err != nil {
··· 3 import ( 4 "database/sql" 5 "fmt" 6 + "log/slog" 7 "math" 8 "time" 9 ) ··· 105 return nil 106 } 107 108 + func GetXPEventsForUser(e Execer, did string, logger *slog.Logger) ([]XpEvent, error) { 109 + l := logger.With("handler", "GetXPEventsForUser") 110 + 111 rows, err := e.Query(` 112 select did, session_rkey, xp_gained, created_at 113 from xp_events ··· 131 createdAtStr, 132 ) 133 if err != nil { 134 + l.Error("failed to find xp_event", "err", err) 135 continue 136 } 137 ··· 150 return xpEvents, nil 151 } 152 153 + func UpdateXPForSession(e Execer, updatedSession *StudySession, logger *slog.Logger) error { 154 + l := logger.With("handler", "UpdateXPForSession") 155 + 156 var oldXPGained int 157 err := e.QueryRow(` 158 select xp_gained from xp_events where did = ? and session_rkey = ? 159 `, updatedSession.Did, updatedSession.Rkey).Scan(&oldXPGained) 160 if err != nil { 161 if err == sql.ErrNoRows { 162 + l.Debug("adding xp for session") 163 return AddXPForSession(e, updatedSession.Did, *updatedSession) 164 } 165 return fmt.Errorf("failed to get old xp for session: %w", err) ··· 170 return nil 171 } 172 173 + l.Debug("updating xp for session") 174 + 175 _, err = e.Exec(` 176 insert into xp_events (did, session_rkey, xp_gained) 177 values (?, ?, ?) ··· 200 return nil 201 } 202 203 + func RemoveXPForSession(e Execer, did string, sessionRkey string, logger *slog.Logger) error { 204 + l := logger.With("handler", "RemoveXPForSession") 205 + 206 var xpToRemove int 207 + 208 err := e.QueryRow(` 209 select xp_gained from xp_events where did = ? and session_rkey = ? 210 `, did, sessionRkey).Scan(&xpToRemove) 211 if err != nil { 212 if err == sql.ErrNoRows { 213 + l.Debug("xp_event not found, nothing to remove") 214 return nil 215 } 216 return fmt.Errorf("failed to get xp for session being deleted: %w", err) ··· 221 return fmt.Errorf("failed to delete xp_event: %w", err) 222 } 223 224 + l.Debug("deleted xp_event") 225 + 226 var currentTotalXP int 227 err = e.QueryRow("select xp from profiles where did = ?", did).Scan(&currentTotalXP) 228 if err != nil {

History

1 round 0 comments
sign up or login to add to the discussion
brookjeynes.dev submitted #0
1 commit
expand
feat(db/xp): improve logging
expand 0 comments
pull request successfully merged