···9696}
97979898// TODO make use of swap commit
9999-func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) {
9999+func (rm *RepoMan) applyWrites(ctx context.Context, urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) {
100100 rootcid, err := cid.Cast(urepo.Root)
101101 if err != nil {
102102 return nil, err
···106106 bs := recording_blockstore.New(dbs)
107107 r, err := repo.OpenRepo(context.TODO(), bs, rootcid)
108108109109- entries := []models.Record{}
110109 var results []ApplyWriteResult
111110111111+ entries := make([]models.Record, 0, len(writes))
112112 for i, op := range writes {
113113+ // updates or deletes must supply an rkey
113114 if op.Type != OpTypeCreate && op.Rkey == nil {
114115 return nil, fmt.Errorf("invalid rkey")
115116 } else if op.Type == OpTypeCreate && op.Rkey != nil {
116116- _, _, err := r.GetRecord(context.TODO(), op.Collection+"/"+*op.Rkey)
117117+ // we should conver this op to an update if the rkey already exists
118118+ _, _, err := r.GetRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey))
117119 if err == nil {
118120 op.Type = OpTypeUpdate
119121 }
120122 } else if op.Rkey == nil {
123123+ // creates that don't supply an rkey will have one generated for them
121124 op.Rkey = to.StringPtr(rm.clock.Next().String())
122125 writes[i].Rkey = op.Rkey
123126 }
124127128128+ // validate the record key is actually valid
125129 _, err := syntax.ParseRecordKey(*op.Rkey)
126130 if err != nil {
127131 return nil, err
···129133130134 switch op.Type {
131135 case OpTypeCreate:
132132- j, err := json.Marshal(*op.Record)
136136+ // HACK: this fixes some type conversions, mainly around integers
137137+ // first we convert to json bytes
138138+ b, err := json.Marshal(*op.Record)
133139 if err != nil {
134140 return nil, err
135141 }
136136- out, err := atdata.UnmarshalJSON(j)
142142+ // then we use atdata.UnmarshalJSON to convert it back to a map
143143+ out, err := atdata.UnmarshalJSON(b)
137144 if err != nil {
138145 return nil, err
139146 }
147147+ // finally we can cast to a MarshalableMap
140148 mm := MarshalableMap(out)
141149142150 // HACK: if a record doesn't contain a $type, we can manually set it here based on the op's collection
151151+ // i forget why this is actually necessary?
143152 if mm["$type"] == "" {
144153 mm["$type"] = op.Collection
145154 }
146155147147- nc, err := r.PutRecord(context.TODO(), op.Collection+"/"+*op.Rkey, &mm)
156156+ nc, err := r.PutRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey), &mm)
148157 if err != nil {
149158 return nil, err
150159 }
160160+151161 d, err := atdata.MarshalCBOR(mm)
152162 if err != nil {
153163 return nil, err
154164 }
165165+155166 entries = append(entries, models.Record{
156167 Did: urepo.Did,
157168 CreatedAt: rm.clock.Next().String(),
···160171 Cid: nc.String(),
161172 Value: d,
162173 })
174174+163175 results = append(results, ApplyWriteResult{
164176 Type: to.StringPtr(OpTypeCreate.String()),
165177 Uri: to.StringPtr("at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey),
···167179 ValidationStatus: to.StringPtr("valid"), // TODO: obviously this might not be true atm lol
168180 })
169181 case OpTypeDelete:
182182+ // try to find the old record in the database
170183 var old models.Record
171184 if err := rm.db.Raw("SELECT value FROM records WHERE did = ? AND nsid = ? AND rkey = ?", nil, urepo.Did, op.Collection, op.Rkey).Scan(&old).Error; err != nil {
172185 return nil, err
173186 }
187187+188188+ // TODO: this is really confusing, and looking at it i have no idea why i did this. below when we are doing deletes, we
189189+ // check if `cid` here is nil to indicate if we should delete. that really doesn't make much sense and its super illogical
190190+ // when reading this code. i dont feel like fixing right now though so
174191 entries = append(entries, models.Record{
175192 Did: urepo.Did,
176193 Nsid: op.Collection,
177194 Rkey: *op.Rkey,
178195 Value: old.Value,
179196 })
197197+198198+ // delete the record from the repo
180199 err := r.DeleteRecord(context.TODO(), op.Collection+"/"+*op.Rkey)
181200 if err != nil {
182201 return nil, err
183202 }
203203+204204+ // add a result for the delete
184205 results = append(results, ApplyWriteResult{
185206 Type: to.StringPtr(OpTypeDelete.String()),
186207 })
187208 case OpTypeUpdate:
188188- j, err := json.Marshal(*op.Record)
209209+ // HACK: same hack as above for type fixes
210210+ b, err := json.Marshal(*op.Record)
189211 if err != nil {
190212 return nil, err
191213 }
192192- out, err := atdata.UnmarshalJSON(j)
214214+ out, err := atdata.UnmarshalJSON(b)
193215 if err != nil {
194216 return nil, err
195217 }
196218 mm := MarshalableMap(out)
197197- nc, err := r.UpdateRecord(context.TODO(), op.Collection+"/"+*op.Rkey, &mm)
219219+220220+ nc, err := r.UpdateRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey), &mm)
198221 if err != nil {
199222 return nil, err
200223 }
224224+201225 d, err := atdata.MarshalCBOR(mm)
202226 if err != nil {
203227 return nil, err
204228 }
229229+205230 entries = append(entries, models.Record{
206231 Did: urepo.Did,
207232 CreatedAt: rm.clock.Next().String(),
···210235 Cid: nc.String(),
211236 Value: d,
212237 })
238238+213239 results = append(results, ApplyWriteResult{
214240 Type: to.StringPtr(OpTypeUpdate.String()),
215241 Uri: to.StringPtr("at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey),
···219245 }
220246 }
221247222222- newroot, rev, err := r.Commit(context.TODO(), urepo.SignFor)
248248+ // commit and get the new root
249249+ newroot, rev, err := r.Commit(ctx, urepo.SignFor)
223250 if err != nil {
224251 return nil, err
225252 }
226253254254+ // create a buffer for dumping our new cbor into
227255 buf := new(bytes.Buffer)
228256257257+ // first write the car header to the buffer
229258 hb, err := cbor.DumpObject(&car.CarHeader{
230259 Roots: []cid.Cid{newroot},
231260 Version: 1,
232261 })
233233-234262 if _, err := carstore.LdWrite(buf, hb); err != nil {
235263 return nil, err
236264 }
237265266266+ // get a diff of the changes to the repo
238267 diffops, err := r.DiffSince(context.TODO(), rootcid)
239268 if err != nil {
240269 return nil, err
241270 }
242271272272+ // create the repo ops for the given diff
243273 ops := make([]*atproto.SyncSubscribeRepos_RepoOp, 0, len(diffops))
244244-245274 for _, op := range diffops {
246275 var c cid.Cid
247276 switch op.Op {
···270299 })
271300 }
272301273273- blk, err := dbs.Get(context.TODO(), c)
302302+ blk, err := dbs.Get(ctx, c)
274303 if err != nil {
275304 return nil, err
276305 }
277306307307+ // write the block to the buffer
278308 if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil {
279309 return nil, err
280310 }
281311 }
282312313313+ // write the writelog to the buffer
283314 for _, op := range bs.GetWriteLog() {
284315 if _, err := carstore.LdWrite(buf, op.Cid().Bytes(), op.RawData()); err != nil {
285316 return nil, err
286317 }
287318 }
288319320320+ // blob blob blob blob blob :3
289321 var blobs []lexutil.LexLink
290322 for _, entry := range entries {
291323 var cids []cid.Cid
324324+ // whenever there is cid present, we know it's a create (dumb)
292325 if entry.Cid != "" {
293326 if err := rm.s.db.Create(&entry, []clause.Expression{clause.OnConflict{
294327 Columns: []clause.Column{{Name: "did"}, {Name: "nsid"}, {Name: "rkey"}},
···297330 return nil, err
298331 }
299332333333+ // increment the given blob refs, yay
300334 cids, err = rm.incrementBlobRefs(urepo, entry.Value)
301335 if err != nil {
302336 return nil, err
303337 }
304338 } else {
339339+ // as i noted above this is dumb. but we delete whenever the cid is nil. it works solely becaue the pkey
340340+ // is did + collection + rkey. i still really want to separate that out, or use a different type to make
341341+ // this less confusing/easy to read. alas, its 2 am and yea no
305342 if err := rm.s.db.Delete(&entry, nil).Error; err != nil {
306343 return nil, err
307344 }
345345+346346+ // TODO:
308347 cids, err = rm.decrementBlobRefs(urepo, entry.Value)
309348 if err != nil {
310349 return nil, err
311350 }
312351 }
313352353353+ // add all the relevant blobs to the blobs list of blobs. blob ^.^
314354 for _, c := range cids {
315355 blobs = append(blobs, lexutil.LexLink(c))
316356 }
317357 }
318358319319- rm.s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{
359359+ // NOTE: using the request ctx seems a bit suss here, so using a background context. i'm not sure if this
360360+ // runs sync or not
361361+ rm.s.evtman.AddEvent(context.Background(), &events.XRPCStreamEvent{
320362 RepoCommit: &atproto.SyncSubscribeRepos_Commit{
321363 Repo: urepo.Did,
322364 Blocks: buf.Bytes(),
···330372 },
331373 })
332374333333- if err := rm.s.UpdateRepo(context.TODO(), urepo.Did, newroot, rev); err != nil {
375375+ if err := rm.s.UpdateRepo(ctx, urepo.Did, newroot, rev); err != nil {
334376 return nil, err
335377 }
336378···345387 return results, nil
346388}
347389348348-func (rm *RepoMan) getRecordProof(urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) {
390390+// this is a fun little guy. to get a proof, we need to read the record out of the blockstore and record how we actually
391391+// got to the guy. we'll wrap a new blockstore in a recording blockstore, then return the log for proof
392392+func (rm *RepoMan) getRecordProof(ctx context.Context, urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) {
349393 c, err := cid.Cast(urepo.Root)
350394 if err != nil {
351395 return cid.Undef, nil, err
···354398 dbs := rm.s.getBlockstore(urepo.Did)
355399 bs := recording_blockstore.New(dbs)
356400357357- r, err := repo.OpenRepo(context.TODO(), bs, c)
401401+ r, err := repo.OpenRepo(ctx, bs, c)
358402 if err != nil {
359403 return cid.Undef, nil, err
360404 }
361405362362- _, _, err = r.GetRecordBytes(context.TODO(), collection+"/"+rkey)
406406+ _, _, err = r.GetRecordBytes(ctx, fmt.Sprintf("%s/%s", collection, rkey))
363407 if err != nil {
364408 return cid.Undef, nil, err
365409 }
···397441 return nil, err
398442 }
399443444444+ // TODO: this does _not_ handle deletions of blobs that are on s3 storage!!!! we need to get the blob, see what
445445+ // storage it is in, and clean up s3!!!!
400446 if res.Count == 0 {
401447 if err := rm.db.Exec("DELETE FROM blobs WHERE id = ?", nil, res.ID).Error; err != nil {
402448 return nil, err