···96}
9798// TODO make use of swap commit
99+func (rm *RepoMan) applyWrites(ctx context.Context, urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) {
100 rootcid, err := cid.Cast(urepo.Root)
101 if err != nil {
102 return nil, err
···106 bs := recording_blockstore.New(dbs)
107 r, err := repo.OpenRepo(context.TODO(), bs, rootcid)
1080109 var results []ApplyWriteResult
110111+ entries := make([]models.Record, 0, len(writes))
112 for i, op := range writes {
113+ // updates or deletes must supply an rkey
114 if op.Type != OpTypeCreate && op.Rkey == nil {
115 return nil, fmt.Errorf("invalid rkey")
116 } else if op.Type == OpTypeCreate && op.Rkey != nil {
117+ // we should conver this op to an update if the rkey already exists
118+ _, _, err := r.GetRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey))
119 if err == nil {
120 op.Type = OpTypeUpdate
121 }
122 } else if op.Rkey == nil {
123+ // creates that don't supply an rkey will have one generated for them
124 op.Rkey = to.StringPtr(rm.clock.Next().String())
125 writes[i].Rkey = op.Rkey
126 }
127128+ // validate the record key is actually valid
129 _, err := syntax.ParseRecordKey(*op.Rkey)
130 if err != nil {
131 return nil, err
···133134 switch op.Type {
135 case OpTypeCreate:
136+ // HACK: this fixes some type conversions, mainly around integers
137+ // first we convert to json bytes
138+ b, err := json.Marshal(*op.Record)
139 if err != nil {
140 return nil, err
141 }
142+ // then we use atdata.UnmarshalJSON to convert it back to a map
143+ out, err := atdata.UnmarshalJSON(b)
144 if err != nil {
145 return nil, err
146 }
147+ // finally we can cast to a MarshalableMap
148 mm := MarshalableMap(out)
149150 // HACK: if a record doesn't contain a $type, we can manually set it here based on the op's collection
151+ // i forget why this is actually necessary?
152 if mm["$type"] == "" {
153 mm["$type"] = op.Collection
154 }
155156+ nc, err := r.PutRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey), &mm)
157 if err != nil {
158 return nil, err
159 }
160+161 d, err := atdata.MarshalCBOR(mm)
162 if err != nil {
163 return nil, err
164 }
165+166 entries = append(entries, models.Record{
167 Did: urepo.Did,
168 CreatedAt: rm.clock.Next().String(),
···171 Cid: nc.String(),
172 Value: d,
173 })
174+175 results = append(results, ApplyWriteResult{
176 Type: to.StringPtr(OpTypeCreate.String()),
177 Uri: to.StringPtr("at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey),
···179 ValidationStatus: to.StringPtr("valid"), // TODO: obviously this might not be true atm lol
180 })
181 case OpTypeDelete:
182+ // try to find the old record in the database
183 var old models.Record
184 if err := rm.db.Raw("SELECT value FROM records WHERE did = ? AND nsid = ? AND rkey = ?", nil, urepo.Did, op.Collection, op.Rkey).Scan(&old).Error; err != nil {
185 return nil, err
186 }
187+188+ // TODO: this is really confusing, and looking at it i have no idea why i did this. below when we are doing deletes, we
189+ // check if `cid` here is nil to indicate if we should delete. that really doesn't make much sense and its super illogical
190+ // when reading this code. i dont feel like fixing right now though so
191 entries = append(entries, models.Record{
192 Did: urepo.Did,
193 Nsid: op.Collection,
194 Rkey: *op.Rkey,
195 Value: old.Value,
196 })
197+198+ // delete the record from the repo
199 err := r.DeleteRecord(context.TODO(), op.Collection+"/"+*op.Rkey)
200 if err != nil {
201 return nil, err
202 }
203+204+ // add a result for the delete
205 results = append(results, ApplyWriteResult{
206 Type: to.StringPtr(OpTypeDelete.String()),
207 })
208 case OpTypeUpdate:
209+ // HACK: same hack as above for type fixes
210+ b, err := json.Marshal(*op.Record)
211 if err != nil {
212 return nil, err
213 }
214+ out, err := atdata.UnmarshalJSON(b)
215 if err != nil {
216 return nil, err
217 }
218 mm := MarshalableMap(out)
219+220+ nc, err := r.UpdateRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey), &mm)
221 if err != nil {
222 return nil, err
223 }
224+225 d, err := atdata.MarshalCBOR(mm)
226 if err != nil {
227 return nil, err
228 }
229+230 entries = append(entries, models.Record{
231 Did: urepo.Did,
232 CreatedAt: rm.clock.Next().String(),
···235 Cid: nc.String(),
236 Value: d,
237 })
238+239 results = append(results, ApplyWriteResult{
240 Type: to.StringPtr(OpTypeUpdate.String()),
241 Uri: to.StringPtr("at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey),
···245 }
246 }
247248+ // commit and get the new root
249+ newroot, rev, err := r.Commit(ctx, urepo.SignFor)
250 if err != nil {
251 return nil, err
252 }
253254+ // create a buffer for dumping our new cbor into
255 buf := new(bytes.Buffer)
256257+ // first write the car header to the buffer
258 hb, err := cbor.DumpObject(&car.CarHeader{
259 Roots: []cid.Cid{newroot},
260 Version: 1,
261 })
0262 if _, err := carstore.LdWrite(buf, hb); err != nil {
263 return nil, err
264 }
265266+ // get a diff of the changes to the repo
267 diffops, err := r.DiffSince(context.TODO(), rootcid)
268 if err != nil {
269 return nil, err
270 }
271272+ // create the repo ops for the given diff
273 ops := make([]*atproto.SyncSubscribeRepos_RepoOp, 0, len(diffops))
0274 for _, op := range diffops {
275 var c cid.Cid
276 switch op.Op {
···299 })
300 }
301302+ blk, err := dbs.Get(ctx, c)
303 if err != nil {
304 return nil, err
305 }
306307+ // write the block to the buffer
308 if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil {
309 return nil, err
310 }
311 }
312313+ // write the writelog to the buffer
314 for _, op := range bs.GetWriteLog() {
315 if _, err := carstore.LdWrite(buf, op.Cid().Bytes(), op.RawData()); err != nil {
316 return nil, err
317 }
318 }
319320+ // blob blob blob blob blob :3
321 var blobs []lexutil.LexLink
322 for _, entry := range entries {
323 var cids []cid.Cid
324+ // whenever there is cid present, we know it's a create (dumb)
325 if entry.Cid != "" {
326 if err := rm.s.db.Create(&entry, []clause.Expression{clause.OnConflict{
327 Columns: []clause.Column{{Name: "did"}, {Name: "nsid"}, {Name: "rkey"}},
···330 return nil, err
331 }
332333+ // increment the given blob refs, yay
334 cids, err = rm.incrementBlobRefs(urepo, entry.Value)
335 if err != nil {
336 return nil, err
337 }
338 } else {
339+ // as i noted above this is dumb. but we delete whenever the cid is nil. it works solely becaue the pkey
340+ // is did + collection + rkey. i still really want to separate that out, or use a different type to make
341+ // this less confusing/easy to read. alas, its 2 am and yea no
342 if err := rm.s.db.Delete(&entry, nil).Error; err != nil {
343 return nil, err
344 }
345+346+ // TODO:
347 cids, err = rm.decrementBlobRefs(urepo, entry.Value)
348 if err != nil {
349 return nil, err
350 }
351 }
352353+ // add all the relevant blobs to the blobs list of blobs. blob ^.^
354 for _, c := range cids {
355 blobs = append(blobs, lexutil.LexLink(c))
356 }
357 }
358359+ // NOTE: using the request ctx seems a bit suss here, so using a background context. i'm not sure if this
360+ // runs sync or not
361+ rm.s.evtman.AddEvent(context.Background(), &events.XRPCStreamEvent{
362 RepoCommit: &atproto.SyncSubscribeRepos_Commit{
363 Repo: urepo.Did,
364 Blocks: buf.Bytes(),
···372 },
373 })
374375+ if err := rm.s.UpdateRepo(ctx, urepo.Did, newroot, rev); err != nil {
376 return nil, err
377 }
378···387 return results, nil
388}
389390+// this is a fun little guy. to get a proof, we need to read the record out of the blockstore and record how we actually
391+// got to the guy. we'll wrap a new blockstore in a recording blockstore, then return the log for proof
392+func (rm *RepoMan) getRecordProof(ctx context.Context, urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) {
393 c, err := cid.Cast(urepo.Root)
394 if err != nil {
395 return cid.Undef, nil, err
···398 dbs := rm.s.getBlockstore(urepo.Did)
399 bs := recording_blockstore.New(dbs)
400401+ r, err := repo.OpenRepo(ctx, bs, c)
402 if err != nil {
403 return cid.Undef, nil, err
404 }
405406+ _, _, err = r.GetRecordBytes(ctx, fmt.Sprintf("%s/%s", collection, rkey))
407 if err != nil {
408 return cid.Undef, nil, err
409 }
···441 return nil, err
442 }
443444+ // TODO: this does _not_ handle deletions of blobs that are on s3 storage!!!! we need to get the blob, see what
445+ // storage it is in, and clean up s3!!!!
446 if res.Count == 0 {
447 if err := rm.db.Exec("DELETE FROM blobs WHERE id = ?", nil, res.ID).Error; err != nil {
448 return nil, err