Write-ahead log with fsync batching for Eio
1(** Write-Ahead Log implementation.
2
3 Based on LevelDB's log format:
4 https://github.com/google/leveldb/blob/main/doc/log_format.md *)
5
6(* Int63 comes transitively from eio -> optint *)
7module Int63 = Optint.Int63
8
9let err_invalid_record_type n =
10 Error (`Msg (Fmt.str "invalid record type: %d" n))
11
12(* Record types *)
13type record_type = Full | First | Middle | Last
14
15let record_type_to_int = function
16 | Full -> 1
17 | First -> 2
18 | Middle -> 3
19 | Last -> 4
20
21let record_type_of_int = function
22 | 1 -> Ok Full
23 | 2 -> Ok First
24 | 3 -> Ok Middle
25 | 4 -> Ok Last
26 | n -> err_invalid_record_type n
27
28(* Constants *)
29let block_size = 32 * 1024 (* 32KB *)
30let header_size = 4 + 2 + 1 (* CRC32 + length + type *)
31let max_record_size = block_size - header_size
32
33(* CRC32C using checkseum - returns Optint.t which we convert to unboxed int *)
34let crc32c data =
35 let v =
36 Checkseum.Crc32c.digest_string data 0 (String.length data)
37 Checkseum.Crc32c.default
38 in
39 Optint.to_unsigned_int v
40
41(* Encode a single physical record *)
42let encode_record typ data =
43 let data_len = String.length data in
44 if data_len = 0 then invalid_arg "empty record";
45 if data_len > max_record_size then
46 Fmt.invalid_arg "record too large: %d > %d" data_len max_record_size;
47 let buf = Bytes.create (header_size + data_len) in
48 (* Type + data for CRC *)
49 let type_byte = record_type_to_int typ in
50 let crc_input = String.make 1 (Char.chr type_byte) ^ data in
51 let crc = crc32c crc_input in
52 Wire.UInt32.set_le buf 0 crc;
53 Bytes.set_uint16_le buf 4 data_len;
54 Bytes.set buf 6 (Char.chr type_byte);
55 Bytes.blit_string data 0 buf 7 data_len;
56 Bytes.to_string buf
57
58(* Decode a physical record from bytes *)
59let decode_record bytes =
60 let len = String.length bytes in
61 if len < header_size then Error (`Msg "record too short")
62 else
63 let buf = Bytes.of_string bytes in
64 let stored_crc = Wire.UInt32.get_le buf 0 in
65 let data_len = Bytes.get_uint16_le buf 4 in
66 let type_byte = Char.code (Bytes.get buf 6) in
67 if header_size + data_len > len then Error (`Msg "incomplete record")
68 else
69 match record_type_of_int type_byte with
70 | Error _ as e -> e
71 | Ok typ ->
72 let data = String.sub bytes header_size data_len in
73 let crc_input = String.make 1 (Char.chr type_byte) ^ data in
74 let computed_crc = crc32c crc_input in
75 if stored_crc <> computed_crc then Error (`Msg "CRC mismatch")
76 else Ok (typ, data)
77
78(* WAL handle *)
79type t = {
80 mutable file : Eio.File.rw_ty Eio.Resource.t;
81 mutable block_offset : int; (* Current offset within block *)
82 mutable file_offset : Optint.Int63.t; (* Current file offset *)
83}
84
85let pp ppf t =
86 Fmt.pf ppf "wal(block_offset=%d,file_offset=%s)" t.block_offset
87 (Int63.to_string t.file_offset)
88
89(* Create or open a WAL file *)
90let v ~sw path =
91 let file = Eio.Path.open_out ~sw ~create:(`Or_truncate 0o644) path in
92 (* Get file size to append at end *)
93 let stat = Eio.Path.stat ~follow:true path in
94 let file_size = stat.size in
95 let block_offset =
96 Optint.Int63.(to_int (rem file_size (of_int block_size)))
97 in
98 {
99 file :> Eio.File.rw_ty Eio.Resource.t;
100 block_offset;
101 file_offset = file_size;
102 }
103
104(* Append data to the log *)
105let append t data =
106 if String.length data = 0 then invalid_arg "empty record";
107 let rec write_fragments data_offset is_first =
108 let remaining = String.length data - data_offset in
109 if remaining = 0 then ()
110 else
111 (* Space left in current block *)
112 let block_remaining = block_size - t.block_offset in
113 if block_remaining < header_size then begin
114 (* Not enough space for header, pad and move to next block *)
115 let padding = String.make block_remaining '\x00' in
116 Eio.File.pwrite_all t.file ~file_offset:t.file_offset
117 [ Cstruct.of_string padding ];
118 t.file_offset <-
119 Optint.Int63.add t.file_offset (Optint.Int63.of_int block_remaining);
120 t.block_offset <- 0;
121 write_fragments data_offset is_first
122 end
123 else
124 let avail = block_remaining - header_size in
125 let is_last = remaining <= avail in
126 let fragment_len = min remaining avail in
127 let typ =
128 match (is_first, is_last) with
129 | true, true -> Full
130 | true, false -> First
131 | false, true -> Last
132 | false, false -> Middle
133 in
134 let fragment = String.sub data data_offset fragment_len in
135 let record = encode_record typ fragment in
136 Eio.File.pwrite_all t.file ~file_offset:t.file_offset
137 [ Cstruct.of_string record ];
138 let record_len = String.length record in
139 t.file_offset <-
140 Optint.Int63.add t.file_offset (Optint.Int63.of_int record_len);
141 t.block_offset <- t.block_offset + record_len;
142 if t.block_offset >= block_size then t.block_offset <- 0;
143 write_fragments (data_offset + fragment_len) false
144 in
145 write_fragments 0 true
146
147(* Sync to disk *)
148let sync t = Eio.File.sync t.file
149
150(* Close the log *)
151let close t =
152 sync t;
153 Eio.Resource.close t.file
154
155(* Read one physical record from a file; returns a tagged result. *)
156let read_record_from file ~file_offset ~block_offset =
157 let block_remaining = block_size - block_offset in
158 if block_remaining < header_size then `Skip block_remaining
159 else
160 let header_buf = Cstruct.create header_size in
161 let bytes_read =
162 try
163 Eio.File.pread_exact file ~file_offset [ header_buf ];
164 header_size
165 with End_of_file -> 0
166 in
167 if bytes_read < header_size then `Eof
168 else
169 let header_bytes = Bytes.of_string (Cstruct.to_string header_buf) in
170 let data_len = Bytes.get_uint16_le header_bytes 4 in
171 let record_len = header_size + data_len in
172 if block_offset + record_len > block_size then `Skip block_remaining
173 else
174 let record_buf = Cstruct.create record_len in
175 let () =
176 try Eio.File.pread_exact file ~file_offset [ record_buf ]
177 with End_of_file -> ()
178 in
179 match decode_record (Cstruct.to_string record_buf) with
180 | Error _ -> `Corrupted record_len
181 | Ok (typ, data) -> `Valid (typ, data, record_len)
182
183(* Read all records from a log file *)
184let fold path ~init ~f =
185 Eio.Path.with_open_in path @@ fun file ->
186 let stat = Eio.Path.stat ~follow:true path in
187 let file_size = stat.size in
188 let ( +! ) = Optint.Int63.add in
189 let of_int = Optint.Int63.of_int in
190 let rec read_records acc file_offset block_offset fragments =
191 if Optint.Int63.compare file_offset file_size >= 0 then acc
192 else
193 match read_record_from file ~file_offset ~block_offset with
194 | `Eof -> acc
195 | `Skip skip -> read_records acc (file_offset +! of_int skip) 0 fragments
196 | `Corrupted record_len ->
197 let next_block_offset = block_offset + record_len in
198 read_records acc
199 (file_offset +! of_int record_len)
200 next_block_offset []
201 | `Valid (typ, data, record_len) -> (
202 let next_offset = file_offset +! of_int record_len in
203 let next_block_offset =
204 let n = block_offset + record_len in
205 if n >= block_size then 0 else n
206 in
207 match typ with
208 | Full -> read_records (f acc data) next_offset next_block_offset []
209 | First -> read_records acc next_offset next_block_offset [ data ]
210 | Middle ->
211 read_records acc next_offset next_block_offset (data :: fragments)
212 | Last ->
213 let full_data = String.concat "" (List.rev (data :: fragments)) in
214 read_records (f acc full_data) next_offset next_block_offset [])
215 in
216 read_records init Optint.Int63.zero 0 []
217
218let read_all path =
219 List.rev (fold path ~init:[] ~f:(fun acc data -> data :: acc))