objective categorical abstract machine language personal data server

Don't make all writes wait on global sequencer mutex

futur.blue b39e41cf 8339d18d

verified
+248 -218
+244 -214
pegasus/lib/repository.ml
··· 8 8 module String_map = Lex.String_map 9 9 module Tid = Mist.Tid 10 10 11 + let write_locks : (string, Lwt_mutex.t) Hashtbl.t = Hashtbl.create 100 12 + 13 + let write_lock_mutex = Lwt_mutex.create () 14 + 15 + let with_write_lock did f = 16 + let%lwt lock = 17 + Lwt_mutex.with_lock write_lock_mutex (fun () -> 18 + match Hashtbl.find_opt write_locks did with 19 + | Some l -> 20 + Lwt.return l 21 + | None -> 22 + let l = Lwt_mutex.create () in 23 + Hashtbl.add write_locks did l ; 24 + Lwt.return l ) 25 + in 26 + Lwt_mutex.with_lock lock f 27 + 11 28 module Write_op = struct 12 29 let create = "com.atproto.repo.applyWrites#create" 13 30 ··· 244 261 245 262 let apply_writes (t : t) (writes : repo_write list) (swap_commit : Cid.t option) 246 263 : write_result Lwt.t = 247 - let open Sequencer.Types in 248 - let module Inductive = Mist.Mst.Inductive (Mst) in 249 - let%lwt prev_commit = 250 - match%lwt User_store.get_commit t.db with 251 - | Some (_, commit) -> 252 - Lwt.return commit 253 - | None -> 254 - failwith ("failed to retrieve commit for " ^ t.did) 255 - in 256 - if swap_commit <> None && swap_commit <> Option.map fst t.commit then 257 - Errors.invalid_request ~name:"InvalidSwap" 258 - (Format.sprintf "swapCommit cid %s did not match last commit cid %s" 259 - (Cid.to_string (Option.get swap_commit)) 260 - (match t.commit with Some (c, _) -> Cid.to_string c | None -> "null") ) ; 261 - let%lwt block_map = Lwt.map ref (get_map t) in 262 - let cached_store = Cached_store.create t.db in 263 - let mst : Cached_mst.t ref = 264 - ref (Cached_mst.create cached_store prev_commit.data) 265 - in 266 - (* ops to emit, built in loop because prev_data (previous cid) is otherwise inaccessible *) 267 - let commit_ops : commit_evt_op list ref = ref [] in 268 - let added_leaves = ref Block_map.empty in 269 - let%lwt results = 270 - Lwt_list.map_s 271 - (fun (w : repo_write) -> 272 - match w with 273 - | Create {collection; rkey; value; _} -> 274 - let rkey = Option.value rkey ~default:(Tid.now ()) in 275 - let path = Format.sprintf "%s/%s" collection rkey in 276 - let uri = Format.sprintf "at://%s/%s" t.did path in 277 - let%lwt () = 278 - match String_map.find_opt path !block_map with 279 - | Some cid -> 280 - Errors.invalid_request ~name:"InvalidSwap" 281 - (Format.sprintf 282 - "attempted to write record %s that already exists with \ 283 - cid %s" 284 - path (Cid.to_string cid) ) 285 - | None -> 286 - Lwt.return () 287 - in 288 - let record_with_type : Lex.repo_record = 289 - if String_map.mem "$type" value then value 290 - else String_map.add "$type" (`String collection) value 291 - in 292 - let%lwt cid, block = 293 - User_store.put_record t.db (`LexMap record_with_type) path 294 - in 295 - block_map := String_map.add path cid !block_map ; 296 - added_leaves := Block_map.set cid block !added_leaves ; 297 - commit_ops := 298 - !commit_ops @ [{action= `Create; path; cid= Some cid; prev= None}] ; 299 - let%lwt new_mst = Cached_mst.add !mst path cid in 300 - mst := new_mst ; 301 - let refs = 302 - Util.find_blob_refs value 303 - |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 304 - in 305 - let%lwt () = 306 - match%lwt User_store.put_blob_refs t.db path refs with 307 - | Ok () -> 308 - Lwt.return () 309 - | Error err -> 310 - raise err 311 - in 312 - Lwt.return 313 - (Create 314 - {type'= "com.atproto.repo.applyWrites#createResult"; uri; cid} 315 - ) 316 - | Update {collection; rkey; value; swap_record; _} -> 317 - let path = Format.sprintf "%s/%s" collection rkey in 318 - let uri = Format.sprintf "at://%s/%s" t.did path in 319 - let old_cid = String_map.find_opt path !block_map in 320 - ( if 321 - (swap_record <> None && swap_record <> old_cid) 322 - || (swap_record = None && old_cid = None) 323 - then 324 - let cid_str = 325 - match old_cid with 264 + with_write_lock t.did (fun () -> 265 + let open Sequencer.Types in 266 + let module Inductive = Mist.Mst.Inductive (Mst) in 267 + let%lwt prev_commit = 268 + match%lwt User_store.get_commit t.db with 269 + | Some (_, commit) -> 270 + Lwt.return commit 271 + | None -> 272 + failwith ("failed to retrieve commit for " ^ t.did) 273 + in 274 + if swap_commit <> None && swap_commit <> Option.map fst t.commit then 275 + Errors.invalid_request ~name:"InvalidSwap" 276 + (Format.sprintf "swapCommit cid %s did not match last commit cid %s" 277 + (Cid.to_string (Option.get swap_commit)) 278 + ( match t.commit with 279 + | Some (c, _) -> 280 + Cid.to_string c 281 + | None -> 282 + "null" ) ) ; 283 + let%lwt block_map = Lwt.map ref (get_map t) in 284 + let cached_store = Cached_store.create t.db in 285 + let mst : Cached_mst.t ref = 286 + ref (Cached_mst.create cached_store prev_commit.data) 287 + in 288 + (* ops to emit, built in loop because prev_data (previous cid) is otherwise inaccessible *) 289 + let commit_ops : commit_evt_op list ref = ref [] in 290 + let added_leaves = ref Block_map.empty in 291 + let%lwt results = 292 + Lwt_list.map_s 293 + (fun (w : repo_write) -> 294 + match w with 295 + | Create {collection; rkey; value; _} -> 296 + let rkey = Option.value rkey ~default:(Tid.now ()) in 297 + let path = Format.sprintf "%s/%s" collection rkey in 298 + let uri = Format.sprintf "at://%s/%s" t.did path in 299 + let%lwt () = 300 + match String_map.find_opt path !block_map with 326 301 | Some cid -> 327 - Cid.to_string cid 302 + Errors.invalid_request ~name:"InvalidSwap" 303 + (Format.sprintf 304 + "attempted to write record %s that already exists \ 305 + with cid %s" 306 + path (Cid.to_string cid) ) 328 307 | None -> 329 - "null" 308 + Lwt.return () 309 + in 310 + let record_with_type : Lex.repo_record = 311 + if String_map.mem "$type" value then value 312 + else String_map.add "$type" (`String collection) value 313 + in 314 + let%lwt cid, block = 315 + User_store.put_record t.db (`LexMap record_with_type) path 316 + in 317 + block_map := String_map.add path cid !block_map ; 318 + added_leaves := Block_map.set cid block !added_leaves ; 319 + commit_ops := 320 + !commit_ops 321 + @ [{action= `Create; path; cid= Some cid; prev= None}] ; 322 + let%lwt new_mst = Cached_mst.add !mst path cid in 323 + mst := new_mst ; 324 + let refs = 325 + Util.find_blob_refs value 326 + |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 330 327 in 331 - Errors.invalid_request ~name:"InvalidSwap" 332 - (Format.sprintf "attempted to update record %s with cid %s" 333 - path cid_str ) ) ; 334 - let%lwt () = 335 - match old_cid with 336 - | Some _ -> ( 337 - match%lwt User_store.get_record t.db path with 338 - | Some record -> 339 - let refs = 340 - Util.find_blob_refs record.value 341 - |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 328 + let%lwt () = 329 + match%lwt User_store.put_blob_refs t.db path refs with 330 + | Ok () -> 331 + Lwt.return () 332 + | Error err -> 333 + raise err 334 + in 335 + Lwt.return 336 + (Create 337 + { type'= "com.atproto.repo.applyWrites#createResult" 338 + ; uri 339 + ; cid } ) 340 + | Update {collection; rkey; value; swap_record; _} -> 341 + let path = Format.sprintf "%s/%s" collection rkey in 342 + let uri = Format.sprintf "at://%s/%s" t.did path in 343 + let old_cid = String_map.find_opt path !block_map in 344 + ( if 345 + (swap_record <> None && swap_record <> old_cid) 346 + || (swap_record = None && old_cid = None) 347 + then 348 + let cid_str = 349 + match old_cid with 350 + | Some cid -> 351 + Cid.to_string cid 352 + | None -> 353 + "null" 342 354 in 343 - if not (List.is_empty refs) then 344 - let%lwt _ = 345 - User_store.delete_orphaned_blobs_by_record_path t.db 346 - path 347 - in 348 - Lwt.return_unit 349 - else Lwt.return_unit 350 - | None -> 351 - Lwt.return_unit ) 352 - | None -> 353 - Lwt.return_unit 354 - in 355 - let record_with_type : Lex.repo_record = 356 - if String_map.mem "$type" value then value 357 - else String_map.add "$type" (`String collection) value 358 - in 359 - let%lwt new_cid, new_block = 360 - User_store.put_record t.db (`LexMap record_with_type) path 361 - in 362 - added_leaves := Block_map.set new_cid new_block !added_leaves ; 363 - block_map := String_map.add path new_cid !block_map ; 364 - commit_ops := 365 - !commit_ops 366 - @ [{action= `Update; path; cid= Some new_cid; prev= old_cid}] ; 367 - let%lwt new_mst = Cached_mst.add !mst path new_cid in 368 - mst := new_mst ; 369 - let refs = 370 - Util.find_blob_refs value 371 - |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 372 - in 373 - let%lwt () = 374 - match%lwt User_store.put_blob_refs t.db path refs with 375 - | Ok () -> 376 - Lwt.return () 377 - | Error err -> 378 - raise err 379 - in 380 - Lwt.return 381 - (Update 382 - { type'= "com.atproto.repo.applyWrites#updateResult" 383 - ; uri 384 - ; cid= new_cid } ) 385 - | Delete {collection; rkey; swap_record; _} -> 386 - let path = Format.sprintf "%s/%s" collection rkey in 387 - let cid = String_map.find_opt path !block_map in 388 - ( if cid = None || (swap_record <> None && swap_record <> cid) then 389 - let cid_str = 390 - match cid with 391 - | Some cid -> 392 - Cid.to_string cid 355 + Errors.invalid_request ~name:"InvalidSwap" 356 + (Format.sprintf 357 + "attempted to update record %s with cid %s" path 358 + cid_str ) ) ; 359 + let%lwt () = 360 + match old_cid with 361 + | Some _ -> ( 362 + match%lwt User_store.get_record t.db path with 363 + | Some record -> 364 + let refs = 365 + Util.find_blob_refs record.value 366 + |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 367 + in 368 + if not (List.is_empty refs) then 369 + let%lwt _ = 370 + User_store.delete_orphaned_blobs_by_record_path t.db 371 + path 372 + in 373 + Lwt.return_unit 374 + else Lwt.return_unit 375 + | None -> 376 + Lwt.return_unit ) 393 377 | None -> 394 - "null" 378 + Lwt.return_unit 379 + in 380 + let record_with_type : Lex.repo_record = 381 + if String_map.mem "$type" value then value 382 + else String_map.add "$type" (`String collection) value 383 + in 384 + let%lwt new_cid, new_block = 385 + User_store.put_record t.db (`LexMap record_with_type) path 386 + in 387 + added_leaves := Block_map.set new_cid new_block !added_leaves ; 388 + block_map := String_map.add path new_cid !block_map ; 389 + commit_ops := 390 + !commit_ops 391 + @ [{action= `Update; path; cid= Some new_cid; prev= old_cid}] ; 392 + let%lwt new_mst = Cached_mst.add !mst path new_cid in 393 + mst := new_mst ; 394 + let refs = 395 + Util.find_blob_refs value 396 + |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 397 + in 398 + let%lwt () = 399 + match%lwt User_store.put_blob_refs t.db path refs with 400 + | Ok () -> 401 + Lwt.return () 402 + | Error err -> 403 + raise err 395 404 in 396 - Errors.invalid_request ~name:"InvalidSwap" 397 - (Format.sprintf "attempted to delete record %s with cid %s" 398 - path cid_str ) ) ; 399 - let%lwt () = 400 - match%lwt User_store.get_record t.db path with 401 - | Some record -> 402 - let refs = 403 - Util.find_blob_refs record.value 404 - |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 405 - in 406 - if not (List.is_empty refs) then 407 - let%lwt _ = 408 - User_store.delete_orphaned_blobs_by_record_path t.db path 405 + Lwt.return 406 + (Update 407 + { type'= "com.atproto.repo.applyWrites#updateResult" 408 + ; uri 409 + ; cid= new_cid } ) 410 + | Delete {collection; rkey; swap_record; _} -> 411 + let path = Format.sprintf "%s/%s" collection rkey in 412 + let cid = String_map.find_opt path !block_map in 413 + ( if cid = None || (swap_record <> None && swap_record <> cid) 414 + then 415 + let cid_str = 416 + match cid with 417 + | Some cid -> 418 + Cid.to_string cid 419 + | None -> 420 + "null" 409 421 in 410 - Lwt.return_unit 411 - else Lwt.return_unit 412 - | None -> 413 - Lwt.return_unit 414 - in 415 - let%lwt () = User_store.delete_record t.db path in 416 - block_map := String_map.remove path !block_map ; 417 - commit_ops := 418 - !commit_ops @ [{action= `Delete; path; cid= None; prev= cid}] ; 419 - let%lwt new_mst = Cached_mst.delete !mst path in 420 - mst := new_mst ; 421 - Lwt.return 422 - (Delete {type'= "com.atproto.repo.applyWrites#deleteResult"}) ) 423 - writes 424 - in 425 - let new_mst = !mst in 426 - let%lwt new_commit = put_commit t new_mst.root ~previous:(Some prev_commit) in 427 - let new_commit_cid, new_commit_signed = new_commit in 428 - let commit_block = 429 - new_commit_signed |> signed_commit_to_yojson |> Dag_cbor.encode_yojson 430 - in 431 - let diff : Inductive.diff list = 432 - List.fold_left 433 - (fun (acc : Inductive.diff list) 434 - ({action; path; cid; prev} : commit_evt_op) -> 435 - match action with 436 - | `Create -> 437 - acc @ [Add {key= path; cid= Option.get cid}] 438 - | `Update -> 439 - acc @ [Update {key= path; cid= Option.get cid; prev}] 440 - | `Delete -> 441 - acc @ [Delete {key= path; prev= Option.get prev}] ) 442 - [] !commit_ops 443 - in 444 - let%lwt proof_blocks = 445 - match%lwt 446 - Inductive.generate_proof !block_map diff ~new_root:new_mst.root 447 - ~prev_root:prev_commit.data 448 - with 449 - | Ok blocks -> 450 - Lwt.return (Block_map.merge blocks !added_leaves) 451 - | Error err -> 452 - raise err 453 - in 454 - let block_stream = 455 - proof_blocks |> Block_map.entries |> Lwt_seq.of_list 456 - |> Lwt_seq.cons (new_commit_cid, commit_block) 457 - in 458 - let%lwt blocks = 459 - Car.blocks_to_stream new_commit_cid block_stream |> Car.collect_stream 460 - in 461 - let%lwt ds = Data_store.connect () in 462 - let%lwt _ = 463 - Sequencer.sequence_commit ds ~did:t.did ~commit:new_commit_cid 464 - ~rev:new_commit_signed.rev ~blocks ~ops:!commit_ops ~since:prev_commit.rev 465 - ~prev_data:prev_commit.data () 466 - in 467 - Lwt.return {commit= new_commit; results} 422 + Errors.invalid_request ~name:"InvalidSwap" 423 + (Format.sprintf 424 + "attempted to delete record %s with cid %s" path 425 + cid_str ) ) ; 426 + let%lwt () = 427 + match%lwt User_store.get_record t.db path with 428 + | Some record -> 429 + let refs = 430 + Util.find_blob_refs record.value 431 + |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 432 + in 433 + if not (List.is_empty refs) then 434 + let%lwt _ = 435 + User_store.delete_orphaned_blobs_by_record_path t.db 436 + path 437 + in 438 + Lwt.return_unit 439 + else Lwt.return_unit 440 + | None -> 441 + Lwt.return_unit 442 + in 443 + let%lwt () = User_store.delete_record t.db path in 444 + block_map := String_map.remove path !block_map ; 445 + commit_ops := 446 + !commit_ops @ [{action= `Delete; path; cid= None; prev= cid}] ; 447 + let%lwt new_mst = Cached_mst.delete !mst path in 448 + mst := new_mst ; 449 + Lwt.return 450 + (Delete {type'= "com.atproto.repo.applyWrites#deleteResult"}) ) 451 + writes 452 + in 453 + let new_mst = !mst in 454 + let%lwt new_commit = 455 + put_commit t new_mst.root ~previous:(Some prev_commit) 456 + in 457 + let new_commit_cid, new_commit_signed = new_commit in 458 + let commit_block = 459 + new_commit_signed |> signed_commit_to_yojson |> Dag_cbor.encode_yojson 460 + in 461 + let diff : Inductive.diff list = 462 + List.fold_left 463 + (fun (acc : Inductive.diff list) 464 + ({action; path; cid; prev} : commit_evt_op) -> 465 + match action with 466 + | `Create -> 467 + acc @ [Add {key= path; cid= Option.get cid}] 468 + | `Update -> 469 + acc @ [Update {key= path; cid= Option.get cid; prev}] 470 + | `Delete -> 471 + acc @ [Delete {key= path; prev= Option.get prev}] ) 472 + [] !commit_ops 473 + in 474 + let%lwt proof_blocks = 475 + match%lwt 476 + Inductive.generate_proof !block_map diff ~new_root:new_mst.root 477 + ~prev_root:prev_commit.data 478 + with 479 + | Ok blocks -> 480 + Lwt.return (Block_map.merge blocks !added_leaves) 481 + | Error err -> 482 + raise err 483 + in 484 + let block_stream = 485 + proof_blocks |> Block_map.entries |> Lwt_seq.of_list 486 + |> Lwt_seq.cons (new_commit_cid, commit_block) 487 + in 488 + let%lwt blocks = 489 + Car.blocks_to_stream new_commit_cid block_stream |> Car.collect_stream 490 + in 491 + let%lwt ds = Data_store.connect () in 492 + let%lwt _ = 493 + Sequencer.sequence_commit ds ~did:t.did ~commit:new_commit_cid 494 + ~rev:new_commit_signed.rev ~blocks ~ops:!commit_ops 495 + ~since:prev_commit.rev ~prev_data:prev_commit.data () 496 + in 497 + Lwt.return {commit= new_commit; results} ) 468 498 469 499 let load ?write ?create ?(ensure_active = false) ?ds did : t Lwt.t = 470 500 let%lwt data_store_conn =
+4 -4
pegasus/lib/sequencer.ml
··· 743 743 let raw = Dag_cbor.encode_yojson @@ Encode.format_commit evt in 744 744 let%lwt seq = DB.append_event conn ~t:`Commit ~time:time_ms ~data:raw in 745 745 let frame = Frame.encode_message ~seq ~time:time_iso (Commit evt) in 746 - let%lwt () = Bus.publish {seq; bytes= frame} in 746 + Lwt.async (fun () -> Bus.publish {seq; bytes= frame}) ; 747 747 Lwt.return seq 748 748 749 749 let sequence_sync (conn : Data_store.t) ~(did : string) ~(rev : string) ··· 754 754 let raw = Dag_cbor.encode_yojson @@ Encode.format_sync evt in 755 755 let%lwt seq = DB.append_event conn ~t:`Sync ~time:time_ms ~data:raw in 756 756 let frame = Frame.encode_message ~seq ~time:time_iso (Sync evt) in 757 - let%lwt () = Bus.publish {seq; bytes= frame} in 757 + Lwt.async (fun () -> Bus.publish {seq; bytes= frame}) ; 758 758 Lwt.return seq 759 759 760 760 let sequence_identity (conn : Data_store.t) ~(did : string) ··· 765 765 let raw = Dag_cbor.encode_yojson @@ Encode.format_identity evt in 766 766 let%lwt seq = DB.append_event conn ~t:`Identity ~time:time_ms ~data:raw in 767 767 let frame = Frame.encode_message ~seq ~time:time_iso (Identity evt) in 768 - let%lwt () = Bus.publish {seq; bytes= frame} in 768 + Lwt.async (fun () -> Bus.publish {seq; bytes= frame}) ; 769 769 Lwt.return seq 770 770 771 771 let sequence_account (conn : Data_store.t) ~(did : string) ~(active : bool) ··· 776 776 let raw = Dag_cbor.encode_yojson @@ Encode.format_account evt in 777 777 let%lwt seq = DB.append_event conn ~t:`Account ~time:time_ms ~data:raw in 778 778 let frame = Frame.encode_message ~seq ~time:time_iso (Account evt) in 779 - let%lwt () = Bus.publish {seq; bytes= frame} in 779 + Lwt.async (fun () -> Bus.publish {seq; bytes= frame}) ; 780 780 Lwt.return seq