Signed-off-by: brookjeynes me@brookjeynes.dev
+30
-20
internal/consumer/ingester.go
+30
-20
internal/consumer/ingester.go
···
4
4
"context"
5
5
"encoding/json"
6
6
"fmt"
7
-
"log"
7
+
"log/slog"
8
8
"strings"
9
9
"time"
10
10
···
20
20
type Ingester struct {
21
21
Db db.DbWrapper
22
22
Config *config.Config
23
+
Logger *slog.Logger
23
24
}
24
25
25
26
type processFunc func(ctx context.Context, e *models.Event) error
···
35
36
}
36
37
}()
37
38
39
+
l := i.Logger.With("kind", e.Kind)
38
40
switch e.Kind {
39
41
case models.EventKindCommit:
40
42
switch e.Commit.Collection {
41
43
case yoten.ActorProfileNSID:
44
+
l = l.With("handler", "ingestProfile")
42
45
err = i.ingestProfile(e)
43
46
case yoten.FeedSessionNSID:
47
+
l = l.With("handler", "ingestStudySession")
44
48
err = i.ingestStudySession(e)
45
49
case yoten.ActivityDefNSID:
50
+
l = l.With("handler", "ingestActivityDef")
46
51
err = i.ingestActivityDef(e)
47
52
case yoten.FeedResourceNSID:
53
+
l = l.With("handler", "ingestResource")
48
54
err = i.ingestResource(e)
49
55
case yoten.GraphFollowNSID:
56
+
l = l.With("handler", "ingestFollow")
50
57
err = i.ingestFollow(e)
51
58
case yoten.FeedReactionNSID:
59
+
l = l.With("handler", "ingestReaction")
52
60
err = i.ingestReaction(e)
53
61
case yoten.FeedCommentNSID:
62
+
l = l.With("handler", "ingestComment")
54
63
err = i.ingestComment(e)
55
64
}
65
+
l = i.Logger.With("nsid", e.Commit.Collection)
56
66
}
57
67
if err != nil {
58
-
log.Printf("failed to ingest event for collection %s: %v", e.Commit.Collection, err)
68
+
l.Error("failed to ingest event", "err", err)
59
69
}
60
70
61
71
return nil
···
129
139
return fmt.Errorf("failed to start transaction: %w", err)
130
140
}
131
141
132
-
log.Printf("upserting profile '%s' from pds request", profile.Did)
142
+
i.Logger.Debug("upserting profile from pds request")
133
143
err = db.UpsertProfile(tx, &profile)
134
144
if err != nil {
135
145
tx.Rollback()
···
160
170
161
171
date, err := time.Parse(time.RFC3339, record.Date)
162
172
if err != nil {
163
-
log.Printf("invalid record: %s", err)
173
+
i.Logger.Error("invalid record", "err", err)
164
174
return err
165
175
}
166
176
···
227
237
return fmt.Errorf("failed to start transaction: %w", err)
228
238
}
229
239
230
-
log.Println("upserting study session from pds request")
240
+
i.Logger.Debug("upserting study session from pds request")
231
241
err = db.UpsertStudySession(tx, &studySession, e.Commit.RKey)
232
242
if err != nil {
233
243
tx.Rollback()
···
252
262
return fmt.Errorf("failed to start transaction: %w", err)
253
263
}
254
264
255
-
log.Println("deleting study session from pds request")
265
+
i.Logger.Debug("deleting study session from pds request")
256
266
err = db.DeleteStudySessionByRkey(tx, did, e.Commit.RKey)
257
267
if err != nil {
258
268
tx.Rollback()
···
344
354
return fmt.Errorf("failed to start transaction: %w", err)
345
355
}
346
356
347
-
log.Println("upserting activity def from pds request")
357
+
i.Logger.Debug("upserting activity def from pds request")
348
358
err = db.UpsertActivityDef(tx, &activityDef, e.Commit.RKey)
349
359
if err != nil {
350
360
tx.Rollback()
···
352
362
}
353
363
return tx.Commit()
354
364
case models.CommitOperationDelete:
355
-
log.Println("deleting activity def from pds request")
365
+
i.Logger.Debug("deleting activity def from pds request")
356
366
err = db.DeleteActivityDefByRkey(i.Db, did, e.Commit.RKey)
357
367
}
358
368
if err != nil {
···
387
397
388
398
subjectDid := record.Subject
389
399
390
-
log.Println("upserting follow from pds request")
400
+
i.Logger.Debug("upserting follow from pds request")
391
401
err = db.AddFollow(tx, did, subjectDid, e.Commit.RKey)
392
402
if err != nil {
393
403
tx.Rollback()
···
397
407
subjectUri := fmt.Sprintf("at://%s/%s/%s", did, yoten.GraphFollowNSID, e.Commit.RKey)
398
408
err = db.CreateNotification(tx, subjectDid, did, subjectUri, db.NotificationTypeFollow)
399
409
if err != nil {
400
-
log.Println("failed to create notification record:", err)
410
+
i.Logger.Error("failed to create notification record", "err", err)
401
411
}
402
412
403
413
return tx.Commit()
404
414
case models.CommitOperationDelete:
405
-
log.Println("deleting follow from pds request")
415
+
i.Logger.Debug("deleting follow from pds request")
406
416
err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
407
417
}
408
418
if err != nil {
···
465
475
CreatedAt: createdAt,
466
476
}
467
477
468
-
log.Println("upserting reaction from pds request")
478
+
i.Logger.Debug("upserting reaction from pds request")
469
479
err = db.UpsertReaction(i.Db, reactionEvent)
470
480
if err != nil {
471
481
tx.Rollback()
···
474
484
475
485
err = db.CreateNotification(tx, subjectDid.String(), did, subject.String(), db.NotificationTypeReaction)
476
486
if err != nil {
477
-
log.Println("failed to create notification record:", err)
487
+
i.Logger.Error("failed to create notification record", "err", err)
478
488
}
479
489
480
490
return tx.Commit()
481
491
case models.CommitOperationDelete:
482
-
log.Println("deleting reaction from pds request")
492
+
i.Logger.Debug("deleting reaction from pds request")
483
493
err = db.DeleteReactionByRkey(i.Db, did, e.Commit.RKey)
484
494
}
485
495
if err != nil {
···
546
556
return fmt.Errorf("invalid resource: %w", err)
547
557
}
548
558
549
-
log.Println("upserting resource from pds request")
559
+
i.Logger.Debug("upserting resource from pds request")
550
560
err = db.UpsertResource(i.Db, resource, resource.Rkey)
551
561
if err != nil {
552
562
tx.Rollback()
···
554
564
}
555
565
return tx.Commit()
556
566
case models.CommitOperationDelete:
557
-
log.Println("deleting resource from pds request")
567
+
i.Logger.Debug("deleting resource from pds request")
558
568
err = db.DeleteResourceByRkey(i.Db, did, e.Commit.RKey)
559
569
}
560
570
if err != nil {
···
626
636
CreatedAt: createdAt,
627
637
}
628
638
629
-
log.Println("upserting comment from pds request")
639
+
i.Logger.Debug("upserting comment from pds request")
630
640
err = db.UpsertComment(i.Db, comment)
631
641
if err != nil {
632
642
tx.Rollback()
···
637
647
if subjectDid.String() != did {
638
648
err = db.CreateNotification(tx, subjectDid.String(), did, subjectUri.String(), db.NotificationTypeComment)
639
649
if err != nil {
640
-
log.Println("failed to create notification record:", err)
650
+
i.Logger.Error("failed to create notification record", "err", err)
641
651
}
642
652
}
643
653
···
645
655
if comment.ParentCommentUri != nil && comment.ParentCommentUri.Authority().String() != did {
646
656
err = db.CreateNotification(tx, comment.ParentCommentUri.Authority().String(), did, parentCommentUri.String(), db.NotificationTypeReply)
647
657
if err != nil {
648
-
log.Println("failed to create notification record:", err)
658
+
i.Logger.Error("failed to create notification record", "err", err)
649
659
}
650
660
}
651
661
652
662
return tx.Commit()
653
663
case models.CommitOperationDelete:
654
-
log.Println("deleting comment from pds request")
664
+
i.Logger.Debug("deleting comment from pds request")
655
665
err = db.DeleteCommentByRkey(i.Db, did, e.Commit.RKey)
656
666
}
657
667
if err != nil {
+4
-2
internal/server/app.go
+4
-2
internal/server/app.go
···
85
85
yoten.GraphFollowNSID,
86
86
},
87
87
nil,
88
-
slog.Default(),
88
+
log.SubLogger(logger, "jetstream"),
89
89
wrapper,
90
90
false,
91
91
)
···
95
95
96
96
ingester := consumer.Ingester{
97
97
Db: wrapper,
98
-
Config: config}
98
+
Config: config,
99
+
Logger: log.SubLogger(logger, "ingester"),
100
+
}
99
101
err = jc.StartJetstream(ctx, ingester.Ingest())
100
102
if err != nil {
101
103
return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
History
1 round
0 comments
brookjeynes.dev
submitted
#0
1 commit
expand
collapse
feat(consumer/ingester): use slogger
Signed-off-by: brookjeynes <me@brookjeynes.dev>
expand 0 comments
pull request successfully merged