Convert opencode transcripts to otel (or agent) traces
1use anyhow::Result;
2use jiff::Timestamp;
3use regex::Regex;
4use serde::Deserialize;
5use std::fs;
6use std::str::FromStr;
7
8#[derive(Debug, Clone, Deserialize)]
9pub struct OpenCodeLog {
10 pub session_id: String,
11 pub created: String,
12 pub updated: String,
13 pub messages: Vec<LogMessage>,
14}
15
16#[derive(Debug, Clone, Deserialize)]
17pub struct LogMessage {
18 #[serde(rename = "type")]
19 pub msg_type: String,
20 pub role: String,
21 pub agent: Option<String>,
22 pub model: Option<String>,
23 pub timestamp: Option<Timestamp>,
24 pub content: String,
25 pub thinking: Option<String>,
26 pub tools_used: Option<Vec<ToolUsage>>,
27 pub duration_ms: Option<u64>,
28}
29
30#[derive(Debug, Clone, Deserialize)]
31pub struct ToolUsage {
32 pub name: String,
33 pub method: String,
34 pub arguments: serde_json::Value,
35 pub result: Option<serde_json::Value>,
36 pub duration_ms: Option<u64>,
37}
38
39#[derive(Debug, Clone, PartialEq)]
40pub enum SpanType {
41 Chat,
42 Thinking,
43 ToolCall,
44}
45
46#[derive(Debug, Clone)]
47pub struct GenAiSpan {
48 pub span_type: SpanType,
49 pub session_id: String,
50 pub timestamp: Timestamp,
51 pub agent: Option<String>,
52 pub model: Option<String>,
53 pub attributes: serde_json::Map<String, serde_json::Value>,
54 pub duration_ms: Option<u64>,
55 pub parent_span_id: Option<String>,
56}
57
58impl GenAiSpan {
59 pub fn span_id(&self) -> String {
60 let span_type_i32 = match self.span_type {
61 SpanType::Chat => 0,
62 SpanType::Thinking => 1,
63 SpanType::ToolCall => 2,
64 };
65
66 let nanos =
67 self.timestamp.as_second() * 1_000_000_000 + self.timestamp.subsec_nanosecond() as i64;
68
69 format!("{}-{}-{}", self.session_id, span_type_i32, nanos)
70 }
71}
72
73pub struct LogParser {
74 header_regex: Regex,
75 assistant_regex: Regex,
76 user_regex: Regex,
77 tool_regex: Regex,
78 tool_output_regex: Regex,
79 pub span_padding_ms: u64,
80}
81
82impl LogParser {
83 pub fn new() -> Self {
84 Self {
85 header_regex: Regex::new(
86 r"\*\*Session ID:\*\* (.+?)\n\*\*Created:\*\* (.+?)\n\*\*Updated:\*\* (.+?)\n",
87 )
88 .unwrap(),
89 assistant_regex: Regex::new(r"## \s*Assistant \s*\(([^)]+)\)\s*").unwrap(),
90 user_regex: Regex::new(r"## \s*User").unwrap(),
91 tool_regex: Regex::new(r"Tool:\s+(\w+)").unwrap(),
92 tool_output_regex: Regex::new(r"Tool:\s+\w+").unwrap(),
93 span_padding_ms: 300,
94 }
95 }
96
97 pub fn with_span_padding(mut self, padding_ms: u64) -> Self {
98 self.span_padding_ms = padding_ms;
99 self
100 }
101
102 pub fn parse_file(&self, path: &str) -> Result<OpenCodeLog> {
103 let content = fs::read_to_string(path)?;
104 self.parse_content(&content)
105 }
106
107 pub fn parse_content(&self, content: &str) -> Result<OpenCodeLog> {
108 let mut session_id = String::new();
109 let mut created = String::new();
110 let mut updated = String::new();
111
112 if let Some(caps) = self.header_regex.captures(content) {
113 session_id = caps[1].to_string();
114 created = caps[2].to_string();
115 updated = caps[3].to_string();
116 }
117
118 let mut messages = Vec::new();
119 let lines: Vec<&str> = content.lines().collect();
120 let mut i = 0;
121
122 while i < lines.len() {
123 let line = lines[i];
124
125 if let Some(_caps) = self.user_regex.captures(line) {
126 let mut message = LogMessage {
127 msg_type: "user".to_string(),
128 role: "user".to_string(),
129 agent: None,
130 model: None,
131 timestamp: None,
132 content: String::new(),
133 thinking: None,
134 tools_used: None,
135 duration_ms: None,
136 };
137
138 i += 1;
139 while i < lines.len() {
140 let line = lines[i];
141
142 if line.starts_with("## ") {
143 break;
144 }
145
146 message.content.push_str(line);
147 message.content.push('\n');
148 i += 1;
149 }
150
151 if !message.content.is_empty() {
152 messages.push(message);
153 }
154
155 continue;
156 }
157
158 if let Some(caps) = self.assistant_regex.captures(line) {
159 let parts: Vec<&str> = caps[1].split("·").map(|s| s.trim()).collect();
160 let agent = parts.get(0).map(|s| s.to_string());
161 let model = parts.get(1).map(|s| s.to_string());
162 let duration_str = parts.get(2).and_then(|s| s.strip_suffix("s"));
163 let duration_ms = duration_str
164 .and_then(|s| s.parse::<f64>().ok())
165 .map(|d| (d * 1000.0) as u64);
166
167 let mut message = LogMessage {
168 msg_type: "assistant".to_string(),
169 role: "assistant".to_string(),
170 agent,
171 model,
172 timestamp: None,
173 content: String::new(),
174 thinking: None,
175 tools_used: None,
176 duration_ms,
177 };
178
179 i += 1;
180 let mut in_thinking = false;
181 let mut thinking_content = String::new();
182
183 while i < lines.len() {
184 let line = lines[i];
185
186 if line.starts_with("## ") {
187 break;
188 }
189
190 if line == "_Thinking:_" {
191 in_thinking = true;
192 i += 1;
193 continue;
194 }
195
196 if in_thinking && line == "Done!" {
197 in_thinking = false;
198 i += 1;
199 continue;
200 }
201
202 if in_thinking {
203 thinking_content.push_str(line);
204 thinking_content.push('\n');
205 } else {
206 message.content.push_str(line);
207 message.content.push('\n');
208 }
209
210 i += 1;
211 }
212
213 if !thinking_content.is_empty() {
214 message.thinking = Some(thinking_content.trim().to_string());
215 }
216
217 message.content = message.content.trim().to_string();
218 messages.push(message);
219 continue;
220 }
221
222 i += 1;
223 }
224
225 Ok(OpenCodeLog {
226 session_id,
227 created,
228 updated,
229 messages,
230 })
231 }
232
233 pub fn parse_entries(&self, log: &OpenCodeLog) -> Vec<GenAiSpan> {
234 let updated_at = Timestamp::from_str(&log.updated).ok();
235 self.parse_entries_with_time(log, updated_at)
236 }
237
238 fn parse_entries_with_time(
239 &self,
240 log: &OpenCodeLog,
241 updated_at: Option<Timestamp>,
242 ) -> Vec<GenAiSpan> {
243 let mut spans = Vec::new();
244 let mut current_user_msg: Option<&LogMessage> = None;
245
246 for msg in &log.messages {
247 match msg.role.as_str() {
248 "user" => {
249 current_user_msg = Some(msg);
250 }
251
252 "assistant" => {
253 if let Some(user_msg) = current_user_msg {
254 let parent_span = self.create_chat_span(Some(user_msg), msg, log);
255 let parent_span_id = parent_span.span_id();
256 spans.push(parent_span);
257
258 if let Some(_thinking) = &msg.thinking {
259 spans.push(self.create_thinking_span(msg, log, &parent_span_id));
260 }
261
262 if let Some(tools) = &msg.tools_used {
263 for tool in tools {
264 spans.push(self.create_tool_span(msg, tool, log, &parent_span_id));
265 }
266 }
267
268 current_user_msg = None;
269 }
270 }
271
272 _ => {}
273 }
274 }
275
276 self.calculate_timestamps(&mut spans, updated_at);
277 spans
278 }
279
280 fn create_chat_span(
281 &self,
282 user_msg: Option<&LogMessage>,
283 assistant_msg: &LogMessage,
284 log: &OpenCodeLog,
285 ) -> GenAiSpan {
286 let mut attributes = serde_json::Map::new();
287
288 attributes.insert(
289 "gen_ai.conversation.id".to_string(),
290 serde_json::json!(log.session_id),
291 );
292
293 if let Some(user) = user_msg {
294 let input_messages = serde_json::json!([{
295 "role": "user",
296 "parts": [{
297 "type": "text",
298 "content": user.content.trim()
299 }]
300 }]);
301 attributes.insert("gen_ai.input.messages".to_string(), input_messages);
302 }
303
304 if let Some(agent) = &assistant_msg.agent {
305 attributes.insert(
306 "gen_ai.system.agent_name".to_string(),
307 serde_json::json!(agent),
308 );
309 }
310
311 if let Some(model) = &assistant_msg.model {
312 attributes.insert("gen_ai.model.name".to_string(), serde_json::json!(model));
313 }
314
315 let mut output_messages = Vec::new();
316
317 if let Some(thinking) = &assistant_msg.thinking {
318 output_messages.push(serde_json::json!({
319 "role": "assistant",
320 "parts": [{
321 "type": "reasoning",
322 "content": thinking.trim()
323 }],
324 "finish_reason": "stop"
325 }));
326 }
327
328 let content_cleaned = assistant_msg
329 .content
330 .lines()
331 .filter(|l| !self.tool_output_regex.is_match(l))
332 .collect::<Vec<_>>()
333 .join("\n")
334 .trim()
335 .to_string();
336
337 if !content_cleaned.is_empty() {
338 output_messages.push(serde_json::json!({
339 "role": "assistant",
340 "parts": [{
341 "type": "text",
342 "content": content_cleaned
343 }],
344 "finish_reason": "stop"
345 }));
346 }
347
348 if !output_messages.is_empty() {
349 attributes.insert(
350 "gen_ai.output.messages".to_string(),
351 serde_json::json!(output_messages),
352 );
353 }
354
355 GenAiSpan {
356 span_type: SpanType::Chat,
357 session_id: log.session_id.clone(),
358 timestamp: assistant_msg.timestamp.unwrap_or_else(|| Timestamp::now()),
359 agent: assistant_msg.agent.clone(),
360 model: assistant_msg.model.clone(),
361 attributes,
362 duration_ms: assistant_msg.duration_ms,
363 parent_span_id: None,
364 }
365 }
366
367 fn create_thinking_span(
368 &self,
369 assistant_msg: &LogMessage,
370 log: &OpenCodeLog,
371 parent_span_id: &str,
372 ) -> GenAiSpan {
373 let mut attributes = serde_json::Map::new();
374
375 attributes.insert(
376 "gen_ai.conversation.id".to_string(),
377 serde_json::json!(log.session_id),
378 );
379
380 if let Some(agent) = &assistant_msg.agent {
381 attributes.insert(
382 "gen_ai.system.agent_name".to_string(),
383 serde_json::json!(agent),
384 );
385 }
386
387 if let Some(model) = &assistant_msg.model {
388 attributes.insert("gen_ai.model.name".to_string(), serde_json::json!(model));
389 }
390
391 if let Some(thinking) = &assistant_msg.thinking {
392 let output_messages = serde_json::json!([{
393 "role": "assistant",
394 "parts": [{
395 "type": "reasoning",
396 "content": thinking.trim()
397 }],
398 "finish_reason": "stop"
399 }]);
400 attributes.insert("gen_ai.output.messages".to_string(), output_messages);
401 }
402
403 GenAiSpan {
404 span_type: SpanType::Thinking,
405 session_id: log.session_id.clone(),
406 timestamp: assistant_msg.timestamp.unwrap_or_else(|| Timestamp::now()),
407 agent: assistant_msg.agent.clone(),
408 model: assistant_msg.model.clone(),
409 attributes,
410 duration_ms: None,
411 parent_span_id: Some(parent_span_id.to_string()),
412 }
413 }
414
415 fn create_tool_span(
416 &self,
417 assistant_msg: &LogMessage,
418 tool: &ToolUsage,
419 log: &OpenCodeLog,
420 parent_span_id: &str,
421 ) -> GenAiSpan {
422 let mut attributes = serde_json::Map::new();
423
424 attributes.insert(
425 "gen_ai.conversation.id".to_string(),
426 serde_json::json!(log.session_id),
427 );
428 attributes.insert(
429 "mcp.method.name".to_string(),
430 serde_json::json!("tools/call"),
431 );
432 attributes.insert("gen_ai.tool.name".to_string(), serde_json::json!(tool.name));
433 attributes.insert(
434 "gen_ai.operation.name".to_string(),
435 serde_json::json!("execute_tool"),
436 );
437 attributes.insert(
438 "gen_ai.tool.call.arguments".to_string(),
439 serde_json::json!(tool.arguments),
440 );
441
442 if let Some(agent) = &assistant_msg.agent {
443 attributes.insert(
444 "gen_ai.system.agent_name".to_string(),
445 serde_json::json!(agent),
446 );
447 }
448
449 if let Some(model) = &assistant_msg.model {
450 attributes.insert("gen_ai.model.name".to_string(), serde_json::json!(model));
451 }
452
453 if let Some(result) = &tool.result {
454 attributes.insert(
455 "gen_ai.tool.call.result".to_string(),
456 serde_json::json!(result),
457 );
458 }
459
460 GenAiSpan {
461 span_type: SpanType::ToolCall,
462 session_id: log.session_id.clone(),
463 timestamp: assistant_msg.timestamp.unwrap_or_else(|| Timestamp::now()),
464 agent: assistant_msg.agent.clone(),
465 model: assistant_msg.model.clone(),
466 attributes,
467 duration_ms: tool.duration_ms,
468 parent_span_id: Some(parent_span_id.to_string()),
469 }
470 }
471
472 pub fn calculate_timestamps(&self, spans: &mut Vec<GenAiSpan>, updated_at: Option<Timestamp>) {
473 let now = updated_at.unwrap_or_else(|| Timestamp::now());
474 let mut current_end_ms = now.as_millisecond();
475
476 let mut chat_indices: Vec<usize> = spans
477 .iter()
478 .enumerate()
479 .filter(|(_, s)| s.span_type == SpanType::Chat)
480 .map(|(i, _)| i)
481 .collect();
482
483 chat_indices.sort_by(|a, b| b.cmp(a));
484
485 for idx in chat_indices {
486 if let Some(span) = spans.get(idx) {
487 let parent_id = span.span_id();
488
489 let child_durations: u64 = spans
490 .iter()
491 .filter(|s| s.parent_span_id.as_deref() == Some(parent_id.as_str()))
492 .map(|s| s.duration_ms.unwrap_or(0))
493 .sum();
494
495 let duration_ms = span
496 .duration_ms
497 .unwrap_or(child_durations)
498 .max(child_durations);
499
500 let end_ms = current_end_ms;
501 let start_ms = end_ms - duration_ms as i64;
502
503 if let Some(s) = spans.get_mut(idx) {
504 s.timestamp = Timestamp::from_millisecond(start_ms).unwrap();
505 s.duration_ms = Some(duration_ms);
506 }
507
508 self.set_child_timestamps(spans, parent_id.as_str(), start_ms, end_ms);
509
510 current_end_ms = start_ms - self.span_padding_ms as i64;
511 }
512 }
513 }
514
515 fn set_child_timestamps(
516 &self,
517 spans: &mut [GenAiSpan],
518 parent_id: &str,
519 parent_start_ms: i64,
520 parent_end_ms: i64,
521 ) {
522 let child_indices: Vec<usize> = spans
523 .iter()
524 .enumerate()
525 .filter(|(_, s)| s.parent_span_id.as_deref() == Some(parent_id))
526 .map(|(i, _)| i)
527 .collect();
528
529 if child_indices.is_empty() {
530 return;
531 }
532
533 let parent_duration_ms = (parent_end_ms - parent_start_ms) as u64;
534 let child_duration_ms = parent_duration_ms / child_indices.len() as u64;
535
536 let mut current_start_ms = parent_start_ms;
537
538 for idx in child_indices {
539 if let Some(span) = spans.get_mut(idx) {
540 let duration_ms = if span.duration_ms.is_some() {
541 span.duration_ms.unwrap()
542 } else {
543 child_duration_ms
544 };
545 let start_ms = current_start_ms;
546 let end_ms = start_ms + duration_ms as i64;
547
548 span.timestamp = Timestamp::from_millisecond(start_ms).unwrap();
549 span.duration_ms = Some(duration_ms);
550
551 current_start_ms = end_ms;
552 }
553 }
554 }
555}