Signed-off-by: oppiliappan me@oppi.li
+20
-31
appview/notify/merged_notifier.go
+20
-31
appview/notify/merged_notifier.go
···
2
2
3
3
import (
4
4
"context"
5
-
"log/slog"
6
-
"reflect"
7
5
"sync"
8
6
9
7
"github.com/bluesky-social/indigo/atproto/syntax"
10
8
"tangled.org/core/appview/models"
11
-
"tangled.org/core/log"
12
9
)
13
10
14
11
type mergedNotifier struct {
15
12
notifiers []Notifier
16
-
logger *slog.Logger
17
13
}
18
14
19
-
func NewMergedNotifier(notifiers []Notifier, logger *slog.Logger) Notifier {
20
-
return &mergedNotifier{notifiers, logger}
15
+
func NewMergedNotifier(notifiers []Notifier) Notifier {
16
+
return &mergedNotifier{notifiers}
21
17
}
22
18
23
19
var _ Notifier = &mergedNotifier{}
24
20
25
21
// fanout calls the same method on all notifiers concurrently
26
-
func (m *mergedNotifier) fanout(method string, ctx context.Context, args ...any) {
27
-
ctx = log.IntoContext(ctx, m.logger.With("method", method))
22
+
func (m *mergedNotifier) fanout(callback func(Notifier)) {
28
23
var wg sync.WaitGroup
29
24
for _, n := range m.notifiers {
30
25
wg.Add(1)
31
26
go func(notifier Notifier) {
32
27
defer wg.Done()
33
-
v := reflect.ValueOf(notifier).MethodByName(method)
34
-
in := make([]reflect.Value, len(args)+1)
35
-
in[0] = reflect.ValueOf(ctx)
36
-
for i, arg := range args {
37
-
in[i+1] = reflect.ValueOf(arg)
38
-
}
39
-
v.Call(in)
28
+
callback(n)
40
29
}(n)
41
30
}
42
31
}
43
32
44
33
func (m *mergedNotifier) NewRepo(ctx context.Context, repo *models.Repo) {
45
-
m.fanout("NewRepo", ctx, repo)
34
+
m.fanout(func(n Notifier) { n.NewRepo(ctx, repo) })
46
35
}
47
36
48
37
func (m *mergedNotifier) NewStar(ctx context.Context, star *models.Star) {
49
-
m.fanout("NewStar", ctx, star)
38
+
m.fanout(func(n Notifier) { n.NewStar(ctx, star) })
50
39
}
51
40
52
41
func (m *mergedNotifier) DeleteStar(ctx context.Context, star *models.Star) {
53
-
m.fanout("DeleteStar", ctx, star)
42
+
m.fanout(func(n Notifier) { n.DeleteStar(ctx, star) })
54
43
}
55
44
56
45
func (m *mergedNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) {
57
-
m.fanout("NewIssue", ctx, issue, mentions)
46
+
m.fanout(func(n Notifier) { n.NewIssue(ctx, issue, mentions) })
58
47
}
59
48
60
49
func (m *mergedNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment, mentions []syntax.DID) {
61
-
m.fanout("NewIssueComment", ctx, comment, mentions)
50
+
m.fanout(func(n Notifier) { n.NewIssueComment(ctx, comment, mentions) })
62
51
}
63
52
64
53
func (m *mergedNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) {
65
-
m.fanout("NewIssueState", ctx, actor, issue)
54
+
m.fanout(func(n Notifier) { n.NewIssueState(ctx, actor, issue) })
66
55
}
67
56
68
57
func (m *mergedNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) {
69
-
m.fanout("DeleteIssue", ctx, issue)
58
+
m.fanout(func(n Notifier) { n.DeleteIssue(ctx, issue) })
70
59
}
71
60
72
61
func (m *mergedNotifier) NewFollow(ctx context.Context, follow *models.Follow) {
73
-
m.fanout("NewFollow", ctx, follow)
62
+
m.fanout(func(n Notifier) { n.NewFollow(ctx, follow) })
74
63
}
75
64
76
65
func (m *mergedNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) {
77
-
m.fanout("DeleteFollow", ctx, follow)
66
+
m.fanout(func(n Notifier) { n.DeleteFollow(ctx, follow) })
78
67
}
79
68
80
69
func (m *mergedNotifier) NewPull(ctx context.Context, pull *models.Pull) {
81
-
m.fanout("NewPull", ctx, pull)
70
+
m.fanout(func(n Notifier) { n.NewPull(ctx, pull) })
82
71
}
83
72
84
73
func (m *mergedNotifier) NewPullComment(ctx context.Context, comment *models.PullComment, mentions []syntax.DID) {
85
-
m.fanout("NewPullComment", ctx, comment, mentions)
74
+
m.fanout(func(n Notifier) { n.NewPullComment(ctx, comment, mentions) })
86
75
}
87
76
88
77
func (m *mergedNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) {
89
-
m.fanout("NewPullState", ctx, actor, pull)
78
+
m.fanout(func(n Notifier) { n.NewPullState(ctx, actor, pull) })
90
79
}
91
80
92
81
func (m *mergedNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) {
93
-
m.fanout("UpdateProfile", ctx, profile)
82
+
m.fanout(func(n Notifier) { n.UpdateProfile(ctx, profile) })
94
83
}
95
84
96
85
func (m *mergedNotifier) NewString(ctx context.Context, s *models.String) {
97
-
m.fanout("NewString", ctx, s)
86
+
m.fanout(func(n Notifier) { n.NewString(ctx, s) })
98
87
}
99
88
100
89
func (m *mergedNotifier) EditString(ctx context.Context, s *models.String) {
101
-
m.fanout("EditString", ctx, s)
90
+
m.fanout(func(n Notifier) { n.EditString(ctx, s) })
102
91
}
103
92
104
93
func (m *mergedNotifier) DeleteString(ctx context.Context, did, rkey string) {
105
-
m.fanout("DeleteString", ctx, did, rkey)
94
+
m.fanout(func(n Notifier) { n.DeleteString(ctx, did, rkey) })
106
95
}
+42
-14
appview/notify/db/db.go
+42
-14
appview/notify/db/db.go
···
2
2
3
3
import (
4
4
"context"
5
-
"log"
6
5
"slices"
7
6
8
7
"github.com/bluesky-social/indigo/atproto/syntax"
···
11
10
"tangled.org/core/appview/models"
12
11
"tangled.org/core/appview/notify"
13
12
"tangled.org/core/idresolver"
13
+
"tangled.org/core/log"
14
14
"tangled.org/core/orm"
15
15
"tangled.org/core/sets"
16
16
)
···
38
38
}
39
39
40
40
func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) {
41
+
l := log.FromContext(ctx)
42
+
41
43
if star.RepoAt.Collection().String() != tangled.RepoNSID {
42
44
// skip string stars for now
43
45
return
···
45
47
var err error
46
48
repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(star.RepoAt)))
47
49
if err != nil {
48
-
log.Printf("NewStar: failed to get repos: %v", err)
50
+
l.Error("failed to get repos", "err", err)
49
51
return
50
52
}
51
53
···
59
61
var pullId *int64
60
62
61
63
n.notifyEvent(
64
+
ctx,
62
65
actorDid,
63
66
recipients,
64
67
eventType,
···
75
78
}
76
79
77
80
func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) {
81
+
l := log.FromContext(ctx)
82
+
78
83
collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt()))
79
84
if err != nil {
80
-
log.Printf("failed to fetch collaborators: %v", err)
85
+
l.Error("failed to fetch collaborators", "err", err)
81
86
return
82
87
}
83
88
···
101
106
var pullId *int64
102
107
103
108
n.notifyEvent(
109
+
ctx,
104
110
actorDid,
105
111
recipients,
106
112
models.NotificationTypeIssueCreated,
···
111
117
pullId,
112
118
)
113
119
n.notifyEvent(
120
+
ctx,
114
121
actorDid,
115
122
sets.Collect(slices.Values(mentions)),
116
123
models.NotificationTypeUserMentioned,
···
123
130
}
124
131
125
132
func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment, mentions []syntax.DID) {
133
+
l := log.FromContext(ctx)
134
+
126
135
issues, err := db.GetIssues(n.db, orm.FilterEq("at_uri", comment.IssueAt))
127
136
if err != nil {
128
-
log.Printf("NewIssueComment: failed to get issues: %v", err)
137
+
l.Error("failed to get issues", "err", err)
129
138
return
130
139
}
131
140
if len(issues) == 0 {
132
-
log.Printf("NewIssueComment: no issue found for %s", comment.IssueAt)
141
+
l.Error("no issue found for", "err", comment.IssueAt)
133
142
return
134
143
}
135
144
issue := issues[0]
···
170
179
var pullId *int64
171
180
172
181
n.notifyEvent(
182
+
ctx,
173
183
actorDid,
174
184
recipients,
175
185
models.NotificationTypeIssueCommented,
···
180
190
pullId,
181
191
)
182
192
n.notifyEvent(
193
+
ctx,
183
194
actorDid,
184
195
sets.Collect(slices.Values(mentions)),
185
196
models.NotificationTypeUserMentioned,
···
204
215
var repoId, issueId, pullId *int64
205
216
206
217
n.notifyEvent(
218
+
ctx,
207
219
actorDid,
208
220
recipients,
209
221
eventType,
···
220
232
}
221
233
222
234
func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) {
235
+
l := log.FromContext(ctx)
236
+
223
237
repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt)))
224
238
if err != nil {
225
-
log.Printf("NewPull: failed to get repos: %v", err)
239
+
l.Error("failed to get repos", "err", err)
226
240
return
227
241
}
228
242
collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt()))
229
243
if err != nil {
230
-
log.Printf("failed to fetch collaborators: %v", err)
244
+
l.Error("failed to fetch collaborators", "err", err)
231
245
return
232
246
}
233
247
···
249
263
pullId := &p
250
264
251
265
n.notifyEvent(
266
+
ctx,
252
267
actorDid,
253
268
recipients,
254
269
eventType,
···
261
276
}
262
277
263
278
func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.PullComment, mentions []syntax.DID) {
279
+
l := log.FromContext(ctx)
280
+
264
281
pull, err := db.GetPull(n.db,
265
282
syntax.ATURI(comment.RepoAt),
266
283
comment.PullId,
267
284
)
268
285
if err != nil {
269
-
log.Printf("NewPullComment: failed to get pulls: %v", err)
286
+
l.Error("failed to get pulls", "err", err)
270
287
return
271
288
}
272
289
273
290
repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", comment.RepoAt))
274
291
if err != nil {
275
-
log.Printf("NewPullComment: failed to get repos: %v", err)
292
+
l.Error("failed to get repos", "err", err)
276
293
return
277
294
}
278
295
···
298
315
pullId := &p
299
316
300
317
n.notifyEvent(
318
+
ctx,
301
319
actorDid,
302
320
recipients,
303
321
eventType,
···
308
326
pullId,
309
327
)
310
328
n.notifyEvent(
329
+
ctx,
311
330
actorDid,
312
331
sets.Collect(slices.Values(mentions)),
313
332
models.NotificationTypeUserMentioned,
···
336
355
}
337
356
338
357
func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) {
358
+
l := log.FromContext(ctx)
359
+
339
360
collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt()))
340
361
if err != nil {
341
-
log.Printf("failed to fetch collaborators: %v", err)
362
+
l.Error("failed to fetch collaborators", "err", err)
342
363
return
343
364
}
344
365
···
368
389
}
369
390
370
391
n.notifyEvent(
392
+
ctx,
371
393
actor,
372
394
recipients,
373
395
eventType,
···
380
402
}
381
403
382
404
func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) {
405
+
l := log.FromContext(ctx)
406
+
383
407
// Get repo details
384
408
repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt)))
385
409
if err != nil {
386
-
log.Printf("NewPullState: failed to get repos: %v", err)
410
+
l.Error("failed to get repos", "err", err)
387
411
return
388
412
}
389
413
390
414
collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt()))
391
415
if err != nil {
392
-
log.Printf("failed to fetch collaborators: %v", err)
416
+
l.Error("failed to fetch collaborators", "err", err)
393
417
return
394
418
}
395
419
···
417
441
case models.PullMerged:
418
442
eventType = models.NotificationTypePullMerged
419
443
default:
420
-
log.Println("NewPullState: unexpected new PR state:", pull.State)
444
+
l.Error("unexpected new PR state", "state", pull.State)
421
445
return
422
446
}
423
447
p := int64(pull.ID)
424
448
pullId := &p
425
449
426
450
n.notifyEvent(
451
+
ctx,
427
452
actor,
428
453
recipients,
429
454
eventType,
···
436
461
}
437
462
438
463
func (n *databaseNotifier) notifyEvent(
464
+
ctx context.Context,
439
465
actorDid syntax.DID,
440
466
recipients sets.Set[syntax.DID],
441
467
eventType models.NotificationType,
···
445
471
issueId *int64,
446
472
pullId *int64,
447
473
) {
474
+
l := log.FromContext(ctx)
475
+
448
476
// if the user is attempting to mention >maxMentions users, this is probably spam, do not mention anybody
449
477
if eventType == models.NotificationTypeUserMentioned && recipients.Len() > maxMentions {
450
478
return
···
494
522
}
495
523
496
524
if err := db.CreateNotification(tx, notif); err != nil {
497
-
log.Printf("notifyEvent: failed to create notification for %s: %v", recipientDid, err)
525
+
l.Error("failed to create notification", "recipientDid", recipientDid, "err", err)
498
526
}
499
527
}
500
528
+105
appview/notify/logging_notifier.go
+105
appview/notify/logging_notifier.go
···
1
+
package notify
2
+
3
+
import (
4
+
"context"
5
+
"log/slog"
6
+
7
+
"tangled.org/core/appview/models"
8
+
tlog "tangled.org/core/log"
9
+
10
+
"github.com/bluesky-social/indigo/atproto/syntax"
11
+
)
12
+
13
+
type loggingNotifier struct {
14
+
inner Notifier
15
+
logger *slog.Logger
16
+
}
17
+
18
+
func NewLoggingNotifier(inner Notifier, logger *slog.Logger) Notifier {
19
+
return &loggingNotifier{
20
+
inner,
21
+
logger,
22
+
}
23
+
}
24
+
25
+
var _ Notifier = &loggingNotifier{}
26
+
27
+
func (l *loggingNotifier) NewRepo(ctx context.Context, repo *models.Repo) {
28
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "NewRepo"))
29
+
l.inner.NewRepo(ctx, repo)
30
+
}
31
+
32
+
func (l *loggingNotifier) NewStar(ctx context.Context, star *models.Star) {
33
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "NewStar"))
34
+
l.inner.NewStar(ctx, star)
35
+
}
36
+
37
+
func (l *loggingNotifier) DeleteStar(ctx context.Context, star *models.Star) {
38
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "DeleteStar"))
39
+
l.inner.DeleteStar(ctx, star)
40
+
}
41
+
42
+
func (l *loggingNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) {
43
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "NewIssue"))
44
+
l.inner.NewIssue(ctx, issue, mentions)
45
+
}
46
+
47
+
func (l *loggingNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment, mentions []syntax.DID) {
48
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "NewIssueComment"))
49
+
l.inner.NewIssueComment(ctx, comment, mentions)
50
+
}
51
+
52
+
func (l *loggingNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) {
53
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "NewIssueState"))
54
+
l.inner.NewIssueState(ctx, actor, issue)
55
+
}
56
+
57
+
func (l *loggingNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) {
58
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "DeleteIssue"))
59
+
l.inner.DeleteIssue(ctx, issue)
60
+
}
61
+
62
+
func (l *loggingNotifier) NewFollow(ctx context.Context, follow *models.Follow) {
63
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "NewFollow"))
64
+
l.inner.NewFollow(ctx, follow)
65
+
}
66
+
67
+
func (l *loggingNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) {
68
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "DeleteFollow"))
69
+
l.inner.DeleteFollow(ctx, follow)
70
+
}
71
+
72
+
func (l *loggingNotifier) NewPull(ctx context.Context, pull *models.Pull) {
73
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "NewPull"))
74
+
l.inner.NewPull(ctx, pull)
75
+
}
76
+
77
+
func (l *loggingNotifier) NewPullComment(ctx context.Context, comment *models.PullComment, mentions []syntax.DID) {
78
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "NewPullComment"))
79
+
l.inner.NewPullComment(ctx, comment, mentions)
80
+
}
81
+
82
+
func (l *loggingNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) {
83
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "NewPullState"))
84
+
l.inner.NewPullState(ctx, actor, pull)
85
+
}
86
+
87
+
func (l *loggingNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) {
88
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "UpdateProfile"))
89
+
l.inner.UpdateProfile(ctx, profile)
90
+
}
91
+
92
+
func (l *loggingNotifier) NewString(ctx context.Context, s *models.String) {
93
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "NewString"))
94
+
l.inner.NewString(ctx, s)
95
+
}
96
+
97
+
func (l *loggingNotifier) EditString(ctx context.Context, s *models.String) {
98
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "EditString"))
99
+
l.inner.EditString(ctx, s)
100
+
}
101
+
102
+
func (l *loggingNotifier) DeleteString(ctx context.Context, did, rkey string) {
103
+
ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "DeleteString"))
104
+
l.inner.DeleteString(ctx, did, rkey)
105
+
}
+2
-1
appview/state/state.go
+2
-1
appview/state/state.go
···
173
173
notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
174
174
}
175
175
notifiers = append(notifiers, indexer)
176
-
notifier := notify.NewMergedNotifier(notifiers, tlog.SubLogger(logger, "notify"))
176
+
notifier := notify.NewMergedNotifier(notifiers)
177
+
notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
177
178
178
179
state := &State{
179
180
d,
History
1 round
0 comments
oppi.li
submitted
#0
2 commits
expand
collapse
appview/notify: remove reflection usage in mergedNotifier
Signed-off-by: oppiliappan <me@oppi.li>
appview/notify: introduce loggingNotifier as a separate notifier
this was previously clubbed into mergedNotifier. this changeset also
employes the new context-logger in every notifier.
Signed-off-by: oppiliappan <me@oppi.li>
1/3 failed, 2/3 success
expand
collapse
expand 0 comments
closed without merging