Signed-off-by: brookjeynes me@brookjeynes.dev
+35
-35
internal/consumer/ingester.go
+35
-35
internal/consumer/ingester.go
···
36
36
}
37
37
}()
38
38
39
-
l := i.Logger.With("kind", e.Kind)
39
+
l := i.Logger.With("kind", e.Kind).With("did", e.Did)
40
40
switch e.Kind {
41
41
case models.EventKindCommit:
42
+
l = i.Logger.With("nsid", e.Commit.Collection).With("rkey", e.Commit.RKey)
42
43
switch e.Commit.Collection {
43
44
case yoten.ActorProfileNSID:
44
45
l = l.With("handler", "ingestProfile")
45
-
err = i.ingestProfile(e)
46
+
err = i.ingestProfile(e, l)
46
47
case yoten.FeedSessionNSID:
47
48
l = l.With("handler", "ingestStudySession")
48
-
err = i.ingestStudySession(e)
49
+
err = i.ingestStudySession(e, l)
49
50
case yoten.ActivityDefNSID:
50
51
l = l.With("handler", "ingestActivityDef")
51
-
err = i.ingestActivityDef(e)
52
+
err = i.ingestActivityDef(e, l)
52
53
case yoten.FeedResourceNSID:
53
54
l = l.With("handler", "ingestResource")
54
-
err = i.ingestResource(e)
55
+
err = i.ingestResource(e, l)
55
56
case yoten.GraphFollowNSID:
56
57
l = l.With("handler", "ingestFollow")
57
-
err = i.ingestFollow(e)
58
+
err = i.ingestFollow(e, l)
58
59
case yoten.FeedReactionNSID:
59
60
l = l.With("handler", "ingestReaction")
60
-
err = i.ingestReaction(e)
61
+
err = i.ingestReaction(e, l)
61
62
case yoten.FeedCommentNSID:
62
63
l = l.With("handler", "ingestComment")
63
-
err = i.ingestComment(e)
64
+
err = i.ingestComment(e, l)
64
65
}
65
-
l = i.Logger.With("nsid", e.Commit.Collection)
66
66
}
67
67
if err != nil {
68
68
l.Error("failed to ingest event", "err", err)
···
72
72
}
73
73
}
74
74
75
-
func (i *Ingester) ingestProfile(e *models.Event) error {
75
+
func (i *Ingester) ingestProfile(e *models.Event, logger *slog.Logger) error {
76
76
did := e.Did
77
77
78
78
if e.Commit.RKey != "self" {
···
139
139
return fmt.Errorf("failed to start transaction: %w", err)
140
140
}
141
141
142
-
i.Logger.Debug("upserting profile from pds request")
142
+
logger.Debug("upserting profile from pds request")
143
143
err = db.UpsertProfile(tx, &profile)
144
144
if err != nil {
145
145
tx.Rollback()
···
151
151
return nil
152
152
}
153
153
154
-
func (i *Ingester) ingestStudySession(e *models.Event) error {
154
+
func (i *Ingester) ingestStudySession(e *models.Event, logger *slog.Logger) error {
155
155
did := e.Did
156
156
157
157
switch e.Commit.Operation {
···
170
170
171
171
date, err := time.Parse(time.RFC3339, record.Date)
172
172
if err != nil {
173
-
i.Logger.Error("invalid record", "err", err)
173
+
logger.Error("invalid record", "err", err)
174
174
return err
175
175
}
176
176
···
237
237
return fmt.Errorf("failed to start transaction: %w", err)
238
238
}
239
239
240
-
i.Logger.Debug("upserting study session from pds request")
240
+
logger.Debug("upserting study session from pds request")
241
241
err = db.UpsertStudySession(tx, &studySession, e.Commit.RKey)
242
242
if err != nil {
243
243
tx.Rollback()
244
244
return fmt.Errorf("failed to upsert study session record: %w", err)
245
245
}
246
246
247
-
err = db.UpdateXPForSession(tx, &studySession)
247
+
err = db.UpdateXPForSession(tx, &studySession, logger)
248
248
if err != nil {
249
249
tx.Rollback()
250
250
return fmt.Errorf("failed to add xp for session: %w", err)
···
262
262
return fmt.Errorf("failed to start transaction: %w", err)
263
263
}
264
264
265
-
i.Logger.Debug("deleting study session from pds request")
265
+
logger.Debug("deleting study session from pds request")
266
266
err = db.RemoveXPForSession(tx, did, e.Commit.RKey, logger)
267
267
if err != nil {
268
268
tx.Rollback()
···
281
281
return nil
282
282
}
283
283
284
-
func (i *Ingester) ingestActivityDef(e *models.Event) error {
284
+
func (i *Ingester) ingestActivityDef(e *models.Event, logger *slog.Logger) error {
285
285
did := e.Did
286
286
var err error
287
287
···
354
354
return fmt.Errorf("failed to start transaction: %w", err)
355
355
}
356
356
357
-
i.Logger.Debug("upserting activity def from pds request")
357
+
logger.Debug("upserting activity def from pds request")
358
358
err = db.UpsertActivityDef(tx, &activityDef, e.Commit.RKey)
359
359
if err != nil {
360
360
tx.Rollback()
···
362
362
}
363
363
return tx.Commit()
364
364
case models.CommitOperationDelete:
365
-
i.Logger.Debug("deleting activity def from pds request")
365
+
logger.Debug("deleting activity def from pds request")
366
366
err = db.DeleteActivityDefByRkey(i.Db, did, e.Commit.RKey)
367
367
}
368
368
if err != nil {
···
372
372
return nil
373
373
}
374
374
375
-
func (i *Ingester) ingestFollow(e *models.Event) error {
375
+
func (i *Ingester) ingestFollow(e *models.Event, logger *slog.Logger) error {
376
376
var err error
377
377
did := e.Did
378
378
···
397
397
398
398
subjectDid := record.Subject
399
399
400
-
i.Logger.Debug("upserting follow from pds request")
400
+
logger.Debug("upserting follow from pds request")
401
401
err = db.AddFollow(tx, did, subjectDid, e.Commit.RKey)
402
402
if err != nil {
403
403
tx.Rollback()
···
407
407
subjectUri := fmt.Sprintf("at://%s/%s/%s", did, yoten.GraphFollowNSID, e.Commit.RKey)
408
408
err = db.CreateNotification(tx, subjectDid, did, subjectUri, db.NotificationTypeFollow)
409
409
if err != nil {
410
-
i.Logger.Error("failed to create notification record", "err", err)
410
+
logger.Error("failed to create notification record", "err", err)
411
411
}
412
412
413
413
return tx.Commit()
414
414
case models.CommitOperationDelete:
415
-
i.Logger.Debug("deleting follow from pds request")
415
+
logger.Debug("deleting follow from pds request")
416
416
err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
417
417
}
418
418
if err != nil {
···
422
422
return nil
423
423
}
424
424
425
-
func (i *Ingester) ingestReaction(e *models.Event) error {
425
+
func (i *Ingester) ingestReaction(e *models.Event, logger *slog.Logger) error {
426
426
var err error
427
427
did := e.Did
428
428
···
475
475
CreatedAt: createdAt,
476
476
}
477
477
478
-
i.Logger.Debug("upserting reaction from pds request")
478
+
logger.Debug("upserting reaction from pds request")
479
479
err = db.UpsertReaction(i.Db, reactionEvent)
480
480
if err != nil {
481
481
tx.Rollback()
···
484
484
485
485
err = db.CreateNotification(tx, subjectDid.String(), did, subject.String(), db.NotificationTypeReaction)
486
486
if err != nil {
487
-
i.Logger.Error("failed to create notification record", "err", err)
487
+
logger.Error("failed to create notification record", "err", err)
488
488
}
489
489
490
490
return tx.Commit()
491
491
case models.CommitOperationDelete:
492
-
i.Logger.Debug("deleting reaction from pds request")
492
+
logger.Debug("deleting reaction from pds request")
493
493
err = db.DeleteReactionByRkey(i.Db, did, e.Commit.RKey)
494
494
}
495
495
if err != nil {
···
499
499
return nil
500
500
}
501
501
502
-
func (i *Ingester) ingestResource(e *models.Event) error {
502
+
func (i *Ingester) ingestResource(e *models.Event, logger *slog.Logger) error {
503
503
var err error
504
504
did := e.Did
505
505
···
556
556
return fmt.Errorf("invalid resource: %w", err)
557
557
}
558
558
559
-
i.Logger.Debug("upserting resource from pds request")
559
+
logger.Debug("upserting resource from pds request")
560
560
err = db.UpsertResource(i.Db, resource, resource.Rkey)
561
561
if err != nil {
562
562
tx.Rollback()
···
564
564
}
565
565
return tx.Commit()
566
566
case models.CommitOperationDelete:
567
-
i.Logger.Debug("deleting resource from pds request")
567
+
logger.Debug("deleting resource from pds request")
568
568
err = db.DeleteResourceByRkey(i.Db, did, e.Commit.RKey)
569
569
}
570
570
if err != nil {
···
574
574
return nil
575
575
}
576
576
577
-
func (i *Ingester) ingestComment(e *models.Event) error {
577
+
func (i *Ingester) ingestComment(e *models.Event, logger *slog.Logger) error {
578
578
var err error
579
579
did := e.Did
580
580
···
636
636
CreatedAt: createdAt,
637
637
}
638
638
639
-
i.Logger.Debug("upserting comment from pds request")
639
+
logger.Debug("upserting comment from pds request")
640
640
err = db.UpsertComment(i.Db, comment)
641
641
if err != nil {
642
642
tx.Rollback()
···
647
647
if subjectDid.String() != did {
648
648
err = db.CreateNotification(tx, subjectDid.String(), did, subjectUri.String(), db.NotificationTypeComment)
649
649
if err != nil {
650
-
i.Logger.Error("failed to create notification record", "err", err)
650
+
logger.Error("failed to create notification record", "err", err)
651
651
}
652
652
}
653
653
···
655
655
if comment.ParentCommentUri != nil && comment.ParentCommentUri.Authority().String() != did {
656
656
err = db.CreateNotification(tx, comment.ParentCommentUri.Authority().String(), did, parentCommentUri.String(), db.NotificationTypeReply)
657
657
if err != nil {
658
-
i.Logger.Error("failed to create notification record", "err", err)
658
+
logger.Error("failed to create notification record", "err", err)
659
659
}
660
660
}
661
661
662
662
return tx.Commit()
663
663
case models.CommitOperationDelete:
664
-
i.Logger.Debug("deleting comment from pds request")
664
+
logger.Debug("deleting comment from pds request")
665
665
err = db.DeleteCommentByRkey(i.Db, did, e.Commit.RKey)
666
666
}
667
667
if err != nil {
+18
-8
internal/db/xp.go
+18
-8
internal/db/xp.go
···
3
3
import (
4
4
"database/sql"
5
5
"fmt"
6
-
"log"
6
+
"log/slog"
7
7
"math"
8
8
"time"
9
9
)
···
105
105
return nil
106
106
}
107
107
108
-
func GetXPEventsForUser(e Execer, did string) ([]XpEvent, error) {
108
+
func GetXPEventsForUser(e Execer, did string, logger *slog.Logger) ([]XpEvent, error) {
109
+
l := logger.With("handler", "GetXPEventsForUser")
110
+
109
111
rows, err := e.Query(`
110
112
select did, session_rkey, xp_gained, created_at
111
113
from xp_events
···
129
131
createdAtStr,
130
132
)
131
133
if err != nil {
132
-
log.Println("failed to scan xp event:", err)
134
+
l.Error("failed to find xp_event", "err", err)
133
135
continue
134
136
}
135
137
···
148
150
return xpEvents, nil
149
151
}
150
152
151
-
func UpdateXPForSession(e Execer, updatedSession *StudySession) error {
153
+
func UpdateXPForSession(e Execer, updatedSession *StudySession, logger *slog.Logger) error {
154
+
l := logger.With("handler", "UpdateXPForSession")
155
+
152
156
var oldXPGained int
153
157
err := e.QueryRow(`
154
158
select xp_gained from xp_events where did = ? and session_rkey = ?
155
159
`, updatedSession.Did, updatedSession.Rkey).Scan(&oldXPGained)
156
160
if err != nil {
157
161
if err == sql.ErrNoRows {
158
-
log.Println("adding xp for new session")
162
+
l.Debug("adding xp for session")
159
163
return AddXPForSession(e, updatedSession.Did, *updatedSession)
160
164
}
161
165
return fmt.Errorf("failed to get old xp for session: %w", err)
···
166
170
return nil
167
171
}
168
172
173
+
l.Debug("updating xp for session")
174
+
169
175
_, err = e.Exec(`
170
176
insert into xp_events (did, session_rkey, xp_gained)
171
177
values (?, ?, ?)
···
194
200
return nil
195
201
}
196
202
197
-
func RemoveXPForSession(e Execer, did string, sessionRkey string) error {
203
+
func RemoveXPForSession(e Execer, did string, sessionRkey string, logger *slog.Logger) error {
204
+
l := logger.With("handler", "RemoveXPForSession")
205
+
198
206
var xpToRemove int
207
+
199
208
err := e.QueryRow(`
200
209
select xp_gained from xp_events where did = ? and session_rkey = ?
201
210
`, did, sessionRkey).Scan(&xpToRemove)
202
-
203
211
if err != nil {
204
212
if err == sql.ErrNoRows {
205
-
log.Printf("no xp event found for session '%s', nothing to remove.", sessionRkey)
213
+
l.Debug("xp_event not found, nothing to remove")
206
214
return nil
207
215
}
208
216
return fmt.Errorf("failed to get xp for session being deleted: %w", err)
···
213
221
return fmt.Errorf("failed to delete xp_event: %w", err)
214
222
}
215
223
224
+
l.Debug("deleted xp_event")
225
+
216
226
var currentTotalXP int
217
227
err = e.QueryRow("select xp from profiles where did = ?", did).Scan(¤tTotalXP)
218
228
if err != nil {
History
1 round
0 comments
brookjeynes.dev
submitted
#0
1 commit
expand
collapse
feat(db/xp): improve logging
Signed-off-by: brookjeynes <me@brookjeynes.dev>
expand 0 comments
pull request successfully merged