tangled
alpha
login
or
join now
stream.place
/
streamplace
74
fork
atom
Live video on the AT Protocol
74
fork
atom
overview
issues
1
pulls
pipelines
statedb: non-blockin' lockin'
Eli Mallon
6 months ago
660becdc
91f46eb2
+79
-32
2 changed files
expand all
collapse all
unified
split
pkg
statedb
locks.go
locks_test.go
+48
-10
pkg/statedb/locks.go
···
4
4
"context"
5
5
"crypto/sha256"
6
6
"encoding/binary"
7
7
+
"errors"
7
8
"fmt"
8
9
"sync"
9
10
11
11
+
"github.com/cenkalti/backoff"
10
12
"gorm.io/gorm"
11
13
"stream.place/streamplace/pkg/log"
12
14
)
···
20
22
}
21
23
panic("unsupported database type")
22
24
}
25
25
+
26
26
+
var ErrNoLock = fmt.Errorf("pg_try_advisory_lock returned false")
23
27
24
28
func (state *StatefulDB) getNamedLockPostgres(name string) (func(), error) {
25
29
// we also use a local lock here - whoever is locking wants exclusive access even within the node
26
30
lock := state.locks.GetLock(name)
27
31
lock.Lock()
28
28
-
state.pgLockConnMu.Lock()
29
29
-
defer state.pgLockConnMu.Unlock()
30
32
// Convert string to sha256 hash and use decimal value for advisory lock
31
33
h := sha256.Sum256([]byte(name))
32
34
nameInt := int64(binary.BigEndian.Uint64(h[:8]))
33
35
34
36
log.Debug(context.Background(), fmt.Sprintf("starting SELECT pg_advisory_lock(%d)", nameInt))
35
35
-
err := state.pgLockConn.Exec("SELECT pg_advisory_lock($1)", nameInt).Error
37
37
+
err := state.pgLockBackoff(nameInt)
36
38
if err != nil {
37
39
lock.Unlock()
38
40
return nil, err
39
41
}
40
42
return func() {
41
41
-
state.pgLockConnMu.Lock()
42
42
-
defer state.pgLockConnMu.Unlock()
43
43
log.Debug(context.Background(), fmt.Sprintf("starting SELECT pg_advisory_unlock(%d)", nameInt))
44
44
-
var unlocked bool
45
45
-
err := state.pgLockConn.Raw("SELECT pg_advisory_unlock($1)", nameInt).Scan(&unlocked).Error
46
46
-
if err == nil && !unlocked {
47
47
-
err = fmt.Errorf("pg_advisory_unlock returned false")
48
48
-
}
44
44
+
err := state.pgUnlock(nameInt)
49
45
if err != nil {
50
46
// unfortunate, but the risk is that we're holding on to the lock forever,
51
47
// so it's responsible to crash in this case
···
53
49
}
54
50
lock.Unlock()
55
51
}, nil
52
52
+
}
53
53
+
54
54
+
func (state *StatefulDB) pgLockBackoff(key int64) error {
55
55
+
ticker := backoff.NewTicker(backoff.NewExponentialBackOff())
56
56
+
defer ticker.Stop()
57
57
+
var err error
58
58
+
for i := 0; i < 10; i++ {
59
59
+
err = state.pgLock(key)
60
60
+
if err == nil {
61
61
+
return nil
62
62
+
}
63
63
+
if !errors.Is(err, ErrNoLock) {
64
64
+
return err
65
65
+
}
66
66
+
if i < 9 {
67
67
+
<-ticker.C
68
68
+
}
69
69
+
}
70
70
+
return fmt.Errorf("failed to lock after 10 attempts: %w", err)
71
71
+
}
72
72
+
73
73
+
func (state *StatefulDB) pgLock(key int64) error {
74
74
+
state.pgLockConnMu.Lock()
75
75
+
defer state.pgLockConnMu.Unlock()
76
76
+
var locked bool
77
77
+
err := state.pgLockConn.Raw("SELECT pg_try_advisory_lock($1)", key).Scan(&locked).Error
78
78
+
if err == nil && !locked {
79
79
+
log.Error(context.Background(), fmt.Sprintf("pg_try_advisory_lock returned false for key %d", key))
80
80
+
err = ErrNoLock
81
81
+
}
82
82
+
return err
83
83
+
}
84
84
+
85
85
+
func (state *StatefulDB) pgUnlock(key int64) error {
86
86
+
state.pgLockConnMu.Lock()
87
87
+
defer state.pgLockConnMu.Unlock()
88
88
+
var unlocked bool
89
89
+
err := state.pgLockConn.Raw("SELECT pg_advisory_unlock($1)", key).Scan(&unlocked).Error
90
90
+
if err == nil && !unlocked {
91
91
+
err = fmt.Errorf("pg_advisory_unlock returned false")
92
92
+
}
93
93
+
return err
56
94
}
57
95
58
96
// startLockerConn starts a dedicated connection to the database for locking
+31
-22
pkg/statedb/locks_test.go
···
89
89
return u.String()
90
90
}
91
91
92
92
-
var lockRuns = 50000
92
92
+
var lockRuns = 100
93
93
+
var nodeCount = 25
93
94
94
95
func TestPostgresLocks(t *testing.T) {
95
96
if postgresURL == "" {
···
100
101
cli := config.CLI{
101
102
DBURL: dburl,
102
103
}
103
103
-
mod, err := model.MakeDB(":memory:")
104
104
-
require.NoError(t, err)
105
105
-
state, err := MakeDB(context.Background(), &cli, nil, mod)
106
106
-
require.NoError(t, err)
107
107
-
104
104
+
ctx, cancel := context.WithCancel(context.Background())
105
105
+
defer cancel()
108
106
var g errgroup.Group
109
107
var count atomic.Uint64
108
108
+
start := make(chan struct{})
109
109
+
for i := 0; i < nodeCount; i++ {
110
110
+
mod, err := model.MakeDB(":memory:")
111
111
+
require.NoError(t, err)
112
112
+
state, err := MakeDB(ctx, &cli, nil, mod)
113
113
+
require.NoError(t, err)
110
114
111
111
-
doLock := func() error {
112
112
-
unlock, err := state.GetNamedLock("test")
113
113
-
require.NoError(t, err)
114
114
-
defer unlock()
115
115
-
count.Add(1)
116
116
-
return nil
117
117
-
}
115
115
+
defer func() {
116
116
+
sqlDB, err := state.DB.DB()
117
117
+
require.NoError(t, err)
118
118
+
err = sqlDB.Close()
119
119
+
require.NoError(t, err)
120
120
+
}()
121
121
+
122
122
+
doLock := func() error {
123
123
+
<-start
124
124
+
unlock, err := state.GetNamedLock("test")
125
125
+
require.NoError(t, err)
126
126
+
defer unlock()
127
127
+
count.Add(1)
128
128
+
return nil
129
129
+
}
118
130
119
119
-
for i := 0; i < lockRuns; i++ {
120
120
-
g.Go(doLock)
131
131
+
for i := 0; i < lockRuns; i++ {
132
132
+
g.Go(doLock)
133
133
+
}
121
134
}
135
135
+
close(start)
122
136
123
123
-
err = g.Wait()
137
137
+
err := g.Wait()
124
138
require.NoError(t, err)
125
125
-
require.Equal(t, int(count.Load()), int(uint64(lockRuns)))
139
139
+
require.Equal(t, int(count.Load()), int(uint64(lockRuns*nodeCount)))
126
140
127
127
-
sqlDB, err := state.DB.DB()
128
128
-
require.NoError(t, err)
129
129
-
130
130
-
// Close
131
131
-
sqlDB.Close()
132
141
}