SpaceOS block storage layout
1(*---------------------------------------------------------------------------
2 Copyright (c) 2025 Thomas Gazagnaire. All rights reserved.
3 SPDX-License-Identifier: ISC
4 ---------------------------------------------------------------------------*)
5
6open Space_wire
7
8(** {1 Error Helpers} *)
9
10let err_read sector e =
11 Error (Fmt.str "read error at sector %Ld: %a" sector Block.pp_error e)
12
13let err_write sector e =
14 Error (Fmt.str "write error at sector %Ld: %a" sector Block.pp_write_error e)
15
16let err_short_read ~got ~need =
17 Error (Fmt.str "short read: got %d, need %d" got need)
18
19let err_bad_magic ~expected ~got =
20 Error (Fmt.str "bad magic: expected 0x%08x, got 0x%08x" expected got)
21
22let err_dp_range offset max =
23 Error (Fmt.str "DP offset %d out of range (0..%d)" offset max)
24
25(** {1 Layout Constants} *)
26
27let superblock_block = 0L
28let param_start = 1L
29let param_blocks = 16L
30let event_start = 17L
31let event_blocks = 16L
32let default_dp_start = 33
33
34(** {1 Storage Handle} *)
35
36type t = { blk : Block.t; sb : Superblock.t }
37
38let block t = t.blk
39let superblock t = t.sb
40
41(** {1 Helpers} *)
42
43let sector_size blk = (Block.info blk).sector_size
44
45let read_bytes blk sector len =
46 match Block.read blk sector with
47 | Error e -> err_read sector e
48 | Ok data ->
49 if String.length data < len then
50 err_short_read ~got:(String.length data) ~need:len
51 else Ok data
52
53let write_bytes blk sector data =
54 let ss = sector_size blk in
55 let padded =
56 if String.length data >= ss then data
57 else
58 (* Read existing sector content first to preserve trailing bytes *)
59 let base =
60 match Block.read blk sector with
61 | Ok s -> Bytes.of_string s
62 | Error _ -> Bytes.make ss '\x00'
63 in
64 Bytes.blit_string data 0 base 0 (String.length data);
65 Bytes.unsafe_to_string base
66 in
67 match Block.write blk sector padded with
68 | Ok () -> Ok ()
69 | Error e -> err_write sector e
70
71(** {1 Lifecycle} *)
72
73let v blk ~tenant_id ~uuid ~epoch =
74 let info = Block.info blk in
75 let total_blocks = Int64.to_int info.sectors in
76 let dp_start = default_dp_start in
77 let dp_size = max 0 (total_blocks - dp_start) in
78 let sb =
79 Superblock.v ~tenant_id ~total_blocks ~dp_start ~dp_size ~epoch ~uuid
80 in
81 (* Write superblock *)
82 let sb_size = Wire.Codec.wire_size Superblock.codec in
83 let buf = Bytes.make sb_size '\x00' in
84 Wire.Codec.encode Superblock.codec sb buf 0;
85 match write_bytes blk superblock_block (Bytes.unsafe_to_string buf) with
86 | Error _ as e -> e
87 | Ok () -> (
88 (* Zero the param area *)
89 let ss = sector_size blk in
90 let zeros = String.make ss '\x00' in
91 let rec zero_range start count =
92 if count <= 0L then Ok ()
93 else
94 match Block.write blk start zeros with
95 | Error e -> err_write start e
96 | Ok () -> zero_range (Int64.add start 1L) (Int64.sub count 1L)
97 in
98 match zero_range param_start param_blocks with
99 | Error _ as e -> e
100 | Ok () -> (
101 (* Zero the event area *)
102 match zero_range event_start event_blocks with
103 | Error _ as e -> e
104 | Ok () -> (
105 (* Zero the DP catalog block *)
106 match Block.write blk (Int64.of_int dp_start) zeros with
107 | Error e -> err_write (Int64.of_int dp_start) e
108 | Ok () -> Ok { blk; sb })))
109
110let open_ blk =
111 let sb_size = Wire.Codec.wire_size Superblock.codec in
112 match read_bytes blk superblock_block sb_size with
113 | Error _ as e -> e
114 | Ok data ->
115 let buf = Bytes.of_string data in
116 let sb = Wire.Codec.decode Superblock.codec buf 0 in
117 if not (Superblock.check_magic sb) then
118 err_bad_magic ~expected:Superblock.magic ~got:sb.magic
119 else if not (Superblock.check_crc sb) then Error "superblock CRC mismatch"
120 else Ok { blk; sb }
121
122(** {1 Parameter Store}
123
124 Parameters are stored as 252-byte entries packed into blocks 1-16. Multiple
125 entries can fit per sector. *)
126
127let param_entry_size = Wire.Codec.wire_size Param_entry.codec
128
129let max_params t =
130 let ss = sector_size t.blk in
131 Int64.to_int param_blocks * ss / param_entry_size
132
133let read_param t ~slot =
134 let ss = sector_size t.blk in
135 let byte_offset = slot * param_entry_size in
136 let sector = Int64.add param_start (Int64.of_int (byte_offset / ss)) in
137 let offset_in_sector = byte_offset mod ss in
138 match read_bytes t.blk sector param_entry_size with
139 | Error _ as e -> e
140 | Ok data ->
141 if offset_in_sector + param_entry_size > String.length data then
142 (* Entry spans sector boundary — read two sectors *)
143 match Block.read_many t.blk sector 2 with
144 | Error e -> err_read sector e
145 | Ok data2 ->
146 let buf = Bytes.of_string data2 in
147 Ok (Wire.Codec.decode Param_entry.codec buf offset_in_sector)
148 else
149 let buf = Bytes.of_string data in
150 Ok (Wire.Codec.decode Param_entry.codec buf offset_in_sector)
151
152let write_param t ~slot entry =
153 let ss = sector_size t.blk in
154 let byte_offset = slot * param_entry_size in
155 let sector = Int64.add param_start (Int64.of_int (byte_offset / ss)) in
156 let offset_in_sector = byte_offset mod ss in
157 (* Read existing sector(s) *)
158 let spans = offset_in_sector + param_entry_size > ss in
159 let n_sectors = if spans then 2 else 1 in
160 match Block.read_many t.blk sector n_sectors with
161 | Error e -> err_read sector e
162 | Ok data -> (
163 let buf = Bytes.of_string data in
164 Wire.Codec.encode Param_entry.codec entry buf offset_in_sector;
165 (* Write back sector(s) *)
166 let s1 = Bytes.sub_string buf 0 ss in
167 match Block.write t.blk sector s1 with
168 | Error e -> err_write sector e
169 | Ok () ->
170 if spans then
171 let s2 = Bytes.sub_string buf ss ss in
172 match Block.write t.blk (Int64.add sector 1L) s2 with
173 | Error e -> err_write (Int64.add sector 1L) e
174 | Ok () -> Ok ()
175 else Ok ())
176
177(** {2 Param Lookup}
178
179 Linear scan of all slots, collecting entries with valid CRCs. At SpaceOS
180 scale (~32 entries with 512B sectors, ~260 with 4096B), linear scan is the
181 right choice — simple, auditable, no hidden state. *)
182
183let scan_params t =
184 let max = max_params t in
185 let rec loop i acc =
186 if i >= max then Ok (List.rev acc)
187 else
188 match read_param t ~slot:i with
189 | Error _ as e -> e
190 | Ok entry ->
191 if Param_entry.check_crc entry then loop (i + 1) ((i, entry) :: acc)
192 else loop (i + 1) acc
193 in
194 loop 0 []
195
196let param t ~id =
197 match scan_params t with
198 | Error _ as e -> e
199 | Ok entries ->
200 let best =
201 List.fold_left
202 (fun acc (_, (e : Param_entry.t)) ->
203 if e.param_id <> id then acc
204 else
205 match acc with
206 | None -> Some e
207 | Some prev ->
208 if e.generation > prev.generation then Some e else acc)
209 None entries
210 in
211 Ok best
212
213let put_param t ~id value =
214 let max = max_params t in
215 (* Find current highest generation for this id *)
216 let rec find_gen i best_gen =
217 if i >= max then Ok best_gen
218 else
219 match read_param t ~slot:i with
220 | Error _ as e -> e
221 | Ok entry ->
222 if Param_entry.check_crc entry && entry.param_id = id then
223 find_gen (i + 1) (Stdlib.max best_gen entry.generation)
224 else find_gen (i + 1) best_gen
225 in
226 match find_gen 0 0 with
227 | Error _ as e -> e
228 | Ok gen -> (
229 (* Find first free slot (invalid CRC = empty) *)
230 let rec find_free i =
231 if i >= max then Error "parameter store full"
232 else
233 match read_param t ~slot:i with
234 | Error _ as e -> e
235 | Ok entry ->
236 if not (Param_entry.check_crc entry) then Ok i
237 else find_free (i + 1)
238 in
239 match find_free 0 with
240 | Error _ as e -> e
241 | Ok slot ->
242 let entry = Param_entry.v ~param_id:id ~generation:(gen + 1) value in
243 write_param t ~slot entry)
244
245let latest_params t =
246 match scan_params t with
247 | Error _ as e -> e
248 | Ok entries ->
249 let tbl = Hashtbl.create 16 in
250 List.iter
251 (fun (_, (e : Param_entry.t)) ->
252 match Hashtbl.find_opt tbl e.param_id with
253 | None -> Hashtbl.replace tbl e.param_id e
254 | Some prev ->
255 if e.generation > prev.Param_entry.generation then
256 Hashtbl.replace tbl e.param_id e)
257 entries;
258 Ok (Hashtbl.fold (fun _ v acc -> v :: acc) tbl [])
259
260(** {1 Event Log}
261
262 The event log is a ring buffer in blocks 17-32. An 8-byte write pointer
263 (big-endian uint64) at the start of block 17 tracks the next record index.
264 Records follow contiguously after the pointer. *)
265
266let event_record_size = Wire.Codec.wire_size Event_log.codec
267let event_pointer_size = 8
268
269let read_event_pointer t =
270 match read_bytes t.blk event_start event_pointer_size with
271 | Error _ as e -> e
272 | Ok data ->
273 let buf = Bytes.of_string data in
274 Ok (Wire.UInt32.get_be buf 0)
275
276let write_event_pointer t idx =
277 match Block.read t.blk event_start with
278 | Error e -> err_read event_start e
279 | Ok data -> (
280 let buf = Bytes.of_string data in
281 Wire.UInt32.set_be buf 0 idx;
282 match Block.write t.blk event_start (Bytes.unsafe_to_string buf) with
283 | Error e -> err_write event_start e
284 | Ok () -> Ok ())
285
286let event_byte_offset index = event_pointer_size + (index * event_record_size)
287
288let max_events t =
289 let ss = sector_size t.blk in
290 let total_bytes = Int64.to_int event_blocks * ss in
291 (total_bytes - event_pointer_size) / event_record_size
292
293let read_event t ~index =
294 let ss = sector_size t.blk in
295 let byte_offset = event_byte_offset index in
296 let sector = Int64.add event_start (Int64.of_int (byte_offset / ss)) in
297 let offset_in_sector = byte_offset mod ss in
298 let spans = offset_in_sector + event_record_size > ss in
299 let n_sectors = if spans then 2 else 1 in
300 match Block.read_many t.blk sector n_sectors with
301 | Error e -> err_read sector e
302 | Ok data ->
303 let buf = Bytes.of_string data in
304 Ok (Wire.Codec.decode Event_log.codec buf offset_in_sector)
305
306let write_event t entry =
307 match read_event_pointer t with
308 | Error _ as e -> e
309 | Ok ptr -> (
310 let max_ev = max_events t in
311 let index = ptr mod max_ev in
312 let ss = sector_size t.blk in
313 let byte_offset = event_byte_offset index in
314 let sector = Int64.add event_start (Int64.of_int (byte_offset / ss)) in
315 let offset_in_sector = byte_offset mod ss in
316 let spans = offset_in_sector + event_record_size > ss in
317 let n_sectors = if spans then 2 else 1 in
318 match Block.read_many t.blk sector n_sectors with
319 | Error e -> err_read sector e
320 | Ok data -> (
321 let buf = Bytes.of_string data in
322 Wire.Codec.encode Event_log.codec entry buf offset_in_sector;
323 let s1 = Bytes.sub_string buf 0 ss in
324 match Block.write t.blk sector s1 with
325 | Error e -> err_write sector e
326 | Ok () -> (
327 let write_s2 =
328 if spans then
329 let s2 = Bytes.sub_string buf ss ss in
330 Block.write t.blk (Int64.add sector 1L) s2
331 else Ok ()
332 in
333 match write_s2 with
334 | Error e -> err_write (Int64.add sector 1L) e
335 | Ok () -> write_event_pointer t (ptr + 1))))
336
337(** {2 Event Helpers} *)
338
339let event_count t = read_event_pointer t
340
341let recent_events t ~count =
342 match read_event_pointer t with
343 | Error _ as e -> e
344 | Ok ptr ->
345 if ptr = 0 then Ok []
346 else
347 let max_ev = max_events t in
348 let total = min count ptr in
349 let start = ptr - total in
350 let rec loop i acc =
351 if i >= total then Ok (List.rev acc)
352 else
353 let index = (start + i) mod max_ev in
354 match read_event t ~index with
355 | Error _ as e -> e
356 | Ok ev -> loop (i + 1) (ev :: acc)
357 in
358 loop 0 []
359
360(** {1 Data Product Area} *)
361
362let dp_start t = t.sb.dp_start
363let dp_size t = t.sb.dp_size
364let dp_payload_size = Wire.Codec.wire_size Dp_payload.codec
365
366let read_dp t ~offset =
367 if offset < 0 || offset >= t.sb.dp_size then
368 err_dp_range offset (t.sb.dp_size - 1)
369 else
370 let sector = Int64.of_int (t.sb.dp_start + offset) in
371 match read_bytes t.blk sector dp_payload_size with
372 | Error _ as e -> e
373 | Ok data ->
374 let buf = Bytes.of_string data in
375 Ok (Wire.Codec.decode Dp_payload.codec buf 0)
376
377let write_dp t ~offset payload =
378 if offset < 0 || offset >= t.sb.dp_size then
379 err_dp_range offset (t.sb.dp_size - 1)
380 else
381 let sector = Int64.of_int (t.sb.dp_start + offset) in
382 let buf = Bytes.make dp_payload_size '\x00' in
383 Wire.Codec.encode Dp_payload.codec payload buf 0;
384 write_bytes t.blk sector (Bytes.unsafe_to_string buf)
385
386(** {2 DP Catalog}
387
388 DP block 0 is reserved as a catalog: 4-byte count (uint32 BE) followed by
389 packed {!Dp_payload.t} entries. Data blocks start at DP offset 1. *)
390
391let dp_catalog_header_size = 4
392let dp_data_offset = 1
393
394let crc32c data =
395 Optint.to_unsigned_int
396 (Checkseum.Crc32c.digest_string data 0 (String.length data)
397 Checkseum.Crc32c.default)
398
399let dp_catalog t =
400 let ss = sector_size t.blk in
401 let sector = Int64.of_int t.sb.dp_start in
402 match read_bytes t.blk sector ss with
403 | Error _ as e -> e
404 | Ok data ->
405 let buf = Bytes.of_string data in
406 let count = Wire.UInt32.get_be buf 0 in
407 let rec loop i acc =
408 if i >= count then Ok (List.rev acc)
409 else
410 let off = dp_catalog_header_size + (i * dp_payload_size) in
411 if off + dp_payload_size > ss then Ok (List.rev acc)
412 else
413 let entry = Wire.Codec.decode Dp_payload.codec buf off in
414 loop (i + 1) (entry :: acc)
415 in
416 loop 0 []
417
418let dp t ~name =
419 match dp_catalog t with
420 | Error _ as e -> e
421 | Ok entries ->
422 Ok (List.find_opt (fun dp -> Dp_payload.name_string dp = name) entries)
423
424let write_sectors blk ~start ~ss data =
425 let data_len = String.length data in
426 let block_count = (data_len + ss - 1) / ss in
427 let rec go i =
428 if i >= block_count then Ok ()
429 else
430 let sector = Int64.of_int (start + i) in
431 let chunk_start = i * ss in
432 let chunk_len = min ss (data_len - chunk_start) in
433 let chunk =
434 if chunk_len = ss then String.sub data chunk_start ss
435 else
436 let b = Bytes.make ss '\x00' in
437 Bytes.blit_string data chunk_start b 0 chunk_len;
438 Bytes.unsafe_to_string b
439 in
440 match Block.write blk sector chunk with
441 | Error e -> err_write sector e
442 | Ok () -> go (i + 1)
443 in
444 go 0
445
446let write_dp_data t ~dp_class ~priority ~name data =
447 let ss = sector_size t.blk in
448 let data_len = String.length data in
449 let block_count = (data_len + ss - 1) / ss in
450 let crc = crc32c data in
451 match dp_catalog t with
452 | Error _ as e -> e
453 | Ok entries -> (
454 let next_offset =
455 List.fold_left
456 (fun acc (dp : Dp_payload.t) ->
457 Stdlib.max acc (dp.block_offset + dp.block_count))
458 dp_data_offset entries
459 in
460 if next_offset + block_count > t.sb.dp_size then Error "DP area full"
461 else
462 match
463 write_sectors t.blk ~start:(t.sb.dp_start + next_offset) ~ss data
464 with
465 | Error _ as e -> e
466 | Ok () -> (
467 let dp =
468 Dp_payload.v ~block_offset:next_offset ~block_count ~dp_class
469 ~priority ~name ~crc32:crc
470 in
471 (* Append to catalog *)
472 let count = List.length entries in
473 let sector = Int64.of_int t.sb.dp_start in
474 match Block.read t.blk sector with
475 | Error e -> err_read sector e
476 | Ok cat_data ->
477 let buf = Bytes.of_string cat_data in
478 let off = dp_catalog_header_size + (count * dp_payload_size) in
479 if off + dp_payload_size > ss then Error "DP catalog full"
480 else (
481 Wire.Codec.encode Dp_payload.codec dp buf off;
482 Wire.UInt32.set_be buf 0 (count + 1);
483 match
484 Block.write t.blk sector (Bytes.unsafe_to_string buf)
485 with
486 | Error e -> err_write sector e
487 | Ok () -> Ok dp)))
488
489let read_dp_data t (dp : Dp_payload.t) =
490 let rec read_blocks i acc =
491 if i >= dp.block_count then Ok (String.concat "" (List.rev acc))
492 else
493 let sector = Int64.of_int (t.sb.dp_start + dp.block_offset + i) in
494 match Block.read t.blk sector with
495 | Error e -> err_read sector e
496 | Ok data -> read_blocks (i + 1) (data :: acc)
497 in
498 read_blocks 0 []
499
500(** {1 Pretty Printing} *)
501
502let pp ppf t =
503 let sb = t.sb in
504 Fmt.pf ppf
505 "@[<v>SpaceOS Block Storage@,\
506 @[<v 2>Superblock:@,\
507 magic = 0x%08x%s@,\
508 version = %d@,\
509 tenant_id = %d@,\
510 blocks = %d@,\
511 dp_start = %d@,\
512 dp_size = %d@,\
513 epoch = %Ld@,\
514 uuid = %a@,\
515 crc32 = 0x%08x%s@]@,\
516 @[<v 2>Layout:@,\
517 params = blocks %Ld-%Ld (%d entries)@,\
518 events = blocks %Ld-%Ld (%d records)@,\
519 dp = blocks %d-%d@]@]"
520 sb.magic
521 (if Superblock.check_magic sb then " (OK)" else " (BAD)")
522 sb.format_version sb.tenant_id sb.total_blocks sb.dp_start sb.dp_size
523 sb.epoch
524 (Fmt.seq ~sep:Fmt.nop (fun ppf c -> Fmt.pf ppf "%02x" (Char.code c)))
525 (String.to_seq sb.uuid) sb.crc32
526 (if Superblock.check_crc sb then " (OK)" else " (BAD)")
527 param_start
528 (Int64.sub (Int64.add param_start param_blocks) 1L)
529 (Int64.to_int param_blocks * (Block.info t.blk).sector_size
530 / param_entry_size)
531 event_start
532 (Int64.sub (Int64.add event_start event_blocks) 1L)
533 (max_events t) sb.dp_start
534 (sb.dp_start + sb.dp_size - 1)