objective categorical abstract machine language personal data server

Undo async sequencer

futur.blue 4ea081a0 d765faf6

verified
+190 -219
+186 -215
pegasus/lib/repository.ml
··· 8 8 module String_map = Lex.String_map 9 9 module Tid = Mist.Tid 10 10 11 - let write_locks : (string, Lwt_mutex.t) Hashtbl.t = Hashtbl.create 100 12 - 13 - let write_lock_mutex = Lwt_mutex.create () 14 - 15 - let with_write_lock did f = 16 - let%lwt lock = 17 - Lwt_mutex.with_lock write_lock_mutex (fun () -> 18 - match Hashtbl.find_opt write_locks did with 19 - | Some l -> 20 - Lwt.return l 21 - | None -> 22 - let l = Lwt_mutex.create () in 23 - Hashtbl.add write_locks did l ; 24 - Lwt.return l ) 25 - in 26 - Lwt_mutex.with_lock lock f 27 - 28 11 module Write_op = struct 29 12 let create = "com.atproto.repo.applyWrites#create" 30 13 ··· 261 244 262 245 let apply_writes (t : t) (writes : repo_write list) (swap_commit : Cid.t option) 263 246 : write_result Lwt.t = 264 - with_write_lock t.did (fun () -> 265 - let open Sequencer.Types in 266 - let%lwt prev_commit = 267 - match%lwt User_store.get_commit t.db with 268 - | Some (_, commit) -> 269 - Lwt.return commit 270 - | None -> 271 - failwith ("failed to retrieve commit for " ^ t.did) 272 - in 273 - if swap_commit <> None && swap_commit <> Option.map fst t.commit then 274 - Errors.invalid_request ~name:"InvalidSwap" 275 - (Format.sprintf "swapCommit cid %s did not match last commit cid %s" 276 - (Cid.to_string (Option.get swap_commit)) 277 - ( match t.commit with 278 - | Some (c, _) -> 279 - Cid.to_string c 280 - | None -> 281 - "null" ) ) ; 282 - let cached_store = Cached_store.create t.db in 283 - let mst : Cached_mst.t ref = 284 - ref (Cached_mst.create cached_store prev_commit.data) 285 - in 286 - t.block_map <- None ; 287 - (* ops to emit, built in loop because prev_data (previous cid) is otherwise inaccessible *) 288 - let commit_ops : commit_evt_op list ref = ref [] in 289 - let added_leaves = ref Block_map.empty in 290 - let%lwt results = 291 - Lwt_list.map_s 292 - (fun (w : repo_write) -> 293 - match w with 294 - | Create {collection; rkey; value; _} -> 295 - let rkey = Option.value rkey ~default:(Tid.now ()) in 296 - let path = Format.sprintf "%s/%s" collection rkey in 297 - let uri = Format.sprintf "at://%s/%s" t.did path in 298 - let%lwt () = 299 - match%lwt User_store.get_record_cid t.db path with 300 - | Some cid -> 301 - Errors.invalid_request ~name:"InvalidSwap" 302 - (Format.sprintf 303 - "attempted to write record %s that already exists \ 304 - with cid %s" 305 - path (Cid.to_string cid) ) 306 - | None -> 307 - Lwt.return () 308 - in 309 - let record_with_type : Lex.repo_record = 310 - if String_map.mem "$type" value then value 311 - else String_map.add "$type" (`String collection) value 312 - in 313 - let%lwt cid, block = 314 - User_store.put_record t.db (`LexMap record_with_type) path 315 - in 316 - added_leaves := Block_map.set cid block !added_leaves ; 317 - commit_ops := 318 - !commit_ops 319 - @ [{action= `Create; path; cid= Some cid; prev= None}] ; 320 - let%lwt new_mst = Cached_mst.add !mst path cid in 321 - mst := new_mst ; 322 - let refs = 323 - Util.find_blob_refs value 324 - |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 325 - in 326 - let%lwt () = 327 - match%lwt User_store.put_blob_refs t.db path refs with 328 - | Ok () -> 329 - Lwt.return () 330 - | Error err -> 331 - raise err 332 - in 333 - Lwt.return 334 - (Create 335 - { type'= "com.atproto.repo.applyWrites#createResult" 336 - ; uri 337 - ; cid } ) 338 - | Update {collection; rkey; value; swap_record; _} -> 339 - let path = Format.sprintf "%s/%s" collection rkey in 340 - let uri = Format.sprintf "at://%s/%s" t.did path in 341 - let%lwt old_cid = User_store.get_record_cid t.db path in 342 - ( if 343 - (swap_record <> None && swap_record <> old_cid) 344 - || (swap_record = None && old_cid = None) 345 - then 346 - let cid_str = 347 - match old_cid with 348 - | Some cid -> 349 - Cid.to_string cid 350 - | None -> 351 - "null" 352 - in 353 - Errors.invalid_request ~name:"InvalidSwap" 354 - (Format.sprintf 355 - "attempted to update record %s with cid %s" path 356 - cid_str ) ) ; 357 - let%lwt () = 247 + let open Sequencer.Types in 248 + let%lwt prev_commit = 249 + match%lwt User_store.get_commit t.db with 250 + | Some (_, commit) -> 251 + Lwt.return commit 252 + | None -> 253 + failwith ("failed to retrieve commit for " ^ t.did) 254 + in 255 + if swap_commit <> None && swap_commit <> Option.map fst t.commit then 256 + Errors.invalid_request ~name:"InvalidSwap" 257 + (Format.sprintf "swapCommit cid %s did not match last commit cid %s" 258 + (Cid.to_string (Option.get swap_commit)) 259 + (match t.commit with Some (c, _) -> Cid.to_string c | None -> "null") ) ; 260 + let cached_store = Cached_store.create t.db in 261 + let mst : Cached_mst.t ref = 262 + ref (Cached_mst.create cached_store prev_commit.data) 263 + in 264 + t.block_map <- None ; 265 + (* ops to emit, built in loop because prev_data (previous cid) is otherwise inaccessible *) 266 + let commit_ops : commit_evt_op list ref = ref [] in 267 + let added_leaves = ref Block_map.empty in 268 + let%lwt results = 269 + Lwt_list.map_s 270 + (fun (w : repo_write) -> 271 + match w with 272 + | Create {collection; rkey; value; _} -> 273 + let rkey = Option.value rkey ~default:(Tid.now ()) in 274 + let path = Format.sprintf "%s/%s" collection rkey in 275 + let uri = Format.sprintf "at://%s/%s" t.did path in 276 + let%lwt () = 277 + match%lwt User_store.get_record_cid t.db path with 278 + | Some cid -> 279 + Errors.invalid_request ~name:"InvalidSwap" 280 + (Format.sprintf 281 + "attempted to write record %s that already exists \ 282 + with cid %s" 283 + path (Cid.to_string cid) ) 284 + | None -> 285 + Lwt.return () 286 + in 287 + let record_with_type : Lex.repo_record = 288 + if String_map.mem "$type" value then value 289 + else String_map.add "$type" (`String collection) value 290 + in 291 + let%lwt cid, block = 292 + User_store.put_record t.db (`LexMap record_with_type) path 293 + in 294 + added_leaves := Block_map.set cid block !added_leaves ; 295 + commit_ops := 296 + !commit_ops @ [{action= `Create; path; cid= Some cid; prev= None}] ; 297 + let%lwt new_mst = Cached_mst.add !mst path cid in 298 + mst := new_mst ; 299 + let refs = 300 + Util.find_blob_refs value 301 + |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 302 + in 303 + let%lwt () = 304 + match%lwt User_store.put_blob_refs t.db path refs with 305 + | Ok () -> 306 + Lwt.return () 307 + | Error err -> 308 + raise err 309 + in 310 + Lwt.return 311 + (Create 312 + { type'= "com.atproto.repo.applyWrites#createResult" 313 + ; uri 314 + ; cid } ) 315 + | Update {collection; rkey; value; swap_record; _} -> 316 + let path = Format.sprintf "%s/%s" collection rkey in 317 + let uri = Format.sprintf "at://%s/%s" t.did path in 318 + let%lwt old_cid = User_store.get_record_cid t.db path in 319 + ( if 320 + (swap_record <> None && swap_record <> old_cid) 321 + || (swap_record = None && old_cid = None) 322 + then 323 + let cid_str = 358 324 match old_cid with 359 - | Some _ -> ( 360 - match%lwt User_store.get_record t.db path with 361 - | Some record -> 362 - let refs = 363 - Util.find_blob_refs record.value 364 - |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 365 - in 366 - if not (List.is_empty refs) then 367 - let%lwt _ = 368 - User_store.delete_orphaned_blobs_by_record_path t.db 369 - path 370 - in 371 - Lwt.return_unit 372 - else Lwt.return_unit 373 - | None -> 374 - Lwt.return_unit ) 325 + | Some cid -> 326 + Cid.to_string cid 375 327 | None -> 376 - Lwt.return_unit 377 - in 378 - let record_with_type : Lex.repo_record = 379 - if String_map.mem "$type" value then value 380 - else String_map.add "$type" (`String collection) value 381 - in 382 - let%lwt new_cid, new_block = 383 - User_store.put_record t.db (`LexMap record_with_type) path 384 - in 385 - added_leaves := Block_map.set new_cid new_block !added_leaves ; 386 - commit_ops := 387 - !commit_ops 388 - @ [{action= `Update; path; cid= Some new_cid; prev= old_cid}] ; 389 - let%lwt new_mst = Cached_mst.add !mst path new_cid in 390 - mst := new_mst ; 391 - let refs = 392 - Util.find_blob_refs value 393 - |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 394 - in 395 - let%lwt () = 396 - match%lwt User_store.put_blob_refs t.db path refs with 397 - | Ok () -> 398 - Lwt.return () 399 - | Error err -> 400 - raise err 328 + "null" 401 329 in 402 - Lwt.return 403 - (Update 404 - { type'= "com.atproto.repo.applyWrites#updateResult" 405 - ; uri 406 - ; cid= new_cid } ) 407 - | Delete {collection; rkey; swap_record; _} -> 408 - let path = Format.sprintf "%s/%s" collection rkey in 409 - let%lwt cid = User_store.get_record_cid t.db path in 410 - ( if cid = None || (swap_record <> None && swap_record <> cid) 411 - then 412 - let cid_str = 413 - match cid with 414 - | Some cid -> 415 - Cid.to_string cid 416 - | None -> 417 - "null" 418 - in 419 - Errors.invalid_request ~name:"InvalidSwap" 420 - (Format.sprintf 421 - "attempted to delete record %s with cid %s" path 422 - cid_str ) ) ; 423 - let%lwt () = 330 + Errors.invalid_request ~name:"InvalidSwap" 331 + (Format.sprintf "attempted to update record %s with cid %s" 332 + path cid_str ) ) ; 333 + let%lwt () = 334 + match old_cid with 335 + | Some _ -> ( 424 336 match%lwt User_store.get_record t.db path with 425 337 | Some record -> 426 338 let refs = ··· 435 347 Lwt.return_unit 436 348 else Lwt.return_unit 437 349 | None -> 438 - Lwt.return_unit 350 + Lwt.return_unit ) 351 + | None -> 352 + Lwt.return_unit 353 + in 354 + let record_with_type : Lex.repo_record = 355 + if String_map.mem "$type" value then value 356 + else String_map.add "$type" (`String collection) value 357 + in 358 + let%lwt new_cid, new_block = 359 + User_store.put_record t.db (`LexMap record_with_type) path 360 + in 361 + added_leaves := Block_map.set new_cid new_block !added_leaves ; 362 + commit_ops := 363 + !commit_ops 364 + @ [{action= `Update; path; cid= Some new_cid; prev= old_cid}] ; 365 + let%lwt new_mst = Cached_mst.add !mst path new_cid in 366 + mst := new_mst ; 367 + let refs = 368 + Util.find_blob_refs value 369 + |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 370 + in 371 + let%lwt () = 372 + match%lwt User_store.put_blob_refs t.db path refs with 373 + | Ok () -> 374 + Lwt.return () 375 + | Error err -> 376 + raise err 377 + in 378 + Lwt.return 379 + (Update 380 + { type'= "com.atproto.repo.applyWrites#updateResult" 381 + ; uri 382 + ; cid= new_cid } ) 383 + | Delete {collection; rkey; swap_record; _} -> 384 + let path = Format.sprintf "%s/%s" collection rkey in 385 + let%lwt cid = User_store.get_record_cid t.db path in 386 + ( if cid = None || (swap_record <> None && swap_record <> cid) then 387 + let cid_str = 388 + match cid with 389 + | Some cid -> 390 + Cid.to_string cid 391 + | None -> 392 + "null" 439 393 in 440 - let%lwt () = User_store.delete_record t.db path in 441 - commit_ops := 442 - !commit_ops @ [{action= `Delete; path; cid= None; prev= cid}] ; 443 - let%lwt new_mst = Cached_mst.delete !mst path in 444 - mst := new_mst ; 445 - Lwt.return 446 - (Delete {type'= "com.atproto.repo.applyWrites#deleteResult"}) ) 447 - writes 448 - in 449 - let new_mst = !mst in 450 - let%lwt new_commit = 451 - put_commit t new_mst.root ~previous:(Some prev_commit) 452 - in 453 - let new_commit_cid, new_commit_signed = new_commit in 454 - let commit_block = 455 - new_commit_signed |> signed_commit_to_yojson |> Dag_cbor.encode_yojson 456 - in 457 - let%lwt proof_blocks = 458 - Lwt_list.fold_left_s 459 - (fun acc ({path; _} : commit_evt_op) -> 460 - let%lwt key_proof = 461 - Cached_mst.proof_for_key new_mst new_mst.root path 394 + Errors.invalid_request ~name:"InvalidSwap" 395 + (Format.sprintf "attempted to delete record %s with cid %s" 396 + path cid_str ) ) ; 397 + let%lwt () = 398 + match%lwt User_store.get_record t.db path with 399 + | Some record -> 400 + let refs = 401 + Util.find_blob_refs record.value 402 + |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 403 + in 404 + if not (List.is_empty refs) then 405 + let%lwt _ = 406 + User_store.delete_orphaned_blobs_by_record_path t.db path 407 + in 408 + Lwt.return_unit 409 + else Lwt.return_unit 410 + | None -> 411 + Lwt.return_unit 462 412 in 463 - Lwt.return (Block_map.merge acc key_proof) ) 464 - Block_map.empty !commit_ops 465 - in 466 - let proof_blocks = Block_map.merge proof_blocks !added_leaves in 467 - let block_stream = 468 - proof_blocks |> Block_map.entries |> Lwt_seq.of_list 469 - |> Lwt_seq.cons (new_commit_cid, commit_block) 470 - in 471 - let%lwt blocks = 472 - Car.blocks_to_stream new_commit_cid block_stream |> Car.collect_stream 473 - in 474 - let%lwt ds = Data_store.connect () in 475 - let%lwt _ = 476 - Sequencer.sequence_commit ds ~did:t.did ~commit:new_commit_cid 477 - ~rev:new_commit_signed.rev ~blocks ~ops:!commit_ops 478 - ~since:prev_commit.rev ~prev_data:prev_commit.data () 479 - in 480 - Lwt.return {commit= new_commit; results} ) 413 + let%lwt () = User_store.delete_record t.db path in 414 + commit_ops := 415 + !commit_ops @ [{action= `Delete; path; cid= None; prev= cid}] ; 416 + let%lwt new_mst = Cached_mst.delete !mst path in 417 + mst := new_mst ; 418 + Lwt.return 419 + (Delete {type'= "com.atproto.repo.applyWrites#deleteResult"}) ) 420 + writes 421 + in 422 + let new_mst = !mst in 423 + let%lwt new_commit = put_commit t new_mst.root ~previous:(Some prev_commit) in 424 + let new_commit_cid, new_commit_signed = new_commit in 425 + let commit_block = 426 + new_commit_signed |> signed_commit_to_yojson |> Dag_cbor.encode_yojson 427 + in 428 + let%lwt proof_blocks = 429 + Lwt_list.fold_left_s 430 + (fun acc ({path; _} : commit_evt_op) -> 431 + let%lwt key_proof = 432 + Cached_mst.proof_for_key new_mst new_mst.root path 433 + in 434 + Lwt.return (Block_map.merge acc key_proof) ) 435 + Block_map.empty !commit_ops 436 + in 437 + let proof_blocks = Block_map.merge proof_blocks !added_leaves in 438 + let block_stream = 439 + proof_blocks |> Block_map.entries |> Lwt_seq.of_list 440 + |> Lwt_seq.cons (new_commit_cid, commit_block) 441 + in 442 + let%lwt blocks = 443 + Car.blocks_to_stream new_commit_cid block_stream |> Car.collect_stream 444 + in 445 + let%lwt ds = Data_store.connect () in 446 + let%lwt _ = 447 + Sequencer.sequence_commit ds ~did:t.did ~commit:new_commit_cid 448 + ~rev:new_commit_signed.rev ~blocks ~ops:!commit_ops 449 + ~since:prev_commit.rev ~prev_data:prev_commit.data () 450 + in 451 + Lwt.return {commit= new_commit; results} 481 452 482 453 let load ?write ?create ?(ensure_active = false) ?ds did : t Lwt.t = 483 454 let%lwt data_store_conn =
+4 -4
pegasus/lib/sequencer.ml
··· 743 743 let raw = Dag_cbor.encode_yojson @@ Encode.format_commit evt in 744 744 let%lwt seq = DB.append_event conn ~t:`Commit ~time:time_ms ~data:raw in 745 745 let frame = Frame.encode_message ~seq ~time:time_iso (Commit evt) in 746 - Lwt.async (fun () -> Bus.publish {seq; bytes= frame}) ; 746 + let%lwt () = Bus.publish {seq; bytes= frame} in 747 747 Lwt.return seq 748 748 749 749 let sequence_sync (conn : Data_store.t) ~(did : string) ~(rev : string) ··· 754 754 let raw = Dag_cbor.encode_yojson @@ Encode.format_sync evt in 755 755 let%lwt seq = DB.append_event conn ~t:`Sync ~time:time_ms ~data:raw in 756 756 let frame = Frame.encode_message ~seq ~time:time_iso (Sync evt) in 757 - Lwt.async (fun () -> Bus.publish {seq; bytes= frame}) ; 757 + let%lwt () = Bus.publish {seq; bytes= frame} in 758 758 Lwt.return seq 759 759 760 760 let sequence_identity (conn : Data_store.t) ~(did : string) ··· 765 765 let raw = Dag_cbor.encode_yojson @@ Encode.format_identity evt in 766 766 let%lwt seq = DB.append_event conn ~t:`Identity ~time:time_ms ~data:raw in 767 767 let frame = Frame.encode_message ~seq ~time:time_iso (Identity evt) in 768 - Lwt.async (fun () -> Bus.publish {seq; bytes= frame}) ; 768 + let%lwt () = Bus.publish {seq; bytes= frame} in 769 769 Lwt.return seq 770 770 771 771 let sequence_account (conn : Data_store.t) ~(did : string) ~(active : bool) ··· 776 776 let raw = Dag_cbor.encode_yojson @@ Encode.format_account evt in 777 777 let%lwt seq = DB.append_event conn ~t:`Account ~time:time_ms ~data:raw in 778 778 let frame = Frame.encode_message ~seq ~time:time_iso (Account evt) in 779 - Lwt.async (fun () -> Bus.publish {seq; bytes= frame}) ; 779 + let%lwt () = Bus.publish {seq; bytes= frame} in 780 780 Lwt.return seq