+11
-2
server/handle_repo_apply_writes.go
+11
-2
server/handle_repo_apply_writes.go
···
20
Value *MarshalableMap `json:"value,omitempty"`
21
}
22
23
func (s *Server) handleApplyWrites(e echo.Context) error {
24
repo := e.Get("repo").(*models.RepoActor)
25
···
49
})
50
}
51
52
-
if err := s.repoman.applyWrites(repo.Repo, ops, req.SwapCommit); err != nil {
53
s.logger.Error("error applying writes", "error", err)
54
return helpers.ServerError(e, nil)
55
}
56
57
-
return nil
58
}
···
20
Value *MarshalableMap `json:"value,omitempty"`
21
}
22
23
+
type ComAtprotoRepoApplyWritesResponse struct {
24
+
Commit RepoCommit `json:"commit"`
25
+
Results []ApplyWriteResult `json:"results"`
26
+
}
27
+
28
func (s *Server) handleApplyWrites(e echo.Context) error {
29
repo := e.Get("repo").(*models.RepoActor)
30
···
54
})
55
}
56
57
+
results, err := s.repoman.applyWrites(repo.Repo, ops, req.SwapCommit)
58
+
if err != nil {
59
s.logger.Error("error applying writes", "error", err)
60
return helpers.ServerError(e, nil)
61
}
62
63
+
return e.JSON(200, ComAtprotoRepoApplyWritesResponse{
64
+
Commit: *results[0].Commit,
65
+
Results: results,
66
+
})
67
}
+5
-2
server/handle_repo_create_record.go
+5
-2
server/handle_repo_create_record.go
···
40
optype = OpTypeUpdate
41
}
42
43
-
if err := s.repoman.applyWrites(repo.Repo, []Op{
44
{
45
Type: optype,
46
Collection: req.Collection,
···
49
Record: &req.Record,
50
SwapRecord: req.SwapRecord,
51
},
52
-
}, req.SwapCommit); err != nil {
53
s.logger.Error("error applying writes", "error", err)
54
return helpers.ServerError(e, nil)
55
}
56
57
return nil
58
}
···
40
optype = OpTypeUpdate
41
}
42
43
+
results, err := s.repoman.applyWrites(repo.Repo, []Op{
44
{
45
Type: optype,
46
Collection: req.Collection,
···
49
Record: &req.Record,
50
SwapRecord: req.SwapRecord,
51
},
52
+
}, req.SwapCommit)
53
+
if err != nil {
54
s.logger.Error("error applying writes", "error", err)
55
return helpers.ServerError(e, nil)
56
}
57
+
58
+
return e.JSON(200, results[0])
59
60
return nil
61
}
+4
-3
server/handle_repo_put_record.go
+4
-3
server/handle_repo_put_record.go
···
40
optype = OpTypeUpdate
41
}
42
43
-
if err := s.repoman.applyWrites(repo.Repo, []Op{
44
{
45
Type: optype,
46
Collection: req.Collection,
···
49
Record: &req.Record,
50
SwapRecord: req.SwapRecord,
51
},
52
-
}, req.SwapCommit); err != nil {
53
s.logger.Error("error applying writes", "error", err)
54
return helpers.ServerError(e, nil)
55
}
56
57
-
return nil
58
}
···
40
optype = OpTypeUpdate
41
}
42
43
+
results, err := s.repoman.applyWrites(repo.Repo, []Op{
44
{
45
Type: optype,
46
Collection: req.Collection,
···
49
Record: &req.Record,
50
SwapRecord: req.SwapRecord,
51
},
52
+
}, req.SwapCommit)
53
+
if err != nil {
54
s.logger.Error("error applying writes", "error", err)
55
return helpers.ServerError(e, nil)
56
}
57
58
+
return e.JSON(200, results[0])
59
}
+41
-17
server/repo.go
+41
-17
server/repo.go
···
82
return nil
83
}
84
85
// TODO make use of swap commit
86
-
func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *string) error {
87
rootcid, err := cid.Cast(urepo.Root)
88
if err != nil {
89
-
return err
90
}
91
92
dbs := blockstore.New(urepo.Did, rm.db)
···
96
97
for i, op := range writes {
98
if op.Type != OpTypeCreate && op.Rkey == nil {
99
-
return fmt.Errorf("invalid rkey")
100
} else if op.Rkey == nil {
101
op.Rkey = to.StringPtr(rm.clock.Next().String())
102
writes[i].Rkey = op.Rkey
···
104
105
_, err := syntax.ParseRecordKey(*op.Rkey)
106
if err != nil {
107
-
return err
108
}
109
110
switch op.Type {
111
case OpTypeCreate:
112
nc, err := r.PutRecord(context.TODO(), op.Collection+"/"+*op.Rkey, op.Record)
113
if err != nil {
114
-
return err
115
}
116
117
d, _ := data.MarshalCBOR(*op.Record)
···
126
case OpTypeDelete:
127
err := r.DeleteRecord(context.TODO(), op.Collection+"/"+*op.Rkey)
128
if err != nil {
129
-
return err
130
}
131
case OpTypeUpdate:
132
nc, err := r.UpdateRecord(context.TODO(), op.Collection+"/"+*op.Rkey, op.Record)
133
if err != nil {
134
-
return err
135
}
136
137
d, _ := data.MarshalCBOR(*op.Record)
···
148
149
newroot, rev, err := r.Commit(context.TODO(), urepo.SignFor)
150
if err != nil {
151
-
return err
152
}
153
154
buf := new(bytes.Buffer)
···
159
})
160
161
if _, err := carstore.LdWrite(buf, hb); err != nil {
162
-
return err
163
}
164
165
diffops, err := r.DiffSince(context.TODO(), rootcid)
166
if err != nil {
167
-
return err
168
}
169
170
ops := make([]*atproto.SyncSubscribeRepos_RepoOp, 0, len(diffops))
···
194
195
blk, err := dbs.Get(context.TODO(), op.NewCid)
196
if err != nil {
197
-
return err
198
}
199
200
if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil {
201
-
return err
202
}
203
}
204
205
for _, op := range dbs.GetLog() {
206
if _, err := carstore.LdWrite(buf, op.Cid().Bytes(), op.RawData()); err != nil {
207
-
return err
208
}
209
}
210
211
var blobs []lexutil.LexLink
212
for _, entry := range entries {
···
214
Columns: []clause.Column{{Name: "did"}, {Name: "nsid"}, {Name: "rkey"}},
215
UpdateAll: true,
216
}).Create(&entry).Error; err != nil {
217
-
return err
218
}
219
220
// we should actually check the type (i.e. delete, create,., update) here but we'll do it later
221
cids, err := rm.incrementBlobRefs(urepo, entry.Value)
222
if err != nil {
223
-
return err
224
}
225
226
for _, c := range cids {
227
blobs = append(blobs, lexutil.LexLink(c))
228
}
229
}
230
231
rm.s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{
···
243
})
244
245
if err := dbs.UpdateRepo(context.TODO(), newroot, rev); err != nil {
246
-
return err
247
}
248
249
-
return nil
250
}
251
252
func (rm *RepoMan) getRecordProof(urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) {
···
82
return nil
83
}
84
85
+
type ApplyWriteResult struct {
86
+
Uri string `json:"string"`
87
+
Cid string `json:"cid"`
88
+
Commit *RepoCommit `json:"commit"`
89
+
ValidationStatus *string `json:"validationStatus"`
90
+
}
91
+
92
+
type RepoCommit struct {
93
+
Cid string `json:"cid"`
94
+
Rev string `json:"rev"`
95
+
}
96
+
97
// TODO make use of swap commit
98
+
func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) {
99
rootcid, err := cid.Cast(urepo.Root)
100
if err != nil {
101
+
return nil, err
102
}
103
104
dbs := blockstore.New(urepo.Did, rm.db)
···
108
109
for i, op := range writes {
110
if op.Type != OpTypeCreate && op.Rkey == nil {
111
+
return nil, fmt.Errorf("invalid rkey")
112
} else if op.Rkey == nil {
113
op.Rkey = to.StringPtr(rm.clock.Next().String())
114
writes[i].Rkey = op.Rkey
···
116
117
_, err := syntax.ParseRecordKey(*op.Rkey)
118
if err != nil {
119
+
return nil, err
120
}
121
122
switch op.Type {
123
case OpTypeCreate:
124
nc, err := r.PutRecord(context.TODO(), op.Collection+"/"+*op.Rkey, op.Record)
125
if err != nil {
126
+
return nil, err
127
}
128
129
d, _ := data.MarshalCBOR(*op.Record)
···
138
case OpTypeDelete:
139
err := r.DeleteRecord(context.TODO(), op.Collection+"/"+*op.Rkey)
140
if err != nil {
141
+
return nil, err
142
}
143
case OpTypeUpdate:
144
nc, err := r.UpdateRecord(context.TODO(), op.Collection+"/"+*op.Rkey, op.Record)
145
if err != nil {
146
+
return nil, err
147
}
148
149
d, _ := data.MarshalCBOR(*op.Record)
···
160
161
newroot, rev, err := r.Commit(context.TODO(), urepo.SignFor)
162
if err != nil {
163
+
return nil, err
164
}
165
166
buf := new(bytes.Buffer)
···
171
})
172
173
if _, err := carstore.LdWrite(buf, hb); err != nil {
174
+
return nil, err
175
}
176
177
diffops, err := r.DiffSince(context.TODO(), rootcid)
178
if err != nil {
179
+
return nil, err
180
}
181
182
ops := make([]*atproto.SyncSubscribeRepos_RepoOp, 0, len(diffops))
···
206
207
blk, err := dbs.Get(context.TODO(), op.NewCid)
208
if err != nil {
209
+
return nil, err
210
}
211
212
if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil {
213
+
return nil, err
214
}
215
}
216
217
for _, op := range dbs.GetLog() {
218
if _, err := carstore.LdWrite(buf, op.Cid().Bytes(), op.RawData()); err != nil {
219
+
return nil, err
220
}
221
}
222
+
223
+
var results []ApplyWriteResult
224
225
var blobs []lexutil.LexLink
226
for _, entry := range entries {
···
228
Columns: []clause.Column{{Name: "did"}, {Name: "nsid"}, {Name: "rkey"}},
229
UpdateAll: true,
230
}).Create(&entry).Error; err != nil {
231
+
return nil, err
232
}
233
234
// we should actually check the type (i.e. delete, create,., update) here but we'll do it later
235
cids, err := rm.incrementBlobRefs(urepo, entry.Value)
236
if err != nil {
237
+
return nil, err
238
}
239
240
for _, c := range cids {
241
blobs = append(blobs, lexutil.LexLink(c))
242
}
243
+
244
+
results = append(results, ApplyWriteResult{
245
+
Uri: "at://" + urepo.Did + "/" + entry.Nsid + "/" + entry.Rkey,
246
+
Cid: entry.Cid,
247
+
Commit: &RepoCommit{
248
+
Cid: newroot.String(),
249
+
Rev: rev,
250
+
},
251
+
ValidationStatus: to.StringPtr("valid"), // TODO: obviously this might not be true atm lol
252
+
})
253
}
254
255
rm.s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{
···
267
})
268
269
if err := dbs.UpdateRepo(context.TODO(), newroot, rev); err != nil {
270
+
return nil, err
271
}
272
273
+
return results, nil
274
}
275
276
func (rm *RepoMan) getRecordProof(urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) {