(*--------------------------------------------------------------------------- Copyright (c) 2025 Thomas Gazagnaire. All rights reserved. SPDX-License-Identifier: ISC ---------------------------------------------------------------------------*) open Space_wire (** {1 Error Helpers} *) let err_read sector e = Error (Fmt.str "read error at sector %Ld: %a" sector Block.pp_error e) let err_write sector e = Error (Fmt.str "write error at sector %Ld: %a" sector Block.pp_write_error e) let err_short_read ~got ~need = Error (Fmt.str "short read: got %d, need %d" got need) let err_bad_magic ~expected ~got = Error (Fmt.str "bad magic: expected 0x%08x, got 0x%08x" expected got) let err_dp_range offset max = Error (Fmt.str "DP offset %d out of range (0..%d)" offset max) (** {1 Layout Constants} *) let superblock_block = 0L let param_start = 1L let param_blocks = 16L let event_start = 17L let event_blocks = 16L let default_dp_start = 33 (** {1 Storage Handle} *) type t = { blk : Block.t; sb : Superblock.t } let block t = t.blk let superblock t = t.sb (** {1 Helpers} *) let sector_size blk = (Block.info blk).sector_size let read_bytes blk sector len = match Block.read blk sector with | Error e -> err_read sector e | Ok data -> if String.length data < len then err_short_read ~got:(String.length data) ~need:len else Ok data let write_bytes blk sector data = let ss = sector_size blk in let padded = if String.length data >= ss then data else (* Read existing sector content first to preserve trailing bytes *) let base = match Block.read blk sector with | Ok s -> Bytes.of_string s | Error _ -> Bytes.make ss '\x00' in Bytes.blit_string data 0 base 0 (String.length data); Bytes.unsafe_to_string base in match Block.write blk sector padded with | Ok () -> Ok () | Error e -> err_write sector e (** {1 Lifecycle} *) let v blk ~tenant_id ~uuid ~epoch = let info = Block.info blk in let total_blocks = Int64.to_int info.sectors in let dp_start = default_dp_start in let dp_size = max 0 (total_blocks - dp_start) in let sb = Superblock.v ~tenant_id ~total_blocks ~dp_start ~dp_size ~epoch ~uuid in (* Write superblock *) let sb_size = Wire.Codec.wire_size Superblock.codec in let buf = Bytes.make sb_size '\x00' in Wire.Codec.encode Superblock.codec sb buf 0; match write_bytes blk superblock_block (Bytes.unsafe_to_string buf) with | Error _ as e -> e | Ok () -> ( (* Zero the param area *) let ss = sector_size blk in let zeros = String.make ss '\x00' in let rec zero_range start count = if count <= 0L then Ok () else match Block.write blk start zeros with | Error e -> err_write start e | Ok () -> zero_range (Int64.add start 1L) (Int64.sub count 1L) in match zero_range param_start param_blocks with | Error _ as e -> e | Ok () -> ( (* Zero the event area *) match zero_range event_start event_blocks with | Error _ as e -> e | Ok () -> ( (* Zero the DP catalog block *) match Block.write blk (Int64.of_int dp_start) zeros with | Error e -> err_write (Int64.of_int dp_start) e | Ok () -> Ok { blk; sb }))) let open_ blk = let sb_size = Wire.Codec.wire_size Superblock.codec in match read_bytes blk superblock_block sb_size with | Error _ as e -> e | Ok data -> let buf = Bytes.of_string data in let sb = Wire.Codec.decode Superblock.codec buf 0 in if not (Superblock.check_magic sb) then err_bad_magic ~expected:Superblock.magic ~got:sb.magic else if not (Superblock.check_crc sb) then Error "superblock CRC mismatch" else Ok { blk; sb } (** {1 Parameter Store} Parameters are stored as 252-byte entries packed into blocks 1-16. Multiple entries can fit per sector. *) let param_entry_size = Wire.Codec.wire_size Param_entry.codec let max_params t = let ss = sector_size t.blk in Int64.to_int param_blocks * ss / param_entry_size let read_param t ~slot = let ss = sector_size t.blk in let byte_offset = slot * param_entry_size in let sector = Int64.add param_start (Int64.of_int (byte_offset / ss)) in let offset_in_sector = byte_offset mod ss in match read_bytes t.blk sector param_entry_size with | Error _ as e -> e | Ok data -> if offset_in_sector + param_entry_size > String.length data then (* Entry spans sector boundary — read two sectors *) match Block.read_many t.blk sector 2 with | Error e -> err_read sector e | Ok data2 -> let buf = Bytes.of_string data2 in Ok (Wire.Codec.decode Param_entry.codec buf offset_in_sector) else let buf = Bytes.of_string data in Ok (Wire.Codec.decode Param_entry.codec buf offset_in_sector) let write_param t ~slot entry = let ss = sector_size t.blk in let byte_offset = slot * param_entry_size in let sector = Int64.add param_start (Int64.of_int (byte_offset / ss)) in let offset_in_sector = byte_offset mod ss in (* Read existing sector(s) *) let spans = offset_in_sector + param_entry_size > ss in let n_sectors = if spans then 2 else 1 in match Block.read_many t.blk sector n_sectors with | Error e -> err_read sector e | Ok data -> ( let buf = Bytes.of_string data in Wire.Codec.encode Param_entry.codec entry buf offset_in_sector; (* Write back sector(s) *) let s1 = Bytes.sub_string buf 0 ss in match Block.write t.blk sector s1 with | Error e -> err_write sector e | Ok () -> if spans then let s2 = Bytes.sub_string buf ss ss in match Block.write t.blk (Int64.add sector 1L) s2 with | Error e -> err_write (Int64.add sector 1L) e | Ok () -> Ok () else Ok ()) (** {2 Param Lookup} Linear scan of all slots, collecting entries with valid CRCs. At SpaceOS scale (~32 entries with 512B sectors, ~260 with 4096B), linear scan is the right choice — simple, auditable, no hidden state. *) let scan_params t = let max = max_params t in let rec loop i acc = if i >= max then Ok (List.rev acc) else match read_param t ~slot:i with | Error _ as e -> e | Ok entry -> if Param_entry.check_crc entry then loop (i + 1) ((i, entry) :: acc) else loop (i + 1) acc in loop 0 [] let param t ~id = match scan_params t with | Error _ as e -> e | Ok entries -> let best = List.fold_left (fun acc (_, (e : Param_entry.t)) -> if e.param_id <> id then acc else match acc with | None -> Some e | Some prev -> if e.generation > prev.generation then Some e else acc) None entries in Ok best let put_param t ~id value = let max = max_params t in (* Find current highest generation for this id *) let rec find_gen i best_gen = if i >= max then Ok best_gen else match read_param t ~slot:i with | Error _ as e -> e | Ok entry -> if Param_entry.check_crc entry && entry.param_id = id then find_gen (i + 1) (Stdlib.max best_gen entry.generation) else find_gen (i + 1) best_gen in match find_gen 0 0 with | Error _ as e -> e | Ok gen -> ( (* Find first free slot (invalid CRC = empty) *) let rec find_free i = if i >= max then Error "parameter store full" else match read_param t ~slot:i with | Error _ as e -> e | Ok entry -> if not (Param_entry.check_crc entry) then Ok i else find_free (i + 1) in match find_free 0 with | Error _ as e -> e | Ok slot -> let entry = Param_entry.v ~param_id:id ~generation:(gen + 1) value in write_param t ~slot entry) let latest_params t = match scan_params t with | Error _ as e -> e | Ok entries -> let tbl = Hashtbl.create 16 in List.iter (fun (_, (e : Param_entry.t)) -> match Hashtbl.find_opt tbl e.param_id with | None -> Hashtbl.replace tbl e.param_id e | Some prev -> if e.generation > prev.Param_entry.generation then Hashtbl.replace tbl e.param_id e) entries; Ok (Hashtbl.fold (fun _ v acc -> v :: acc) tbl []) (** {1 Event Log} The event log is a ring buffer in blocks 17-32. An 8-byte write pointer (big-endian uint64) at the start of block 17 tracks the next record index. Records follow contiguously after the pointer. *) let event_record_size = Wire.Codec.wire_size Event_log.codec let event_pointer_size = 8 let read_event_pointer t = match read_bytes t.blk event_start event_pointer_size with | Error _ as e -> e | Ok data -> let buf = Bytes.of_string data in Ok (Wire.UInt32.get_be buf 0) let write_event_pointer t idx = match Block.read t.blk event_start with | Error e -> err_read event_start e | Ok data -> ( let buf = Bytes.of_string data in Wire.UInt32.set_be buf 0 idx; match Block.write t.blk event_start (Bytes.unsafe_to_string buf) with | Error e -> err_write event_start e | Ok () -> Ok ()) let event_byte_offset index = event_pointer_size + (index * event_record_size) let max_events t = let ss = sector_size t.blk in let total_bytes = Int64.to_int event_blocks * ss in (total_bytes - event_pointer_size) / event_record_size let read_event t ~index = let ss = sector_size t.blk in let byte_offset = event_byte_offset index in let sector = Int64.add event_start (Int64.of_int (byte_offset / ss)) in let offset_in_sector = byte_offset mod ss in let spans = offset_in_sector + event_record_size > ss in let n_sectors = if spans then 2 else 1 in match Block.read_many t.blk sector n_sectors with | Error e -> err_read sector e | Ok data -> let buf = Bytes.of_string data in Ok (Wire.Codec.decode Event_log.codec buf offset_in_sector) let write_event t entry = match read_event_pointer t with | Error _ as e -> e | Ok ptr -> ( let max_ev = max_events t in let index = ptr mod max_ev in let ss = sector_size t.blk in let byte_offset = event_byte_offset index in let sector = Int64.add event_start (Int64.of_int (byte_offset / ss)) in let offset_in_sector = byte_offset mod ss in let spans = offset_in_sector + event_record_size > ss in let n_sectors = if spans then 2 else 1 in match Block.read_many t.blk sector n_sectors with | Error e -> err_read sector e | Ok data -> ( let buf = Bytes.of_string data in Wire.Codec.encode Event_log.codec entry buf offset_in_sector; let s1 = Bytes.sub_string buf 0 ss in match Block.write t.blk sector s1 with | Error e -> err_write sector e | Ok () -> ( let write_s2 = if spans then let s2 = Bytes.sub_string buf ss ss in Block.write t.blk (Int64.add sector 1L) s2 else Ok () in match write_s2 with | Error e -> err_write (Int64.add sector 1L) e | Ok () -> write_event_pointer t (ptr + 1)))) (** {2 Event Helpers} *) let event_count t = read_event_pointer t let recent_events t ~count = match read_event_pointer t with | Error _ as e -> e | Ok ptr -> if ptr = 0 then Ok [] else let max_ev = max_events t in let total = min count ptr in let start = ptr - total in let rec loop i acc = if i >= total then Ok (List.rev acc) else let index = (start + i) mod max_ev in match read_event t ~index with | Error _ as e -> e | Ok ev -> loop (i + 1) (ev :: acc) in loop 0 [] (** {1 Data Product Area} *) let dp_start t = t.sb.dp_start let dp_size t = t.sb.dp_size let dp_payload_size = Wire.Codec.wire_size Dp_payload.codec let read_dp t ~offset = if offset < 0 || offset >= t.sb.dp_size then err_dp_range offset (t.sb.dp_size - 1) else let sector = Int64.of_int (t.sb.dp_start + offset) in match read_bytes t.blk sector dp_payload_size with | Error _ as e -> e | Ok data -> let buf = Bytes.of_string data in Ok (Wire.Codec.decode Dp_payload.codec buf 0) let write_dp t ~offset payload = if offset < 0 || offset >= t.sb.dp_size then err_dp_range offset (t.sb.dp_size - 1) else let sector = Int64.of_int (t.sb.dp_start + offset) in let buf = Bytes.make dp_payload_size '\x00' in Wire.Codec.encode Dp_payload.codec payload buf 0; write_bytes t.blk sector (Bytes.unsafe_to_string buf) (** {2 DP Catalog} DP block 0 is reserved as a catalog: 4-byte count (uint32 BE) followed by packed {!Dp_payload.t} entries. Data blocks start at DP offset 1. *) let dp_catalog_header_size = 4 let dp_data_offset = 1 let crc32c data = Optint.to_unsigned_int (Checkseum.Crc32c.digest_string data 0 (String.length data) Checkseum.Crc32c.default) let dp_catalog t = let ss = sector_size t.blk in let sector = Int64.of_int t.sb.dp_start in match read_bytes t.blk sector ss with | Error _ as e -> e | Ok data -> let buf = Bytes.of_string data in let count = Wire.UInt32.get_be buf 0 in let rec loop i acc = if i >= count then Ok (List.rev acc) else let off = dp_catalog_header_size + (i * dp_payload_size) in if off + dp_payload_size > ss then Ok (List.rev acc) else let entry = Wire.Codec.decode Dp_payload.codec buf off in loop (i + 1) (entry :: acc) in loop 0 [] let dp t ~name = match dp_catalog t with | Error _ as e -> e | Ok entries -> Ok (List.find_opt (fun dp -> Dp_payload.name_string dp = name) entries) let write_sectors blk ~start ~ss data = let data_len = String.length data in let block_count = (data_len + ss - 1) / ss in let rec go i = if i >= block_count then Ok () else let sector = Int64.of_int (start + i) in let chunk_start = i * ss in let chunk_len = min ss (data_len - chunk_start) in let chunk = if chunk_len = ss then String.sub data chunk_start ss else let b = Bytes.make ss '\x00' in Bytes.blit_string data chunk_start b 0 chunk_len; Bytes.unsafe_to_string b in match Block.write blk sector chunk with | Error e -> err_write sector e | Ok () -> go (i + 1) in go 0 let write_dp_data t ~dp_class ~priority ~name data = let ss = sector_size t.blk in let data_len = String.length data in let block_count = (data_len + ss - 1) / ss in let crc = crc32c data in match dp_catalog t with | Error _ as e -> e | Ok entries -> ( let next_offset = List.fold_left (fun acc (dp : Dp_payload.t) -> Stdlib.max acc (dp.block_offset + dp.block_count)) dp_data_offset entries in if next_offset + block_count > t.sb.dp_size then Error "DP area full" else match write_sectors t.blk ~start:(t.sb.dp_start + next_offset) ~ss data with | Error _ as e -> e | Ok () -> ( let dp = Dp_payload.v ~block_offset:next_offset ~block_count ~dp_class ~priority ~name ~crc32:crc in (* Append to catalog *) let count = List.length entries in let sector = Int64.of_int t.sb.dp_start in match Block.read t.blk sector with | Error e -> err_read sector e | Ok cat_data -> let buf = Bytes.of_string cat_data in let off = dp_catalog_header_size + (count * dp_payload_size) in if off + dp_payload_size > ss then Error "DP catalog full" else ( Wire.Codec.encode Dp_payload.codec dp buf off; Wire.UInt32.set_be buf 0 (count + 1); match Block.write t.blk sector (Bytes.unsafe_to_string buf) with | Error e -> err_write sector e | Ok () -> Ok dp))) let read_dp_data t (dp : Dp_payload.t) = let rec read_blocks i acc = if i >= dp.block_count then Ok (String.concat "" (List.rev acc)) else let sector = Int64.of_int (t.sb.dp_start + dp.block_offset + i) in match Block.read t.blk sector with | Error e -> err_read sector e | Ok data -> read_blocks (i + 1) (data :: acc) in read_blocks 0 [] (** {1 Pretty Printing} *) let pp ppf t = let sb = t.sb in Fmt.pf ppf "@[SpaceOS Block Storage@,\ @[Superblock:@,\ magic = 0x%08x%s@,\ version = %d@,\ tenant_id = %d@,\ blocks = %d@,\ dp_start = %d@,\ dp_size = %d@,\ epoch = %Ld@,\ uuid = %a@,\ crc32 = 0x%08x%s@]@,\ @[Layout:@,\ params = blocks %Ld-%Ld (%d entries)@,\ events = blocks %Ld-%Ld (%d records)@,\ dp = blocks %d-%d@]@]" sb.magic (if Superblock.check_magic sb then " (OK)" else " (BAD)") sb.format_version sb.tenant_id sb.total_blocks sb.dp_start sb.dp_size sb.epoch (Fmt.seq ~sep:Fmt.nop (fun ppf c -> Fmt.pf ppf "%02x" (Char.code c))) (String.to_seq sb.uuid) sb.crc32 (if Superblock.check_crc sb then " (OK)" else " (BAD)") param_start (Int64.sub (Int64.add param_start param_blocks) 1L) (Int64.to_int param_blocks * (Block.info t.blk).sector_size / param_entry_size) event_start (Int64.sub (Int64.add event_start event_blocks) 1L) (max_events t) sb.dp_start (sb.dp_start + sb.dp_size - 1)