···4 "context"
5 "fmt"
67- "github.com/bluesky-social/indigo/atproto/syntax"
8 blocks "github.com/ipfs/go-block-format"
9 "github.com/ipfs/go-cid"
10 "gorm.io/gorm/clause"
···17type SqliteBlockstore struct {
18 db *db.DB
19 did string
020 readonly bool
21 inserts map[cid.Cid]blocks.Block
22}
···39 }
40}
41000000042func (bs *SqliteBlockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
43 var block models.Block
44···69 b := models.Block{
70 Did: bs.did,
71 Cid: block.Cid().Bytes(),
72- Rev: syntax.NewTIDNow(0).String(), // TODO: WARN, this is bad. don't do this
73 Value: block.RawData(),
74 }
75···108 b := models.Block{
109 Did: bs.did,
110 Cid: block.Cid().Bytes(),
111- Rev: syntax.NewTIDNow(0).String(), // TODO: WARN, this is bad. don't do this
112 Value: block.RawData(),
113 }
114
···4 "context"
5 "fmt"
607 blocks "github.com/ipfs/go-block-format"
8 "github.com/ipfs/go-cid"
9 "gorm.io/gorm/clause"
···16type SqliteBlockstore struct {
17 db *db.DB
18 did string
19+ rev string
20 readonly bool
21 inserts map[cid.Cid]blocks.Block
22}
···39 }
40}
4142+// SetRev sets the revision that will be stamped on every block written to the
43+// store. It should be called with the new repo revision before any Put/PutMany
44+// calls for a given commit.
45+func (bs *SqliteBlockstore) SetRev(rev string) {
46+ bs.rev = rev
47+}
48+49func (bs *SqliteBlockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
50 var block models.Block
51···76 b := models.Block{
77 Did: bs.did,
78 Cid: block.Cid().Bytes(),
79+ Rev: bs.rev,
80 Value: block.RawData(),
81 }
82···115 b := models.Block{
116 Did: bs.did,
117 Cid: block.Cid().Bytes(),
118+ Rev: bs.rev,
119 Value: block.RawData(),
120 }
121
+63-21
oauth/client/manager.go
···18 "pkg.rbrt.fr/vow/internal/helpers"
19)
2000021type Manager struct {
22 cli *http.Client
23 logger *slog.Logger
···59 var jwks jwk.Key
60 if metadata.TokenEndpointAuthMethod == "private_key_jwt" {
61 if metadata.JWKS != nil && len(metadata.JWKS.Keys) > 0 {
62- // TODO: this is kinda bad but whatever for now. there could obviously be more than one jwk, and we need to
63- // make sure we use the right one
64- b, err := json.Marshal(metadata.JWKS.Keys[0])
65- if err != nil {
66- return nil, err
67- }
68-69- k, err := helpers.ParseJWKFromBytes(b)
70 if err != nil {
71 return nil, err
72 }
73-74 jwks = k
75 } else if metadata.JWKSURI != nil {
76 maybeJwks, err := cm.getClientJwks(ctx, clientId, *metadata.JWKSURI)
···147 }
148149 type Keys struct {
150- Keys []map[string]any `json:"keys"`
151 }
152153 var keys Keys
···159 return nil, errors.New("no keys in jwks response")
160 }
161162- // TODO: this is again bad, we should be figuring out which one we need to use...
163- b, err := json.Marshal(keys.Keys[0])
164- if err != nil {
165- return nil, fmt.Errorf("could not marshal key: %w", err)
166- }
167-168- k, err := helpers.ParseJWKFromBytes(b)
169 if err != nil {
170 return nil, err
171 }
···176 return jwks, nil
177}
178000000000000000000000000000000000000000000000000000179func validateAndParseMetadata(clientId string, b []byte) (*Metadata, error) {
180 var metadataMap map[string]any
181 if err := json.Unmarshal(b, &metadataMap); err != nil {
···246 return nil, fmt.Errorf("duplicate scope `%s`", scope)
247 }
248249- // TODO: check for unsupported scopes
00250251 scopesMap[scope] = true
252 }
···259260 switch gt {
261 case "implicit":
262- return nil, errors.New("grantg type `implicit` is not allowed")
263 case "authorization_code", "refresh_token":
264- // TODO check if this grant type is supported
265 default:
266- return nil, fmt.Errorf("grant tyhpe `%s` is not supported", gt)
267 }
268269 grantTypesMap[gt] = true
···18 "pkg.rbrt.fr/vow/internal/helpers"
19)
2021+// supportedScopes lists the OAuth scopes this server accepts.
22+var supportedScopes = []string{"atproto", "transition:generic", "transition:chat.bsky"}
23+24type Manager struct {
25 cli *http.Client
26 logger *slog.Logger
···62 var jwks jwk.Key
63 if metadata.TokenEndpointAuthMethod == "private_key_jwt" {
64 if metadata.JWKS != nil && len(metadata.JWKS.Keys) > 0 {
65+ k, err := selectKey(metadata.JWKS.Keys, metadata.TokenEndpointAuthSigningAlg)
000000066 if err != nil {
67 return nil, err
68 }
069 jwks = k
70 } else if metadata.JWKSURI != nil {
71 maybeJwks, err := cm.getClientJwks(ctx, clientId, *metadata.JWKSURI)
···142 }
143144 type Keys struct {
145+ Keys []any `json:"keys"`
146 }
147148 var keys Keys
···154 return nil, errors.New("no keys in jwks response")
155 }
156157+ k, err := selectKey(keys.Keys, "")
000000158 if err != nil {
159 return nil, err
160 }
···165 return jwks, nil
166}
167168+// selectKey picks the best signing key from a raw JWKS key list.
169+// It prefers a key whose "kid" matches the hint (if non-empty), then any key
170+// with "use"="sig", and finally falls back to the first key in the set.
171+func selectKey(keys []any, kidHint string) (jwk.Key, error) {
172+ if len(keys) == 0 {
173+ return nil, errors.New("empty jwks")
174+ }
175+176+ asMap := func(v any) map[string]any {
177+ m, _ := v.(map[string]any)
178+ return m
179+ }
180+181+ var chosen map[string]any
182+183+ if kidHint != "" {
184+ for _, k := range keys {
185+ m := asMap(k)
186+ if m["kid"] == kidHint {
187+ chosen = m
188+ break
189+ }
190+ }
191+ }
192+193+ if chosen == nil {
194+ for _, k := range keys {
195+ m := asMap(k)
196+ if use, _ := m["use"].(string); use == "sig" {
197+ chosen = m
198+ break
199+ }
200+ }
201+ }
202+203+ if chosen == nil {
204+ chosen = asMap(keys[0])
205+ }
206+207+ if chosen == nil {
208+ return nil, errors.New("jwks contains no usable keys")
209+ }
210+211+ b, err := json.Marshal(chosen)
212+ if err != nil {
213+ return nil, fmt.Errorf("could not marshal key: %w", err)
214+ }
215+216+ return helpers.ParseJWKFromBytes(b)
217+}
218+219func validateAndParseMetadata(clientId string, b []byte) (*Metadata, error) {
220 var metadataMap map[string]any
221 if err := json.Unmarshal(b, &metadataMap); err != nil {
···286 return nil, fmt.Errorf("duplicate scope `%s`", scope)
287 }
288289+ if !slices.Contains(supportedScopes, scope) {
290+ return nil, fmt.Errorf("unsupported scope %q", scope)
291+ }
292293 scopesMap[scope] = true
294 }
···301302 switch gt {
303 case "implicit":
304+ return nil, errors.New("grant type `implicit` is not allowed")
305 case "authorization_code", "refresh_token":
306+ // supported
307 default:
308+ return nil, fmt.Errorf("grant type `%s` is not supported", gt)
309 }
310311 grantTypesMap[gt] = true
+6-6
readme.md
···534. **Start the services**
5455 ```bash
56- docker-compose pull
57- docker-compose up -d
58 ```
59605. **Get your invite code**
···62 On first run, an invite code is automatically created. View it with:
6364 ```bash
65- docker-compose logs create-invite
66 ```
6768 Or check the saved file:
···73746. **Monitor the services**
75 ```bash
76- docker-compose logs -f
77 ```
7879### What Gets Set Up
···146## Updating
147148```bash
149-docker-compose pull
150-docker-compose up -d
151```
152153## Implemented Endpoints
···534. **Start the services**
5455 ```bash
56+ docker compose pull
57+ docker compose up -d
58 ```
59605. **Get your invite code**
···62 On first run, an invite code is automatically created. View it with:
6364 ```bash
65+ docker compose logs create-invite
66 ```
6768 Or check the saved file:
···73746. **Monitor the services**
75 ```bash
76+ docker compose logs -f
77 ```
7879### What Gets Set Up
···146## Updating
147148```bash
149+docker compose build
150+docker compose up -d
151```
152153## Implemented Endpoints
···62 return
63 }
6465- // TODO: this seems wrong. should be a way to get the entire request url i believe, but this will work for now
66- dpopProof, err := s.oauthProvider.DpopManager.CheckProof(r.Method, "https://"+s.config.Hostname+r.URL.String(), r.Header, nil)
0000067 if err != nil {
68 if errors.Is(err, dpop.ErrUseDpopNonce) {
69 nonce := s.oauthProvider.NextNonce()
···62 return
63 }
6465+ scheme := "https"
66+ if r.TLS == nil && r.Header.Get("X-Forwarded-Proto") == "" {
67+ scheme = "http"
68+ } else if proto := r.Header.Get("X-Forwarded-Proto"); proto != "" {
69+ scheme = proto
70+ }
71+ dpopProof, err := s.oauthProvider.DpopManager.CheckProof(r.Method, scheme+"://"+r.Host+r.URL.String(), r.Header, nil)
72 if err != nil {
73 if errors.Is(err, dpop.ErrUseDpopNonce) {
74 nonce := s.oauthProvider.NextNonce()
-6
server/handle_oauth_token.go
···100 return
101 }
102103- // TODO: this should come from an oauth provider config
104- if !slices.Contains([]string{"authorization_code", "refresh_token"}, req.GrantType) {
105- helpers.InputError(w, new(fmt.Sprintf(`"%s" grant type is not supported by the server`, req.GrantType)))
106- return
107- }
108-109 if !slices.Contains(client.Metadata.GrantTypes, req.GrantType) {
110 helpers.InputError(w, new(fmt.Sprintf(`"%s" grant type is not supported by the client`, req.GrantType)))
111 return
···100 return
101 }
102000000103 if !slices.Contains(client.Metadata.GrantTypes, req.GrantType) {
104 helpers.InputError(w, new(fmt.Sprintf(`"%s" grant type is not supported by the client`, req.GrantType)))
105 return
+30-6
server/handle_repo_list_repos.go
···23import (
4 "net/http"
056- "pkg.rbrt.fr/vow/models"
7 "github.com/ipfs/go-cid"
008)
910type ComAtprotoSyncListReposResponse struct {
···20 Status *string `json:"status,omitempty"`
21}
2223-// TODO: paginate this bitch
24func (s *Server) handleListRepos(w http.ResponseWriter, r *http.Request) {
25 ctx := r.Context()
260000000000000000027 var repos []models.Repo
28- if err := s.db.Raw(ctx, "SELECT * FROM repos ORDER BY created_at DESC LIMIT 500", nil).Scan(&repos).Error; err != nil {
29- http.Error(w, err.Error(), http.StatusInternalServerError)
30 return
31 }
3200000033 items := make([]ComAtprotoSyncListReposRepoItem, 0, len(repos))
34 for _, repo := range repos {
35 c, err := cid.Cast(repo.Root)
36 if err != nil {
37- http.Error(w, err.Error(), http.StatusInternalServerError)
38 return
39 }
40···48 }
4950 s.writeJSON(w, 200, ComAtprotoSyncListReposResponse{
51- Cursor: nil,
52 Repos: items,
53 })
54}
···23import (
4 "net/http"
5+ "strconv"
607 "github.com/ipfs/go-cid"
8+ "pkg.rbrt.fr/vow/internal/helpers"
9+ "pkg.rbrt.fr/vow/models"
10)
1112type ComAtprotoSyncListReposResponse struct {
···22 Status *string `json:"status,omitempty"`
23}
24025func (s *Server) handleListRepos(w http.ResponseWriter, r *http.Request) {
26 ctx := r.Context()
2728+ limit := 500
29+ if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
30+ if l, err := strconv.Atoi(limitStr); err == nil && l > 0 && l <= 1000 {
31+ limit = l
32+ }
33+ }
34+35+ cursor := r.URL.Query().Get("cursor")
36+37+ params := []any{}
38+ cursorClause := ""
39+ if cursor != "" {
40+ cursorClause = "WHERE did > ?"
41+ params = append(params, cursor)
42+ }
43+ params = append(params, limit+1)
44+45 var repos []models.Repo
46+ if err := s.db.Raw(ctx, "SELECT * FROM repos "+cursorClause+" ORDER BY did ASC LIMIT ?", nil, params...).Scan(&repos).Error; err != nil {
47+ helpers.ServerError(w, nil)
48 return
49 }
5051+ var nextCursor *string
52+ if len(repos) > limit {
53+ repos = repos[:limit]
54+ nextCursor = &repos[len(repos)-1].Did
55+ }
56+57 items := make([]ComAtprotoSyncListReposRepoItem, 0, len(repos))
58 for _, repo := range repos {
59 c, err := cid.Cast(repo.Root)
60 if err != nil {
61+ helpers.ServerError(w, nil)
62 return
63 }
64···72 }
7374 s.writeJSON(w, 200, ComAtprotoSyncListReposResponse{
75+ Cursor: nextCursor,
76 Repos: items,
77 })
78}
···24 return
25 }
26027 cursor := r.URL.Query().Get("cursor")
2829 limit := 50
···3738 params := []any{did}
39 if cursor != "" {
40+ // cursor is the CID string of the last blob on the previous page;
41+ // convert to bytes for the comparison against the binary cid column.
42+ cursorCid, err := cid.Decode(cursor)
43+ if err != nil {
44+ helpers.InputError(w, new("invalid cursor"))
45+ return
46+ }
47+ params = append(params, cursorCid.Bytes())
48+ cursorquery = "AND cid > ?"
49 }
50+ params = append(params, limit+1)
5152 urepo, err := s.getRepoActorByDid(ctx, did)
53 if err != nil {
···65 }
6667 var blobs []models.Blob
68+ if err := s.db.Raw(ctx, "SELECT * FROM blobs WHERE did = ? "+cursorquery+" ORDER BY cid ASC LIMIT ?", nil, params...).Scan(&blobs).Error; err != nil {
69 logger.Error("error getting records", "error", err)
70 helpers.ServerError(w, nil)
71 return
···86 }
8788 var newcursor *string
89+ if len(blobs) > limit {
90+ blobs = blobs[:limit]
91+ lastCid, err := cid.Cast(blobs[len(blobs)-1].Cid)
92+ if err == nil {
93+ s := lastCid.String()
94+ newcursor = &s
95+ }
96 }
9798 s.writeJSON(w, http.StatusOK, ComAtprotoSyncListBlobsResponse{
+18-8
server/repo.go
···169 }, nil
170}
171172-func commitRepo(ctx context.Context, bs blockstore.Blockstore, r *atp.Repo, signingKey []byte) (cid.Cid, string, error) {
173- if _, err := r.MST.WriteDiffBlocks(ctx, bs.(legacyblockstore.Blockstore)); err != nil { //nolint:staticcheck
174- return cid.Undef, "", fmt.Errorf("writing MST blocks: %w", err)
175- }
01760177 commit, err := r.Commit()
178 if err != nil {
179 return cid.Undef, "", fmt.Errorf("creating commit: %w", err)
···187 return cid.Undef, "", fmt.Errorf("signing commit: %w", err)
188 }
1890000000000190 buf := new(bytes.Buffer)
191 if err := commit.MarshalCBOR(buf); err != nil {
192 return cid.Undef, "", fmt.Errorf("marshaling commit: %w", err)
···331 return cid.Undef, err
332 }
333334- // TODO: this is really confusing, and looking at it i have no idea why i did this. below when we are doing deletes, we
335- // check if `cid` here is nil to indicate if we should delete. that really doesn't make much sense and its super illogical
336- // when reading this code. i dont feel like fixing right now though so
337 entries = append(entries, models.Record{
338 Did: urepo.Did,
339 Nsid: op.Collection,
···504 return nil, err
505 }
506507- // TODO:
508 cids, err = rm.decrementBlobRefs(ctx, urepo, entry.Value)
509 if err != nil {
510 return nil, err
···169 }, nil
170}
171172+// revSetter is implemented by blockstores that can be told the current repo
173+// revision before blocks are written (so the Rev column is stamped correctly).
174+type revSetter interface {
175+ SetRev(rev string)
176+}
177178+func commitRepo(ctx context.Context, bs blockstore.Blockstore, r *atp.Repo, signingKey []byte) (cid.Cid, string, error) {
179 commit, err := r.Commit()
180 if err != nil {
181 return cid.Undef, "", fmt.Errorf("creating commit: %w", err)
···189 return cid.Undef, "", fmt.Errorf("signing commit: %w", err)
190 }
191192+ // Stamp the revision on the blockstore before writing any blocks so that
193+ // every block persisted for this commit carries the correct Rev value.
194+ if rs, ok := bs.(revSetter); ok {
195+ rs.SetRev(commit.Rev)
196+ }
197+198+ if _, err := r.MST.WriteDiffBlocks(ctx, bs.(legacyblockstore.Blockstore)); err != nil { //nolint:staticcheck
199+ return cid.Undef, "", fmt.Errorf("writing MST blocks: %w", err)
200+ }
201+202 buf := new(bytes.Buffer)
203 if err := commit.MarshalCBOR(buf); err != nil {
204 return cid.Undef, "", fmt.Errorf("marshaling commit: %w", err)
···343 return cid.Undef, err
344 }
345346+ // A nil Cid on the entry is the sentinel used later in the
347+ // batch-upsert loop to distinguish deletes from creates/updates.
0348 entries = append(entries, models.Record{
349 Did: urepo.Did,
350 Nsid: op.Collection,
···515 return nil, err
516 }
5170518 cids, err = rm.decrementBlobRefs(ctx, urepo, entry.Value)
519 if err != nil {
520 return nil, err