Signed-off-by: brookjeynes me@brookjeynes.dev
+35
-35
internal/consumer/ingester.go
+35
-35
internal/consumer/ingester.go
···
36
}
37
}()
38
39
-
l := i.Logger.With("kind", e.Kind)
40
switch e.Kind {
41
case models.EventKindCommit:
42
switch e.Commit.Collection {
43
case yoten.ActorProfileNSID:
44
l = l.With("handler", "ingestProfile")
45
-
err = i.ingestProfile(e)
46
case yoten.FeedSessionNSID:
47
l = l.With("handler", "ingestStudySession")
48
-
err = i.ingestStudySession(e)
49
case yoten.ActivityDefNSID:
50
l = l.With("handler", "ingestActivityDef")
51
-
err = i.ingestActivityDef(e)
52
case yoten.FeedResourceNSID:
53
l = l.With("handler", "ingestResource")
54
-
err = i.ingestResource(e)
55
case yoten.GraphFollowNSID:
56
l = l.With("handler", "ingestFollow")
57
-
err = i.ingestFollow(e)
58
case yoten.FeedReactionNSID:
59
l = l.With("handler", "ingestReaction")
60
-
err = i.ingestReaction(e)
61
case yoten.FeedCommentNSID:
62
l = l.With("handler", "ingestComment")
63
-
err = i.ingestComment(e)
64
}
65
-
l = i.Logger.With("nsid", e.Commit.Collection)
66
}
67
if err != nil {
68
l.Error("failed to ingest event", "err", err)
···
72
}
73
}
74
75
-
func (i *Ingester) ingestProfile(e *models.Event) error {
76
did := e.Did
77
78
if e.Commit.RKey != "self" {
···
139
return fmt.Errorf("failed to start transaction: %w", err)
140
}
141
142
-
i.Logger.Debug("upserting profile from pds request")
143
err = db.UpsertProfile(tx, &profile)
144
if err != nil {
145
tx.Rollback()
···
151
return nil
152
}
153
154
-
func (i *Ingester) ingestStudySession(e *models.Event) error {
155
did := e.Did
156
157
switch e.Commit.Operation {
···
170
171
date, err := time.Parse(time.RFC3339, record.Date)
172
if err != nil {
173
-
i.Logger.Error("invalid record", "err", err)
174
return err
175
}
176
···
237
return fmt.Errorf("failed to start transaction: %w", err)
238
}
239
240
-
i.Logger.Debug("upserting study session from pds request")
241
err = db.UpsertStudySession(tx, &studySession, e.Commit.RKey)
242
if err != nil {
243
tx.Rollback()
244
return fmt.Errorf("failed to upsert study session record: %w", err)
245
}
246
247
-
err = db.UpdateXPForSession(tx, &studySession)
248
if err != nil {
249
tx.Rollback()
250
return fmt.Errorf("failed to add xp for session: %w", err)
···
262
return fmt.Errorf("failed to start transaction: %w", err)
263
}
264
265
-
i.Logger.Debug("deleting study session from pds request")
266
err = db.RemoveXPForSession(tx, did, e.Commit.RKey, logger)
267
if err != nil {
268
tx.Rollback()
···
281
return nil
282
}
283
284
-
func (i *Ingester) ingestActivityDef(e *models.Event) error {
285
did := e.Did
286
var err error
287
···
354
return fmt.Errorf("failed to start transaction: %w", err)
355
}
356
357
-
i.Logger.Debug("upserting activity def from pds request")
358
err = db.UpsertActivityDef(tx, &activityDef, e.Commit.RKey)
359
if err != nil {
360
tx.Rollback()
···
362
}
363
return tx.Commit()
364
case models.CommitOperationDelete:
365
-
i.Logger.Debug("deleting activity def from pds request")
366
err = db.DeleteActivityDefByRkey(i.Db, did, e.Commit.RKey)
367
}
368
if err != nil {
···
372
return nil
373
}
374
375
-
func (i *Ingester) ingestFollow(e *models.Event) error {
376
var err error
377
did := e.Did
378
···
397
398
subjectDid := record.Subject
399
400
-
i.Logger.Debug("upserting follow from pds request")
401
err = db.AddFollow(tx, did, subjectDid, e.Commit.RKey)
402
if err != nil {
403
tx.Rollback()
···
407
subjectUri := fmt.Sprintf("at://%s/%s/%s", did, yoten.GraphFollowNSID, e.Commit.RKey)
408
err = db.CreateNotification(tx, subjectDid, did, subjectUri, db.NotificationTypeFollow)
409
if err != nil {
410
-
i.Logger.Error("failed to create notification record", "err", err)
411
}
412
413
return tx.Commit()
414
case models.CommitOperationDelete:
415
-
i.Logger.Debug("deleting follow from pds request")
416
err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
417
}
418
if err != nil {
···
422
return nil
423
}
424
425
-
func (i *Ingester) ingestReaction(e *models.Event) error {
426
var err error
427
did := e.Did
428
···
475
CreatedAt: createdAt,
476
}
477
478
-
i.Logger.Debug("upserting reaction from pds request")
479
err = db.UpsertReaction(i.Db, reactionEvent)
480
if err != nil {
481
tx.Rollback()
···
484
485
err = db.CreateNotification(tx, subjectDid.String(), did, subject.String(), db.NotificationTypeReaction)
486
if err != nil {
487
-
i.Logger.Error("failed to create notification record", "err", err)
488
}
489
490
return tx.Commit()
491
case models.CommitOperationDelete:
492
-
i.Logger.Debug("deleting reaction from pds request")
493
err = db.DeleteReactionByRkey(i.Db, did, e.Commit.RKey)
494
}
495
if err != nil {
···
499
return nil
500
}
501
502
-
func (i *Ingester) ingestResource(e *models.Event) error {
503
var err error
504
did := e.Did
505
···
556
return fmt.Errorf("invalid resource: %w", err)
557
}
558
559
-
i.Logger.Debug("upserting resource from pds request")
560
err = db.UpsertResource(i.Db, resource, resource.Rkey)
561
if err != nil {
562
tx.Rollback()
···
564
}
565
return tx.Commit()
566
case models.CommitOperationDelete:
567
-
i.Logger.Debug("deleting resource from pds request")
568
err = db.DeleteResourceByRkey(i.Db, did, e.Commit.RKey)
569
}
570
if err != nil {
···
574
return nil
575
}
576
577
-
func (i *Ingester) ingestComment(e *models.Event) error {
578
var err error
579
did := e.Did
580
···
636
CreatedAt: createdAt,
637
}
638
639
-
i.Logger.Debug("upserting comment from pds request")
640
err = db.UpsertComment(i.Db, comment)
641
if err != nil {
642
tx.Rollback()
···
647
if subjectDid.String() != did {
648
err = db.CreateNotification(tx, subjectDid.String(), did, subjectUri.String(), db.NotificationTypeComment)
649
if err != nil {
650
-
i.Logger.Error("failed to create notification record", "err", err)
651
}
652
}
653
···
655
if comment.ParentCommentUri != nil && comment.ParentCommentUri.Authority().String() != did {
656
err = db.CreateNotification(tx, comment.ParentCommentUri.Authority().String(), did, parentCommentUri.String(), db.NotificationTypeReply)
657
if err != nil {
658
-
i.Logger.Error("failed to create notification record", "err", err)
659
}
660
}
661
662
return tx.Commit()
663
case models.CommitOperationDelete:
664
-
i.Logger.Debug("deleting comment from pds request")
665
err = db.DeleteCommentByRkey(i.Db, did, e.Commit.RKey)
666
}
667
if err != nil {
···
36
}
37
}()
38
39
+
l := i.Logger.With("kind", e.Kind).With("did", e.Did)
40
switch e.Kind {
41
case models.EventKindCommit:
42
+
l = i.Logger.With("nsid", e.Commit.Collection).With("rkey", e.Commit.RKey)
43
switch e.Commit.Collection {
44
case yoten.ActorProfileNSID:
45
l = l.With("handler", "ingestProfile")
46
+
err = i.ingestProfile(e, l)
47
case yoten.FeedSessionNSID:
48
l = l.With("handler", "ingestStudySession")
49
+
err = i.ingestStudySession(e, l)
50
case yoten.ActivityDefNSID:
51
l = l.With("handler", "ingestActivityDef")
52
+
err = i.ingestActivityDef(e, l)
53
case yoten.FeedResourceNSID:
54
l = l.With("handler", "ingestResource")
55
+
err = i.ingestResource(e, l)
56
case yoten.GraphFollowNSID:
57
l = l.With("handler", "ingestFollow")
58
+
err = i.ingestFollow(e, l)
59
case yoten.FeedReactionNSID:
60
l = l.With("handler", "ingestReaction")
61
+
err = i.ingestReaction(e, l)
62
case yoten.FeedCommentNSID:
63
l = l.With("handler", "ingestComment")
64
+
err = i.ingestComment(e, l)
65
}
66
}
67
if err != nil {
68
l.Error("failed to ingest event", "err", err)
···
72
}
73
}
74
75
+
func (i *Ingester) ingestProfile(e *models.Event, logger *slog.Logger) error {
76
did := e.Did
77
78
if e.Commit.RKey != "self" {
···
139
return fmt.Errorf("failed to start transaction: %w", err)
140
}
141
142
+
logger.Debug("upserting profile from pds request")
143
err = db.UpsertProfile(tx, &profile)
144
if err != nil {
145
tx.Rollback()
···
151
return nil
152
}
153
154
+
func (i *Ingester) ingestStudySession(e *models.Event, logger *slog.Logger) error {
155
did := e.Did
156
157
switch e.Commit.Operation {
···
170
171
date, err := time.Parse(time.RFC3339, record.Date)
172
if err != nil {
173
+
logger.Error("invalid record", "err", err)
174
return err
175
}
176
···
237
return fmt.Errorf("failed to start transaction: %w", err)
238
}
239
240
+
logger.Debug("upserting study session from pds request")
241
err = db.UpsertStudySession(tx, &studySession, e.Commit.RKey)
242
if err != nil {
243
tx.Rollback()
244
return fmt.Errorf("failed to upsert study session record: %w", err)
245
}
246
247
+
err = db.UpdateXPForSession(tx, &studySession, logger)
248
if err != nil {
249
tx.Rollback()
250
return fmt.Errorf("failed to add xp for session: %w", err)
···
262
return fmt.Errorf("failed to start transaction: %w", err)
263
}
264
265
+
logger.Debug("deleting study session from pds request")
266
err = db.RemoveXPForSession(tx, did, e.Commit.RKey, logger)
267
if err != nil {
268
tx.Rollback()
···
281
return nil
282
}
283
284
+
func (i *Ingester) ingestActivityDef(e *models.Event, logger *slog.Logger) error {
285
did := e.Did
286
var err error
287
···
354
return fmt.Errorf("failed to start transaction: %w", err)
355
}
356
357
+
logger.Debug("upserting activity def from pds request")
358
err = db.UpsertActivityDef(tx, &activityDef, e.Commit.RKey)
359
if err != nil {
360
tx.Rollback()
···
362
}
363
return tx.Commit()
364
case models.CommitOperationDelete:
365
+
logger.Debug("deleting activity def from pds request")
366
err = db.DeleteActivityDefByRkey(i.Db, did, e.Commit.RKey)
367
}
368
if err != nil {
···
372
return nil
373
}
374
375
+
func (i *Ingester) ingestFollow(e *models.Event, logger *slog.Logger) error {
376
var err error
377
did := e.Did
378
···
397
398
subjectDid := record.Subject
399
400
+
logger.Debug("upserting follow from pds request")
401
err = db.AddFollow(tx, did, subjectDid, e.Commit.RKey)
402
if err != nil {
403
tx.Rollback()
···
407
subjectUri := fmt.Sprintf("at://%s/%s/%s", did, yoten.GraphFollowNSID, e.Commit.RKey)
408
err = db.CreateNotification(tx, subjectDid, did, subjectUri, db.NotificationTypeFollow)
409
if err != nil {
410
+
logger.Error("failed to create notification record", "err", err)
411
}
412
413
return tx.Commit()
414
case models.CommitOperationDelete:
415
+
logger.Debug("deleting follow from pds request")
416
err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
417
}
418
if err != nil {
···
422
return nil
423
}
424
425
+
func (i *Ingester) ingestReaction(e *models.Event, logger *slog.Logger) error {
426
var err error
427
did := e.Did
428
···
475
CreatedAt: createdAt,
476
}
477
478
+
logger.Debug("upserting reaction from pds request")
479
err = db.UpsertReaction(i.Db, reactionEvent)
480
if err != nil {
481
tx.Rollback()
···
484
485
err = db.CreateNotification(tx, subjectDid.String(), did, subject.String(), db.NotificationTypeReaction)
486
if err != nil {
487
+
logger.Error("failed to create notification record", "err", err)
488
}
489
490
return tx.Commit()
491
case models.CommitOperationDelete:
492
+
logger.Debug("deleting reaction from pds request")
493
err = db.DeleteReactionByRkey(i.Db, did, e.Commit.RKey)
494
}
495
if err != nil {
···
499
return nil
500
}
501
502
+
func (i *Ingester) ingestResource(e *models.Event, logger *slog.Logger) error {
503
var err error
504
did := e.Did
505
···
556
return fmt.Errorf("invalid resource: %w", err)
557
}
558
559
+
logger.Debug("upserting resource from pds request")
560
err = db.UpsertResource(i.Db, resource, resource.Rkey)
561
if err != nil {
562
tx.Rollback()
···
564
}
565
return tx.Commit()
566
case models.CommitOperationDelete:
567
+
logger.Debug("deleting resource from pds request")
568
err = db.DeleteResourceByRkey(i.Db, did, e.Commit.RKey)
569
}
570
if err != nil {
···
574
return nil
575
}
576
577
+
func (i *Ingester) ingestComment(e *models.Event, logger *slog.Logger) error {
578
var err error
579
did := e.Did
580
···
636
CreatedAt: createdAt,
637
}
638
639
+
logger.Debug("upserting comment from pds request")
640
err = db.UpsertComment(i.Db, comment)
641
if err != nil {
642
tx.Rollback()
···
647
if subjectDid.String() != did {
648
err = db.CreateNotification(tx, subjectDid.String(), did, subjectUri.String(), db.NotificationTypeComment)
649
if err != nil {
650
+
logger.Error("failed to create notification record", "err", err)
651
}
652
}
653
···
655
if comment.ParentCommentUri != nil && comment.ParentCommentUri.Authority().String() != did {
656
err = db.CreateNotification(tx, comment.ParentCommentUri.Authority().String(), did, parentCommentUri.String(), db.NotificationTypeReply)
657
if err != nil {
658
+
logger.Error("failed to create notification record", "err", err)
659
}
660
}
661
662
return tx.Commit()
663
case models.CommitOperationDelete:
664
+
logger.Debug("deleting comment from pds request")
665
err = db.DeleteCommentByRkey(i.Db, did, e.Commit.RKey)
666
}
667
if err != nil {
+18
-8
internal/db/xp.go
+18
-8
internal/db/xp.go
···
3
import (
4
"database/sql"
5
"fmt"
6
-
"log"
7
"math"
8
"time"
9
)
···
105
return nil
106
}
107
108
-
func GetXPEventsForUser(e Execer, did string) ([]XpEvent, error) {
109
rows, err := e.Query(`
110
select did, session_rkey, xp_gained, created_at
111
from xp_events
···
129
createdAtStr,
130
)
131
if err != nil {
132
-
log.Println("failed to scan xp event:", err)
133
continue
134
}
135
···
148
return xpEvents, nil
149
}
150
151
-
func UpdateXPForSession(e Execer, updatedSession *StudySession) error {
152
var oldXPGained int
153
err := e.QueryRow(`
154
select xp_gained from xp_events where did = ? and session_rkey = ?
155
`, updatedSession.Did, updatedSession.Rkey).Scan(&oldXPGained)
156
if err != nil {
157
if err == sql.ErrNoRows {
158
-
log.Println("adding xp for new session")
159
return AddXPForSession(e, updatedSession.Did, *updatedSession)
160
}
161
return fmt.Errorf("failed to get old xp for session: %w", err)
···
166
return nil
167
}
168
169
_, err = e.Exec(`
170
insert into xp_events (did, session_rkey, xp_gained)
171
values (?, ?, ?)
···
194
return nil
195
}
196
197
-
func RemoveXPForSession(e Execer, did string, sessionRkey string) error {
198
var xpToRemove int
199
err := e.QueryRow(`
200
select xp_gained from xp_events where did = ? and session_rkey = ?
201
`, did, sessionRkey).Scan(&xpToRemove)
202
-
203
if err != nil {
204
if err == sql.ErrNoRows {
205
-
log.Printf("no xp event found for session '%s', nothing to remove.", sessionRkey)
206
return nil
207
}
208
return fmt.Errorf("failed to get xp for session being deleted: %w", err)
···
213
return fmt.Errorf("failed to delete xp_event: %w", err)
214
}
215
216
var currentTotalXP int
217
err = e.QueryRow("select xp from profiles where did = ?", did).Scan(¤tTotalXP)
218
if err != nil {
···
3
import (
4
"database/sql"
5
"fmt"
6
+
"log/slog"
7
"math"
8
"time"
9
)
···
105
return nil
106
}
107
108
+
func GetXPEventsForUser(e Execer, did string, logger *slog.Logger) ([]XpEvent, error) {
109
+
l := logger.With("handler", "GetXPEventsForUser")
110
+
111
rows, err := e.Query(`
112
select did, session_rkey, xp_gained, created_at
113
from xp_events
···
131
createdAtStr,
132
)
133
if err != nil {
134
+
l.Error("failed to find xp_event", "err", err)
135
continue
136
}
137
···
150
return xpEvents, nil
151
}
152
153
+
func UpdateXPForSession(e Execer, updatedSession *StudySession, logger *slog.Logger) error {
154
+
l := logger.With("handler", "UpdateXPForSession")
155
+
156
var oldXPGained int
157
err := e.QueryRow(`
158
select xp_gained from xp_events where did = ? and session_rkey = ?
159
`, updatedSession.Did, updatedSession.Rkey).Scan(&oldXPGained)
160
if err != nil {
161
if err == sql.ErrNoRows {
162
+
l.Debug("adding xp for session")
163
return AddXPForSession(e, updatedSession.Did, *updatedSession)
164
}
165
return fmt.Errorf("failed to get old xp for session: %w", err)
···
170
return nil
171
}
172
173
+
l.Debug("updating xp for session")
174
+
175
_, err = e.Exec(`
176
insert into xp_events (did, session_rkey, xp_gained)
177
values (?, ?, ?)
···
200
return nil
201
}
202
203
+
func RemoveXPForSession(e Execer, did string, sessionRkey string, logger *slog.Logger) error {
204
+
l := logger.With("handler", "RemoveXPForSession")
205
+
206
var xpToRemove int
207
+
208
err := e.QueryRow(`
209
select xp_gained from xp_events where did = ? and session_rkey = ?
210
`, did, sessionRkey).Scan(&xpToRemove)
211
if err != nil {
212
if err == sql.ErrNoRows {
213
+
l.Debug("xp_event not found, nothing to remove")
214
return nil
215
}
216
return fmt.Errorf("failed to get xp for session being deleted: %w", err)
···
221
return fmt.Errorf("failed to delete xp_event: %w", err)
222
}
223
224
+
l.Debug("deleted xp_event")
225
+
226
var currentTotalXP int
227
err = e.QueryRow("select xp from profiles where did = ?", did).Scan(¤tTotalXP)
228
if err != nil {
History
1 round
0 comments
brookjeynes.dev
submitted
#0
1 commit
expand
collapse
feat(db/xp): improve logging
Signed-off-by: brookjeynes <me@brookjeynes.dev>
expand 0 comments
pull request successfully merged