this repo has no description

feat(bsky-users): use a queue for incoming events

+76 -27
+76 -27
cmd/bsky-users/main.go
··· 4 4 "context" 5 5 "database/sql" 6 6 _ "embed" 7 - "encoding/json" 8 7 "log" 9 8 "os/signal" 9 + "sync" 10 10 "syscall" 11 11 "time" 12 12 ··· 21 21 Transferred int 22 22 } 23 23 24 + type Queue struct { 25 + lk sync.Mutex 26 + events []jetstream.Event 27 + } 28 + 29 + func NewQueue(capacity int) *Queue { 30 + return &Queue{ 31 + events: make([]jetstream.Event, 0, capacity), 32 + } 33 + } 34 + 35 + func (q *Queue) Enqueue(event jetstream.Event) { 36 + q.lk.Lock() 37 + defer q.lk.Unlock() 38 + 39 + q.events = append(q.events, event) 40 + } 41 + 42 + func (q *Queue) Dequeue() (jetstream.Event, bool) { 43 + q.lk.Lock() 44 + defer q.lk.Unlock() 45 + 46 + if len(q.events) == 0 { 47 + var e jetstream.Event 48 + return e, false 49 + } 50 + 51 + event := q.events[0] 52 + q.events = q.events[1:] 53 + return event, true 54 + } 55 + 56 + func (q *Queue) Size() int { 57 + q.lk.Lock() 58 + defer q.lk.Unlock() 59 + 60 + return len(q.events) 61 + } 62 + 24 63 var AppBskyAllowlist = map[string]bool{ 25 64 "app.bsky.actor.profile": true, 26 65 "app.bsky.feed.generator": true, ··· 46 85 //go:embed schema.sql 47 86 var ddl string 48 87 49 - func handler(ctx context.Context, events <-chan []byte, dbCnx *sql.DB) { 88 + func handler(ctx context.Context, queue *Queue, dbCnx *sql.DB) { 50 89 if _, err := dbCnx.ExecContext(ctx, ddl); err != nil { 51 90 log.Printf("could not create tables: %v\n", err) 52 91 } ··· 60 99 eventCount int 61 100 ) 62 101 63 - for evt := range events { 102 + for { 103 + event, ok := queue.Dequeue() 104 + if !ok { 105 + time.Sleep(100 * time.Millisecond) 106 + continue 107 + } 108 + 64 109 if dbTx == nil { 65 110 dbTx, err = dbCnx.BeginTx(ctx, nil) 66 111 if err != nil { ··· 68 113 } 69 114 } 70 115 71 - var event jetstream.Event 72 - if err := json.Unmarshal(evt, &event); err != nil { 73 - continue 74 - } 75 - 76 116 if event.Kind != jetstream.EventKindCommit { 77 117 continue 78 118 } ··· 93 133 dbTx.ExecContext(ctx, userTimestampUpdate, did, ts, ts) 94 134 95 135 eventCount += 1 96 - if eventCount%1000 == 0 { 97 - if err := dbTx.Commit(); err != nil { 98 - log.Printf("commit failed: %v\n") 136 + if eventCount%2500 == 0 { 137 + if err = dbTx.Commit(); err != nil { 138 + log.Printf("commit failed: %v\n", err) 139 + } else { 140 + log.Printf("commit successful\n") 99 141 } 100 142 101 - var results CheckpointResults 102 - err := dbCnx.QueryRowContext(ctx, "PRAGMA wal_checkpoint(RESTART)").Scan(&results.Blocked, &results.Pages, &results.Transferred) 103 - switch { 104 - case err != nil: 105 - log.Printf("failed checkpoint: %v\n", err) 106 - case results.Blocked == 1: 107 - log.Printf("checkpoint: blocked\n") 108 - case results.Pages == results.Transferred: 109 - log.Printf("checkpoint: %d pages transferred\n", results.Transferred) 110 - case results.Pages != results.Transferred: 111 - log.Printf("checkpoint: %d pages, %d transferred\n", results.Pages, results.Transferred) 143 + if eventCount%25_000 == 0 { 144 + var results CheckpointResults 145 + err = dbCnx.QueryRowContext(ctx, "PRAGMA wal_checkpoint(RESTART)").Scan(&results.Blocked, &results.Pages, &results.Transferred) 146 + switch { 147 + case err != nil: 148 + log.Printf("failed checkpoint: %v\n", err) 149 + case results.Blocked == 1: 150 + log.Printf("checkpoint: blocked\n") 151 + case results.Pages == results.Transferred: 152 + log.Printf("checkpoint: %d pages transferred\n", results.Transferred) 153 + case results.Pages != results.Transferred: 154 + log.Printf("checkpoint: %d pages, %d transferred\n", results.Pages, results.Transferred) 155 + } 112 156 } 113 157 114 158 dbTx, err = dbCnx.BeginTx(ctx, nil) 115 159 if err != nil { 116 160 log.Printf("failed to begin transaction: %v\n", err) 117 161 } 162 + 163 + log.Printf("queue size: %d\n", queue.Size()) 118 164 } 119 165 } 120 166 } ··· 123 169 ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) 124 170 defer stop() 125 171 126 - conn, _, err := websocket.DefaultDialer.Dial(JetstreamUrl, nil) 172 + conn, _, err := websocket.DefaultDialer.DialContext(ctx, JetstreamUrl, nil) 127 173 if err != nil { 128 174 log.Fatalf("failed to open websocket: %v\n", err) 129 175 } ··· 149 195 log.Printf("db closed\n") 150 196 }() 151 197 152 - jetstreamEvents := make(chan []byte) 153 - go handler(ctx, jetstreamEvents, dbCnx) 198 + queue := NewQueue(100_000) 199 + go handler(ctx, queue, dbCnx) 154 200 155 201 log.Printf("starting up\n") 156 202 go func() { 157 203 for { 158 - _, message, err := conn.ReadMessage() 204 + var event jetstream.Event 205 + err := conn.ReadJSON(&event) 159 206 if err != nil { 207 + log.Printf("ReadJSON error: %v\n", err) 160 208 stop() 209 + break 161 210 } 162 - jetstreamEvents <- message 211 + queue.Enqueue(event) 163 212 } 164 213 }() 165 214