The open source OpenXR runtime
at main 343 lines 12 kB view raw
1#!/usr/bin/env python3 2# Copyright 2020-2023, Collabora, Ltd. 3# SPDX-License-Identifier: BSL-1.0 4"""Generate code from a JSON file describing the IPC protocol.""" 5 6import json 7import re 8 9def write_cpp_header_guard_start(f): 10 """Write the starting C in C++ header guard""" 11 f.write(''' 12#ifdef __cplusplus 13extern "C" { 14#endif 15''') 16 17def write_cpp_header_guard_end(f): 18 """Write the ending C in C++ header guard""" 19 f.write(''' 20#ifdef __cplusplus 21} 22#endif 23''') 24 25def write_with_wrapped_args(f, start, args, indent): 26 """Write something like a declaration or call.""" 27 f.write("\n" + indent) 28 f.write(start) 29 # For parameter indenting 30 delim_pad = ",\n" + indent + (" " * len(start)) 31 f.write(delim_pad.join(args)) 32 f.write(")") 33 34 35def write_decl(f, return_type, function_name, args, indent=""): 36 """Write a function declaration/definition with wrapped arguments.""" 37 f.write("\n" + indent) 38 f.write(return_type) 39 write_with_wrapped_args(f, 40 "{}(".format(function_name), 41 args, 42 indent) 43 44 45def write_invocation(f, return_val, function_name, args, indent=""): 46 """Write a function call with saved return value and wrapped arguments.""" 47 write_with_wrapped_args(f, 48 "{} = {}(".format(return_val, function_name), 49 args, 50 indent) 51 52 53def write_result_handler(f, result, cleanup="", indent=""): 54 """Write a check of an xrt_result_t value and early out.""" 55 f.write("\n" + indent) 56 f.write("if (%s != XRT_SUCCESS) {" % result) 57 f.write("\n" + indent + "\t") 58 59 if cleanup: 60 f.write(cleanup) 61 f.write("\n" + indent + "\t") 62 63 f.write("return {};".format(result)) 64 f.write("\n" + indent + "}\n") 65 66 67def write_msg_struct(f, call, ident): 68 # Message struct 69 if call.needs_msg_struct: 70 f.write(ident + "struct ipc_" + call.name + "_msg _msg = {\n") 71 else: 72 f.write(ident + "struct ipc_command_msg _msg = {\n") 73 74 f.write(ident + " .cmd = " + str(call.id) + ",\n") 75 for arg in call.in_args: 76 if arg.is_aggregate: 77 f.write(ident + " ." + arg.name + " = *" + arg.name + ",\n") 78 else: 79 f.write(ident + " ." + arg.name + " = " + arg.name + ",\n") 80 if call.in_handles: 81 f.write(ident + " ." + call.in_handles.count_arg_name + " = " + 82 call.in_handles.count_arg_name + ",\n") 83 f.write(ident + "};\n") 84 85 86def write_reply_struct(f, call, ident): 87 # Reply struct 88 if call.out_args: 89 f.write(ident + "struct ipc_" + call.name + "_reply _reply;\n") 90 else: 91 f.write(ident + "struct ipc_result_reply _reply = {0};\n") 92 if call.in_handles: 93 f.write(ident + "struct ipc_result_reply _sync = {0};\n") 94 95 96def write_msg_send(f, ret, indent): 97 # Prepare initial sending 98 func = 'ipc_send' 99 args = ['&ipc_c->imc', '&_msg', 'sizeof(_msg)'] 100 101 f.write("\n" + indent + "// Send our request") 102 write_invocation(f, ret, func, args, indent=indent) 103 f.write(';') 104 105 106class Arg: 107 """An IPC call argument.""" 108 109 # Keep all these synchronized with the definitions in the JSON Schema. 110 SCALAR_TYPES = set(("uint32_t", 111 "int64_t", 112 "uint64_t", 113 "bool", 114 "float")) 115 AGGREGATE_RE = re.compile(r"((const )?struct|union) (xrt|ipc)_[a-z_]+") 116 ENUM_RE = re.compile(r"enum xrt_[a-z_]+") 117 118 @classmethod 119 def parse_array(cls, a): 120 """Turn an array of data into an array of Arg objects.""" 121 return [cls(elm) for elm in a] 122 123 def get_func_argument_in(self): 124 """Get the type and name of this argument as an input parameter.""" 125 if self.is_aggregate: 126 return "const " + self.typename + " *" + self.name 127 else: 128 return self.typename + " " + self.name 129 130 def get_func_argument_out(self): 131 """Get the type and name of this argument as an output parameter.""" 132 return self.typename + " *out_" + self.name 133 134 def get_struct_field(self): 135 """Get the type and name of this argument as a struct field.""" 136 return self.typename + " " + self.name 137 138 def dump(self): 139 """Dump human-readable output to standard out.""" 140 print("\t\t" + self.typename + ": " + self.name) 141 142 def __init__(self, data): 143 """Construct an argument.""" 144 self.name = data['name'] 145 self.typename = data['type'] 146 self.is_standard_scalar = False 147 self.is_aggregate = False 148 self.is_enum = False 149 if self.typename in self.SCALAR_TYPES: 150 self.is_standard_scalar = True 151 elif self.AGGREGATE_RE.match(self.typename): 152 self.is_aggregate = True 153 elif self.ENUM_RE.match(self.typename): 154 self.is_enum = True 155 else: 156 raise RuntimeError("Could not process type name: " + self.typename) 157 158 159class HandleType: 160 """A native handle type requiring special treatment.""" 161 162 # Keep this synchronized with the definition in the JSON Schema. 163 HANDLE_RE = re.compile(r"xrt_([a-z_]+)_handle_t") 164 165 def __init__(self, config_dict): 166 """Construct from dict, originating in JSON.""" 167 match = self.HANDLE_RE.match(config_dict.get("type", "")) 168 if not match: 169 raise RuntimeError( 170 "Could not match handle regex to type of in/out handle! " 171 + str(config_dict)) 172 self.typename = match.group(0) 173 self.stem = match.group(1) 174 self.argstem = 'handles' 175 176 def __str__(self): 177 """Convert to string by returning the type name.""" 178 return self.typename 179 180 @property 181 def arg_name(self): 182 """Get the argument name.""" 183 return self.argstem 184 185 @property 186 def count_arg_name(self): 187 """Get the name of the count argument.""" 188 return self.argstem[:-1] + "_count" 189 190 @property 191 def count_arg_type(self): 192 """Get the type of the count argument.""" 193 return "uint32_t" 194 195 @property 196 def arg_names(self): 197 """Get the argument names for the client proxy.""" 198 return (self.arg_name, 199 self.count_arg_name) 200 201 @property 202 def arg_decls(self): 203 """Get the argument declarations for the client proxy.""" 204 types = (self.typename + ' *', 205 self.count_arg_type + ' ') 206 return (x + y for x, y in zip(types, self.arg_names)) 207 208 @property 209 def const_arg_decls(self): 210 """Get the const argument declarations for the client proxy.""" 211 return ("const {}".format(x) for x in self.arg_decls) 212 213 @property 214 def handler_arg_names(self): 215 """Get the argument names for the server handler.""" 216 return ('max_' + self.count_arg_name, 217 'out_' + self.arg_name, 218 'out_' + self.count_arg_name) 219 220 @property 221 def handler_arg_decls(self): 222 """Get the argument declarations for the server handler.""" 223 types = (self.count_arg_type + ' ', 224 self.typename + ' *', 225 self.count_arg_type + ' *') 226 return (x + y for x, y in zip(types, self.handler_arg_names)) 227 228 229class Call: 230 """A single IPC call.""" 231 232 def dump(self): 233 """Dump human-readable output to standard out.""" 234 print("Call " + self.name) 235 if self.in_args: 236 print("\tIn:") 237 for arg in self.in_args: 238 arg.dump() 239 if self.out_args: 240 print("\tOut:") 241 for arg in self.out_args: 242 arg.dump() 243 244 def write_send_decl(self, f): 245 """Write declaration of ipc_send_CALLNAME_locked.""" 246 args = ["struct ipc_connection *ipc_c"] 247 args.extend(arg.get_func_argument_in() for arg in self.in_args) 248 write_decl(f, 'xrt_result_t', 'ipc_send_' + self.name + "_locked", args) 249 250 def write_receive_decl(self, f): 251 """Write declaration of ipc_receive_CALLNAME_locked.""" 252 args = ["struct ipc_connection *ipc_c"] 253 args.extend(arg.get_func_argument_out() for arg in self.out_args) 254 write_decl(f, 'xrt_result_t', 'ipc_receive_' + self.name + "_locked", args) 255 256 def write_call_decl(self, f): 257 """Write declaration of ipc_call_CALLNAME.""" 258 args = ["struct ipc_connection *ipc_c"] 259 args.extend(arg.get_func_argument_in() for arg in self.in_args) 260 if self.in_handles: 261 args.extend(self.in_handles.const_arg_decls) 262 args.extend(arg.get_func_argument_out() for arg in self.out_args) 263 if self.out_handles: 264 args.extend(self.out_handles.arg_decls) 265 write_decl(f, 'xrt_result_t', 'ipc_call_' + self.name, args) 266 267 def write_handler_decl(self, f): 268 """Write declaration of ipc_handle_CALLNAME.""" 269 args = ["volatile struct ipc_client_state *ics"] 270 271 # Always get in arguments. 272 args.extend(arg.get_func_argument_in() for arg in self.in_args) 273 274 # Handle sending reply in the function itself. 275 if not self.varlen: 276 args.extend(arg.get_func_argument_out() for arg in self.out_args) 277 278 if self.out_handles: 279 args.extend(self.out_handles.handler_arg_decls) 280 if self.in_handles: 281 args.extend(self.in_handles.const_arg_decls) 282 283 write_decl(f, 'xrt_result_t', 'ipc_handle_' + self.name, args) 284 285 @property 286 def needs_msg_struct(self): 287 """Decide whether this call needs a msg struct.""" 288 return self.in_args or self.in_handles 289 290 def __init__(self, name, data): 291 """Construct a call from call name and call data dictionary.""" 292 self.id = None 293 self.name = name 294 self.in_args = [] 295 self.out_args = [] 296 self.in_handles = None 297 self.out_handles = None 298 self.varlen = False 299 for key, val in data.items(): 300 if key == 'id': 301 self.id = val 302 elif key == 'in': 303 self.in_args = Arg.parse_array(val) 304 elif key == 'out': 305 self.out_args = Arg.parse_array(val) 306 elif key == 'out_handles': 307 self.out_handles = HandleType(val) 308 elif key == 'in_handles': 309 self.in_handles = HandleType(val) 310 elif key == 'varlen': 311 self.varlen = val 312 else: 313 raise RuntimeError("Unrecognized key") 314 if not self.id: 315 self.id = "IPC_" + name.upper() 316 if self.varlen and (self.in_handles or self.out_handles): 317 raise Exception("Can not have handles with varlen functions") 318 319 320class Proto: 321 """An IPC protocol containing one or more calls.""" 322 323 @classmethod 324 def parse(cls, data): 325 """Parse a dictionary defining a protocol into Call objects.""" 326 return cls(data) 327 328 @classmethod 329 def load_and_parse(cls, file): 330 """Load a JSON file and parse it into Call objects.""" 331 with open(file) as infile: 332 return cls.parse(json.loads(infile.read())) 333 334 def dump(self): 335 """Dump human-readable output to standard out.""" 336 for call in self.calls: 337 call.dump() 338 339 def __init__(self, data): 340 """Construct a protocol from a dictionary of calls.""" 341 self.calls = [Call(name, call) for name, call 342 in data.items() 343 if not name.startswith("$")]