+2
-6
lib/buffer_pool.ml
+2
-6
lib/buffer_pool.ml
···
34
34
{ tx = (fun ~xt -> Kcas_data.Queue.Xt.take_opt ~xt t.buffers) }
35
35
in
36
36
match buf_opt with
37
-
| Some buf ->
38
-
Cstruct.memset buf 0;
39
-
buf
37
+
| Some buf -> buf
40
38
| None ->
41
39
(* Should not happen if semaphore is properly synchronized,
42
40
but handle gracefully by allocating a new buffer *)
···
54
52
{ tx = (fun ~xt -> Kcas_data.Queue.Xt.take_opt ~xt t.buffers) }
55
53
in
56
54
match buf_opt with
57
-
| Some buf ->
58
-
Cstruct.memset buf 0;
59
-
Some buf
55
+
| Some buf -> Some buf
60
56
| None -> Some (Cstruct.create t.buf_size)
61
57
end
62
58
else None
+30
-16
lib/codec.ml
+30
-16
lib/codec.ml
···
537
537
| [] ->
538
538
encode_internal_msg_to_cstruct ~self_name ~self_port packet.primary ~buf
539
539
| piggyback -> (
540
-
let encode_one msg =
541
-
let temp_buf = Cstruct.create 2048 in
542
-
match
543
-
encode_internal_msg_to_cstruct ~self_name ~self_port msg ~buf:temp_buf
544
-
with
545
-
| Error _ -> None
546
-
| Ok len -> Some (Cstruct.sub temp_buf 0 len, len)
547
-
in
548
-
let primary_result = encode_one packet.primary in
549
-
let piggyback_results = List.filter_map encode_one piggyback in
550
-
match primary_result with
551
-
| None -> Error `Buffer_too_small
552
-
| Some (primary_cs, primary_len) ->
553
-
let all_msgs = primary_cs :: List.map fst piggyback_results in
554
-
let all_lens = primary_len :: List.map snd piggyback_results in
555
-
encode_compound_to_cstruct ~msgs:all_msgs ~msg_lens:all_lens ~dst:buf)
540
+
let msgs = packet.primary :: piggyback in
541
+
let num_msgs = List.length msgs in
542
+
if num_msgs > 255 then failwith "too many messages for compound"
543
+
else
544
+
let header_size = 1 + 1 + (num_msgs * 2) in
545
+
if header_size > Cstruct.length buf then Error `Buffer_too_small
546
+
else
547
+
let rec encode_msgs i msgs current_offset =
548
+
match msgs with
549
+
| [] -> Ok current_offset
550
+
| msg :: rest -> (
551
+
if current_offset >= Cstruct.length buf then
552
+
Error `Buffer_too_small
553
+
else
554
+
let slice = Cstruct.shift buf current_offset in
555
+
match
556
+
encode_internal_msg_to_cstruct ~self_name ~self_port msg
557
+
~buf:slice
558
+
with
559
+
| Error _ -> Error `Buffer_too_small
560
+
| Ok len ->
561
+
Cstruct.BE.set_uint16 buf (2 + (i * 2)) len;
562
+
encode_msgs (i + 1) rest (current_offset + len))
563
+
in
564
+
match encode_msgs 0 msgs header_size with
565
+
| Ok final_offset ->
566
+
Cstruct.set_uint8 buf 0 (message_type_to_int Compound_msg);
567
+
Cstruct.set_uint8 buf 1 num_msgs;
568
+
Ok final_offset
569
+
| Error e -> Error e)
556
570
557
571
let decode_packet (buf : Cstruct.t) : (Types.packet, Types.decode_error) result
558
572
=
+12
-2
lib/types.ml
+12
-2
lib/types.ml
···
403
403
}
404
404
| User_msg { topic; payload; origin } ->
405
405
let origin_str = node_id_to_string origin in
406
+
let topic_len = String.length topic in
407
+
let origin_len = String.length origin_str in
406
408
let encoded =
407
-
Printf.sprintf "%d:%s%d:%s%s" (String.length topic) topic
408
-
(String.length origin_str) origin_str payload
409
+
String.concat ""
410
+
[
411
+
string_of_int topic_len;
412
+
":";
413
+
topic;
414
+
string_of_int origin_len;
415
+
":";
416
+
origin_str;
417
+
payload;
418
+
]
409
419
in
410
420
Wire.User_data encoded
411
421