logfire client for zig
at c7a46726fe7ff8bb65e4e72c985aa146fbf6d404 536 lines 16 kB view raw
1//! logfire-zig: zig SDK for pydantic logfire 2//! 3//! a lightweight observability client that sends traces, logs, and metrics 4//! to logfire (or any OTLP-compatible backend) via HTTP/JSON. 5//! 6//! ## quick start 7//! 8//! ```zig 9//! const logfire = @import("logfire"); 10//! 11//! pub fn main() !void { 12//! var lf = try logfire.configure(.{ 13//! .service_name = "my-service", 14//! }); 15//! defer lf.shutdown(); 16//! 17//! logfire.info("hello from zig", .{}); 18//! 19//! const s = logfire.span("do_work", .{ .item_count = 42 }); 20//! defer s.end(); 21//! // ... do work 22//! } 23//! ``` 24 25const std = @import("std"); 26 27pub const Config = @import("config.zig").Config; 28pub const Span = @import("span.zig").Span; 29pub const Exporter = @import("exporter.zig").Exporter; 30pub const Attribute = @import("attribute.zig").Attribute; 31 32const metrics_mod = @import("metrics.zig"); 33pub const Counter = metrics_mod.Counter; 34pub const Gauge = metrics_mod.Gauge; 35pub const UpDownCounter = metrics_mod.UpDownCounter; 36pub const Histogram = metrics_mod.Histogram; 37pub const ExponentialHistogram = metrics_mod.ExponentialHistogram; 38pub const MetricData = metrics_mod.MetricData; 39pub const NumberDataPoint = metrics_mod.NumberDataPoint; 40pub const HistogramDataPoint = metrics_mod.HistogramDataPoint; 41pub const ExponentialHistogramDataPoint = metrics_mod.ExponentialHistogramDataPoint; 42pub const InstrumentOptions = metrics_mod.InstrumentOptions; 43pub const HistogramOptions = metrics_mod.HistogramOptions; 44pub const AggregationTemporality = metrics_mod.AggregationTemporality; 45 46const log_mod = @import("log.zig"); 47pub const Level = log_mod.Level; 48pub const LogRecord = log_mod.LogRecord; 49 50/// global logfire instance (set after configure()) 51var global_instance: ?*Logfire = null; 52var global_mutex: std.Thread.Mutex = .{}; 53 54/// thread-local trace context - each thread gets its own trace 55threadlocal var tl_trace_id: ?[16]u8 = null; 56threadlocal var tl_current_span_id: ?[8]u8 = null; 57threadlocal var tl_active_span_count: u32 = 0; 58 59pub const Logfire = struct { 60 allocator: std.mem.Allocator, 61 config: Config, 62 exporter: Exporter, 63 64 /// pending spans/logs/metrics waiting to be exported 65 pending_spans: std.ArrayList(Span.Data), 66 pending_logs: std.ArrayList(LogRecord), 67 pending_metrics: std.ArrayList(MetricData), 68 pending_mutex: std.Thread.Mutex, 69 70 /// allocated data points (freed on flush) 71 allocated_data_points: std.ArrayList(*NumberDataPoint), 72 73 /// span ID counter (global, just needs to be unique) 74 span_id_counter: std.atomic.Value(u64), 75 76 /// background flush thread 77 flush_thread: ?std.Thread, 78 running: std.atomic.Value(bool), 79 80 pub fn init(allocator: std.mem.Allocator, config: Config) !*Logfire { 81 const resolved = config.resolve(); 82 83 const self = try allocator.create(Logfire); 84 self.* = .{ 85 .allocator = allocator, 86 .config = resolved, 87 .exporter = Exporter.init(allocator, resolved), 88 .pending_spans = .{}, 89 .pending_logs = .{}, 90 .pending_metrics = .{}, 91 .pending_mutex = .{}, 92 .allocated_data_points = .{}, 93 .span_id_counter = std.atomic.Value(u64).init(1), 94 .flush_thread = null, 95 .running = std.atomic.Value(bool).init(true), 96 }; 97 98 // start background flush thread 99 if (resolved.batch_timeout_ms > 0) { 100 self.flush_thread = std.Thread.spawn(.{}, flushLoop, .{self}) catch null; 101 } 102 103 return self; 104 } 105 106 fn flushLoop(self: *Logfire) void { 107 const interval_ns = self.config.batch_timeout_ms * std.time.ns_per_ms; 108 while (self.running.load(.acquire)) { 109 std.Thread.sleep(interval_ns); 110 if (!self.running.load(.acquire)) break; 111 self.flush() catch |e| { 112 std.log.warn("logfire: background flush failed: {}", .{e}); 113 }; 114 } 115 } 116 117 pub fn deinit(self: *Logfire) void { 118 // free any remaining allocated data points 119 for (self.allocated_data_points.items) |dp| { 120 self.allocator.destroy(dp); 121 } 122 self.allocated_data_points.deinit(self.allocator); 123 self.pending_spans.deinit(self.allocator); 124 self.pending_logs.deinit(self.allocator); 125 self.pending_metrics.deinit(self.allocator); 126 self.exporter.deinit(); 127 self.allocator.destroy(self); 128 } 129 130 pub fn shutdown(self: *Logfire) void { 131 // stop flush thread 132 self.running.store(false, .release); 133 if (self.flush_thread) |t| { 134 t.join(); 135 } 136 137 // final flush 138 self.flush() catch |e| { 139 std.log.warn("logfire: flush failed during shutdown: {}", .{e}); 140 }; 141 142 global_mutex.lock(); 143 defer global_mutex.unlock(); 144 if (global_instance == self) { 145 global_instance = null; 146 } 147 148 self.deinit(); 149 } 150 151 pub fn flush(self: *Logfire) !void { 152 self.pending_mutex.lock(); 153 defer self.pending_mutex.unlock(); 154 155 if (self.pending_spans.items.len > 0 or self.pending_logs.items.len > 0 or self.pending_metrics.items.len > 0) { 156 try self.exporter.sendAll( 157 self.pending_spans.items, 158 self.pending_logs.items, 159 self.pending_metrics.items, 160 ); 161 self.pending_spans.clearRetainingCapacity(); 162 self.pending_logs.clearRetainingCapacity(); 163 self.pending_metrics.clearRetainingCapacity(); 164 165 // free allocated data points 166 for (self.allocated_data_points.items) |dp| { 167 self.allocator.destroy(dp); 168 } 169 self.allocated_data_points.clearRetainingCapacity(); 170 } 171 } 172 173 pub fn createSpan(self: *Logfire, name: []const u8, attributes: anytype) Span { 174 // generate new trace ID if no active spans on this thread 175 if (tl_active_span_count == 0) { 176 tl_trace_id = generateTraceId(); 177 } 178 tl_active_span_count += 1; 179 180 // capture parent span ID before creating new span 181 const parent_span_id = tl_current_span_id; 182 const span_id = self.span_id_counter.fetchAdd(1, .monotonic); 183 184 // encode span ID 185 var span_id_bytes: [8]u8 = undefined; 186 std.mem.writeInt(u64, &span_id_bytes, span_id, .big); 187 188 // update current span ID for this thread 189 tl_current_span_id = span_id_bytes; 190 191 return Span.init(self, name, span_id_bytes, tl_trace_id, parent_span_id, attributes); 192 } 193 194 /// called when a span ends to restore parent context 195 pub fn spanEnded(self: *Logfire, parent_span_id: ?[8]u8) void { 196 _ = self; 197 if (tl_active_span_count > 0) { 198 tl_active_span_count -= 1; 199 } 200 // restore parent as current span 201 tl_current_span_id = parent_span_id; 202 } 203 204 /// start a new trace (generates new trace ID for this thread) 205 pub fn newTrace(self: *Logfire) void { 206 _ = self; 207 tl_trace_id = generateTraceId(); 208 } 209 210 /// get the current thread's trace ID 211 pub fn currentTraceId() ?[16]u8 { 212 return tl_trace_id; 213 } 214 215 pub fn recordLog(self: *Logfire, level: Level, message: []const u8, attributes: anytype) void { 216 const record = LogRecord.init( 217 tl_trace_id, 218 level, 219 message, 220 attributes, 221 ); 222 223 self.pending_mutex.lock(); 224 defer self.pending_mutex.unlock(); 225 self.pending_logs.append(self.allocator, record) catch { 226 std.log.warn("logfire: failed to record log", .{}); 227 }; 228 } 229 230 pub fn recordSpanEnd(self: *Logfire, data: Span.Data) void { 231 self.pending_mutex.lock(); 232 defer self.pending_mutex.unlock(); 233 self.pending_spans.append(self.allocator, data) catch { 234 std.log.warn("logfire: failed to record span", .{}); 235 }; 236 } 237 238 pub fn recordMetric(self: *Logfire, data: MetricData) void { 239 self.pending_mutex.lock(); 240 defer self.pending_mutex.unlock(); 241 self.pending_metrics.append(self.allocator, data) catch { 242 std.log.warn("logfire: failed to record metric", .{}); 243 }; 244 } 245 246 /// record a counter value (monotonic sum, delta temporality) 247 pub fn recordCounter(self: *Logfire, name: []const u8, value: i64, opts: InstrumentOptions) void { 248 const now = std.time.nanoTimestamp(); 249 const dp = self.allocator.create(NumberDataPoint) catch return; 250 dp.* = .{ 251 .start_time_ns = now, 252 .time_ns = now, 253 .value = .{ .int = value }, 254 }; 255 256 self.pending_mutex.lock(); 257 defer self.pending_mutex.unlock(); 258 259 // track allocation for cleanup 260 self.allocated_data_points.append(self.allocator, dp) catch { 261 self.allocator.destroy(dp); 262 return; 263 }; 264 265 self.pending_metrics.append(self.allocator, .{ 266 .name = name, 267 .description = opts.description, 268 .unit = opts.unit, 269 .data = .{ 270 .sum = .{ 271 .data_points = @as(*const [1]NumberDataPoint, dp), 272 .temporality = .delta, // delta since we send individual increments 273 .is_monotonic = true, 274 }, 275 }, 276 }) catch {}; 277 } 278 279 /// record a gauge value (instantaneous) 280 pub fn recordGaugeInt(self: *Logfire, name: []const u8, value: i64, opts: InstrumentOptions) void { 281 const now = std.time.nanoTimestamp(); 282 const dp = self.allocator.create(NumberDataPoint) catch return; 283 dp.* = .{ 284 .start_time_ns = now, 285 .time_ns = now, 286 .value = .{ .int = value }, 287 }; 288 289 self.pending_mutex.lock(); 290 defer self.pending_mutex.unlock(); 291 292 // track allocation for cleanup 293 self.allocated_data_points.append(self.allocator, dp) catch { 294 self.allocator.destroy(dp); 295 return; 296 }; 297 298 self.pending_metrics.append(self.allocator, .{ 299 .name = name, 300 .description = opts.description, 301 .unit = opts.unit, 302 .data = .{ 303 .gauge = .{ 304 .data_points = @as(*const [1]NumberDataPoint, dp), 305 }, 306 }, 307 }) catch {}; 308 } 309 310 /// record a gauge value (instantaneous, f64) 311 pub fn recordGaugeDouble(self: *Logfire, name: []const u8, value: f64, opts: InstrumentOptions) void { 312 const now = std.time.nanoTimestamp(); 313 const dp = self.allocator.create(NumberDataPoint) catch return; 314 dp.* = .{ 315 .start_time_ns = now, 316 .time_ns = now, 317 .value = .{ .double = value }, 318 }; 319 320 self.pending_mutex.lock(); 321 defer self.pending_mutex.unlock(); 322 323 // track allocation for cleanup 324 self.allocated_data_points.append(self.allocator, dp) catch { 325 self.allocator.destroy(dp); 326 return; 327 }; 328 329 self.pending_metrics.append(self.allocator, .{ 330 .name = name, 331 .description = opts.description, 332 .unit = opts.unit, 333 .data = .{ 334 .gauge = .{ 335 .data_points = @as(*const [1]NumberDataPoint, dp), 336 }, 337 }, 338 }) catch {}; 339 } 340 341 fn generateTraceId() [16]u8 { 342 var id: [16]u8 = undefined; 343 std.crypto.random.bytes(&id); 344 return id; 345 } 346}; 347 348/// configure logfire with the given options 349pub fn configure(options: Config) !*Logfire { 350 const allocator = std.heap.page_allocator; 351 const instance = try Logfire.init(allocator, options); 352 353 global_mutex.lock(); 354 defer global_mutex.unlock(); 355 global_instance = instance; 356 357 return instance; 358} 359 360/// get the global logfire instance 361pub fn getInstance() ?*Logfire { 362 global_mutex.lock(); 363 defer global_mutex.unlock(); 364 return global_instance; 365} 366 367// convenience functions that use the global instance 368 369pub fn span(name: []const u8, attributes: anytype) Span { 370 if (getInstance()) |lf| { 371 return lf.createSpan(name, attributes); 372 } 373 return Span.noop(); 374} 375 376/// start a new trace (generates new trace ID for subsequent spans) 377pub fn newTrace() void { 378 if (getInstance()) |lf| { 379 lf.newTrace(); 380 } 381} 382 383pub fn trace(comptime fmt: []const u8, args: anytype) void { 384 logWithLevel(.trace, fmt, args); 385} 386 387pub fn debug(comptime fmt: []const u8, args: anytype) void { 388 logWithLevel(.debug, fmt, args); 389} 390 391pub fn info(comptime fmt: []const u8, args: anytype) void { 392 logWithLevel(.info, fmt, args); 393} 394 395pub fn warn(comptime fmt: []const u8, args: anytype) void { 396 logWithLevel(.warn, fmt, args); 397} 398 399pub fn err(comptime fmt: []const u8, args: anytype) void { 400 logWithLevel(.err, fmt, args); 401} 402 403fn logWithLevel(level: Level, comptime fmt: []const u8, args: anytype) void { 404 if (getInstance()) |lf| { 405 var buf: [4096]u8 = undefined; 406 const message = std.fmt.bufPrint(&buf, fmt, args) catch fmt; 407 // dupe the message since buf is on the stack 408 const owned_message = lf.allocator.dupe(u8, message) catch return; 409 lf.recordLog(level, owned_message, .{}); 410 } 411} 412 413// metric convenience functions 414 415pub fn counter(name: []const u8, value: i64) void { 416 if (getInstance()) |lf| { 417 lf.recordCounter(name, value, .{}); 418 } 419} 420 421pub fn counterWithOpts(name: []const u8, value: i64, opts: InstrumentOptions) void { 422 if (getInstance()) |lf| { 423 lf.recordCounter(name, value, opts); 424 } 425} 426 427pub fn gaugeInt(name: []const u8, value: i64) void { 428 if (getInstance()) |lf| { 429 lf.recordGaugeInt(name, value, .{}); 430 } 431} 432 433pub fn gaugeDouble(name: []const u8, value: f64) void { 434 if (getInstance()) |lf| { 435 lf.recordGaugeDouble(name, value, .{}); 436 } 437} 438 439pub fn metric(data: MetricData) void { 440 if (getInstance()) |lf| { 441 lf.recordMetric(data); 442 } 443} 444 445// tests 446 447test "basic configuration" { 448 const lf = try configure(.{ 449 .service_name = "test-service", 450 .send_to_logfire = .no, 451 }); 452 defer lf.shutdown(); 453 454 try std.testing.expect(getInstance() != null); 455} 456 457test "span creation" { 458 const lf = try configure(.{ 459 .service_name = "test-service", 460 .send_to_logfire = .no, 461 }); 462 defer lf.shutdown(); 463 464 const s = span("test.operation", .{}); 465 defer s.end(); 466 467 try std.testing.expect(s.active); 468} 469 470test "logging" { 471 const lf = try configure(.{ 472 .service_name = "test-service", 473 .send_to_logfire = .no, 474 }); 475 defer lf.shutdown(); 476 477 info("test message with {d}", .{42}); 478 479 try std.testing.expectEqual(@as(usize, 1), lf.pending_logs.items.len); 480} 481 482test "metrics recording" { 483 const lf = try configure(.{ 484 .service_name = "test-service", 485 .send_to_logfire = .no, 486 }); 487 defer lf.shutdown(); 488 489 counter("requests.total", 1); 490 gaugeInt("connections.active", 42); 491 492 try std.testing.expectEqual(@as(usize, 2), lf.pending_metrics.items.len); 493} 494 495test "parent-child span linking" { 496 const lf = try configure(.{ 497 .service_name = "test-service", 498 .send_to_logfire = .no, 499 }); 500 defer lf.shutdown(); 501 502 // create parent span 503 const parent = span("parent.operation", .{}); 504 const parent_span_id = parent.data.span_id; 505 const parent_trace_id = parent.data.trace_id; 506 507 // parent should have no parent 508 try std.testing.expect(parent.data.parent_span_id == null); 509 510 // create child span while parent is active 511 const child = span("child.operation", .{}); 512 513 // child should have parent's span_id as parent_span_id 514 try std.testing.expect(child.data.parent_span_id != null); 515 try std.testing.expectEqualSlices(u8, &parent_span_id, &child.data.parent_span_id.?); 516 517 // child should share parent's trace_id 518 try std.testing.expectEqualSlices(u8, &parent_trace_id, &child.data.trace_id); 519 520 // end spans (child first due to defer order) 521 child.end(); 522 parent.end(); 523 524 // should have recorded 2 spans 525 try std.testing.expectEqual(@as(usize, 2), lf.pending_spans.items.len); 526} 527 528// re-export tests from submodules 529test { 530 _ = @import("config.zig"); 531 _ = @import("exporter.zig"); 532 _ = @import("span.zig"); 533 _ = @import("log.zig"); 534 _ = @import("attribute.zig"); 535 _ = @import("metrics.zig"); 536}