Vow, uncensorable PDS written in Go

switch to use the new repo package (#71)

* Switch to the new repo lib

* Fix nil pointer

* caching

* clean

authored by hailey.at and committed by

GitHub ac830a5b ae5df424

+411 -181
+4 -5
server/handle_import_repo.go
··· 8 "strings" 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 - "github.com/bluesky-social/indigo/repo" 12 "github.com/haileyok/cocoon/internal/helpers" 13 "github.com/haileyok/cocoon/models" 14 blocks "github.com/ipfs/go-block-format" ··· 60 return helpers.ServerError(e, nil) 61 } 62 63 - r, err := repo.OpenRepo(context.TODO(), bs, cs.Header.Roots[0]) 64 if err != nil { 65 logger.Error("could not open repo", "error", err) 66 return helpers.ServerError(e, nil) ··· 70 71 clock := syntax.NewTIDClock(0) 72 73 - if err := r.ForEach(context.TODO(), "", func(key string, cid cid.Cid) error { 74 - pts := strings.Split(key, "/") 75 nsid := pts[0] 76 rkey := pts[1] 77 cidStr := cid.String() ··· 103 104 tx.Commit() 105 106 - root, rev, err := r.Commit(context.TODO(), urepo.SignFor) 107 if err != nil { 108 logger.Error("error committing", "error", err) 109 return helpers.ServerError(e, nil)
··· 8 "strings" 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "github.com/haileyok/cocoon/internal/helpers" 12 "github.com/haileyok/cocoon/models" 13 blocks "github.com/ipfs/go-block-format" ··· 59 return helpers.ServerError(e, nil) 60 } 61 62 + r, err := openRepo(context.TODO(), bs, cs.Header.Roots[0], urepo.Repo.Did) 63 if err != nil { 64 logger.Error("could not open repo", "error", err) 65 return helpers.ServerError(e, nil) ··· 69 70 clock := syntax.NewTIDClock(0) 71 72 + if err := r.MST.Walk(func(key []byte, cid cid.Cid) error { 73 + pts := strings.Split(string(key), "/") 74 nsid := pts[0] 75 rkey := pts[1] 76 cidStr := cid.String() ··· 102 103 tx.Commit() 104 105 + root, rev, err := commitRepo(context.TODO(), bs, r, urepo.Repo.SigningKey) 106 if err != nil { 107 logger.Error("error committing", "error", err) 108 return helpers.ServerError(e, nil)
+12 -3
server/handle_server_create_account.go
··· 10 "github.com/Azure/go-autorest/autorest/to" 11 "github.com/bluesky-social/indigo/api/atproto" 12 "github.com/bluesky-social/indigo/atproto/atcrypto" 13 "github.com/bluesky-social/indigo/events" 14 - "github.com/bluesky-social/indigo/repo" 15 "github.com/bluesky-social/indigo/util" 16 "github.com/haileyok/cocoon/internal/helpers" 17 "github.com/haileyok/cocoon/models" ··· 220 221 if request.Did == nil || *request.Did == "" { 222 bs := s.getBlockstore(signupDid) 223 - r := repo.NewRepo(context.TODO(), signupDid, bs) 224 225 - root, rev, err := r.Commit(context.TODO(), urepo.SignFor) 226 if err != nil { 227 logger.Error("error committing", "error", err) 228 return helpers.ServerError(e, nil)
··· 10 "github.com/Azure/go-autorest/autorest/to" 11 "github.com/bluesky-social/indigo/api/atproto" 12 "github.com/bluesky-social/indigo/atproto/atcrypto" 13 + atp "github.com/bluesky-social/indigo/atproto/repo" 14 + "github.com/bluesky-social/indigo/atproto/repo/mst" 15 + "github.com/bluesky-social/indigo/atproto/syntax" 16 "github.com/bluesky-social/indigo/events" 17 "github.com/bluesky-social/indigo/util" 18 "github.com/haileyok/cocoon/internal/helpers" 19 "github.com/haileyok/cocoon/models" ··· 222 223 if request.Did == nil || *request.Did == "" { 224 bs := s.getBlockstore(signupDid) 225 + 226 + clk := syntax.NewTIDClock(0) 227 + r := &atp.Repo{ 228 + DID: syntax.DID(signupDid), 229 + Clock: clk, 230 + MST: mst.NewEmptyTree(), 231 + RecordStore: bs, 232 + } 233 234 + root, rev, err := commitRepo(context.TODO(), bs, r, urepo.SigningKey) 235 if err != nil { 236 logger.Error("error committing", "error", err) 237 return helpers.ServerError(e, nil)
+391 -167
server/repo.go
··· 6 "encoding/json" 7 "fmt" 8 "io" 9 "time" 10 11 "github.com/Azure/go-autorest/autorest/to" 12 "github.com/bluesky-social/indigo/api/atproto" 13 "github.com/bluesky-social/indigo/atproto/atdata" 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/bluesky-social/indigo/carstore" 16 "github.com/bluesky-social/indigo/events" 17 lexutil "github.com/bluesky-social/indigo/lex/util" 18 - "github.com/bluesky-social/indigo/repo" 19 "github.com/haileyok/cocoon/internal/db" 20 "github.com/haileyok/cocoon/metrics" 21 "github.com/haileyok/cocoon/models" 22 "github.com/haileyok/cocoon/recording_blockstore" 23 blocks "github.com/ipfs/go-block-format" 24 "github.com/ipfs/go-cid" 25 cbor "github.com/ipfs/go-ipld-cbor" 26 "github.com/ipld/go-car" 27 "gorm.io/gorm/clause" 28 ) 29 30 type RepoMan struct { 31 db *db.DB 32 s *Server 33 clock *syntax.TIDClock 34 } 35 36 func NewRepoMan(s *Server) *RepoMan { ··· 40 s: s, 41 db: s.db, 42 clock: clock, 43 } 44 } 45 46 type OpType string 47 48 var ( ··· 96 Rev string `json:"rev"` 97 } 98 99 // TODO make use of swap commit 100 func (rm *RepoMan) applyWrites(ctx context.Context, urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) { 101 rootcid, err := cid.Cast(urepo.Root) ··· 105 106 dbs := rm.s.getBlockstore(urepo.Did) 107 bs := recording_blockstore.New(dbs) 108 - r, err := repo.OpenRepo(ctx, bs, rootcid) 109 110 var results []ApplyWriteResult 111 112 - entries := make([]models.Record, 0, len(writes)) 113 - for i, op := range writes { 114 - // updates or deletes must supply an rkey 115 - if op.Type != OpTypeCreate && op.Rkey == nil { 116 - return nil, fmt.Errorf("invalid rkey") 117 - } else if op.Type == OpTypeCreate && op.Rkey != nil { 118 - // we should conver this op to an update if the rkey already exists 119 - _, _, err := r.GetRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey)) 120 - if err == nil { 121 - op.Type = OpTypeUpdate 122 } 123 - } else if op.Rkey == nil { 124 - // creates that don't supply an rkey will have one generated for them 125 - op.Rkey = to.StringPtr(rm.clock.Next().String()) 126 - writes[i].Rkey = op.Rkey 127 - } 128 129 - // validate the record key is actually valid 130 - _, err := syntax.ParseRecordKey(*op.Rkey) 131 - if err != nil { 132 - return nil, err 133 - } 134 135 - switch op.Type { 136 - case OpTypeCreate: 137 - // HACK: this fixes some type conversions, mainly around integers 138 - // first we convert to json bytes 139 - b, err := json.Marshal(*op.Record) 140 if err != nil { 141 - return nil, err 142 } 143 - // then we use atdata.UnmarshalJSON to convert it back to a map 144 - out, err := atdata.UnmarshalJSON(b) 145 - if err != nil { 146 - return nil, err 147 - } 148 - // finally we can cast to a MarshalableMap 149 - mm := MarshalableMap(out) 150 151 - // HACK: if a record doesn't contain a $type, we can manually set it here based on the op's collection 152 - // i forget why this is actually necessary? 153 - if mm["$type"] == "" { 154 - mm["$type"] = op.Collection 155 - } 156 157 - nc, err := r.PutRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey), &mm) 158 - if err != nil { 159 - return nil, err 160 - } 161 162 - d, err := atdata.MarshalCBOR(mm) 163 - if err != nil { 164 - return nil, err 165 - } 166 167 - entries = append(entries, models.Record{ 168 - Did: urepo.Did, 169 - CreatedAt: rm.clock.Next().String(), 170 - Nsid: op.Collection, 171 - Rkey: *op.Rkey, 172 - Cid: nc.String(), 173 - Value: d, 174 - }) 175 176 - results = append(results, ApplyWriteResult{ 177 - Type: to.StringPtr(OpTypeCreate.String()), 178 - Uri: to.StringPtr("at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey), 179 - Cid: to.StringPtr(nc.String()), 180 - ValidationStatus: to.StringPtr("valid"), // TODO: obviously this might not be true atm lol 181 - }) 182 - case OpTypeDelete: 183 - // try to find the old record in the database 184 - var old models.Record 185 - if err := rm.db.Raw(ctx, "SELECT value FROM records WHERE did = ? AND nsid = ? AND rkey = ?", nil, urepo.Did, op.Collection, op.Rkey).Scan(&old).Error; err != nil { 186 - return nil, err 187 - } 188 189 - // TODO: this is really confusing, and looking at it i have no idea why i did this. below when we are doing deletes, we 190 - // check if `cid` here is nil to indicate if we should delete. that really doesn't make much sense and its super illogical 191 - // when reading this code. i dont feel like fixing right now though so 192 - entries = append(entries, models.Record{ 193 - Did: urepo.Did, 194 - Nsid: op.Collection, 195 - Rkey: *op.Rkey, 196 - Value: old.Value, 197 - }) 198 199 - // delete the record from the repo 200 - err := r.DeleteRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey)) 201 - if err != nil { 202 - return nil, err 203 - } 204 205 - // add a result for the delete 206 - results = append(results, ApplyWriteResult{ 207 - Type: to.StringPtr(OpTypeDelete.String()), 208 - }) 209 - case OpTypeUpdate: 210 - // HACK: same hack as above for type fixes 211 - b, err := json.Marshal(*op.Record) 212 - if err != nil { 213 - return nil, err 214 - } 215 - out, err := atdata.UnmarshalJSON(b) 216 - if err != nil { 217 - return nil, err 218 - } 219 - mm := MarshalableMap(out) 220 221 - nc, err := r.UpdateRecord(ctx, fmt.Sprintf("%s/%s", op.Collection, *op.Rkey), &mm) 222 - if err != nil { 223 - return nil, err 224 - } 225 226 - d, err := atdata.MarshalCBOR(mm) 227 - if err != nil { 228 - return nil, err 229 - } 230 231 - entries = append(entries, models.Record{ 232 - Did: urepo.Did, 233 - CreatedAt: rm.clock.Next().String(), 234 - Nsid: op.Collection, 235 - Rkey: *op.Rkey, 236 - Cid: nc.String(), 237 - Value: d, 238 - }) 239 240 - results = append(results, ApplyWriteResult{ 241 - Type: to.StringPtr(OpTypeUpdate.String()), 242 - Uri: to.StringPtr("at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey), 243 - Cid: to.StringPtr(nc.String()), 244 - ValidationStatus: to.StringPtr("valid"), // TODO: obviously this might not be true atm lol 245 - }) 246 } 247 - } 248 249 - // commit and get the new root 250 - newroot, rev, err := r.Commit(ctx, urepo.SignFor) 251 - if err != nil { 252 return nil, err 253 } 254 ··· 270 return nil, err 271 } 272 273 - // get a diff of the changes to the repo 274 - diffops, err := r.DiffSince(ctx, rootcid) 275 - if err != nil { 276 - return nil, err 277 - } 278 - 279 - // create the repo ops for the given diff 280 - ops := make([]*atproto.SyncSubscribeRepos_RepoOp, 0, len(diffops)) 281 - for _, op := range diffops { 282 - var c cid.Cid 283 - switch op.Op { 284 - case "add", "mut": 285 kind := "create" 286 - if op.Op == "mut" { 287 kind = "update" 288 } 289 290 - c = op.NewCid 291 - ll := lexutil.LexLink(op.NewCid) 292 - ops = append(ops, &atproto.SyncSubscribeRepos_RepoOp{ 293 Action: kind, 294 - Path: op.Rpath, 295 Cid: &ll, 296 }) 297 298 - case "del": 299 - c = op.OldCid 300 - ll := lexutil.LexLink(op.OldCid) 301 - ops = append(ops, &atproto.SyncSubscribeRepos_RepoOp{ 302 Action: "delete", 303 - Path: op.Rpath, 304 Cid: nil, 305 Prev: &ll, 306 }) 307 - } 308 309 - blk, err := dbs.Get(ctx, c) 310 - if err != nil { 311 - return nil, err 312 - } 313 - 314 - // write the block to the buffer 315 - if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil { 316 - return nil, err 317 } 318 } 319 320 // write the writelog to the buffer 321 - for _, op := range bs.GetWriteLog() { 322 - if _, err := carstore.LdWrite(buf, op.Cid().Bytes(), op.RawData()); err != nil { 323 return nil, err 324 } 325 } ··· 374 Since: &urepo.Rev, 375 Commit: lexutil.LexLink(newroot), 376 Time: time.Now().Format(time.RFC3339Nano), 377 - Ops: ops, 378 TooBig: false, 379 }, 380 }) ··· 394 return results, nil 395 } 396 397 - // this is a fun little guy. to get a proof, we need to read the record out of the blockstore and record how we actually 398 - // got to the guy. we'll wrap a new blockstore in a recording blockstore, then return the log for proof 399 func (rm *RepoMan) getRecordProof(ctx context.Context, urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) { 400 - c, err := cid.Cast(urepo.Root) 401 if err != nil { 402 return cid.Undef, nil, err 403 } 404 405 dbs := rm.s.getBlockstore(urepo.Did) 406 - bs := recording_blockstore.New(dbs) 407 408 - r, err := repo.OpenRepo(ctx, bs, c) 409 - if err != nil { 410 return cid.Undef, nil, err 411 } 412 413 - _, _, err = r.GetRecordBytes(ctx, fmt.Sprintf("%s/%s", collection, rkey)) 414 - if err != nil { 415 - return cid.Undef, nil, err 416 } 417 418 - return c, bs.GetReadLog(), nil 419 } 420 421 func (rm *RepoMan) incrementBlobRefs(ctx context.Context, urepo models.Repo, cbor []byte) ([]cid.Cid, error) {
··· 6 "encoding/json" 7 "fmt" 8 "io" 9 + "sync" 10 "time" 11 12 "github.com/Azure/go-autorest/autorest/to" 13 "github.com/bluesky-social/indigo/api/atproto" 14 + "github.com/bluesky-social/indigo/atproto/atcrypto" 15 "github.com/bluesky-social/indigo/atproto/atdata" 16 + atp "github.com/bluesky-social/indigo/atproto/repo" 17 + "github.com/bluesky-social/indigo/atproto/repo/mst" 18 "github.com/bluesky-social/indigo/atproto/syntax" 19 "github.com/bluesky-social/indigo/carstore" 20 "github.com/bluesky-social/indigo/events" 21 lexutil "github.com/bluesky-social/indigo/lex/util" 22 "github.com/haileyok/cocoon/internal/db" 23 "github.com/haileyok/cocoon/metrics" 24 "github.com/haileyok/cocoon/models" 25 "github.com/haileyok/cocoon/recording_blockstore" 26 blocks "github.com/ipfs/go-block-format" 27 "github.com/ipfs/go-cid" 28 + blockstore "github.com/ipfs/go-ipfs-blockstore" 29 cbor "github.com/ipfs/go-ipld-cbor" 30 "github.com/ipld/go-car" 31 + "github.com/multiformats/go-multihash" 32 "gorm.io/gorm/clause" 33 ) 34 35 + type cachedRepo struct { 36 + mu sync.Mutex 37 + repo *atp.Repo 38 + root cid.Cid 39 + } 40 + 41 type RepoMan struct { 42 db *db.DB 43 s *Server 44 clock *syntax.TIDClock 45 + 46 + cacheMu sync.Mutex 47 + cache map[string]*cachedRepo 48 } 49 50 func NewRepoMan(s *Server) *RepoMan { ··· 54 s: s, 55 db: s.db, 56 clock: clock, 57 + cache: make(map[string]*cachedRepo), 58 } 59 } 60 61 + func (rm *RepoMan) withRepo(ctx context.Context, did string, rootCid cid.Cid, fn func(r *atp.Repo) (newRoot cid.Cid, err error)) error { 62 + rm.cacheMu.Lock() 63 + cr, ok := rm.cache[did] 64 + if !ok { 65 + cr = &cachedRepo{} 66 + rm.cache[did] = cr 67 + } 68 + rm.cacheMu.Unlock() 69 + 70 + cr.mu.Lock() 71 + defer cr.mu.Unlock() 72 + 73 + if cr.repo == nil || cr.root != rootCid { 74 + bs := rm.s.getBlockstore(did) 75 + r, err := openRepo(ctx, bs, rootCid, did) 76 + if err != nil { 77 + return err 78 + } 79 + cr.repo = r 80 + cr.root = rootCid 81 + } 82 + 83 + newRoot, err := fn(cr.repo) 84 + if err != nil { 85 + // invalidate on error since the tree may be partially mutated 86 + cr.repo = nil 87 + cr.root = cid.Undef 88 + return err 89 + } 90 + 91 + cr.root = newRoot 92 + return nil 93 + } 94 + 95 type OpType string 96 97 var ( ··· 145 Rev string `json:"rev"` 146 } 147 148 + func openRepo(ctx context.Context, bs blockstore.Blockstore, rootCid cid.Cid, did string) (*atp.Repo, error) { 149 + commitBlock, err := bs.Get(ctx, rootCid) 150 + if err != nil { 151 + return nil, fmt.Errorf("reading commit block: %w", err) 152 + } 153 + 154 + var commit atp.Commit 155 + if err := commit.UnmarshalCBOR(bytes.NewReader(commitBlock.RawData())); err != nil { 156 + return nil, fmt.Errorf("parsing commit block: %w", err) 157 + } 158 + 159 + tree, err := mst.LoadTreeFromStore(ctx, bs, commit.Data) 160 + if err != nil { 161 + return nil, fmt.Errorf("loading MST: %w", err) 162 + } 163 + 164 + clk := syntax.ClockFromTID(syntax.TID(commit.Rev)) 165 + return &atp.Repo{ 166 + DID: syntax.DID(did), 167 + Clock: &clk, 168 + MST: *tree, 169 + RecordStore: bs, 170 + }, nil 171 + } 172 + 173 + func commitRepo(ctx context.Context, bs blockstore.Blockstore, r *atp.Repo, signingKey []byte) (cid.Cid, string, error) { 174 + if _, err := r.MST.WriteDiffBlocks(ctx, bs); err != nil { 175 + return cid.Undef, "", fmt.Errorf("writing MST blocks: %w", err) 176 + } 177 + 178 + commit, err := r.Commit() 179 + if err != nil { 180 + return cid.Undef, "", fmt.Errorf("creating commit: %w", err) 181 + } 182 + 183 + privkey, err := atcrypto.ParsePrivateBytesK256(signingKey) 184 + if err != nil { 185 + return cid.Undef, "", fmt.Errorf("parsing signing key: %w", err) 186 + } 187 + if err := commit.Sign(privkey); err != nil { 188 + return cid.Undef, "", fmt.Errorf("signing commit: %w", err) 189 + } 190 + 191 + buf := new(bytes.Buffer) 192 + if err := commit.MarshalCBOR(buf); err != nil { 193 + return cid.Undef, "", fmt.Errorf("marshaling commit: %w", err) 194 + } 195 + 196 + pref := cid.NewPrefixV1(cid.DagCBOR, multihash.SHA2_256) 197 + commitCid, err := pref.Sum(buf.Bytes()) 198 + if err != nil { 199 + return cid.Undef, "", fmt.Errorf("computing commit CID: %w", err) 200 + } 201 + 202 + blk, err := blocks.NewBlockWithCid(buf.Bytes(), commitCid) 203 + if err != nil { 204 + return cid.Undef, "", fmt.Errorf("creating commit block: %w", err) 205 + } 206 + if err := bs.Put(ctx, blk); err != nil { 207 + return cid.Undef, "", fmt.Errorf("writing commit block: %w", err) 208 + } 209 + 210 + return commitCid, commit.Rev, nil 211 + } 212 + 213 + func putRecordBlock(ctx context.Context, bs blockstore.Blockstore, rec *MarshalableMap) (cid.Cid, error) { 214 + buf := new(bytes.Buffer) 215 + if err := rec.MarshalCBOR(buf); err != nil { 216 + return cid.Undef, err 217 + } 218 + 219 + pref := cid.NewPrefixV1(cid.DagCBOR, multihash.SHA2_256) 220 + c, err := pref.Sum(buf.Bytes()) 221 + if err != nil { 222 + return cid.Undef, err 223 + } 224 + 225 + blk, err := blocks.NewBlockWithCid(buf.Bytes(), c) 226 + if err != nil { 227 + return cid.Undef, err 228 + } 229 + if err := bs.Put(ctx, blk); err != nil { 230 + return cid.Undef, err 231 + } 232 + 233 + return c, nil 234 + } 235 + 236 // TODO make use of swap commit 237 func (rm *RepoMan) applyWrites(ctx context.Context, urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) { 238 rootcid, err := cid.Cast(urepo.Root) ··· 242 243 dbs := rm.s.getBlockstore(urepo.Did) 244 bs := recording_blockstore.New(dbs) 245 246 var results []ApplyWriteResult 247 + var ops []*atp.Operation 248 + var entries []models.Record 249 + var newroot cid.Cid 250 + var rev string 251 252 + if err := rm.withRepo(ctx, urepo.Did, rootcid, func(r *atp.Repo) (cid.Cid, error) { 253 + entries = make([]models.Record, 0, len(writes)) 254 + for i, op := range writes { 255 + // updates or deletes must supply an rkey 256 + if op.Type != OpTypeCreate && op.Rkey == nil { 257 + return cid.Undef, fmt.Errorf("invalid rkey") 258 + } else if op.Type == OpTypeCreate && op.Rkey != nil { 259 + // we should convert this op to an update if the rkey already exists 260 + path := fmt.Sprintf("%s/%s", op.Collection, *op.Rkey) 261 + existing, _ := r.MST.Get([]byte(path)) 262 + if existing != nil { 263 + op.Type = OpTypeUpdate 264 + } 265 + } else if op.Rkey == nil { 266 + // creates that don't supply an rkey will have one generated for them 267 + op.Rkey = to.StringPtr(rm.clock.Next().String()) 268 + writes[i].Rkey = op.Rkey 269 } 270 271 + path := fmt.Sprintf("%s/%s", op.Collection, *op.Rkey) 272 273 + // validate the record key is actually valid 274 + _, err := syntax.ParseRecordKey(*op.Rkey) 275 if err != nil { 276 + return cid.Undef, err 277 } 278 279 + switch op.Type { 280 + case OpTypeCreate: 281 + // HACK: this fixes some type conversions, mainly around integers 282 + b, err := json.Marshal(*op.Record) 283 + if err != nil { 284 + return cid.Undef, err 285 + } 286 + out, err := atdata.UnmarshalJSON(b) 287 + if err != nil { 288 + return cid.Undef, err 289 + } 290 + mm := MarshalableMap(out) 291 292 + // HACK: if a record doesn't contain a $type, we can manually set it here based on the op's collection 293 + if mm["$type"] == "" { 294 + mm["$type"] = op.Collection 295 + } 296 297 + nc, err := putRecordBlock(ctx, bs, &mm) 298 + if err != nil { 299 + return cid.Undef, err 300 + } 301 302 + atpOp, err := atp.ApplyOp(&r.MST, path, &nc) 303 + if err != nil { 304 + return cid.Undef, err 305 + } 306 + ops = append(ops, atpOp) 307 308 + d, err := atdata.MarshalCBOR(mm) 309 + if err != nil { 310 + return cid.Undef, err 311 + } 312 313 + entries = append(entries, models.Record{ 314 + Did: urepo.Did, 315 + CreatedAt: rm.clock.Next().String(), 316 + Nsid: op.Collection, 317 + Rkey: *op.Rkey, 318 + Cid: nc.String(), 319 + Value: d, 320 + }) 321 322 + results = append(results, ApplyWriteResult{ 323 + Type: to.StringPtr(OpTypeCreate.String()), 324 + Uri: to.StringPtr("at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey), 325 + Cid: to.StringPtr(nc.String()), 326 + ValidationStatus: to.StringPtr("valid"), // TODO: obviously this might not be true atm lol 327 + }) 328 + case OpTypeDelete: 329 + // try to find the old record in the database 330 + var old models.Record 331 + if err := rm.db.Raw(ctx, "SELECT value FROM records WHERE did = ? AND nsid = ? AND rkey = ?", nil, urepo.Did, op.Collection, op.Rkey).Scan(&old).Error; err != nil { 332 + return cid.Undef, err 333 + } 334 335 + // TODO: this is really confusing, and looking at it i have no idea why i did this. below when we are doing deletes, we 336 + // check if `cid` here is nil to indicate if we should delete. that really doesn't make much sense and its super illogical 337 + // when reading this code. i dont feel like fixing right now though so 338 + entries = append(entries, models.Record{ 339 + Did: urepo.Did, 340 + Nsid: op.Collection, 341 + Rkey: *op.Rkey, 342 + Value: old.Value, 343 + }) 344 345 + atpOp, err := atp.ApplyOp(&r.MST, path, nil) 346 + if err != nil { 347 + return cid.Undef, err 348 + } 349 + ops = append(ops, atpOp) 350 351 + results = append(results, ApplyWriteResult{ 352 + Type: to.StringPtr(OpTypeDelete.String()), 353 + }) 354 + case OpTypeUpdate: 355 + // HACK: same hack as above for type fixes 356 + b, err := json.Marshal(*op.Record) 357 + if err != nil { 358 + return cid.Undef, err 359 + } 360 + out, err := atdata.UnmarshalJSON(b) 361 + if err != nil { 362 + return cid.Undef, err 363 + } 364 + mm := MarshalableMap(out) 365 366 + nc, err := putRecordBlock(ctx, bs, &mm) 367 + if err != nil { 368 + return cid.Undef, err 369 + } 370 + 371 + atpOp, err := atp.ApplyOp(&r.MST, path, &nc) 372 + if err != nil { 373 + return cid.Undef, err 374 + } 375 + ops = append(ops, atpOp) 376 + 377 + d, err := atdata.MarshalCBOR(mm) 378 + if err != nil { 379 + return cid.Undef, err 380 + } 381 + 382 + entries = append(entries, models.Record{ 383 + Did: urepo.Did, 384 + CreatedAt: rm.clock.Next().String(), 385 + Nsid: op.Collection, 386 + Rkey: *op.Rkey, 387 + Cid: nc.String(), 388 + Value: d, 389 + }) 390 391 + results = append(results, ApplyWriteResult{ 392 + Type: to.StringPtr(OpTypeUpdate.String()), 393 + Uri: to.StringPtr("at://" + urepo.Did + "/" + op.Collection + "/" + *op.Rkey), 394 + Cid: to.StringPtr(nc.String()), 395 + ValidationStatus: to.StringPtr("valid"), // TODO: obviously this might not be true atm lol 396 + }) 397 + } 398 } 399 + 400 + // commit and get the new root 401 + var commitErr error 402 + newroot, rev, commitErr = commitRepo(ctx, bs, r, urepo.SigningKey) 403 + if commitErr != nil { 404 + return cid.Undef, commitErr 405 + } 406 407 + return newroot, nil 408 + }); err != nil { 409 return nil, err 410 } 411 ··· 427 return nil, err 428 } 429 430 + // create the repo ops for the firehose from the tracked operations 431 + repoOps := make([]*atproto.SyncSubscribeRepos_RepoOp, 0, len(ops)) 432 + for _, op := range ops { 433 + if op.IsCreate() || op.IsUpdate() { 434 kind := "create" 435 + if op.IsUpdate() { 436 kind = "update" 437 } 438 439 + ll := lexutil.LexLink(*op.Value) 440 + repoOps = append(repoOps, &atproto.SyncSubscribeRepos_RepoOp{ 441 Action: kind, 442 + Path: op.Path, 443 Cid: &ll, 444 }) 445 446 + blk, err := dbs.Get(ctx, *op.Value) 447 + if err != nil { 448 + return nil, err 449 + } 450 + if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil { 451 + return nil, err 452 + } 453 + } else if op.IsDelete() { 454 + ll := lexutil.LexLink(*op.Prev) 455 + repoOps = append(repoOps, &atproto.SyncSubscribeRepos_RepoOp{ 456 Action: "delete", 457 + Path: op.Path, 458 Cid: nil, 459 Prev: &ll, 460 }) 461 462 + blk, err := dbs.Get(ctx, *op.Prev) 463 + if err != nil { 464 + return nil, err 465 + } 466 + if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil { 467 + return nil, err 468 + } 469 } 470 } 471 472 // write the writelog to the buffer 473 + for _, blk := range bs.GetWriteLog() { 474 + if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil { 475 return nil, err 476 } 477 } ··· 526 Since: &urepo.Rev, 527 Commit: lexutil.LexLink(newroot), 528 Time: time.Now().Format(time.RFC3339Nano), 529 + Ops: repoOps, 530 TooBig: false, 531 }, 532 }) ··· 546 return results, nil 547 } 548 549 func (rm *RepoMan) getRecordProof(ctx context.Context, urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) { 550 + commitCid, err := cid.Cast(urepo.Root) 551 if err != nil { 552 return cid.Undef, nil, err 553 } 554 555 dbs := rm.s.getBlockstore(urepo.Did) 556 + 557 + var proofBlocks []blocks.Block 558 + var recordCid *cid.Cid 559 + 560 + if err := rm.withRepo(ctx, urepo.Did, commitCid, func(r *atp.Repo) (cid.Cid, error) { 561 + path := collection + "/" + rkey 562 + 563 + // walk the cached in-memory tree to find the record and collect MST node CIDs on the path 564 + nodeCIDs := collectPathNodeCIDs(r.MST.Root, []byte(path)) 565 + 566 + rc, getErr := r.MST.Get([]byte(path)) 567 + if getErr != nil { 568 + return cid.Undef, getErr 569 + } 570 + if rc == nil { 571 + return cid.Undef, fmt.Errorf("record not found: %s", path) 572 + } 573 + recordCid = rc 574 + 575 + // read the commit block 576 + commitBlk, err := dbs.Get(ctx, commitCid) 577 + if err != nil { 578 + return cid.Undef, fmt.Errorf("reading commit block for proof: %w", err) 579 + } 580 + proofBlocks = append(proofBlocks, commitBlk) 581 + 582 + // read the MST nodes on the path 583 + for _, nc := range nodeCIDs { 584 + blk, err := dbs.Get(ctx, nc) 585 + if err != nil { 586 + return cid.Undef, fmt.Errorf("reading MST node for proof: %w", err) 587 + } 588 + proofBlocks = append(proofBlocks, blk) 589 + } 590 + 591 + // read the record block 592 + recordBlk, err := dbs.Get(ctx, *recordCid) 593 + if err != nil { 594 + return cid.Undef, fmt.Errorf("reading record block for proof: %w", err) 595 + } 596 + proofBlocks = append(proofBlocks, recordBlk) 597 598 + // read-only, return same root 599 + return commitCid, nil 600 + }); err != nil { 601 return cid.Undef, nil, err 602 } 603 604 + return commitCid, proofBlocks, nil 605 + } 606 + 607 + func collectPathNodeCIDs(n *mst.Node, key []byte) []cid.Cid { 608 + if n == nil { 609 + return nil 610 + } 611 + 612 + var cids []cid.Cid 613 + if n.CID != nil { 614 + cids = append(cids, *n.CID) 615 + } 616 + 617 + height := mst.HeightForKey(key) 618 + if height >= n.Height { 619 + // key is at or above this level, no need to descend 620 + return cids 621 } 622 623 + // find the child node that covers this key 624 + childIdx := -1 625 + for i, e := range n.Entries { 626 + if e.IsChild() { 627 + childIdx = i 628 + continue 629 + } 630 + if e.IsValue() { 631 + if bytes.Compare(key, e.Key) <= 0 { 632 + break 633 + } 634 + childIdx = -1 635 + } 636 + } 637 + 638 + if childIdx >= 0 && n.Entries[childIdx].Child != nil { 639 + cids = append(cids, collectPathNodeCIDs(n.Entries[childIdx].Child, key)...) 640 + } 641 + 642 + return cids 643 } 644 645 func (rm *RepoMan) incrementBlobRefs(ctx context.Context, urepo models.Repo, cbor []byte) ([]cid.Cid, error) {
+4 -6
test.go
··· 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/bluesky-social/indigo/events" 15 "github.com/bluesky-social/indigo/events/schedulers/parallel" 16 lexutil "github.com/bluesky-social/indigo/lex/util" 17 - "github.com/bluesky-social/indigo/repo" 18 "github.com/bluesky-social/indigo/repomgr" 19 "github.com/gorilla/websocket" 20 ) ··· 82 panic(err) 83 } 84 85 - rr, err := repo.ReadRepoFromCar(context.TODO(), bytes.NewReader(evt.Blocks)) 86 if err != nil { 87 panic(err) 88 } ··· 98 go func() { 99 switch ek { 100 case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: 101 - rc, recordCBOR, err := rr.GetRecordBytes(context.TODO(), op.Path) 102 if err != nil { 103 panic(err) 104 } 105 106 - if op.Cid == nil || lexutil.LexLink(rc) != *op.Cid { 107 panic("nocid") 108 } 109 110 - _ = collection 111 - _ = rkey 112 _ = recordCBOR 113 _ = did 114
··· 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/bluesky-social/indigo/events" 15 "github.com/bluesky-social/indigo/events/schedulers/parallel" 16 + atp "github.com/bluesky-social/indigo/atproto/repo" 17 lexutil "github.com/bluesky-social/indigo/lex/util" 18 "github.com/bluesky-social/indigo/repomgr" 19 "github.com/gorilla/websocket" 20 ) ··· 82 panic(err) 83 } 84 85 + _, rr, err := atp.LoadRepoFromCAR(context.TODO(), bytes.NewReader(evt.Blocks)) 86 if err != nil { 87 panic(err) 88 } ··· 98 go func() { 99 switch ek { 100 case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: 101 + recordCBOR, rc, err := rr.GetRecordBytes(context.TODO(), collection, rkey) 102 if err != nil { 103 panic(err) 104 } 105 106 + if op.Cid == nil || rc == nil || lexutil.LexLink(*rc) != *op.Cid { 107 panic("nocid") 108 } 109 110 _ = recordCBOR 111 _ = did 112