its for when you want to get like notifications for your reposts

refactor: use cornelk hashmap instead of using a map with a mutex lol

ptr.pet b16df182 ad439ec8

verified
+23 -36
+1
go.mod
··· 5 require ( 6 github.com/bluesky-social/indigo v0.0.0-20250606055443-008e4ed915ad 7 github.com/bluesky-social/jetstream v0.0.0-20250414024304-d17bd81a945e 8 github.com/gorilla/mux v1.8.1 9 github.com/gorilla/websocket v1.5.3 10 )
··· 5 require ( 6 github.com/bluesky-social/indigo v0.0.0-20250606055443-008e4ed915ad 7 github.com/bluesky-social/jetstream v0.0.0-20250414024304-d17bd81a945e 8 + github.com/cornelk/hashmap v1.0.8 9 github.com/gorilla/mux v1.8.1 10 github.com/gorilla/websocket v1.5.3 11 )
+2
go.sum
··· 12 github.com/caseyho/jetstream v0.0.0-20250310034359-bee7b7fc4d0f/go.mod h1:WiYEeyJSdUwqoaZ71KJSpTblemUCpwJfh5oVXplK6T4= 13 github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= 14 github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= 15 github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= 16 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= 17 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
··· 12 github.com/caseyho/jetstream v0.0.0-20250310034359-bee7b7fc4d0f/go.mod h1:WiYEeyJSdUwqoaZ71KJSpTblemUCpwJfh5oVXplK6T4= 13 github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= 14 github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= 15 + github.com/cornelk/hashmap v1.0.8 h1:nv0AWgw02n+iDcawr5It4CjQIAcdMMKRrs10HOJYlrc= 16 + github.com/cornelk/hashmap v1.0.8/go.mod h1:RfZb7JO3RviW/rT6emczVuC/oxpdz4UsSB2LJSclR1k= 17 github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= 18 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= 19 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+20 -36
main.go
··· 6 "log" 7 "log/slog" 8 "net/http" 9 - "sync" 10 11 "github.com/bluesky-social/indigo/api/bsky" 12 "github.com/bluesky-social/indigo/xrpc" 13 "github.com/bluesky-social/jetstream/pkg/client" 14 "github.com/bluesky-social/jetstream/pkg/models" 15 "github.com/gorilla/mux" 16 "github.com/gorilla/websocket" 17 ) ··· 34 35 // Global state 36 var ( 37 - subscribers = make(map[string]*SubscriberData) 38 - subscribersMux sync.RWMutex 39 40 likeStream *client.Client 41 subscriberStream *client.Client ··· 50 ) 51 52 func getFollowsDids() []string { 53 - subscribersMux.RLock() 54 - defer subscribersMux.RUnlock() 55 - 56 var dids []string 57 - for _, subscriber := range subscribers { 58 - for follow, _ := range subscriber.ListenTo { 59 dids = append(dids, follow) 60 } 61 - } 62 - 63 return dids 64 } 65 66 func getSubscriberDids() []string { 67 - subscribersMux.RLock() 68 - defer subscribersMux.RUnlock() 69 - 70 - var dids []string 71 - for did := range subscribers { 72 - dids = append(dids, did) 73 - } 74 - 75 return dids 76 } 77 ··· 137 Reposts: reposts, 138 } 139 140 - subscribersMux.Lock() 141 - subscribers[did] = subscriber 142 - subscribersMux.Unlock() 143 updateSubscriberStreamOpts() 144 updateLikeStreamOpts() 145 // delete subscriber after we are done 146 defer func() { 147 - subscribersMux.Lock() 148 - delete(subscribers, did) 149 - subscribersMux.Unlock() 150 updateSubscriberStreamOpts() 151 updateLikeStreamOpts() 152 }() ··· 215 return nil 216 } 217 218 - subscribersMux.RLock() 219 - defer subscribersMux.RUnlock() 220 - 221 - for _, subscriber := range subscribers { 222 - for repostURI, _ := range subscriber.Reposts { 223 // (un)liked a post the subscriber reposted 224 if like.Subject.Uri == repostURI { 225 notification := NotificationMessage{ ··· 228 RepostURI: repostURI, 229 } 230 231 - if err := subscriber.Conn.WriteJSON(notification); err != nil { 232 - logger.Error("Failed to send notification", "subscriber", subscriber.DID, "error", err) 233 } 234 } 235 } 236 - } 237 238 return nil 239 } ··· 278 return nil 279 } 280 281 - subscribersMux.Lock() 282 - defer subscribersMux.Unlock() 283 - 284 - if subscriber, exists := subscribers[event.Did]; exists { 285 if event.Commit.Operation == models.CommitOperationDelete { 286 onDelete(subscriber, data) 287 } else {
··· 6 "log" 7 "log/slog" 8 "net/http" 9 10 "github.com/bluesky-social/indigo/api/bsky" 11 "github.com/bluesky-social/indigo/xrpc" 12 "github.com/bluesky-social/jetstream/pkg/client" 13 "github.com/bluesky-social/jetstream/pkg/models" 14 + "github.com/cornelk/hashmap" 15 "github.com/gorilla/mux" 16 "github.com/gorilla/websocket" 17 ) ··· 34 35 // Global state 36 var ( 37 + subscribers = hashmap.New[string, *SubscriberData]() 38 39 likeStream *client.Client 40 subscriberStream *client.Client ··· 49 ) 50 51 func getFollowsDids() []string { 52 var dids []string 53 + subscribers.Range(func(s string, sd *SubscriberData) bool { 54 + for follow, _ := range sd.ListenTo { 55 dids = append(dids, follow) 56 } 57 + return true 58 + }) 59 return dids 60 } 61 62 func getSubscriberDids() []string { 63 + dids := make([]string, 0, subscribers.Len()) 64 + subscribers.Range(func(s string, sd *SubscriberData) bool { 65 + dids = append(dids, s) 66 + return true 67 + }) 68 return dids 69 } 70 ··· 130 Reposts: reposts, 131 } 132 133 + subscribers.Set(did, subscriber) 134 updateSubscriberStreamOpts() 135 updateLikeStreamOpts() 136 // delete subscriber after we are done 137 defer func() { 138 + subscribers.Del(did) 139 updateSubscriberStreamOpts() 140 updateLikeStreamOpts() 141 }() ··· 204 return nil 205 } 206 207 + subscribers.Range(func(s string, sd *SubscriberData) bool { 208 + for repostURI, _ := range sd.Reposts { 209 // (un)liked a post the subscriber reposted 210 if like.Subject.Uri == repostURI { 211 notification := NotificationMessage{ ··· 214 RepostURI: repostURI, 215 } 216 217 + if err := sd.Conn.WriteJSON(notification); err != nil { 218 + logger.Error("Failed to send notification", "subscriber", sd.DID, "error", err) 219 } 220 } 221 } 222 + return true 223 + }) 224 225 return nil 226 } ··· 265 return nil 266 } 267 268 + if subscriber, exists := subscribers.Get(event.Did); exists { 269 if event.Commit.Operation == models.CommitOperationDelete { 270 onDelete(subscriber, data) 271 } else {