Convert opencode transcripts to otel (or agent) traces
at main 555 lines 17 kB view raw
1use anyhow::Result; 2use jiff::Timestamp; 3use regex::Regex; 4use serde::Deserialize; 5use std::fs; 6use std::str::FromStr; 7 8#[derive(Debug, Clone, Deserialize)] 9pub struct OpenCodeLog { 10 pub session_id: String, 11 pub created: String, 12 pub updated: String, 13 pub messages: Vec<LogMessage>, 14} 15 16#[derive(Debug, Clone, Deserialize)] 17pub struct LogMessage { 18 #[serde(rename = "type")] 19 pub msg_type: String, 20 pub role: String, 21 pub agent: Option<String>, 22 pub model: Option<String>, 23 pub timestamp: Option<Timestamp>, 24 pub content: String, 25 pub thinking: Option<String>, 26 pub tools_used: Option<Vec<ToolUsage>>, 27 pub duration_ms: Option<u64>, 28} 29 30#[derive(Debug, Clone, Deserialize)] 31pub struct ToolUsage { 32 pub name: String, 33 pub method: String, 34 pub arguments: serde_json::Value, 35 pub result: Option<serde_json::Value>, 36 pub duration_ms: Option<u64>, 37} 38 39#[derive(Debug, Clone, PartialEq)] 40pub enum SpanType { 41 Chat, 42 Thinking, 43 ToolCall, 44} 45 46#[derive(Debug, Clone)] 47pub struct GenAiSpan { 48 pub span_type: SpanType, 49 pub session_id: String, 50 pub timestamp: Timestamp, 51 pub agent: Option<String>, 52 pub model: Option<String>, 53 pub attributes: serde_json::Map<String, serde_json::Value>, 54 pub duration_ms: Option<u64>, 55 pub parent_span_id: Option<String>, 56} 57 58impl GenAiSpan { 59 pub fn span_id(&self) -> String { 60 let span_type_i32 = match self.span_type { 61 SpanType::Chat => 0, 62 SpanType::Thinking => 1, 63 SpanType::ToolCall => 2, 64 }; 65 66 let nanos = 67 self.timestamp.as_second() * 1_000_000_000 + self.timestamp.subsec_nanosecond() as i64; 68 69 format!("{}-{}-{}", self.session_id, span_type_i32, nanos) 70 } 71} 72 73pub struct LogParser { 74 header_regex: Regex, 75 assistant_regex: Regex, 76 user_regex: Regex, 77 tool_regex: Regex, 78 tool_output_regex: Regex, 79 pub span_padding_ms: u64, 80} 81 82impl LogParser { 83 pub fn new() -> Self { 84 Self { 85 header_regex: Regex::new( 86 r"\*\*Session ID:\*\* (.+?)\n\*\*Created:\*\* (.+?)\n\*\*Updated:\*\* (.+?)\n", 87 ) 88 .unwrap(), 89 assistant_regex: Regex::new(r"## \s*Assistant \s*\(([^)]+)\)\s*").unwrap(), 90 user_regex: Regex::new(r"## \s*User").unwrap(), 91 tool_regex: Regex::new(r"Tool:\s+(\w+)").unwrap(), 92 tool_output_regex: Regex::new(r"Tool:\s+\w+").unwrap(), 93 span_padding_ms: 300, 94 } 95 } 96 97 pub fn with_span_padding(mut self, padding_ms: u64) -> Self { 98 self.span_padding_ms = padding_ms; 99 self 100 } 101 102 pub fn parse_file(&self, path: &str) -> Result<OpenCodeLog> { 103 let content = fs::read_to_string(path)?; 104 self.parse_content(&content) 105 } 106 107 pub fn parse_content(&self, content: &str) -> Result<OpenCodeLog> { 108 let mut session_id = String::new(); 109 let mut created = String::new(); 110 let mut updated = String::new(); 111 112 if let Some(caps) = self.header_regex.captures(content) { 113 session_id = caps[1].to_string(); 114 created = caps[2].to_string(); 115 updated = caps[3].to_string(); 116 } 117 118 let mut messages = Vec::new(); 119 let lines: Vec<&str> = content.lines().collect(); 120 let mut i = 0; 121 122 while i < lines.len() { 123 let line = lines[i]; 124 125 if let Some(_caps) = self.user_regex.captures(line) { 126 let mut message = LogMessage { 127 msg_type: "user".to_string(), 128 role: "user".to_string(), 129 agent: None, 130 model: None, 131 timestamp: None, 132 content: String::new(), 133 thinking: None, 134 tools_used: None, 135 duration_ms: None, 136 }; 137 138 i += 1; 139 while i < lines.len() { 140 let line = lines[i]; 141 142 if line.starts_with("## ") { 143 break; 144 } 145 146 message.content.push_str(line); 147 message.content.push('\n'); 148 i += 1; 149 } 150 151 if !message.content.is_empty() { 152 messages.push(message); 153 } 154 155 continue; 156 } 157 158 if let Some(caps) = self.assistant_regex.captures(line) { 159 let parts: Vec<&str> = caps[1].split("·").map(|s| s.trim()).collect(); 160 let agent = parts.get(0).map(|s| s.to_string()); 161 let model = parts.get(1).map(|s| s.to_string()); 162 let duration_str = parts.get(2).and_then(|s| s.strip_suffix("s")); 163 let duration_ms = duration_str 164 .and_then(|s| s.parse::<f64>().ok()) 165 .map(|d| (d * 1000.0) as u64); 166 167 let mut message = LogMessage { 168 msg_type: "assistant".to_string(), 169 role: "assistant".to_string(), 170 agent, 171 model, 172 timestamp: None, 173 content: String::new(), 174 thinking: None, 175 tools_used: None, 176 duration_ms, 177 }; 178 179 i += 1; 180 let mut in_thinking = false; 181 let mut thinking_content = String::new(); 182 183 while i < lines.len() { 184 let line = lines[i]; 185 186 if line.starts_with("## ") { 187 break; 188 } 189 190 if line == "_Thinking:_" { 191 in_thinking = true; 192 i += 1; 193 continue; 194 } 195 196 if in_thinking && line == "Done!" { 197 in_thinking = false; 198 i += 1; 199 continue; 200 } 201 202 if in_thinking { 203 thinking_content.push_str(line); 204 thinking_content.push('\n'); 205 } else { 206 message.content.push_str(line); 207 message.content.push('\n'); 208 } 209 210 i += 1; 211 } 212 213 if !thinking_content.is_empty() { 214 message.thinking = Some(thinking_content.trim().to_string()); 215 } 216 217 message.content = message.content.trim().to_string(); 218 messages.push(message); 219 continue; 220 } 221 222 i += 1; 223 } 224 225 Ok(OpenCodeLog { 226 session_id, 227 created, 228 updated, 229 messages, 230 }) 231 } 232 233 pub fn parse_entries(&self, log: &OpenCodeLog) -> Vec<GenAiSpan> { 234 let updated_at = Timestamp::from_str(&log.updated).ok(); 235 self.parse_entries_with_time(log, updated_at) 236 } 237 238 fn parse_entries_with_time( 239 &self, 240 log: &OpenCodeLog, 241 updated_at: Option<Timestamp>, 242 ) -> Vec<GenAiSpan> { 243 let mut spans = Vec::new(); 244 let mut current_user_msg: Option<&LogMessage> = None; 245 246 for msg in &log.messages { 247 match msg.role.as_str() { 248 "user" => { 249 current_user_msg = Some(msg); 250 } 251 252 "assistant" => { 253 if let Some(user_msg) = current_user_msg { 254 let parent_span = self.create_chat_span(Some(user_msg), msg, log); 255 let parent_span_id = parent_span.span_id(); 256 spans.push(parent_span); 257 258 if let Some(_thinking) = &msg.thinking { 259 spans.push(self.create_thinking_span(msg, log, &parent_span_id)); 260 } 261 262 if let Some(tools) = &msg.tools_used { 263 for tool in tools { 264 spans.push(self.create_tool_span(msg, tool, log, &parent_span_id)); 265 } 266 } 267 268 current_user_msg = None; 269 } 270 } 271 272 _ => {} 273 } 274 } 275 276 self.calculate_timestamps(&mut spans, updated_at); 277 spans 278 } 279 280 fn create_chat_span( 281 &self, 282 user_msg: Option<&LogMessage>, 283 assistant_msg: &LogMessage, 284 log: &OpenCodeLog, 285 ) -> GenAiSpan { 286 let mut attributes = serde_json::Map::new(); 287 288 attributes.insert( 289 "gen_ai.conversation.id".to_string(), 290 serde_json::json!(log.session_id), 291 ); 292 293 if let Some(user) = user_msg { 294 let input_messages = serde_json::json!([{ 295 "role": "user", 296 "parts": [{ 297 "type": "text", 298 "content": user.content.trim() 299 }] 300 }]); 301 attributes.insert("gen_ai.input.messages".to_string(), input_messages); 302 } 303 304 if let Some(agent) = &assistant_msg.agent { 305 attributes.insert( 306 "gen_ai.system.agent_name".to_string(), 307 serde_json::json!(agent), 308 ); 309 } 310 311 if let Some(model) = &assistant_msg.model { 312 attributes.insert("gen_ai.model.name".to_string(), serde_json::json!(model)); 313 } 314 315 let mut output_messages = Vec::new(); 316 317 if let Some(thinking) = &assistant_msg.thinking { 318 output_messages.push(serde_json::json!({ 319 "role": "assistant", 320 "parts": [{ 321 "type": "reasoning", 322 "content": thinking.trim() 323 }], 324 "finish_reason": "stop" 325 })); 326 } 327 328 let content_cleaned = assistant_msg 329 .content 330 .lines() 331 .filter(|l| !self.tool_output_regex.is_match(l)) 332 .collect::<Vec<_>>() 333 .join("\n") 334 .trim() 335 .to_string(); 336 337 if !content_cleaned.is_empty() { 338 output_messages.push(serde_json::json!({ 339 "role": "assistant", 340 "parts": [{ 341 "type": "text", 342 "content": content_cleaned 343 }], 344 "finish_reason": "stop" 345 })); 346 } 347 348 if !output_messages.is_empty() { 349 attributes.insert( 350 "gen_ai.output.messages".to_string(), 351 serde_json::json!(output_messages), 352 ); 353 } 354 355 GenAiSpan { 356 span_type: SpanType::Chat, 357 session_id: log.session_id.clone(), 358 timestamp: assistant_msg.timestamp.unwrap_or_else(|| Timestamp::now()), 359 agent: assistant_msg.agent.clone(), 360 model: assistant_msg.model.clone(), 361 attributes, 362 duration_ms: assistant_msg.duration_ms, 363 parent_span_id: None, 364 } 365 } 366 367 fn create_thinking_span( 368 &self, 369 assistant_msg: &LogMessage, 370 log: &OpenCodeLog, 371 parent_span_id: &str, 372 ) -> GenAiSpan { 373 let mut attributes = serde_json::Map::new(); 374 375 attributes.insert( 376 "gen_ai.conversation.id".to_string(), 377 serde_json::json!(log.session_id), 378 ); 379 380 if let Some(agent) = &assistant_msg.agent { 381 attributes.insert( 382 "gen_ai.system.agent_name".to_string(), 383 serde_json::json!(agent), 384 ); 385 } 386 387 if let Some(model) = &assistant_msg.model { 388 attributes.insert("gen_ai.model.name".to_string(), serde_json::json!(model)); 389 } 390 391 if let Some(thinking) = &assistant_msg.thinking { 392 let output_messages = serde_json::json!([{ 393 "role": "assistant", 394 "parts": [{ 395 "type": "reasoning", 396 "content": thinking.trim() 397 }], 398 "finish_reason": "stop" 399 }]); 400 attributes.insert("gen_ai.output.messages".to_string(), output_messages); 401 } 402 403 GenAiSpan { 404 span_type: SpanType::Thinking, 405 session_id: log.session_id.clone(), 406 timestamp: assistant_msg.timestamp.unwrap_or_else(|| Timestamp::now()), 407 agent: assistant_msg.agent.clone(), 408 model: assistant_msg.model.clone(), 409 attributes, 410 duration_ms: None, 411 parent_span_id: Some(parent_span_id.to_string()), 412 } 413 } 414 415 fn create_tool_span( 416 &self, 417 assistant_msg: &LogMessage, 418 tool: &ToolUsage, 419 log: &OpenCodeLog, 420 parent_span_id: &str, 421 ) -> GenAiSpan { 422 let mut attributes = serde_json::Map::new(); 423 424 attributes.insert( 425 "gen_ai.conversation.id".to_string(), 426 serde_json::json!(log.session_id), 427 ); 428 attributes.insert( 429 "mcp.method.name".to_string(), 430 serde_json::json!("tools/call"), 431 ); 432 attributes.insert("gen_ai.tool.name".to_string(), serde_json::json!(tool.name)); 433 attributes.insert( 434 "gen_ai.operation.name".to_string(), 435 serde_json::json!("execute_tool"), 436 ); 437 attributes.insert( 438 "gen_ai.tool.call.arguments".to_string(), 439 serde_json::json!(tool.arguments), 440 ); 441 442 if let Some(agent) = &assistant_msg.agent { 443 attributes.insert( 444 "gen_ai.system.agent_name".to_string(), 445 serde_json::json!(agent), 446 ); 447 } 448 449 if let Some(model) = &assistant_msg.model { 450 attributes.insert("gen_ai.model.name".to_string(), serde_json::json!(model)); 451 } 452 453 if let Some(result) = &tool.result { 454 attributes.insert( 455 "gen_ai.tool.call.result".to_string(), 456 serde_json::json!(result), 457 ); 458 } 459 460 GenAiSpan { 461 span_type: SpanType::ToolCall, 462 session_id: log.session_id.clone(), 463 timestamp: assistant_msg.timestamp.unwrap_or_else(|| Timestamp::now()), 464 agent: assistant_msg.agent.clone(), 465 model: assistant_msg.model.clone(), 466 attributes, 467 duration_ms: tool.duration_ms, 468 parent_span_id: Some(parent_span_id.to_string()), 469 } 470 } 471 472 pub fn calculate_timestamps(&self, spans: &mut Vec<GenAiSpan>, updated_at: Option<Timestamp>) { 473 let now = updated_at.unwrap_or_else(|| Timestamp::now()); 474 let mut current_end_ms = now.as_millisecond(); 475 476 let mut chat_indices: Vec<usize> = spans 477 .iter() 478 .enumerate() 479 .filter(|(_, s)| s.span_type == SpanType::Chat) 480 .map(|(i, _)| i) 481 .collect(); 482 483 chat_indices.sort_by(|a, b| b.cmp(a)); 484 485 for idx in chat_indices { 486 if let Some(span) = spans.get(idx) { 487 let parent_id = span.span_id(); 488 489 let child_durations: u64 = spans 490 .iter() 491 .filter(|s| s.parent_span_id.as_deref() == Some(parent_id.as_str())) 492 .map(|s| s.duration_ms.unwrap_or(0)) 493 .sum(); 494 495 let duration_ms = span 496 .duration_ms 497 .unwrap_or(child_durations) 498 .max(child_durations); 499 500 let end_ms = current_end_ms; 501 let start_ms = end_ms - duration_ms as i64; 502 503 if let Some(s) = spans.get_mut(idx) { 504 s.timestamp = Timestamp::from_millisecond(start_ms).unwrap(); 505 s.duration_ms = Some(duration_ms); 506 } 507 508 self.set_child_timestamps(spans, parent_id.as_str(), start_ms, end_ms); 509 510 current_end_ms = start_ms - self.span_padding_ms as i64; 511 } 512 } 513 } 514 515 fn set_child_timestamps( 516 &self, 517 spans: &mut [GenAiSpan], 518 parent_id: &str, 519 parent_start_ms: i64, 520 parent_end_ms: i64, 521 ) { 522 let child_indices: Vec<usize> = spans 523 .iter() 524 .enumerate() 525 .filter(|(_, s)| s.parent_span_id.as_deref() == Some(parent_id)) 526 .map(|(i, _)| i) 527 .collect(); 528 529 if child_indices.is_empty() { 530 return; 531 } 532 533 let parent_duration_ms = (parent_end_ms - parent_start_ms) as u64; 534 let child_duration_ms = parent_duration_ms / child_indices.len() as u64; 535 536 let mut current_start_ms = parent_start_ms; 537 538 for idx in child_indices { 539 if let Some(span) = spans.get_mut(idx) { 540 let duration_ms = if span.duration_ms.is_some() { 541 span.duration_ms.unwrap() 542 } else { 543 child_duration_ms 544 }; 545 let start_ms = current_start_ms; 546 let end_ms = start_ms + duration_ms as i64; 547 548 span.timestamp = Timestamp::from_millisecond(start_ms).unwrap(); 549 span.duration_ms = Some(duration_ms); 550 551 current_start_ms = end_ms; 552 } 553 } 554 } 555}