tangled
alpha
login
or
join now
futur.blue
/
pegasus
57
fork
atom
objective categorical abstract machine language personal data server
57
fork
atom
overview
issues
2
pulls
pipelines
Fix overly aggressive blob ref cleanup on update record
futur.blue
1 month ago
04f0b026
0d72fb31
verified
This commit was signed with the committer's
known signature
.
futur.blue
SSH Key Fingerprint:
SHA256:QHGqHWNpqYyw9bt8KmPuJIyeZX9SZewBZ0PR1COtKQ0=
+76
-46
2 changed files
expand all
collapse all
unified
split
pegasus
lib
repository.ml
user_store.ml
+22
-30
pegasus/lib/repository.ml
···
291
Errors.invalid_request ~name:"InvalidSwap"
292
(Format.sprintf "attempted to update record %s with cid %s"
293
path cid_str ) ) ;
294
-
let%lwt () =
295
match existing_record with
296
| Some record ->
297
-
let refs =
298
-
Util.find_blob_refs record.value
299
-
|> List.map (fun (r : Mist.Blob_ref.t) -> r.ref)
300
-
in
301
-
if not (List.is_empty refs) then
302
-
let%lwt _ =
303
-
User_store.delete_orphaned_blobs_by_record_path t.db path
304
-
in
305
-
Lwt.return_unit
306
-
else Lwt.return_unit
307
| None ->
308
-
Lwt.return_unit
309
in
310
let record_with_type : Lex.repo_record =
311
if String_map.mem "$type" value then value
···
320
:: !commit_ops_rev ;
321
let%lwt new_mst = Cached_mst.add !mst path new_cid in
322
mst := new_mst ;
323
-
let refs =
324
Util.find_blob_refs value
325
|> List.map (fun (r : Mist.Blob_ref.t) -> r.ref)
326
in
0
327
let%lwt () =
328
-
match%lwt User_store.put_blob_refs t.db path refs with
329
| Ok () ->
330
Lwt.return ()
331
| Error err ->
332
raise err
333
in
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
334
Lwt.return
335
(Update
336
{ type'= "com.atproto.repo.applyWrites#updateResult"
···
351
Errors.invalid_request ~name:"InvalidSwap"
352
(Format.sprintf "attempted to delete record %s with cid %s"
353
path cid_str ) ) ;
354
-
let%lwt () =
355
-
match existing_record with
356
-
| Some record ->
357
-
let refs =
358
-
Util.find_blob_refs record.value
359
-
|> List.map (fun (r : Mist.Blob_ref.t) -> r.ref)
360
-
in
361
-
if not (List.is_empty refs) then
362
-
let%lwt _ =
363
-
User_store.delete_orphaned_blobs_by_record_path t.db path
364
-
in
365
-
Lwt.return_unit
366
-
else Lwt.return_unit
367
-
| None ->
368
-
Lwt.return_unit
369
-
in
370
let%lwt () = User_store.delete_record t.db path in
371
commit_ops_rev :=
372
{action= `Delete; path; cid= None; prev= cid} :: !commit_ops_rev ;
···
291
Errors.invalid_request ~name:"InvalidSwap"
292
(Format.sprintf "attempted to update record %s with cid %s"
293
path cid_str ) ) ;
294
+
let old_blob_refs =
295
match existing_record with
296
| Some record ->
297
+
Util.find_blob_refs record.value
298
+
|> List.map (fun (r : Mist.Blob_ref.t) -> r.ref)
0
0
0
0
0
0
0
0
299
| None ->
300
+
[]
301
in
302
let record_with_type : Lex.repo_record =
303
if String_map.mem "$type" value then value
···
312
:: !commit_ops_rev ;
313
let%lwt new_mst = Cached_mst.add !mst path new_cid in
314
mst := new_mst ;
315
+
let new_blob_refs =
316
Util.find_blob_refs value
317
|> List.map (fun (r : Mist.Blob_ref.t) -> r.ref)
318
in
319
+
let%lwt () = User_store.delete_blob_refs_for_path t.db path in
320
let%lwt () =
321
+
match%lwt User_store.put_blob_refs t.db path new_blob_refs with
322
| Ok () ->
323
Lwt.return ()
324
| Error err ->
325
raise err
326
in
327
+
let removed_blob_refs =
328
+
List.filter
329
+
(* include old refs such that *)
330
+
(fun old_ref ->
331
+
(* there isn't a new ref such that *)
332
+
not
333
+
(List.exists
334
+
(* the new ref equals the old ref *)
335
+
(fun new_ref -> Cid.equal old_ref new_ref )
336
+
new_blob_refs ) )
337
+
old_blob_refs
338
+
in
339
+
let%lwt () =
340
+
User_store.delete_unreferenced_blobs t.db removed_blob_refs
341
+
in
342
Lwt.return
343
(Update
344
{ type'= "com.atproto.repo.applyWrites#updateResult"
···
359
Errors.invalid_request ~name:"InvalidSwap"
360
(Format.sprintf "attempted to delete record %s with cid %s"
361
path cid_str ) ) ;
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
362
let%lwt () = User_store.delete_record t.db path in
363
commit_ops_rev :=
364
{action= `Delete; path; cid= None; prev= cid} :: !commit_ops_rev ;
+54
-16
pegasus/lib/user_store.ml
···
270
LIMIT %int{limit}
271
|sql}]
272
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
273
let delete_orphaned_blobs_by_record_path path =
274
[%rapper
275
get_many
···
545
546
let delete_block t cid : (bool, exn) Lwt_result.t =
547
Lwt_result.catch
548
-
@@ fun () -> Util.Sqlite.use_pool t.db @@ Queries.delete_block cid >|= fun _ -> true
0
549
550
let delete_many t cids : (int, exn) Lwt_result.t =
551
Lwt_result.catch
552
-
@@ fun () -> Util.Sqlite.use_pool t.db @@ Queries.delete_blocks cids >|= List.length
0
553
554
let clear_mst t : unit Lwt.t =
555
let%lwt () = Util.Sqlite.use_pool t.db Queries.clear_mst in
···
557
558
(* mst misc *)
559
560
-
let count_blocks t : int Lwt.t = Util.Sqlite.use_pool t.db @@ Queries.count_blocks ()
0
561
562
(* repo commit *)
563
···
606
>|= List.map (fun (path, cid, data, since) ->
607
{path; cid; value= Lex.of_cbor data; since} )
608
609
-
let count_records t : int Lwt.t = Util.Sqlite.use_pool t.db @@ Queries.count_records ()
0
610
611
let list_collections t : string list Lwt.t =
612
Util.Sqlite.use_pool t.db @@ Queries.list_collections
···
625
let delete_record t path : unit Lwt.t =
626
Util.Sqlite.use_pool t.db (fun conn ->
627
Util.Sqlite.transact conn (fun () ->
628
-
let del = Queries.delete_record path conn in
629
-
let$! () = del in
630
let$! deleted_blobs =
631
Queries.delete_orphaned_blobs_by_record_path path conn
632
in
633
-
let () =
634
-
List.iter
635
-
(fun (cid, storage_str) ->
636
-
let storage = Blob_store.storage_of_string storage_str in
637
-
delete_blob_file ~did:t.did ~cid ~storage )
638
-
deleted_blobs
639
-
in
640
-
del ) )
641
642
(* blobs *)
643
···
675
(string * Cid.t) list Lwt.t =
676
Util.Sqlite.use_pool t.db @@ Queries.list_missing_blobs ~limit ~cursor
677
678
-
let count_blobs t : int Lwt.t = Util.Sqlite.use_pool t.db @@ Queries.count_blobs ()
0
679
680
let count_referenced_blobs t : int Lwt.t =
681
Util.Sqlite.use_pool t.db @@ Queries.count_referenced_blobs ()
···
697
let delete_orphaned_blobs_by_record_path t path :
698
(Cid.t * Blob_store.storage) list Lwt.t =
699
let%lwt results =
700
-
Util.Sqlite.use_pool t.db @@ Queries.delete_orphaned_blobs_by_record_path path
0
701
in
702
Lwt.return
703
@@ List.map
···
721
let clear_blob_refs t path cids : unit Lwt.t =
722
if List.is_empty cids then Lwt.return_unit
723
else Util.Sqlite.use_pool t.db @@ Queries.clear_blob_refs path cids
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
724
725
let update_blob_storage t cid storage : unit Lwt.t =
726
let storage_str = Blob_store.storage_to_string storage in
···
270
LIMIT %int{limit}
271
|sql}]
272
273
+
let delete_blob_refs_for_path path =
274
+
[%rapper
275
+
execute
276
+
{sql| DELETE FROM blobs_records WHERE record_path = %string{path} |sql}]
277
+
~path
278
+
279
+
let delete_unreferenced_blobs cids =
280
+
[%rapper
281
+
get_many
282
+
{sql| DELETE FROM blobs
283
+
WHERE cid IN (%list{%CID{cids}})
284
+
AND NOT EXISTS (
285
+
SELECT 1 FROM blobs_records
286
+
WHERE blob_cid = blobs.cid
287
+
)
288
+
RETURNING @CID{cid}, @string{storage}
289
+
|sql}]
290
+
~cids
291
+
292
let delete_orphaned_blobs_by_record_path path =
293
[%rapper
294
get_many
···
564
565
let delete_block t cid : (bool, exn) Lwt_result.t =
566
Lwt_result.catch
567
+
@@ fun () ->
568
+
Util.Sqlite.use_pool t.db @@ Queries.delete_block cid >|= fun _ -> true
569
570
let delete_many t cids : (int, exn) Lwt_result.t =
571
Lwt_result.catch
572
+
@@ fun () ->
573
+
Util.Sqlite.use_pool t.db @@ Queries.delete_blocks cids >|= List.length
574
575
let clear_mst t : unit Lwt.t =
576
let%lwt () = Util.Sqlite.use_pool t.db Queries.clear_mst in
···
578
579
(* mst misc *)
580
581
+
let count_blocks t : int Lwt.t =
582
+
Util.Sqlite.use_pool t.db @@ Queries.count_blocks ()
583
584
(* repo commit *)
585
···
628
>|= List.map (fun (path, cid, data, since) ->
629
{path; cid; value= Lex.of_cbor data; since} )
630
631
+
let count_records t : int Lwt.t =
632
+
Util.Sqlite.use_pool t.db @@ Queries.count_records ()
633
634
let list_collections t : string list Lwt.t =
635
Util.Sqlite.use_pool t.db @@ Queries.list_collections
···
648
let delete_record t path : unit Lwt.t =
649
Util.Sqlite.use_pool t.db (fun conn ->
650
Util.Sqlite.transact conn (fun () ->
0
0
651
let$! deleted_blobs =
652
Queries.delete_orphaned_blobs_by_record_path path conn
653
in
654
+
let$! () = Queries.delete_record path conn in
655
+
List.iter
656
+
(fun (cid, storage_str) ->
657
+
let storage = Blob_store.storage_of_string storage_str in
658
+
delete_blob_file ~did:t.did ~cid ~storage )
659
+
deleted_blobs ;
660
+
Lwt.return_ok () ) )
0
661
662
(* blobs *)
663
···
695
(string * Cid.t) list Lwt.t =
696
Util.Sqlite.use_pool t.db @@ Queries.list_missing_blobs ~limit ~cursor
697
698
+
let count_blobs t : int Lwt.t =
699
+
Util.Sqlite.use_pool t.db @@ Queries.count_blobs ()
700
701
let count_referenced_blobs t : int Lwt.t =
702
Util.Sqlite.use_pool t.db @@ Queries.count_referenced_blobs ()
···
718
let delete_orphaned_blobs_by_record_path t path :
719
(Cid.t * Blob_store.storage) list Lwt.t =
720
let%lwt results =
721
+
Util.Sqlite.use_pool t.db
722
+
@@ Queries.delete_orphaned_blobs_by_record_path path
723
in
724
Lwt.return
725
@@ List.map
···
743
let clear_blob_refs t path cids : unit Lwt.t =
744
if List.is_empty cids then Lwt.return_unit
745
else Util.Sqlite.use_pool t.db @@ Queries.clear_blob_refs path cids
746
+
747
+
let delete_blob_refs_for_path t path : unit Lwt.t =
748
+
Util.Sqlite.use_pool t.db @@ Queries.delete_blob_refs_for_path path
749
+
750
+
let delete_unreferenced_blobs t cids : unit Lwt.t =
751
+
if List.is_empty cids then Lwt.return_unit
752
+
else
753
+
let%lwt results =
754
+
Util.Sqlite.use_pool t.db @@ Queries.delete_unreferenced_blobs cids
755
+
in
756
+
List.iter
757
+
(fun (cid, storage_str) ->
758
+
let storage = Blob_store.storage_of_string storage_str in
759
+
delete_blob_file ~did:t.did ~cid ~storage )
760
+
results ;
761
+
Lwt.return_unit
762
763
let update_blob_storage t cid storage : unit Lwt.t =
764
let storage_str = Blob_store.storage_to_string storage in