SpaceOS block storage layout
at main 534 lines 18 kB view raw
1(*--------------------------------------------------------------------------- 2 Copyright (c) 2025 Thomas Gazagnaire. All rights reserved. 3 SPDX-License-Identifier: ISC 4 ---------------------------------------------------------------------------*) 5 6open Space_wire 7 8(** {1 Error Helpers} *) 9 10let err_read sector e = 11 Error (Fmt.str "read error at sector %Ld: %a" sector Block.pp_error e) 12 13let err_write sector e = 14 Error (Fmt.str "write error at sector %Ld: %a" sector Block.pp_write_error e) 15 16let err_short_read ~got ~need = 17 Error (Fmt.str "short read: got %d, need %d" got need) 18 19let err_bad_magic ~expected ~got = 20 Error (Fmt.str "bad magic: expected 0x%08x, got 0x%08x" expected got) 21 22let err_dp_range offset max = 23 Error (Fmt.str "DP offset %d out of range (0..%d)" offset max) 24 25(** {1 Layout Constants} *) 26 27let superblock_block = 0L 28let param_start = 1L 29let param_blocks = 16L 30let event_start = 17L 31let event_blocks = 16L 32let default_dp_start = 33 33 34(** {1 Storage Handle} *) 35 36type t = { blk : Block.t; sb : Superblock.t } 37 38let block t = t.blk 39let superblock t = t.sb 40 41(** {1 Helpers} *) 42 43let sector_size blk = (Block.info blk).sector_size 44 45let read_bytes blk sector len = 46 match Block.read blk sector with 47 | Error e -> err_read sector e 48 | Ok data -> 49 if String.length data < len then 50 err_short_read ~got:(String.length data) ~need:len 51 else Ok data 52 53let write_bytes blk sector data = 54 let ss = sector_size blk in 55 let padded = 56 if String.length data >= ss then data 57 else 58 (* Read existing sector content first to preserve trailing bytes *) 59 let base = 60 match Block.read blk sector with 61 | Ok s -> Bytes.of_string s 62 | Error _ -> Bytes.make ss '\x00' 63 in 64 Bytes.blit_string data 0 base 0 (String.length data); 65 Bytes.unsafe_to_string base 66 in 67 match Block.write blk sector padded with 68 | Ok () -> Ok () 69 | Error e -> err_write sector e 70 71(** {1 Lifecycle} *) 72 73let v blk ~tenant_id ~uuid ~epoch = 74 let info = Block.info blk in 75 let total_blocks = Int64.to_int info.sectors in 76 let dp_start = default_dp_start in 77 let dp_size = max 0 (total_blocks - dp_start) in 78 let sb = 79 Superblock.v ~tenant_id ~total_blocks ~dp_start ~dp_size ~epoch ~uuid 80 in 81 (* Write superblock *) 82 let sb_size = Wire.Codec.wire_size Superblock.codec in 83 let buf = Bytes.make sb_size '\x00' in 84 Wire.Codec.encode Superblock.codec sb buf 0; 85 match write_bytes blk superblock_block (Bytes.unsafe_to_string buf) with 86 | Error _ as e -> e 87 | Ok () -> ( 88 (* Zero the param area *) 89 let ss = sector_size blk in 90 let zeros = String.make ss '\x00' in 91 let rec zero_range start count = 92 if count <= 0L then Ok () 93 else 94 match Block.write blk start zeros with 95 | Error e -> err_write start e 96 | Ok () -> zero_range (Int64.add start 1L) (Int64.sub count 1L) 97 in 98 match zero_range param_start param_blocks with 99 | Error _ as e -> e 100 | Ok () -> ( 101 (* Zero the event area *) 102 match zero_range event_start event_blocks with 103 | Error _ as e -> e 104 | Ok () -> ( 105 (* Zero the DP catalog block *) 106 match Block.write blk (Int64.of_int dp_start) zeros with 107 | Error e -> err_write (Int64.of_int dp_start) e 108 | Ok () -> Ok { blk; sb }))) 109 110let open_ blk = 111 let sb_size = Wire.Codec.wire_size Superblock.codec in 112 match read_bytes blk superblock_block sb_size with 113 | Error _ as e -> e 114 | Ok data -> 115 let buf = Bytes.of_string data in 116 let sb = Wire.Codec.decode Superblock.codec buf 0 in 117 if not (Superblock.check_magic sb) then 118 err_bad_magic ~expected:Superblock.magic ~got:sb.magic 119 else if not (Superblock.check_crc sb) then Error "superblock CRC mismatch" 120 else Ok { blk; sb } 121 122(** {1 Parameter Store} 123 124 Parameters are stored as 252-byte entries packed into blocks 1-16. Multiple 125 entries can fit per sector. *) 126 127let param_entry_size = Wire.Codec.wire_size Param_entry.codec 128 129let max_params t = 130 let ss = sector_size t.blk in 131 Int64.to_int param_blocks * ss / param_entry_size 132 133let read_param t ~slot = 134 let ss = sector_size t.blk in 135 let byte_offset = slot * param_entry_size in 136 let sector = Int64.add param_start (Int64.of_int (byte_offset / ss)) in 137 let offset_in_sector = byte_offset mod ss in 138 match read_bytes t.blk sector param_entry_size with 139 | Error _ as e -> e 140 | Ok data -> 141 if offset_in_sector + param_entry_size > String.length data then 142 (* Entry spans sector boundary — read two sectors *) 143 match Block.read_many t.blk sector 2 with 144 | Error e -> err_read sector e 145 | Ok data2 -> 146 let buf = Bytes.of_string data2 in 147 Ok (Wire.Codec.decode Param_entry.codec buf offset_in_sector) 148 else 149 let buf = Bytes.of_string data in 150 Ok (Wire.Codec.decode Param_entry.codec buf offset_in_sector) 151 152let write_param t ~slot entry = 153 let ss = sector_size t.blk in 154 let byte_offset = slot * param_entry_size in 155 let sector = Int64.add param_start (Int64.of_int (byte_offset / ss)) in 156 let offset_in_sector = byte_offset mod ss in 157 (* Read existing sector(s) *) 158 let spans = offset_in_sector + param_entry_size > ss in 159 let n_sectors = if spans then 2 else 1 in 160 match Block.read_many t.blk sector n_sectors with 161 | Error e -> err_read sector e 162 | Ok data -> ( 163 let buf = Bytes.of_string data in 164 Wire.Codec.encode Param_entry.codec entry buf offset_in_sector; 165 (* Write back sector(s) *) 166 let s1 = Bytes.sub_string buf 0 ss in 167 match Block.write t.blk sector s1 with 168 | Error e -> err_write sector e 169 | Ok () -> 170 if spans then 171 let s2 = Bytes.sub_string buf ss ss in 172 match Block.write t.blk (Int64.add sector 1L) s2 with 173 | Error e -> err_write (Int64.add sector 1L) e 174 | Ok () -> Ok () 175 else Ok ()) 176 177(** {2 Param Lookup} 178 179 Linear scan of all slots, collecting entries with valid CRCs. At SpaceOS 180 scale (~32 entries with 512B sectors, ~260 with 4096B), linear scan is the 181 right choice — simple, auditable, no hidden state. *) 182 183let scan_params t = 184 let max = max_params t in 185 let rec loop i acc = 186 if i >= max then Ok (List.rev acc) 187 else 188 match read_param t ~slot:i with 189 | Error _ as e -> e 190 | Ok entry -> 191 if Param_entry.check_crc entry then loop (i + 1) ((i, entry) :: acc) 192 else loop (i + 1) acc 193 in 194 loop 0 [] 195 196let param t ~id = 197 match scan_params t with 198 | Error _ as e -> e 199 | Ok entries -> 200 let best = 201 List.fold_left 202 (fun acc (_, (e : Param_entry.t)) -> 203 if e.param_id <> id then acc 204 else 205 match acc with 206 | None -> Some e 207 | Some prev -> 208 if e.generation > prev.generation then Some e else acc) 209 None entries 210 in 211 Ok best 212 213let put_param t ~id value = 214 let max = max_params t in 215 (* Find current highest generation for this id *) 216 let rec find_gen i best_gen = 217 if i >= max then Ok best_gen 218 else 219 match read_param t ~slot:i with 220 | Error _ as e -> e 221 | Ok entry -> 222 if Param_entry.check_crc entry && entry.param_id = id then 223 find_gen (i + 1) (Stdlib.max best_gen entry.generation) 224 else find_gen (i + 1) best_gen 225 in 226 match find_gen 0 0 with 227 | Error _ as e -> e 228 | Ok gen -> ( 229 (* Find first free slot (invalid CRC = empty) *) 230 let rec find_free i = 231 if i >= max then Error "parameter store full" 232 else 233 match read_param t ~slot:i with 234 | Error _ as e -> e 235 | Ok entry -> 236 if not (Param_entry.check_crc entry) then Ok i 237 else find_free (i + 1) 238 in 239 match find_free 0 with 240 | Error _ as e -> e 241 | Ok slot -> 242 let entry = Param_entry.v ~param_id:id ~generation:(gen + 1) value in 243 write_param t ~slot entry) 244 245let latest_params t = 246 match scan_params t with 247 | Error _ as e -> e 248 | Ok entries -> 249 let tbl = Hashtbl.create 16 in 250 List.iter 251 (fun (_, (e : Param_entry.t)) -> 252 match Hashtbl.find_opt tbl e.param_id with 253 | None -> Hashtbl.replace tbl e.param_id e 254 | Some prev -> 255 if e.generation > prev.Param_entry.generation then 256 Hashtbl.replace tbl e.param_id e) 257 entries; 258 Ok (Hashtbl.fold (fun _ v acc -> v :: acc) tbl []) 259 260(** {1 Event Log} 261 262 The event log is a ring buffer in blocks 17-32. An 8-byte write pointer 263 (big-endian uint64) at the start of block 17 tracks the next record index. 264 Records follow contiguously after the pointer. *) 265 266let event_record_size = Wire.Codec.wire_size Event_log.codec 267let event_pointer_size = 8 268 269let read_event_pointer t = 270 match read_bytes t.blk event_start event_pointer_size with 271 | Error _ as e -> e 272 | Ok data -> 273 let buf = Bytes.of_string data in 274 Ok (Wire.UInt32.get_be buf 0) 275 276let write_event_pointer t idx = 277 match Block.read t.blk event_start with 278 | Error e -> err_read event_start e 279 | Ok data -> ( 280 let buf = Bytes.of_string data in 281 Wire.UInt32.set_be buf 0 idx; 282 match Block.write t.blk event_start (Bytes.unsafe_to_string buf) with 283 | Error e -> err_write event_start e 284 | Ok () -> Ok ()) 285 286let event_byte_offset index = event_pointer_size + (index * event_record_size) 287 288let max_events t = 289 let ss = sector_size t.blk in 290 let total_bytes = Int64.to_int event_blocks * ss in 291 (total_bytes - event_pointer_size) / event_record_size 292 293let read_event t ~index = 294 let ss = sector_size t.blk in 295 let byte_offset = event_byte_offset index in 296 let sector = Int64.add event_start (Int64.of_int (byte_offset / ss)) in 297 let offset_in_sector = byte_offset mod ss in 298 let spans = offset_in_sector + event_record_size > ss in 299 let n_sectors = if spans then 2 else 1 in 300 match Block.read_many t.blk sector n_sectors with 301 | Error e -> err_read sector e 302 | Ok data -> 303 let buf = Bytes.of_string data in 304 Ok (Wire.Codec.decode Event_log.codec buf offset_in_sector) 305 306let write_event t entry = 307 match read_event_pointer t with 308 | Error _ as e -> e 309 | Ok ptr -> ( 310 let max_ev = max_events t in 311 let index = ptr mod max_ev in 312 let ss = sector_size t.blk in 313 let byte_offset = event_byte_offset index in 314 let sector = Int64.add event_start (Int64.of_int (byte_offset / ss)) in 315 let offset_in_sector = byte_offset mod ss in 316 let spans = offset_in_sector + event_record_size > ss in 317 let n_sectors = if spans then 2 else 1 in 318 match Block.read_many t.blk sector n_sectors with 319 | Error e -> err_read sector e 320 | Ok data -> ( 321 let buf = Bytes.of_string data in 322 Wire.Codec.encode Event_log.codec entry buf offset_in_sector; 323 let s1 = Bytes.sub_string buf 0 ss in 324 match Block.write t.blk sector s1 with 325 | Error e -> err_write sector e 326 | Ok () -> ( 327 let write_s2 = 328 if spans then 329 let s2 = Bytes.sub_string buf ss ss in 330 Block.write t.blk (Int64.add sector 1L) s2 331 else Ok () 332 in 333 match write_s2 with 334 | Error e -> err_write (Int64.add sector 1L) e 335 | Ok () -> write_event_pointer t (ptr + 1)))) 336 337(** {2 Event Helpers} *) 338 339let event_count t = read_event_pointer t 340 341let recent_events t ~count = 342 match read_event_pointer t with 343 | Error _ as e -> e 344 | Ok ptr -> 345 if ptr = 0 then Ok [] 346 else 347 let max_ev = max_events t in 348 let total = min count ptr in 349 let start = ptr - total in 350 let rec loop i acc = 351 if i >= total then Ok (List.rev acc) 352 else 353 let index = (start + i) mod max_ev in 354 match read_event t ~index with 355 | Error _ as e -> e 356 | Ok ev -> loop (i + 1) (ev :: acc) 357 in 358 loop 0 [] 359 360(** {1 Data Product Area} *) 361 362let dp_start t = t.sb.dp_start 363let dp_size t = t.sb.dp_size 364let dp_payload_size = Wire.Codec.wire_size Dp_payload.codec 365 366let read_dp t ~offset = 367 if offset < 0 || offset >= t.sb.dp_size then 368 err_dp_range offset (t.sb.dp_size - 1) 369 else 370 let sector = Int64.of_int (t.sb.dp_start + offset) in 371 match read_bytes t.blk sector dp_payload_size with 372 | Error _ as e -> e 373 | Ok data -> 374 let buf = Bytes.of_string data in 375 Ok (Wire.Codec.decode Dp_payload.codec buf 0) 376 377let write_dp t ~offset payload = 378 if offset < 0 || offset >= t.sb.dp_size then 379 err_dp_range offset (t.sb.dp_size - 1) 380 else 381 let sector = Int64.of_int (t.sb.dp_start + offset) in 382 let buf = Bytes.make dp_payload_size '\x00' in 383 Wire.Codec.encode Dp_payload.codec payload buf 0; 384 write_bytes t.blk sector (Bytes.unsafe_to_string buf) 385 386(** {2 DP Catalog} 387 388 DP block 0 is reserved as a catalog: 4-byte count (uint32 BE) followed by 389 packed {!Dp_payload.t} entries. Data blocks start at DP offset 1. *) 390 391let dp_catalog_header_size = 4 392let dp_data_offset = 1 393 394let crc32c data = 395 Optint.to_unsigned_int 396 (Checkseum.Crc32c.digest_string data 0 (String.length data) 397 Checkseum.Crc32c.default) 398 399let dp_catalog t = 400 let ss = sector_size t.blk in 401 let sector = Int64.of_int t.sb.dp_start in 402 match read_bytes t.blk sector ss with 403 | Error _ as e -> e 404 | Ok data -> 405 let buf = Bytes.of_string data in 406 let count = Wire.UInt32.get_be buf 0 in 407 let rec loop i acc = 408 if i >= count then Ok (List.rev acc) 409 else 410 let off = dp_catalog_header_size + (i * dp_payload_size) in 411 if off + dp_payload_size > ss then Ok (List.rev acc) 412 else 413 let entry = Wire.Codec.decode Dp_payload.codec buf off in 414 loop (i + 1) (entry :: acc) 415 in 416 loop 0 [] 417 418let dp t ~name = 419 match dp_catalog t with 420 | Error _ as e -> e 421 | Ok entries -> 422 Ok (List.find_opt (fun dp -> Dp_payload.name_string dp = name) entries) 423 424let write_sectors blk ~start ~ss data = 425 let data_len = String.length data in 426 let block_count = (data_len + ss - 1) / ss in 427 let rec go i = 428 if i >= block_count then Ok () 429 else 430 let sector = Int64.of_int (start + i) in 431 let chunk_start = i * ss in 432 let chunk_len = min ss (data_len - chunk_start) in 433 let chunk = 434 if chunk_len = ss then String.sub data chunk_start ss 435 else 436 let b = Bytes.make ss '\x00' in 437 Bytes.blit_string data chunk_start b 0 chunk_len; 438 Bytes.unsafe_to_string b 439 in 440 match Block.write blk sector chunk with 441 | Error e -> err_write sector e 442 | Ok () -> go (i + 1) 443 in 444 go 0 445 446let write_dp_data t ~dp_class ~priority ~name data = 447 let ss = sector_size t.blk in 448 let data_len = String.length data in 449 let block_count = (data_len + ss - 1) / ss in 450 let crc = crc32c data in 451 match dp_catalog t with 452 | Error _ as e -> e 453 | Ok entries -> ( 454 let next_offset = 455 List.fold_left 456 (fun acc (dp : Dp_payload.t) -> 457 Stdlib.max acc (dp.block_offset + dp.block_count)) 458 dp_data_offset entries 459 in 460 if next_offset + block_count > t.sb.dp_size then Error "DP area full" 461 else 462 match 463 write_sectors t.blk ~start:(t.sb.dp_start + next_offset) ~ss data 464 with 465 | Error _ as e -> e 466 | Ok () -> ( 467 let dp = 468 Dp_payload.v ~block_offset:next_offset ~block_count ~dp_class 469 ~priority ~name ~crc32:crc 470 in 471 (* Append to catalog *) 472 let count = List.length entries in 473 let sector = Int64.of_int t.sb.dp_start in 474 match Block.read t.blk sector with 475 | Error e -> err_read sector e 476 | Ok cat_data -> 477 let buf = Bytes.of_string cat_data in 478 let off = dp_catalog_header_size + (count * dp_payload_size) in 479 if off + dp_payload_size > ss then Error "DP catalog full" 480 else ( 481 Wire.Codec.encode Dp_payload.codec dp buf off; 482 Wire.UInt32.set_be buf 0 (count + 1); 483 match 484 Block.write t.blk sector (Bytes.unsafe_to_string buf) 485 with 486 | Error e -> err_write sector e 487 | Ok () -> Ok dp))) 488 489let read_dp_data t (dp : Dp_payload.t) = 490 let rec read_blocks i acc = 491 if i >= dp.block_count then Ok (String.concat "" (List.rev acc)) 492 else 493 let sector = Int64.of_int (t.sb.dp_start + dp.block_offset + i) in 494 match Block.read t.blk sector with 495 | Error e -> err_read sector e 496 | Ok data -> read_blocks (i + 1) (data :: acc) 497 in 498 read_blocks 0 [] 499 500(** {1 Pretty Printing} *) 501 502let pp ppf t = 503 let sb = t.sb in 504 Fmt.pf ppf 505 "@[<v>SpaceOS Block Storage@,\ 506 @[<v 2>Superblock:@,\ 507 magic = 0x%08x%s@,\ 508 version = %d@,\ 509 tenant_id = %d@,\ 510 blocks = %d@,\ 511 dp_start = %d@,\ 512 dp_size = %d@,\ 513 epoch = %Ld@,\ 514 uuid = %a@,\ 515 crc32 = 0x%08x%s@]@,\ 516 @[<v 2>Layout:@,\ 517 params = blocks %Ld-%Ld (%d entries)@,\ 518 events = blocks %Ld-%Ld (%d records)@,\ 519 dp = blocks %d-%d@]@]" 520 sb.magic 521 (if Superblock.check_magic sb then " (OK)" else " (BAD)") 522 sb.format_version sb.tenant_id sb.total_blocks sb.dp_start sb.dp_size 523 sb.epoch 524 (Fmt.seq ~sep:Fmt.nop (fun ppf c -> Fmt.pf ppf "%02x" (Char.code c))) 525 (String.to_seq sb.uuid) sb.crc32 526 (if Superblock.check_crc sb then " (OK)" else " (BAD)") 527 param_start 528 (Int64.sub (Int64.add param_start param_blocks) 1L) 529 (Int64.to_int param_blocks * (Block.info t.blk).sector_size 530 / param_entry_size) 531 event_start 532 (Int64.sub (Int64.add event_start event_blocks) 1L) 533 (max_events t) sb.dp_start 534 (sb.dp_start + sb.dp_size - 1)