Write-ahead log with fsync batching for Eio
at main 219 lines 7.9 kB view raw
1(** Write-Ahead Log implementation. 2 3 Based on LevelDB's log format: 4 https://github.com/google/leveldb/blob/main/doc/log_format.md *) 5 6(* Int63 comes transitively from eio -> optint *) 7module Int63 = Optint.Int63 8 9let err_invalid_record_type n = 10 Error (`Msg (Fmt.str "invalid record type: %d" n)) 11 12(* Record types *) 13type record_type = Full | First | Middle | Last 14 15let record_type_to_int = function 16 | Full -> 1 17 | First -> 2 18 | Middle -> 3 19 | Last -> 4 20 21let record_type_of_int = function 22 | 1 -> Ok Full 23 | 2 -> Ok First 24 | 3 -> Ok Middle 25 | 4 -> Ok Last 26 | n -> err_invalid_record_type n 27 28(* Constants *) 29let block_size = 32 * 1024 (* 32KB *) 30let header_size = 4 + 2 + 1 (* CRC32 + length + type *) 31let max_record_size = block_size - header_size 32 33(* CRC32C using checkseum - returns Optint.t which we convert to unboxed int *) 34let crc32c data = 35 let v = 36 Checkseum.Crc32c.digest_string data 0 (String.length data) 37 Checkseum.Crc32c.default 38 in 39 Optint.to_unsigned_int v 40 41(* Encode a single physical record *) 42let encode_record typ data = 43 let data_len = String.length data in 44 if data_len = 0 then invalid_arg "empty record"; 45 if data_len > max_record_size then 46 Fmt.invalid_arg "record too large: %d > %d" data_len max_record_size; 47 let buf = Bytes.create (header_size + data_len) in 48 (* Type + data for CRC *) 49 let type_byte = record_type_to_int typ in 50 let crc_input = String.make 1 (Char.chr type_byte) ^ data in 51 let crc = crc32c crc_input in 52 Wire.UInt32.set_le buf 0 crc; 53 Bytes.set_uint16_le buf 4 data_len; 54 Bytes.set buf 6 (Char.chr type_byte); 55 Bytes.blit_string data 0 buf 7 data_len; 56 Bytes.to_string buf 57 58(* Decode a physical record from bytes *) 59let decode_record bytes = 60 let len = String.length bytes in 61 if len < header_size then Error (`Msg "record too short") 62 else 63 let buf = Bytes.of_string bytes in 64 let stored_crc = Wire.UInt32.get_le buf 0 in 65 let data_len = Bytes.get_uint16_le buf 4 in 66 let type_byte = Char.code (Bytes.get buf 6) in 67 if header_size + data_len > len then Error (`Msg "incomplete record") 68 else 69 match record_type_of_int type_byte with 70 | Error _ as e -> e 71 | Ok typ -> 72 let data = String.sub bytes header_size data_len in 73 let crc_input = String.make 1 (Char.chr type_byte) ^ data in 74 let computed_crc = crc32c crc_input in 75 if stored_crc <> computed_crc then Error (`Msg "CRC mismatch") 76 else Ok (typ, data) 77 78(* WAL handle *) 79type t = { 80 mutable file : Eio.File.rw_ty Eio.Resource.t; 81 mutable block_offset : int; (* Current offset within block *) 82 mutable file_offset : Optint.Int63.t; (* Current file offset *) 83} 84 85let pp ppf t = 86 Fmt.pf ppf "wal(block_offset=%d,file_offset=%s)" t.block_offset 87 (Int63.to_string t.file_offset) 88 89(* Create or open a WAL file *) 90let v ~sw path = 91 let file = Eio.Path.open_out ~sw ~create:(`Or_truncate 0o644) path in 92 (* Get file size to append at end *) 93 let stat = Eio.Path.stat ~follow:true path in 94 let file_size = stat.size in 95 let block_offset = 96 Optint.Int63.(to_int (rem file_size (of_int block_size))) 97 in 98 { 99 file :> Eio.File.rw_ty Eio.Resource.t; 100 block_offset; 101 file_offset = file_size; 102 } 103 104(* Append data to the log *) 105let append t data = 106 if String.length data = 0 then invalid_arg "empty record"; 107 let rec write_fragments data_offset is_first = 108 let remaining = String.length data - data_offset in 109 if remaining = 0 then () 110 else 111 (* Space left in current block *) 112 let block_remaining = block_size - t.block_offset in 113 if block_remaining < header_size then begin 114 (* Not enough space for header, pad and move to next block *) 115 let padding = String.make block_remaining '\x00' in 116 Eio.File.pwrite_all t.file ~file_offset:t.file_offset 117 [ Cstruct.of_string padding ]; 118 t.file_offset <- 119 Optint.Int63.add t.file_offset (Optint.Int63.of_int block_remaining); 120 t.block_offset <- 0; 121 write_fragments data_offset is_first 122 end 123 else 124 let avail = block_remaining - header_size in 125 let is_last = remaining <= avail in 126 let fragment_len = min remaining avail in 127 let typ = 128 match (is_first, is_last) with 129 | true, true -> Full 130 | true, false -> First 131 | false, true -> Last 132 | false, false -> Middle 133 in 134 let fragment = String.sub data data_offset fragment_len in 135 let record = encode_record typ fragment in 136 Eio.File.pwrite_all t.file ~file_offset:t.file_offset 137 [ Cstruct.of_string record ]; 138 let record_len = String.length record in 139 t.file_offset <- 140 Optint.Int63.add t.file_offset (Optint.Int63.of_int record_len); 141 t.block_offset <- t.block_offset + record_len; 142 if t.block_offset >= block_size then t.block_offset <- 0; 143 write_fragments (data_offset + fragment_len) false 144 in 145 write_fragments 0 true 146 147(* Sync to disk *) 148let sync t = Eio.File.sync t.file 149 150(* Close the log *) 151let close t = 152 sync t; 153 Eio.Resource.close t.file 154 155(* Read one physical record from a file; returns a tagged result. *) 156let read_record_from file ~file_offset ~block_offset = 157 let block_remaining = block_size - block_offset in 158 if block_remaining < header_size then `Skip block_remaining 159 else 160 let header_buf = Cstruct.create header_size in 161 let bytes_read = 162 try 163 Eio.File.pread_exact file ~file_offset [ header_buf ]; 164 header_size 165 with End_of_file -> 0 166 in 167 if bytes_read < header_size then `Eof 168 else 169 let header_bytes = Bytes.of_string (Cstruct.to_string header_buf) in 170 let data_len = Bytes.get_uint16_le header_bytes 4 in 171 let record_len = header_size + data_len in 172 if block_offset + record_len > block_size then `Skip block_remaining 173 else 174 let record_buf = Cstruct.create record_len in 175 let () = 176 try Eio.File.pread_exact file ~file_offset [ record_buf ] 177 with End_of_file -> () 178 in 179 match decode_record (Cstruct.to_string record_buf) with 180 | Error _ -> `Corrupted record_len 181 | Ok (typ, data) -> `Valid (typ, data, record_len) 182 183(* Read all records from a log file *) 184let fold path ~init ~f = 185 Eio.Path.with_open_in path @@ fun file -> 186 let stat = Eio.Path.stat ~follow:true path in 187 let file_size = stat.size in 188 let ( +! ) = Optint.Int63.add in 189 let of_int = Optint.Int63.of_int in 190 let rec read_records acc file_offset block_offset fragments = 191 if Optint.Int63.compare file_offset file_size >= 0 then acc 192 else 193 match read_record_from file ~file_offset ~block_offset with 194 | `Eof -> acc 195 | `Skip skip -> read_records acc (file_offset +! of_int skip) 0 fragments 196 | `Corrupted record_len -> 197 let next_block_offset = block_offset + record_len in 198 read_records acc 199 (file_offset +! of_int record_len) 200 next_block_offset [] 201 | `Valid (typ, data, record_len) -> ( 202 let next_offset = file_offset +! of_int record_len in 203 let next_block_offset = 204 let n = block_offset + record_len in 205 if n >= block_size then 0 else n 206 in 207 match typ with 208 | Full -> read_records (f acc data) next_offset next_block_offset [] 209 | First -> read_records acc next_offset next_block_offset [ data ] 210 | Middle -> 211 read_records acc next_offset next_block_offset (data :: fragments) 212 | Last -> 213 let full_data = String.concat "" (List.rev (data :: fragments)) in 214 read_records (f acc full_data) next_offset next_block_offset []) 215 in 216 read_records init Optint.Int63.zero 0 [] 217 218let read_all path = 219 List.rev (fold path ~init:[] ~f:(fun acc data -> data :: acc))