objective categorical abstract machine language personal data server

Cache db pools, run pragma statements in separate queries

futur.blue c11a3dfd 4ea081a0

verified
+315 -244
+1 -1
pegasus/lib/api/admin_/users.ml
··· 183 183 ~perm:0o644 184 184 in 185 185 let%lwt repo = 186 - Repository.load ~write:true ~create:true 186 + Repository.load ~create:true 187 187 ~ds:ctx.db new_did 188 188 in 189 189 let%lwt _ =
+1 -1
pegasus/lib/api/repo/describeRepo.ml
··· 25 25 "at://%s" 26 26 (fun h -> h) 27 27 in 28 - let%lwt repo = Repository.load ~write:false did in 28 + let%lwt repo = Repository.load did in 29 29 let%lwt collections = Repository.list_collections repo in 30 30 Dream.json @@ Yojson.Safe.to_string 31 31 @@ response_to_yojson
+1 -3
pegasus/lib/api/repo/importRepo.ml
··· 13 13 let did = Auth.get_authed_did_exn ctx.auth in 14 14 let bytes_stream = Dream.body_stream ctx.req in 15 15 let car_stream = stream_to_seq bytes_stream in 16 - let%lwt repo = 17 - Repository.load did ~ds:ctx.db ~ensure_active:true ~write:true 18 - in 16 + let%lwt repo = Repository.load did ~ds:ctx.db ~ensure_active:true in 19 17 let%lwt result = Repository.import_car repo car_stream in 20 18 match result with 21 19 | Ok _ ->
+1 -1
pegasus/lib/api/server/checkAccountStatus.ml
··· 18 18 Errors.internal_error ~msg:"actor not found" () 19 19 | Some actor -> ( 20 20 let%lwt {db= us; commit; _} = 21 - Repository.load ~write:false ~ds:db did 21 + Repository.load ~ds:db did 22 22 in 23 23 let%lwt cid, commit = 24 24 match commit with
+1 -1
pegasus/lib/api/server/createAccount.ml
··· 120 120 ~perm:0o644 121 121 in 122 122 let%lwt repo = 123 - Repository.load ~write:true ~create:true ~ds:db did 123 + Repository.load ~create:true ~ds:db did 124 124 in 125 125 let%lwt _ = Repository.put_initial_commit repo in 126 126 let%lwt _ = Sequencer.sequence_identity db ~did ~handle () in
+1 -1
pegasus/lib/api/sync/getBlob.ml
··· 5 5 let {did; cid} = Xrpc.parse_query ctx.req query_of_yojson in 6 6 let cid_parsed = Cid.as_cid cid in 7 7 let%lwt {db; _} = 8 - Repository.load did ~ensure_active:true ~write:false ~ds:ctx.db 8 + Repository.load did ~ensure_active:true ~ds:ctx.db 9 9 in 10 10 let%lwt blob_meta = User_store.get_blob_metadata db cid_parsed in 11 11 match blob_meta with
+1 -1
pegasus/lib/api/sync/getBlocks.ml
··· 5 5 Xrpc.handler (fun ctx -> 6 6 let {did; cids} : query = Xrpc.parse_query ctx.req query_of_yojson in 7 7 let%lwt {db; commit; _} = 8 - Repository.load did ~ensure_active:true ~write:false ~ds:ctx.db 8 + Repository.load did ~ensure_active:true ~ds:ctx.db 9 9 in 10 10 let commit_cid, commit_signed = Option.get commit in 11 11 let commit_block =
+1 -1
pegasus/lib/api/sync/getLatestCommit.ml
··· 6 6 Xrpc.handler (fun ctx -> 7 7 let {did} : query = Xrpc.parse_query ctx.req query_of_yojson in 8 8 match%lwt 9 - Repository.load did ~ensure_active:true ~write:false ~ds:ctx.db 9 + Repository.load did ~ensure_active:true ~ds:ctx.db 10 10 with 11 11 | {commit= Some (cid, {rev; _}); _} -> 12 12 let cid = Cid.to_string cid in
+1 -1
pegasus/lib/api/sync/getRecord.ml
··· 10 10 in 11 11 let path = collection ^ "/" ^ rkey in 12 12 let%lwt repo = 13 - Repository.load did ~ensure_active:true ~write:false ~ds:ctx.db 13 + Repository.load did ~ensure_active:true ~ds:ctx.db 14 14 in 15 15 match%lwt Repository.get_record repo path with 16 16 | None ->
+1 -1
pegasus/lib/api/sync/getRepo.ml
··· 3 3 let handler = 4 4 Xrpc.handler (fun ctx -> 5 5 let {did} : query = Xrpc.parse_query ctx.req query_of_yojson in 6 - let%lwt repo = Repository.load did ~ensure_active:true ~write:false in 6 + let%lwt repo = Repository.load did ~ensure_active:true in 7 7 let%lwt car_stream = Repository.export_car repo in 8 8 Dream.stream 9 9 ~headers:[("Content-Type", "application/vnd.ipld.car")]
+1 -1
pegasus/lib/api/sync/getRepoStatus.ml
··· 15 15 Errors.invalid_request ~name:"RepoNotFound" 16 16 "couldn't find a repo with that did" 17 17 in 18 - let%lwt {db= user_db; _} = Repository.load did ~write:false ~ds:ctx.db in 18 + let%lwt {db= user_db; _} = Repository.load did ~ds:ctx.db in 19 19 let%lwt _, commit = 20 20 match%lwt User_store.get_commit user_db with 21 21 | Some c ->
+1 -1
pegasus/lib/api/sync/listBlobs.ml
··· 22 22 1000 23 23 in 24 24 let%lwt {db; _} = 25 - Repository.load did ~ensure_active:true ~write:false ~ds:ctx.db 25 + Repository.load did ~ensure_active:true ~ds:ctx.db 26 26 in 27 27 let%lwt cids = User_store.list_blobs db ~limit ~cursor ?since in 28 28 let cids = List.map Cid.to_string cids in
+19 -8
pegasus/lib/data_store.ml
··· 318 318 319 319 type t = Util.caqti_pool 320 320 321 - let connect ?create ?write () : t Lwt.t = 322 - if create = Some true then 323 - Util.mkfile_p Util.Constants.pegasus_db_filepath ~perm:0o644 ; 324 - let%lwt db = 325 - Util.connect_sqlite ?create ?write Util.Constants.pegasus_db_location 326 - in 327 - let%lwt () = Migrations.run_migrations Data_store db in 328 - Lwt.return db 321 + let pool : t option ref = ref None 322 + 323 + let pool_mutex = Lwt_mutex.create () 324 + 325 + let connect ?create () : t Lwt.t = 326 + Lwt_mutex.with_lock pool_mutex (fun () -> 327 + match !pool with 328 + | Some pool -> 329 + Lwt.return pool 330 + | None -> 331 + if create = Some true then 332 + Util.mkfile_p Util.Constants.pegasus_db_filepath ~perm:0o644 ; 333 + let%lwt db = 334 + Util.connect_sqlite ?create ~write:true 335 + Util.Constants.pegasus_db_location 336 + in 337 + let%lwt () = Migrations.run_migrations Data_store db in 338 + pool := Some db ; 339 + Lwt.return db ) 329 340 330 341 let create_actor ~did ~handle ~email ~password ~signing_key conn = 331 342 let password_hash = Bcrypt.hash password |> Bcrypt.string_of_hash in
+242 -194
pegasus/lib/repository.ml
··· 8 8 module String_map = Lex.String_map 9 9 module Tid = Mist.Tid 10 10 11 + let write_locks : (string, Lwt_mutex.t) Hashtbl.t = Hashtbl.create 100 12 + 13 + let write_lock_mutex = Lwt_mutex.create () 14 + 15 + let with_write_lock did f = 16 + let%lwt lock = 17 + Lwt_mutex.with_lock write_lock_mutex (fun () -> 18 + match Hashtbl.find_opt write_locks did with 19 + | Some l -> 20 + Lwt.return l 21 + | None -> 22 + let l = Lwt_mutex.create () in 23 + Hashtbl.add write_locks did l ; 24 + Lwt.return l ) 25 + in 26 + Lwt_mutex.with_lock lock f 27 + 11 28 module Write_op = struct 12 29 let create = "com.atproto.repo.applyWrites#create" 13 30 ··· 244 261 245 262 let apply_writes (t : t) (writes : repo_write list) (swap_commit : Cid.t option) 246 263 : write_result Lwt.t = 247 - let open Sequencer.Types in 248 - let%lwt prev_commit = 249 - match%lwt User_store.get_commit t.db with 250 - | Some (_, commit) -> 251 - Lwt.return commit 252 - | None -> 253 - failwith ("failed to retrieve commit for " ^ t.did) 254 - in 255 - if swap_commit <> None && swap_commit <> Option.map fst t.commit then 256 - Errors.invalid_request ~name:"InvalidSwap" 257 - (Format.sprintf "swapCommit cid %s did not match last commit cid %s" 258 - (Cid.to_string (Option.get swap_commit)) 259 - (match t.commit with Some (c, _) -> Cid.to_string c | None -> "null") ) ; 260 - let cached_store = Cached_store.create t.db in 261 - let mst : Cached_mst.t ref = 262 - ref (Cached_mst.create cached_store prev_commit.data) 263 - in 264 - t.block_map <- None ; 265 - (* ops to emit, built in loop because prev_data (previous cid) is otherwise inaccessible *) 266 - let commit_ops : commit_evt_op list ref = ref [] in 267 - let added_leaves = ref Block_map.empty in 268 - let%lwt results = 269 - Lwt_list.map_s 270 - (fun (w : repo_write) -> 271 - match w with 272 - | Create {collection; rkey; value; _} -> 273 - let rkey = Option.value rkey ~default:(Tid.now ()) in 274 - let path = Format.sprintf "%s/%s" collection rkey in 275 - let uri = Format.sprintf "at://%s/%s" t.did path in 276 - let%lwt () = 277 - match%lwt User_store.get_record_cid t.db path with 278 - | Some cid -> 279 - Errors.invalid_request ~name:"InvalidSwap" 280 - (Format.sprintf 281 - "attempted to write record %s that already exists \ 282 - with cid %s" 283 - path (Cid.to_string cid) ) 284 - | None -> 285 - Lwt.return () 286 - in 287 - let record_with_type : Lex.repo_record = 288 - if String_map.mem "$type" value then value 289 - else String_map.add "$type" (`String collection) value 290 - in 291 - let%lwt cid, block = 292 - User_store.put_record t.db (`LexMap record_with_type) path 293 - in 294 - added_leaves := Block_map.set cid block !added_leaves ; 295 - commit_ops := 296 - !commit_ops @ [{action= `Create; path; cid= Some cid; prev= None}] ; 297 - let%lwt new_mst = Cached_mst.add !mst path cid in 298 - mst := new_mst ; 299 - let refs = 300 - Util.find_blob_refs value 301 - |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 302 - in 303 - let%lwt () = 304 - match%lwt User_store.put_blob_refs t.db path refs with 305 - | Ok () -> 306 - Lwt.return () 307 - | Error err -> 308 - raise err 309 - in 310 - Lwt.return 311 - (Create 312 - { type'= "com.atproto.repo.applyWrites#createResult" 313 - ; uri 314 - ; cid } ) 315 - | Update {collection; rkey; value; swap_record; _} -> 316 - let path = Format.sprintf "%s/%s" collection rkey in 317 - let uri = Format.sprintf "at://%s/%s" t.did path in 318 - let%lwt old_cid = User_store.get_record_cid t.db path in 319 - ( if 320 - (swap_record <> None && swap_record <> old_cid) 321 - || (swap_record = None && old_cid = None) 322 - then 323 - let cid_str = 264 + with_write_lock t.did (fun () -> 265 + Dream.debug (fun l -> l "lock acquired") ; 266 + let open Sequencer.Types in 267 + let module Inductive = Mist.Mst.Inductive (Mst) in 268 + let%lwt prev_commit = 269 + match%lwt User_store.get_commit t.db with 270 + | Some (_, commit) -> 271 + Lwt.return commit 272 + | None -> 273 + failwith ("failed to retrieve commit for " ^ t.did) 274 + in 275 + Dream.debug (fun l -> l "commit retrieved") ; 276 + if swap_commit <> None && swap_commit <> Option.map fst t.commit then 277 + Errors.invalid_request ~name:"InvalidSwap" 278 + (Format.sprintf "swapCommit cid %s did not match last commit cid %s" 279 + (Cid.to_string (Option.get swap_commit)) 280 + ( match t.commit with 281 + | Some (c, _) -> 282 + Cid.to_string c 283 + | None -> 284 + "null" ) ) ; 285 + let%lwt block_map = Lwt.map ref (get_map t) in 286 + let cached_store = Cached_store.create t.db in 287 + let mst : Cached_mst.t ref = 288 + ref (Cached_mst.create cached_store prev_commit.data) 289 + in 290 + (* ops to emit, built in loop because prev_data (previous cid) is otherwise inaccessible *) 291 + let commit_ops : commit_evt_op list ref = ref [] in 292 + let added_leaves = ref Block_map.empty in 293 + let%lwt results = 294 + Lwt_list.map_s 295 + (fun (w : repo_write) -> 296 + match w with 297 + | Create {collection; rkey; value; _} -> 298 + let rkey = Option.value rkey ~default:(Tid.now ()) in 299 + let path = Format.sprintf "%s/%s" collection rkey in 300 + let uri = Format.sprintf "at://%s/%s" t.did path in 301 + let%lwt () = 302 + match String_map.find_opt path !block_map with 303 + | Some cid -> 304 + Errors.invalid_request ~name:"InvalidSwap" 305 + (Format.sprintf 306 + "attempted to write record %s that already exists \ 307 + with cid %s" 308 + path (Cid.to_string cid) ) 309 + | None -> 310 + Lwt.return () 311 + in 312 + let record_with_type : Lex.repo_record = 313 + if String_map.mem "$type" value then value 314 + else String_map.add "$type" (`String collection) value 315 + in 316 + let%lwt cid, block = 317 + User_store.put_record t.db (`LexMap record_with_type) path 318 + in 319 + block_map := String_map.add path cid !block_map ; 320 + added_leaves := Block_map.set cid block !added_leaves ; 321 + commit_ops := 322 + !commit_ops 323 + @ [{action= `Create; path; cid= Some cid; prev= None}] ; 324 + let%lwt new_mst = Cached_mst.add !mst path cid in 325 + mst := new_mst ; 326 + let refs = 327 + Util.find_blob_refs value 328 + |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 329 + in 330 + let%lwt () = 331 + match%lwt User_store.put_blob_refs t.db path refs with 332 + | Ok () -> 333 + Lwt.return () 334 + | Error err -> 335 + raise err 336 + in 337 + Lwt.return 338 + (Create 339 + { type'= "com.atproto.repo.applyWrites#createResult" 340 + ; uri 341 + ; cid } ) 342 + | Update {collection; rkey; value; swap_record; _} -> 343 + let path = Format.sprintf "%s/%s" collection rkey in 344 + let uri = Format.sprintf "at://%s/%s" t.did path in 345 + let old_cid = String_map.find_opt path !block_map in 346 + ( if 347 + (swap_record <> None && swap_record <> old_cid) 348 + || (swap_record = None && old_cid = None) 349 + then 350 + let cid_str = 351 + match old_cid with 352 + | Some cid -> 353 + Cid.to_string cid 354 + | None -> 355 + "null" 356 + in 357 + Errors.invalid_request ~name:"InvalidSwap" 358 + (Format.sprintf 359 + "attempted to update record %s with cid %s" path 360 + cid_str ) ) ; 361 + let%lwt () = 324 362 match old_cid with 325 - | Some cid -> 326 - Cid.to_string cid 363 + | Some _ -> ( 364 + match%lwt User_store.get_record t.db path with 365 + | Some record -> 366 + let refs = 367 + Util.find_blob_refs record.value 368 + |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 369 + in 370 + if not (List.is_empty refs) then 371 + let%lwt _ = 372 + User_store.delete_orphaned_blobs_by_record_path t.db 373 + path 374 + in 375 + Lwt.return_unit 376 + else Lwt.return_unit 377 + | None -> 378 + Lwt.return_unit ) 327 379 | None -> 328 - "null" 380 + Lwt.return_unit 329 381 in 330 - Errors.invalid_request ~name:"InvalidSwap" 331 - (Format.sprintf "attempted to update record %s with cid %s" 332 - path cid_str ) ) ; 333 - let%lwt () = 334 - match old_cid with 335 - | Some _ -> ( 382 + let record_with_type : Lex.repo_record = 383 + if String_map.mem "$type" value then value 384 + else String_map.add "$type" (`String collection) value 385 + in 386 + let%lwt new_cid, new_block = 387 + User_store.put_record t.db (`LexMap record_with_type) path 388 + in 389 + added_leaves := Block_map.set new_cid new_block !added_leaves ; 390 + block_map := String_map.add path new_cid !block_map ; 391 + commit_ops := 392 + !commit_ops 393 + @ [{action= `Update; path; cid= Some new_cid; prev= old_cid}] ; 394 + let%lwt new_mst = Cached_mst.add !mst path new_cid in 395 + mst := new_mst ; 396 + let refs = 397 + Util.find_blob_refs value 398 + |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 399 + in 400 + let%lwt () = 401 + match%lwt User_store.put_blob_refs t.db path refs with 402 + | Ok () -> 403 + Lwt.return () 404 + | Error err -> 405 + raise err 406 + in 407 + Lwt.return 408 + (Update 409 + { type'= "com.atproto.repo.applyWrites#updateResult" 410 + ; uri 411 + ; cid= new_cid } ) 412 + | Delete {collection; rkey; swap_record; _} -> 413 + let path = Format.sprintf "%s/%s" collection rkey in 414 + let cid = String_map.find_opt path !block_map in 415 + ( if cid = None || (swap_record <> None && swap_record <> cid) 416 + then 417 + let cid_str = 418 + match cid with 419 + | Some cid -> 420 + Cid.to_string cid 421 + | None -> 422 + "null" 423 + in 424 + Errors.invalid_request ~name:"InvalidSwap" 425 + (Format.sprintf 426 + "attempted to delete record %s with cid %s" path 427 + cid_str ) ) ; 428 + let%lwt () = 336 429 match%lwt User_store.get_record t.db path with 337 430 | Some record -> 338 431 let refs = ··· 347 440 Lwt.return_unit 348 441 else Lwt.return_unit 349 442 | None -> 350 - Lwt.return_unit ) 351 - | None -> 352 - Lwt.return_unit 353 - in 354 - let record_with_type : Lex.repo_record = 355 - if String_map.mem "$type" value then value 356 - else String_map.add "$type" (`String collection) value 357 - in 358 - let%lwt new_cid, new_block = 359 - User_store.put_record t.db (`LexMap record_with_type) path 360 - in 361 - added_leaves := Block_map.set new_cid new_block !added_leaves ; 362 - commit_ops := 363 - !commit_ops 364 - @ [{action= `Update; path; cid= Some new_cid; prev= old_cid}] ; 365 - let%lwt new_mst = Cached_mst.add !mst path new_cid in 366 - mst := new_mst ; 367 - let refs = 368 - Util.find_blob_refs value 369 - |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 370 - in 371 - let%lwt () = 372 - match%lwt User_store.put_blob_refs t.db path refs with 373 - | Ok () -> 374 - Lwt.return () 375 - | Error err -> 376 - raise err 377 - in 378 - Lwt.return 379 - (Update 380 - { type'= "com.atproto.repo.applyWrites#updateResult" 381 - ; uri 382 - ; cid= new_cid } ) 383 - | Delete {collection; rkey; swap_record; _} -> 384 - let path = Format.sprintf "%s/%s" collection rkey in 385 - let%lwt cid = User_store.get_record_cid t.db path in 386 - ( if cid = None || (swap_record <> None && swap_record <> cid) then 387 - let cid_str = 388 - match cid with 389 - | Some cid -> 390 - Cid.to_string cid 391 - | None -> 392 - "null" 443 + Lwt.return_unit 393 444 in 394 - Errors.invalid_request ~name:"InvalidSwap" 395 - (Format.sprintf "attempted to delete record %s with cid %s" 396 - path cid_str ) ) ; 397 - let%lwt () = 398 - match%lwt User_store.get_record t.db path with 399 - | Some record -> 400 - let refs = 401 - Util.find_blob_refs record.value 402 - |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 403 - in 404 - if not (List.is_empty refs) then 405 - let%lwt _ = 406 - User_store.delete_orphaned_blobs_by_record_path t.db path 407 - in 408 - Lwt.return_unit 409 - else Lwt.return_unit 410 - | None -> 411 - Lwt.return_unit 412 - in 413 - let%lwt () = User_store.delete_record t.db path in 414 - commit_ops := 415 - !commit_ops @ [{action= `Delete; path; cid= None; prev= cid}] ; 416 - let%lwt new_mst = Cached_mst.delete !mst path in 417 - mst := new_mst ; 418 - Lwt.return 419 - (Delete {type'= "com.atproto.repo.applyWrites#deleteResult"}) ) 420 - writes 421 - in 422 - let new_mst = !mst in 423 - let%lwt new_commit = put_commit t new_mst.root ~previous:(Some prev_commit) in 424 - let new_commit_cid, new_commit_signed = new_commit in 425 - let commit_block = 426 - new_commit_signed |> signed_commit_to_yojson |> Dag_cbor.encode_yojson 427 - in 428 - let%lwt proof_blocks = 429 - Lwt_list.fold_left_s 430 - (fun acc ({path; _} : commit_evt_op) -> 431 - let%lwt key_proof = 432 - Cached_mst.proof_for_key new_mst new_mst.root path 433 - in 434 - Lwt.return (Block_map.merge acc key_proof) ) 435 - Block_map.empty !commit_ops 436 - in 437 - let proof_blocks = Block_map.merge proof_blocks !added_leaves in 438 - let block_stream = 439 - proof_blocks |> Block_map.entries |> Lwt_seq.of_list 440 - |> Lwt_seq.cons (new_commit_cid, commit_block) 441 - in 442 - let%lwt blocks = 443 - Car.blocks_to_stream new_commit_cid block_stream |> Car.collect_stream 444 - in 445 - let%lwt ds = Data_store.connect () in 446 - let%lwt _ = 447 - Sequencer.sequence_commit ds ~did:t.did ~commit:new_commit_cid 448 - ~rev:new_commit_signed.rev ~blocks ~ops:!commit_ops 449 - ~since:prev_commit.rev ~prev_data:prev_commit.data () 450 - in 451 - Lwt.return {commit= new_commit; results} 445 + let%lwt () = User_store.delete_record t.db path in 446 + block_map := String_map.remove path !block_map ; 447 + commit_ops := 448 + !commit_ops @ [{action= `Delete; path; cid= None; prev= cid}] ; 449 + let%lwt new_mst = Cached_mst.delete !mst path in 450 + mst := new_mst ; 451 + Lwt.return 452 + (Delete {type'= "com.atproto.repo.applyWrites#deleteResult"}) ) 453 + writes 454 + in 455 + Dream.debug (fun l -> l "writes processed") ; 456 + let new_mst = !mst in 457 + let%lwt new_commit = 458 + put_commit t new_mst.root ~previous:(Some prev_commit) 459 + in 460 + Dream.debug (fun l -> l "commit inserted") ; 461 + let new_commit_cid, new_commit_signed = new_commit in 462 + let commit_block = 463 + new_commit_signed |> signed_commit_to_yojson |> Dag_cbor.encode_yojson 464 + in 465 + let diff : Inductive.diff list = 466 + List.fold_left 467 + (fun (acc : Inductive.diff list) 468 + ({action; path; cid; prev} : commit_evt_op) -> 469 + match action with 470 + | `Create -> 471 + acc @ [Add {key= path; cid= Option.get cid}] 472 + | `Update -> 473 + acc @ [Update {key= path; cid= Option.get cid; prev}] 474 + | `Delete -> 475 + acc @ [Delete {key= path; prev= Option.get prev}] ) 476 + [] !commit_ops 477 + in 478 + let%lwt proof_blocks = 479 + match%lwt 480 + Inductive.generate_proof !block_map diff ~new_root:new_mst.root 481 + ~prev_root:prev_commit.data 482 + with 483 + | Ok blocks -> 484 + Lwt.return (Block_map.merge blocks !added_leaves) 485 + | Error err -> 486 + raise err 487 + in 488 + Dream.debug (fun l -> l "proof generated") ; 489 + let block_stream = 490 + proof_blocks |> Block_map.entries |> Lwt_seq.of_list 491 + |> Lwt_seq.cons (new_commit_cid, commit_block) 492 + in 493 + let%lwt blocks = 494 + Car.blocks_to_stream new_commit_cid block_stream |> Car.collect_stream 495 + in 496 + let%lwt ds = Data_store.connect () in 497 + let%lwt _ = 498 + Sequencer.sequence_commit ds ~did:t.did ~commit:new_commit_cid 499 + ~rev:new_commit_signed.rev ~blocks ~ops:!commit_ops 500 + ~since:prev_commit.rev ~prev_data:prev_commit.data () 501 + in 502 + Dream.debug (fun l -> l "commit sequenced") ; 503 + Lwt.return {commit= new_commit; results} ) 452 504 453 - let load ?write ?create ?(ensure_active = false) ?ds did : t Lwt.t = 505 + let load ?create ?(ensure_active = false) ?ds did : t Lwt.t = 454 506 let%lwt data_store_conn = 455 - match ds with 456 - | Some ds -> 457 - Lwt.return ds 458 - | None -> 459 - Data_store.connect ?write () 507 + match ds with Some ds -> Lwt.return ds | None -> Data_store.connect () 460 508 in 461 509 let%lwt user_db = 462 - try%lwt User_store.connect ?create ~write:true did 510 + try%lwt User_store.connect ?create did 463 511 with _ -> 464 512 Errors.invalid_request ~name:"RepoNotFound" 465 513 "your princess is in another castle"
+18 -6
pegasus/lib/user_store.ml
··· 310 310 311 311 type t = {did: string; db: Util.caqti_pool} 312 312 313 - let connect ?create ?write did : t Lwt.t = 314 - let%lwt db = 315 - Util.connect_sqlite ?create ?write (Util.Constants.user_db_location did) 316 - in 317 - let%lwt () = Migrations.run_migrations User_store db in 318 - Lwt.return {did; db} 313 + let pool_cache : (string, t) Hashtbl.t = Hashtbl.create 64 314 + 315 + let pool_cache_mutex = Lwt_mutex.create () 316 + 317 + let connect ?create did : t Lwt.t = 318 + Lwt_mutex.with_lock pool_cache_mutex (fun () -> 319 + match Hashtbl.find_opt pool_cache did with 320 + | Some cached -> 321 + Lwt.return cached 322 + | None -> 323 + let%lwt db = 324 + Util.connect_sqlite ?create ~write:true 325 + (Util.Constants.user_db_location did) 326 + in 327 + let%lwt () = Migrations.run_migrations User_store db in 328 + let t = {did; db} in 329 + Hashtbl.replace pool_cache did t ; 330 + Lwt.return t ) 319 331 320 332 (* mst blocks; implements Writable_blockstore *) 321 333
+24 -22
pegasus/lib/util.ml
··· 183 183 | Error caqti_err -> 184 184 Error (Caqti_error.Exn caqti_err) 185 185 186 - let _init_connection conn = 187 - match%lwt 188 - [%rapper 189 - execute 190 - {sql| 191 - PRAGMA journal_mode=WAL; 192 - PRAGMA foreign_keys=ON; 193 - PRAGMA synchronous=NORMAL; 194 - PRAGMA busy_timeout=5000; 195 - |sql} 196 - syntax_off] 197 - () conn 198 - with 199 - | Ok conn -> 200 - Lwt.return conn 201 - | Error e -> 202 - raise (Caqti_error.Exn e) 186 + let _init_connection (module Db : Rapper_helper.CONNECTION) : 187 + (unit, Caqti_error.t) Lwt_result.t = 188 + let open Lwt_result.Syntax in 189 + let open Caqti_request.Infix in 190 + let open Caqti_type in 191 + let* _ = 192 + Db.find (((unit ->! string) ~oneshot:true) "PRAGMA journal_mode=WAL") () 193 + in 194 + let* _ = 195 + Db.exec (((unit ->. unit) ~oneshot:true) "PRAGMA foreign_keys=ON") () 196 + in 197 + let* _ = 198 + Db.exec (((unit ->. unit) ~oneshot:true) "PRAGMA synchronous=NORMAL") () 199 + in 200 + let* _ = 201 + Db.find (((unit ->! int) ~oneshot:true) "PRAGMA busy_timeout=5000") () 202 + in 203 + Lwt.return_ok () 203 204 204 205 (* creates an sqlite pool *) 205 206 let connect_sqlite ?(create = false) ?(write = true) db_uri : caqti_pool Lwt.t = ··· 209 210 in 210 211 let pool_config = Caqti_pool_config.create ~max_size:16 ~max_idle_size:4 () in 211 212 match 212 - Caqti_lwt_unix.connect_pool ~pool_config 213 - ~post_connect:(fun conn -> Lwt_result.ok @@ _init_connection conn) 214 - uri 213 + Caqti_lwt_unix.connect_pool ~pool_config ~post_connect:_init_connection uri 215 214 with 216 215 | Ok pool -> 217 216 Lwt.return pool ··· 221 220 let with_connection db_uri f = 222 221 match%lwt 223 222 Caqti_lwt_unix.with_connection db_uri (fun conn -> 224 - let%lwt _ = _init_connection conn in 225 - f conn ) 223 + match%lwt _init_connection conn with 224 + | Ok () -> 225 + f conn 226 + | Error e -> 227 + Lwt.return_error e ) 226 228 with 227 229 | Ok result -> 228 230 Lwt.return result