Y艒ten: A social tracker for your language learning journey built on the atproto.

feat(consumer/ingester): use slogger #18

merged opened by brookjeynes.dev targeting master from push-trrpxxyxxmot
Labels

None yet.

Participants 1
AT URI
at://did:plc:4mj54vc4ha3lh32ksxwunnbh/sh.tangled.repo.pull/3m3bmdlgbm222
+34 -22
Diff #0
+30 -20
internal/consumer/ingester.go
··· 4 4 "context" 5 5 "encoding/json" 6 6 "fmt" 7 - "log" 7 + "log/slog" 8 8 "strings" 9 9 "time" 10 10 ··· 20 20 type Ingester struct { 21 21 Db db.DbWrapper 22 22 Config *config.Config 23 + Logger *slog.Logger 23 24 } 24 25 25 26 type processFunc func(ctx context.Context, e *models.Event) error ··· 35 36 } 36 37 }() 37 38 39 + l := i.Logger.With("kind", e.Kind) 38 40 switch e.Kind { 39 41 case models.EventKindCommit: 40 42 switch e.Commit.Collection { 41 43 case yoten.ActorProfileNSID: 44 + l = l.With("handler", "ingestProfile") 42 45 err = i.ingestProfile(e) 43 46 case yoten.FeedSessionNSID: 47 + l = l.With("handler", "ingestStudySession") 44 48 err = i.ingestStudySession(e) 45 49 case yoten.ActivityDefNSID: 50 + l = l.With("handler", "ingestActivityDef") 46 51 err = i.ingestActivityDef(e) 47 52 case yoten.FeedResourceNSID: 53 + l = l.With("handler", "ingestResource") 48 54 err = i.ingestResource(e) 49 55 case yoten.GraphFollowNSID: 56 + l = l.With("handler", "ingestFollow") 50 57 err = i.ingestFollow(e) 51 58 case yoten.FeedReactionNSID: 59 + l = l.With("handler", "ingestReaction") 52 60 err = i.ingestReaction(e) 53 61 case yoten.FeedCommentNSID: 62 + l = l.With("handler", "ingestComment") 54 63 err = i.ingestComment(e) 55 64 } 65 + l = i.Logger.With("nsid", e.Commit.Collection) 56 66 } 57 67 if err != nil { 58 - log.Printf("failed to ingest event for collection %s: %v", e.Commit.Collection, err) 68 + l.Error("failed to ingest event", "err", err) 59 69 } 60 70 61 71 return nil ··· 129 139 return fmt.Errorf("failed to start transaction: %w", err) 130 140 } 131 141 132 - log.Printf("upserting profile '%s' from pds request", profile.Did) 142 + i.Logger.Debug("upserting profile from pds request") 133 143 err = db.UpsertProfile(tx, &profile) 134 144 if err != nil { 135 145 tx.Rollback() ··· 160 170 161 171 date, err := time.Parse(time.RFC3339, record.Date) 162 172 if err != nil { 163 - log.Printf("invalid record: %s", err) 173 + i.Logger.Error("invalid record", "err", err) 164 174 return err 165 175 } 166 176 ··· 227 237 return fmt.Errorf("failed to start transaction: %w", err) 228 238 } 229 239 230 - log.Println("upserting study session from pds request") 240 + i.Logger.Debug("upserting study session from pds request") 231 241 err = db.UpsertStudySession(tx, &studySession, e.Commit.RKey) 232 242 if err != nil { 233 243 tx.Rollback() ··· 252 262 return fmt.Errorf("failed to start transaction: %w", err) 253 263 } 254 264 255 - log.Println("deleting study session from pds request") 265 + i.Logger.Debug("deleting study session from pds request") 256 266 err = db.DeleteStudySessionByRkey(tx, did, e.Commit.RKey) 257 267 if err != nil { 258 268 tx.Rollback() ··· 344 354 return fmt.Errorf("failed to start transaction: %w", err) 345 355 } 346 356 347 - log.Println("upserting activity def from pds request") 357 + i.Logger.Debug("upserting activity def from pds request") 348 358 err = db.UpsertActivityDef(tx, &activityDef, e.Commit.RKey) 349 359 if err != nil { 350 360 tx.Rollback() ··· 352 362 } 353 363 return tx.Commit() 354 364 case models.CommitOperationDelete: 355 - log.Println("deleting activity def from pds request") 365 + i.Logger.Debug("deleting activity def from pds request") 356 366 err = db.DeleteActivityDefByRkey(i.Db, did, e.Commit.RKey) 357 367 } 358 368 if err != nil { ··· 387 397 388 398 subjectDid := record.Subject 389 399 390 - log.Println("upserting follow from pds request") 400 + i.Logger.Debug("upserting follow from pds request") 391 401 err = db.AddFollow(tx, did, subjectDid, e.Commit.RKey) 392 402 if err != nil { 393 403 tx.Rollback() ··· 397 407 subjectUri := fmt.Sprintf("at://%s/%s/%s", did, yoten.GraphFollowNSID, e.Commit.RKey) 398 408 err = db.CreateNotification(tx, subjectDid, did, subjectUri, db.NotificationTypeFollow) 399 409 if err != nil { 400 - log.Println("failed to create notification record:", err) 410 + i.Logger.Error("failed to create notification record", "err", err) 401 411 } 402 412 403 413 return tx.Commit() 404 414 case models.CommitOperationDelete: 405 - log.Println("deleting follow from pds request") 415 + i.Logger.Debug("deleting follow from pds request") 406 416 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 407 417 } 408 418 if err != nil { ··· 465 475 CreatedAt: createdAt, 466 476 } 467 477 468 - log.Println("upserting reaction from pds request") 478 + i.Logger.Debug("upserting reaction from pds request") 469 479 err = db.UpsertReaction(i.Db, reactionEvent) 470 480 if err != nil { 471 481 tx.Rollback() ··· 474 484 475 485 err = db.CreateNotification(tx, subjectDid.String(), did, subject.String(), db.NotificationTypeReaction) 476 486 if err != nil { 477 - log.Println("failed to create notification record:", err) 487 + i.Logger.Error("failed to create notification record", "err", err) 478 488 } 479 489 480 490 return tx.Commit() 481 491 case models.CommitOperationDelete: 482 - log.Println("deleting reaction from pds request") 492 + i.Logger.Debug("deleting reaction from pds request") 483 493 err = db.DeleteReactionByRkey(i.Db, did, e.Commit.RKey) 484 494 } 485 495 if err != nil { ··· 546 556 return fmt.Errorf("invalid resource: %w", err) 547 557 } 548 558 549 - log.Println("upserting resource from pds request") 559 + i.Logger.Debug("upserting resource from pds request") 550 560 err = db.UpsertResource(i.Db, resource, resource.Rkey) 551 561 if err != nil { 552 562 tx.Rollback() ··· 554 564 } 555 565 return tx.Commit() 556 566 case models.CommitOperationDelete: 557 - log.Println("deleting resource from pds request") 567 + i.Logger.Debug("deleting resource from pds request") 558 568 err = db.DeleteResourceByRkey(i.Db, did, e.Commit.RKey) 559 569 } 560 570 if err != nil { ··· 626 636 CreatedAt: createdAt, 627 637 } 628 638 629 - log.Println("upserting comment from pds request") 639 + i.Logger.Debug("upserting comment from pds request") 630 640 err = db.UpsertComment(i.Db, comment) 631 641 if err != nil { 632 642 tx.Rollback() ··· 637 647 if subjectDid.String() != did { 638 648 err = db.CreateNotification(tx, subjectDid.String(), did, subjectUri.String(), db.NotificationTypeComment) 639 649 if err != nil { 640 - log.Println("failed to create notification record:", err) 650 + i.Logger.Error("failed to create notification record", "err", err) 641 651 } 642 652 } 643 653 ··· 645 655 if comment.ParentCommentUri != nil && comment.ParentCommentUri.Authority().String() != did { 646 656 err = db.CreateNotification(tx, comment.ParentCommentUri.Authority().String(), did, parentCommentUri.String(), db.NotificationTypeReply) 647 657 if err != nil { 648 - log.Println("failed to create notification record:", err) 658 + i.Logger.Error("failed to create notification record", "err", err) 649 659 } 650 660 } 651 661 652 662 return tx.Commit() 653 663 case models.CommitOperationDelete: 654 - log.Println("deleting comment from pds request") 664 + i.Logger.Debug("deleting comment from pds request") 655 665 err = db.DeleteCommentByRkey(i.Db, did, e.Commit.RKey) 656 666 } 657 667 if err != nil {
+4 -2
internal/server/app.go
··· 85 85 yoten.GraphFollowNSID, 86 86 }, 87 87 nil, 88 - slog.Default(), 88 + log.SubLogger(logger, "jetstream"), 89 89 wrapper, 90 90 false, 91 91 ) ··· 95 95 96 96 ingester := consumer.Ingester{ 97 97 Db: wrapper, 98 - Config: config} 98 + Config: config, 99 + Logger: log.SubLogger(logger, "ingester"), 100 + } 99 101 err = jc.StartJetstream(ctx, ingester.Ingest()) 100 102 if err != nil { 101 103 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)

History

1 round 0 comments
sign up or login to add to the discussion
brookjeynes.dev submitted #0
1 commit
expand
feat(consumer/ingester): use slogger
expand 0 comments
pull request successfully merged