Live video on the AT Protocol

statedb: much better postgres distributed locking

+79 -40
+1 -1
pkg/atproto/chat_message_test.go
··· 35 35 cli.DataDir = t.TempDir() 36 36 mod, err := model.MakeDB(":memory:") 37 37 require.NoError(t, err) 38 - state, err := statedb.MakeDB(&cli, nil, mod) 38 + state, err := statedb.MakeDB(context.Background(), &cli, nil, mod) 39 39 require.NoError(t, err) 40 40 atsync := &ATProtoSynchronizer{ 41 41 CLI: &cli,
+1 -1
pkg/atproto/lexicon_repo_test.go
··· 22 22 cli.DataDir = t.TempDir() 23 23 mod, err := model.MakeDB(":memory:") 24 24 require.NoError(t, err) 25 - state, err := statedb.MakeDB(&cli, nil, mod) 25 + state, err := statedb.MakeDB(context.Background(), &cli, nil, mod) 26 26 require.NoError(t, err) 27 27 28 28 // creating a new repo
+3 -2
pkg/cmd/streamplace.go
··· 318 318 } 319 319 } 320 320 321 + group, ctx := TimeoutGroupWithContext(ctx) 322 + 321 323 out := carstore.SQLiteStore{} 322 324 err = out.Open(":memory:") 323 325 if err != nil { 324 326 return err 325 327 } 326 - state, err := statedb.MakeDB(&cli, noter, mod) 328 + state, err := statedb.MakeDB(ctx, &cli, noter, mod) 327 329 if err != nil { 328 330 return err 329 331 } ··· 394 396 return err 395 397 } 396 398 397 - group, ctx := TimeoutGroupWithContext(ctx) 398 399 ctx = log.WithLogValues(ctx, "version", build.Version) 399 400 400 401 group.Go(func() error {
+37 -3
pkg/statedb/locks.go
··· 1 1 package statedb 2 2 3 3 import ( 4 + "context" 4 5 "crypto/sha256" 5 6 "encoding/binary" 6 7 "fmt" 7 8 "sync" 9 + 10 + "gorm.io/gorm" 11 + "stream.place/streamplace/pkg/log" 8 12 ) 9 13 10 14 func (state *StatefulDB) GetNamedLock(name string) (func(), error) { ··· 21 25 // we also use a local lock here - whoever is locking wants exclusive access even within the node 22 26 lock := state.locks.GetLock(name) 23 27 lock.Lock() 28 + state.pgLockConnMu.Lock() 29 + defer state.pgLockConnMu.Unlock() 24 30 // Convert string to sha256 hash and use decimal value for advisory lock 25 31 h := sha256.Sum256([]byte(name)) 26 32 nameInt := int64(binary.BigEndian.Uint64(h[:8])) 27 33 28 - err := state.DB.Exec("SELECT pg_advisory_lock($1)", nameInt).Error 34 + log.Debug(context.Background(), fmt.Sprintf("starting SELECT pg_advisory_lock(%d)", nameInt)) 35 + err := state.pgLockConn.Exec("SELECT pg_advisory_lock($1)", nameInt).Error 29 36 if err != nil { 30 37 lock.Unlock() 31 38 return nil, err 32 39 } 33 40 return func() { 34 - err := state.DB.Exec("SELECT pg_advisory_unlock($1)", nameInt).Error 35 - lock.Unlock() 41 + state.pgLockConnMu.Lock() 42 + defer state.pgLockConnMu.Unlock() 43 + log.Debug(context.Background(), fmt.Sprintf("starting SELECT pg_advisory_unlock(%d)", nameInt)) 44 + var unlocked bool 45 + err := state.pgLockConn.Raw("SELECT pg_advisory_unlock($1)", nameInt).Scan(&unlocked).Error 46 + if err == nil && !unlocked { 47 + err = fmt.Errorf("pg_advisory_unlock returned false") 48 + } 36 49 if err != nil { 37 50 // unfortunate, but the risk is that we're holding on to the lock forever, 38 51 // so it's responsible to crash in this case 39 52 panic(fmt.Errorf("error unlocking named lock: %w", err)) 40 53 } 54 + lock.Unlock() 41 55 }, nil 56 + } 57 + 58 + // startLockerConn starts a dedicated connection to the database for locking 59 + func (state *StatefulDB) startPostgresLockerConn(ctx context.Context) error { 60 + done := make(chan struct{}) 61 + var err error 62 + go func() { 63 + err = state.DB.Connection(func(tx *gorm.DB) error { 64 + state.pgLockConn = tx 65 + close(done) 66 + // hold this open until the context is done 67 + <-ctx.Done() 68 + return nil 69 + }) 70 + if err != nil { 71 + close(done) 72 + } 73 + }() 74 + <-done 75 + return err 42 76 } 43 77 44 78 func (state *StatefulDB) getNamedLockSQLite(name string) (func(), error) {
+20 -27
pkg/statedb/locks_test.go
··· 1 1 package statedb 2 2 3 3 import ( 4 + "context" 4 5 "fmt" 5 6 "net/url" 6 7 "os" 7 8 "os/exec" 8 9 "strings" 10 + "sync/atomic" 9 11 "testing" 10 12 "time" 11 13 12 14 "github.com/google/uuid" 13 15 "github.com/stretchr/testify/require" 16 + "golang.org/x/sync/errgroup" 14 17 "gorm.io/driver/postgres" 15 18 "stream.place/streamplace/pkg/config" 16 19 "stream.place/streamplace/pkg/model" ··· 86 89 return u.String() 87 90 } 88 91 92 + var lockRuns = 50000 93 + 89 94 func TestPostgresLocks(t *testing.T) { 90 95 if postgresURL == "" { 91 96 t.Skip("no postgres url, skipping postgres tests") ··· 97 102 } 98 103 mod, err := model.MakeDB(":memory:") 99 104 require.NoError(t, err) 100 - state, err := MakeDB(&cli, nil, mod) 105 + state, err := MakeDB(context.Background(), &cli, nil, mod) 101 106 require.NoError(t, err) 102 107 103 - unlock, err := state.GetNamedLock("test") 104 - t.Log("got lock") 105 - require.NoError(t, err) 106 - require.NotNil(t, unlock) 107 - 108 - shouldBeLocked := true 109 - 110 - done := make(chan struct{}) 108 + var g errgroup.Group 109 + var count atomic.Uint64 111 110 112 - go func() { 113 - unlock2, err := state.GetNamedLock("test") 114 - t.Log("got lock 2") 115 - require.Equal(t, shouldBeLocked, false) 111 + doLock := func() error { 112 + unlock, err := state.GetNamedLock("test") 116 113 require.NoError(t, err) 117 - require.NotNil(t, unlock2) 118 - unlock2() 119 - close(done) 120 - }() 114 + defer unlock() 115 + count.Add(1) 116 + return nil 117 + } 121 118 122 - time.Sleep(1 * time.Second) 119 + for i := 0; i < lockRuns; i++ { 120 + g.Go(doLock) 121 + } 123 122 124 - t.Log("unlocking") 125 - shouldBeLocked = false 126 - unlock() 127 - t.Log("unlocked") 123 + err = g.Wait() 124 + require.NoError(t, err) 125 + require.Equal(t, int(count.Load()), int(uint64(lockRuns))) 128 126 129 - select { 130 - case <-done: 131 - case <-time.After(1 * time.Second): 132 - require.Fail(t, "lock not released") 133 - } 134 127 sqlDB, err := state.DB.DB() 135 128 require.NoError(t, err) 136 129
+1 -1
pkg/statedb/migrate.go
··· 23 23 // slogGorm.WithTraceAll(), 24 24 ) 25 25 26 - newDB, err := MakeDB(cli, nil, nil) 26 + newDB, err := MakeDB(context.Background(), cli, nil, nil) 27 27 if err != nil { 28 28 return err 29 29 }
+16 -5
pkg/statedb/statedb.go
··· 6 6 "net/url" 7 7 "os" 8 8 "strings" 9 + "sync" 9 10 "time" 10 11 11 12 "github.com/lmittmann/tint" ··· 36 37 model model.Model 37 38 // pokeQueue is used to wake up the queue processor when a new task is enqueued 38 39 pokeQueue chan struct{} 40 + // pgLockConn is used to hold a connection to the database for locking 41 + pgLockConn *gorm.DB 42 + pgLockConnMu sync.Mutex 39 43 } 40 44 41 45 // list tables here so we can migrate them ··· 51 55 var NoPostgresDatabaseCode = "3D000" 52 56 53 57 // Stateful database for storing private streamplace state 54 - func MakeDB(cli *config.CLI, noter notificationpkg.FirebaseNotifier, model model.Model) (*StatefulDB, error) { 58 + func MakeDB(ctx context.Context, cli *config.CLI, noter notificationpkg.FirebaseNotifier, model model.Model) (*StatefulDB, error) { 55 59 dbURL := cli.DBURL 56 - log.Log(context.Background(), "starting stateful database", "dbURL", redactDBURL(dbURL)) 60 + log.Log(ctx, "starting stateful database", "dbURL", redactDBURL(dbURL)) 57 61 var dial gorm.Dialector 58 62 var dbType DBType 59 63 if dbURL == ":memory:" { ··· 98 102 return nil, err 99 103 } 100 104 } 101 - return &StatefulDB{ 105 + state := &StatefulDB{ 102 106 DB: db, 103 107 CLI: cli, 104 108 Type: dbType, 105 109 locks: NewNamedLocks(), 106 110 model: model, 107 111 pokeQueue: make(chan struct{}, 1), 108 - }, nil 112 + } 113 + if state.Type == DBTypePostgres { 114 + err = state.startPostgresLockerConn(ctx) 115 + if err != nil { 116 + return nil, fmt.Errorf("error starting postgres locker connection: %w", err) 117 + } 118 + } 119 + return state, nil 109 120 } 110 121 111 122 func openDB(dial gorm.Dialector) (*gorm.DB, error) { ··· 113 124 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 114 125 TimeFormat: time.RFC3339, 115 126 })), 116 - // slogGorm.WithTraceAll(), 127 + slogGorm.WithTraceAll(), 117 128 ) 118 129 119 130 return gorm.Open(dial, &gorm.Config{