The open source OpenXR runtime
1#!/usr/bin/env python3
2# Copyright 2020-2023, Collabora, Ltd.
3# SPDX-License-Identifier: BSL-1.0
4"""Generate code from a JSON file describing the IPC protocol."""
5
6import json
7import re
8
9def write_cpp_header_guard_start(f):
10 """Write the starting C in C++ header guard"""
11 f.write('''
12#ifdef __cplusplus
13extern "C" {
14#endif
15''')
16
17def write_cpp_header_guard_end(f):
18 """Write the ending C in C++ header guard"""
19 f.write('''
20#ifdef __cplusplus
21}
22#endif
23''')
24
25def write_with_wrapped_args(f, start, args, indent):
26 """Write something like a declaration or call."""
27 f.write("\n" + indent)
28 f.write(start)
29 # For parameter indenting
30 delim_pad = ",\n" + indent + (" " * len(start))
31 f.write(delim_pad.join(args))
32 f.write(")")
33
34
35def write_decl(f, return_type, function_name, args, indent=""):
36 """Write a function declaration/definition with wrapped arguments."""
37 f.write("\n" + indent)
38 f.write(return_type)
39 write_with_wrapped_args(f,
40 "{}(".format(function_name),
41 args,
42 indent)
43
44
45def write_invocation(f, return_val, function_name, args, indent=""):
46 """Write a function call with saved return value and wrapped arguments."""
47 write_with_wrapped_args(f,
48 "{} = {}(".format(return_val, function_name),
49 args,
50 indent)
51
52
53def write_result_handler(f, result, cleanup="", indent=""):
54 """Write a check of an xrt_result_t value and early out."""
55 f.write("\n" + indent)
56 f.write("if (%s != XRT_SUCCESS) {" % result)
57 f.write("\n" + indent + "\t")
58
59 if cleanup:
60 f.write(cleanup)
61 f.write("\n" + indent + "\t")
62
63 f.write("return {};".format(result))
64 f.write("\n" + indent + "}\n")
65
66
67def write_msg_struct(f, call, ident):
68 # Message struct
69 if call.needs_msg_struct:
70 f.write(ident + "struct ipc_" + call.name + "_msg _msg = {\n")
71 else:
72 f.write(ident + "struct ipc_command_msg _msg = {\n")
73
74 f.write(ident + " .cmd = " + str(call.id) + ",\n")
75 for arg in call.in_args:
76 if arg.is_aggregate:
77 f.write(ident + " ." + arg.name + " = *" + arg.name + ",\n")
78 else:
79 f.write(ident + " ." + arg.name + " = " + arg.name + ",\n")
80 if call.in_handles:
81 f.write(ident + " ." + call.in_handles.count_arg_name + " = " +
82 call.in_handles.count_arg_name + ",\n")
83 f.write(ident + "};\n")
84
85
86def write_reply_struct(f, call, ident):
87 # Reply struct
88 if call.out_args:
89 f.write(ident + "struct ipc_" + call.name + "_reply _reply;\n")
90 else:
91 f.write(ident + "struct ipc_result_reply _reply = {0};\n")
92 if call.in_handles:
93 f.write(ident + "struct ipc_result_reply _sync = {0};\n")
94
95
96def write_msg_send(f, ret, indent):
97 # Prepare initial sending
98 func = 'ipc_send'
99 args = ['&ipc_c->imc', '&_msg', 'sizeof(_msg)']
100
101 f.write("\n" + indent + "// Send our request")
102 write_invocation(f, ret, func, args, indent=indent)
103 f.write(';')
104
105
106class Arg:
107 """An IPC call argument."""
108
109 # Keep all these synchronized with the definitions in the JSON Schema.
110 SCALAR_TYPES = set(("uint32_t",
111 "int64_t",
112 "uint64_t",
113 "bool",
114 "float"))
115 AGGREGATE_RE = re.compile(r"((const )?struct|union) (xrt|ipc)_[a-z_]+")
116 ENUM_RE = re.compile(r"enum xrt_[a-z_]+")
117
118 @classmethod
119 def parse_array(cls, a):
120 """Turn an array of data into an array of Arg objects."""
121 return [cls(elm) for elm in a]
122
123 def get_func_argument_in(self):
124 """Get the type and name of this argument as an input parameter."""
125 if self.is_aggregate:
126 return "const " + self.typename + " *" + self.name
127 else:
128 return self.typename + " " + self.name
129
130 def get_func_argument_out(self):
131 """Get the type and name of this argument as an output parameter."""
132 return self.typename + " *out_" + self.name
133
134 def get_struct_field(self):
135 """Get the type and name of this argument as a struct field."""
136 return self.typename + " " + self.name
137
138 def dump(self):
139 """Dump human-readable output to standard out."""
140 print("\t\t" + self.typename + ": " + self.name)
141
142 def __init__(self, data):
143 """Construct an argument."""
144 self.name = data['name']
145 self.typename = data['type']
146 self.is_standard_scalar = False
147 self.is_aggregate = False
148 self.is_enum = False
149 if self.typename in self.SCALAR_TYPES:
150 self.is_standard_scalar = True
151 elif self.AGGREGATE_RE.match(self.typename):
152 self.is_aggregate = True
153 elif self.ENUM_RE.match(self.typename):
154 self.is_enum = True
155 else:
156 raise RuntimeError("Could not process type name: " + self.typename)
157
158
159class HandleType:
160 """A native handle type requiring special treatment."""
161
162 # Keep this synchronized with the definition in the JSON Schema.
163 HANDLE_RE = re.compile(r"xrt_([a-z_]+)_handle_t")
164
165 def __init__(self, config_dict):
166 """Construct from dict, originating in JSON."""
167 match = self.HANDLE_RE.match(config_dict.get("type", ""))
168 if not match:
169 raise RuntimeError(
170 "Could not match handle regex to type of in/out handle! "
171 + str(config_dict))
172 self.typename = match.group(0)
173 self.stem = match.group(1)
174 self.argstem = 'handles'
175
176 def __str__(self):
177 """Convert to string by returning the type name."""
178 return self.typename
179
180 @property
181 def arg_name(self):
182 """Get the argument name."""
183 return self.argstem
184
185 @property
186 def count_arg_name(self):
187 """Get the name of the count argument."""
188 return self.argstem[:-1] + "_count"
189
190 @property
191 def count_arg_type(self):
192 """Get the type of the count argument."""
193 return "uint32_t"
194
195 @property
196 def arg_names(self):
197 """Get the argument names for the client proxy."""
198 return (self.arg_name,
199 self.count_arg_name)
200
201 @property
202 def arg_decls(self):
203 """Get the argument declarations for the client proxy."""
204 types = (self.typename + ' *',
205 self.count_arg_type + ' ')
206 return (x + y for x, y in zip(types, self.arg_names))
207
208 @property
209 def const_arg_decls(self):
210 """Get the const argument declarations for the client proxy."""
211 return ("const {}".format(x) for x in self.arg_decls)
212
213 @property
214 def handler_arg_names(self):
215 """Get the argument names for the server handler."""
216 return ('max_' + self.count_arg_name,
217 'out_' + self.arg_name,
218 'out_' + self.count_arg_name)
219
220 @property
221 def handler_arg_decls(self):
222 """Get the argument declarations for the server handler."""
223 types = (self.count_arg_type + ' ',
224 self.typename + ' *',
225 self.count_arg_type + ' *')
226 return (x + y for x, y in zip(types, self.handler_arg_names))
227
228
229class Call:
230 """A single IPC call."""
231
232 def dump(self):
233 """Dump human-readable output to standard out."""
234 print("Call " + self.name)
235 if self.in_args:
236 print("\tIn:")
237 for arg in self.in_args:
238 arg.dump()
239 if self.out_args:
240 print("\tOut:")
241 for arg in self.out_args:
242 arg.dump()
243
244 def write_send_decl(self, f):
245 """Write declaration of ipc_send_CALLNAME_locked."""
246 args = ["struct ipc_connection *ipc_c"]
247 args.extend(arg.get_func_argument_in() for arg in self.in_args)
248 write_decl(f, 'xrt_result_t', 'ipc_send_' + self.name + "_locked", args)
249
250 def write_receive_decl(self, f):
251 """Write declaration of ipc_receive_CALLNAME_locked."""
252 args = ["struct ipc_connection *ipc_c"]
253 args.extend(arg.get_func_argument_out() for arg in self.out_args)
254 write_decl(f, 'xrt_result_t', 'ipc_receive_' + self.name + "_locked", args)
255
256 def write_call_decl(self, f):
257 """Write declaration of ipc_call_CALLNAME."""
258 args = ["struct ipc_connection *ipc_c"]
259 args.extend(arg.get_func_argument_in() for arg in self.in_args)
260 if self.in_handles:
261 args.extend(self.in_handles.const_arg_decls)
262 args.extend(arg.get_func_argument_out() for arg in self.out_args)
263 if self.out_handles:
264 args.extend(self.out_handles.arg_decls)
265 write_decl(f, 'xrt_result_t', 'ipc_call_' + self.name, args)
266
267 def write_handler_decl(self, f):
268 """Write declaration of ipc_handle_CALLNAME."""
269 args = ["volatile struct ipc_client_state *ics"]
270
271 # Always get in arguments.
272 args.extend(arg.get_func_argument_in() for arg in self.in_args)
273
274 # Handle sending reply in the function itself.
275 if not self.varlen:
276 args.extend(arg.get_func_argument_out() for arg in self.out_args)
277
278 if self.out_handles:
279 args.extend(self.out_handles.handler_arg_decls)
280 if self.in_handles:
281 args.extend(self.in_handles.const_arg_decls)
282
283 write_decl(f, 'xrt_result_t', 'ipc_handle_' + self.name, args)
284
285 @property
286 def needs_msg_struct(self):
287 """Decide whether this call needs a msg struct."""
288 return self.in_args or self.in_handles
289
290 def __init__(self, name, data):
291 """Construct a call from call name and call data dictionary."""
292 self.id = None
293 self.name = name
294 self.in_args = []
295 self.out_args = []
296 self.in_handles = None
297 self.out_handles = None
298 self.varlen = False
299 for key, val in data.items():
300 if key == 'id':
301 self.id = val
302 elif key == 'in':
303 self.in_args = Arg.parse_array(val)
304 elif key == 'out':
305 self.out_args = Arg.parse_array(val)
306 elif key == 'out_handles':
307 self.out_handles = HandleType(val)
308 elif key == 'in_handles':
309 self.in_handles = HandleType(val)
310 elif key == 'varlen':
311 self.varlen = val
312 else:
313 raise RuntimeError("Unrecognized key")
314 if not self.id:
315 self.id = "IPC_" + name.upper()
316 if self.varlen and (self.in_handles or self.out_handles):
317 raise Exception("Can not have handles with varlen functions")
318
319
320class Proto:
321 """An IPC protocol containing one or more calls."""
322
323 @classmethod
324 def parse(cls, data):
325 """Parse a dictionary defining a protocol into Call objects."""
326 return cls(data)
327
328 @classmethod
329 def load_and_parse(cls, file):
330 """Load a JSON file and parse it into Call objects."""
331 with open(file) as infile:
332 return cls.parse(json.loads(infile.read()))
333
334 def dump(self):
335 """Dump human-readable output to standard out."""
336 for call in self.calls:
337 call.dump()
338
339 def __init__(self, data):
340 """Construct a protocol from a dictionary of calls."""
341 self.calls = [Call(name, call) for name, call
342 in data.items()
343 if not name.startswith("$")]