(** Write-Ahead Log implementation. Based on LevelDB's log format: https://github.com/google/leveldb/blob/main/doc/log_format.md *) (* Int63 comes transitively from eio -> optint *) module Int63 = Optint.Int63 let err_invalid_record_type n = Error (`Msg (Fmt.str "invalid record type: %d" n)) (* Record types *) type record_type = Full | First | Middle | Last let record_type_to_int = function | Full -> 1 | First -> 2 | Middle -> 3 | Last -> 4 let record_type_of_int = function | 1 -> Ok Full | 2 -> Ok First | 3 -> Ok Middle | 4 -> Ok Last | n -> err_invalid_record_type n (* Constants *) let block_size = 32 * 1024 (* 32KB *) let header_size = 4 + 2 + 1 (* CRC32 + length + type *) let max_record_size = block_size - header_size (* CRC32C using checkseum - returns Optint.t which we convert to unboxed int *) let crc32c data = let v = Checkseum.Crc32c.digest_string data 0 (String.length data) Checkseum.Crc32c.default in Optint.to_unsigned_int v (* Encode a single physical record *) let encode_record typ data = let data_len = String.length data in if data_len = 0 then invalid_arg "empty record"; if data_len > max_record_size then Fmt.invalid_arg "record too large: %d > %d" data_len max_record_size; let buf = Bytes.create (header_size + data_len) in (* Type + data for CRC *) let type_byte = record_type_to_int typ in let crc_input = String.make 1 (Char.chr type_byte) ^ data in let crc = crc32c crc_input in Wire.UInt32.set_le buf 0 crc; Bytes.set_uint16_le buf 4 data_len; Bytes.set buf 6 (Char.chr type_byte); Bytes.blit_string data 0 buf 7 data_len; Bytes.to_string buf (* Decode a physical record from bytes *) let decode_record bytes = let len = String.length bytes in if len < header_size then Error (`Msg "record too short") else let buf = Bytes.of_string bytes in let stored_crc = Wire.UInt32.get_le buf 0 in let data_len = Bytes.get_uint16_le buf 4 in let type_byte = Char.code (Bytes.get buf 6) in if header_size + data_len > len then Error (`Msg "incomplete record") else match record_type_of_int type_byte with | Error _ as e -> e | Ok typ -> let data = String.sub bytes header_size data_len in let crc_input = String.make 1 (Char.chr type_byte) ^ data in let computed_crc = crc32c crc_input in if stored_crc <> computed_crc then Error (`Msg "CRC mismatch") else Ok (typ, data) (* WAL handle *) type t = { mutable file : Eio.File.rw_ty Eio.Resource.t; mutable block_offset : int; (* Current offset within block *) mutable file_offset : Optint.Int63.t; (* Current file offset *) } let pp ppf t = Fmt.pf ppf "wal(block_offset=%d,file_offset=%s)" t.block_offset (Int63.to_string t.file_offset) (* Create or open a WAL file *) let v ~sw path = let file = Eio.Path.open_out ~sw ~create:(`Or_truncate 0o644) path in (* Get file size to append at end *) let stat = Eio.Path.stat ~follow:true path in let file_size = stat.size in let block_offset = Optint.Int63.(to_int (rem file_size (of_int block_size))) in { file :> Eio.File.rw_ty Eio.Resource.t; block_offset; file_offset = file_size; } (* Append data to the log *) let append t data = if String.length data = 0 then invalid_arg "empty record"; let rec write_fragments data_offset is_first = let remaining = String.length data - data_offset in if remaining = 0 then () else (* Space left in current block *) let block_remaining = block_size - t.block_offset in if block_remaining < header_size then begin (* Not enough space for header, pad and move to next block *) let padding = String.make block_remaining '\x00' in Eio.File.pwrite_all t.file ~file_offset:t.file_offset [ Cstruct.of_string padding ]; t.file_offset <- Optint.Int63.add t.file_offset (Optint.Int63.of_int block_remaining); t.block_offset <- 0; write_fragments data_offset is_first end else let avail = block_remaining - header_size in let is_last = remaining <= avail in let fragment_len = min remaining avail in let typ = match (is_first, is_last) with | true, true -> Full | true, false -> First | false, true -> Last | false, false -> Middle in let fragment = String.sub data data_offset fragment_len in let record = encode_record typ fragment in Eio.File.pwrite_all t.file ~file_offset:t.file_offset [ Cstruct.of_string record ]; let record_len = String.length record in t.file_offset <- Optint.Int63.add t.file_offset (Optint.Int63.of_int record_len); t.block_offset <- t.block_offset + record_len; if t.block_offset >= block_size then t.block_offset <- 0; write_fragments (data_offset + fragment_len) false in write_fragments 0 true (* Sync to disk *) let sync t = Eio.File.sync t.file (* Close the log *) let close t = sync t; Eio.Resource.close t.file (* Read one physical record from a file; returns a tagged result. *) let read_record_from file ~file_offset ~block_offset = let block_remaining = block_size - block_offset in if block_remaining < header_size then `Skip block_remaining else let header_buf = Cstruct.create header_size in let bytes_read = try Eio.File.pread_exact file ~file_offset [ header_buf ]; header_size with End_of_file -> 0 in if bytes_read < header_size then `Eof else let header_bytes = Bytes.of_string (Cstruct.to_string header_buf) in let data_len = Bytes.get_uint16_le header_bytes 4 in let record_len = header_size + data_len in if block_offset + record_len > block_size then `Skip block_remaining else let record_buf = Cstruct.create record_len in let () = try Eio.File.pread_exact file ~file_offset [ record_buf ] with End_of_file -> () in match decode_record (Cstruct.to_string record_buf) with | Error _ -> `Corrupted record_len | Ok (typ, data) -> `Valid (typ, data, record_len) (* Read all records from a log file *) let fold path ~init ~f = Eio.Path.with_open_in path @@ fun file -> let stat = Eio.Path.stat ~follow:true path in let file_size = stat.size in let ( +! ) = Optint.Int63.add in let of_int = Optint.Int63.of_int in let rec read_records acc file_offset block_offset fragments = if Optint.Int63.compare file_offset file_size >= 0 then acc else match read_record_from file ~file_offset ~block_offset with | `Eof -> acc | `Skip skip -> read_records acc (file_offset +! of_int skip) 0 fragments | `Corrupted record_len -> let next_block_offset = block_offset + record_len in read_records acc (file_offset +! of_int record_len) next_block_offset [] | `Valid (typ, data, record_len) -> ( let next_offset = file_offset +! of_int record_len in let next_block_offset = let n = block_offset + record_len in if n >= block_size then 0 else n in match typ with | Full -> read_records (f acc data) next_offset next_block_offset [] | First -> read_records acc next_offset next_block_offset [ data ] | Middle -> read_records acc next_offset next_block_offset (data :: fragments) | Last -> let full_data = String.concat "" (List.rev (data :: fragments)) in read_records (f acc full_data) next_offset next_block_offset []) in read_records init Optint.Int63.zero 0 [] let read_all path = List.rev (fold path ~init:[] ~f:(fun acc data -> data :: acc))