this repo has no description

feat: do manual checkpoints at shutdown and every 1k events

- for the Feed, don't bother doing the read-only thing anymore
- log checkpoint results, as well

+29 -9
+4 -1
cmd/mostliked/main.go
··· 36 36 log.Fatalf("failed to open database: %v\n", err) 37 37 } 38 38 defer func() { 39 + if _, err := dbCnx.Exec("PRAGMA wal_checkpoint(TRUNCATE)"); err != nil { 40 + log.Printf("error doing final WAL checkpoint: %v\n", err) 41 + } 39 42 if err := dbCnx.Close(); err != nil { 40 - log.Printf("failed to close db: %v", err) 43 + log.Printf("failed to close db: %v\n", err) 41 44 } 42 45 log.Println("db closed") 43 46 }()
+1 -1
pkg/mostliked/generator.go
··· 57 57 58 58 func Feed(params feeds.FeedgenParams) appbsky.FeedGetFeedSkeleton_Output { 59 59 ctx := context.Background() 60 - dbCnx, err := sql.Open("sqlite3", "data/mostliked.db?_journal=WAL&_fk=on&mode=ro") 60 + dbCnx, err := sql.Open("sqlite3", "data/mostliked.db?_journal=WAL&_fk=on") 61 61 if err != nil { 62 62 log.Printf("error opening db: %v\n", err) 63 63 }
+24 -7
pkg/mostliked/handler.go
··· 30 30 Likes int64 31 31 } 32 32 33 + type CheckpointResults struct { 34 + Blocked int 35 + Pages int 36 + Transferred int 37 + } 38 + 33 39 func trimPostsTable(ctx context.Context, queries *db.Queries) { 34 40 ticker := time.NewTicker(1 * time.Minute) 35 41 defer ticker.Stop() ··· 64 70 65 71 func Handler(ctx context.Context, events <-chan []byte, dbCnx *sql.DB) { 66 72 if _, err := dbCnx.ExecContext(ctx, ddl); err != nil { 67 - log.Printf("couldn't create tables: %v\n", err) 73 + log.Printf("could not create tables: %v\n", err) 74 + } 75 + if _, err := dbCnx.ExecContext(ctx, "PRAGMA wal_autocheckpoint = 0"); err != nil { 76 + log.Printf("could not set PRAGMA wal_autocheckpoint: %v\n", err) 68 77 } 69 78 queries := db.New(dbCnx) 70 79 ··· 163 172 Likes: draftPost.Likes, 164 173 }) 165 174 if err != nil { 166 - log.Println("error inserting post") 175 + log.Printf("error inserting post: %v\n", err) 167 176 } 168 177 for _, lang := range draftPost.Languages { 169 178 err = queriesTx.InsertLang(ctx, db.InsertLangParams{ ··· 171 180 Lang: strings.ToLower(lang.IsoCode639_1().String()), 172 181 }) 173 182 if err != nil { 174 - log.Println("error inserting lang") 183 + log.Printf("error inserting lang: %v\n", err) 175 184 } 176 185 } 177 186 } else { 178 187 err := queriesTx.UpdateLikes(ctx, like.Subject.Uri) 179 188 if err != nil { 180 - log.Println("error updating likes") 189 + log.Printf("error updating likes: %v\n", err) 181 190 } 182 191 } 183 192 ··· 185 194 if eventCount%1000 == 0 { 186 195 if err := dbTx.Commit(); err != nil { 187 196 log.Printf("commit failed: %v\n", err) 188 - } else { 189 - txOpen = false 190 197 } 191 - // log.Println("db committed") 198 + 199 + var results CheckpointResults 200 + err := dbCnx.QueryRowContext(ctx, "PRAGMA wal_checkpoint(RESTART)").Scan(&results.Blocked, &results.Pages, &results.Transferred) 201 + switch { 202 + case err != nil: 203 + log.Printf("failed checkpoint: %v\n", err) 204 + default: 205 + log.Printf("checkpoint: %+v\n", results) 206 + } 207 + 208 + txOpen = false 192 209 } 193 210 } 194 211 }