bsky feeds about music music-atmosphere-feed.plyr.fm/
bsky feed zig
at main 566 lines 20 kB view raw
1const std = @import("std"); 2const fs = std.fs; 3const json = std.json; 4const Atomic = std.atomic.Value; 5const Thread = std.Thread; 6const Mutex = Thread.Mutex; 7 8const STATS_PATH = "/data/stats.json"; 9 10pub const Platform = enum { soundcloud, bandcamp, spotify, plyr, apple }; 11 12// top posters tracking 13pub const MAX_POSTERS = 100; 14 15pub const PosterEntry = struct { 16 did: [64]u8 = undefined, // DIDs are typically 32 chars but allow headroom 17 did_len: u8 = 0, 18 count: u32 = 0, 19 20 pub fn getDid(self: *const PosterEntry) []const u8 { 21 return self.did[0..self.did_len]; 22 } 23 24 pub fn setDid(self: *PosterEntry, did: []const u8) void { 25 const len = @min(did.len, 64); 26 @memcpy(self.did[0..len], did[0..len]); 27 self.did_len = @intCast(len); 28 } 29}; 30 31pub const TopPosters = struct { 32 entries: [MAX_POSTERS]PosterEntry = [_]PosterEntry{.{}} ** MAX_POSTERS, 33 count: usize = 0, 34 mutex: Mutex = .{}, 35 36 pub fn record(self: *TopPosters, did: []const u8) void { 37 self.mutex.lock(); 38 defer self.mutex.unlock(); 39 40 // look for existing entry 41 for (self.entries[0..self.count]) |*entry| { 42 if (std.mem.eql(u8, entry.getDid(), did)) { 43 entry.count += 1; 44 return; 45 } 46 } 47 48 // add new entry if space 49 if (self.count < MAX_POSTERS) { 50 self.entries[self.count].setDid(did); 51 self.entries[self.count].count = 1; 52 self.count += 1; 53 } 54 } 55 56 pub fn getTop(self: *TopPosters, comptime N: usize) [N]PosterEntry { 57 self.mutex.lock(); 58 defer self.mutex.unlock(); 59 60 // copy and sort 61 var sorted: [MAX_POSTERS]PosterEntry = self.entries; 62 const slice = sorted[0..self.count]; 63 64 std.mem.sort(PosterEntry, slice, {}, struct { 65 fn cmp(_: void, a: PosterEntry, b: PosterEntry) bool { 66 return a.count > b.count; 67 } 68 }.cmp); 69 70 var result: [N]PosterEntry = [_]PosterEntry{.{}} ** N; 71 const copy_len = @min(N, self.count); 72 @memcpy(result[0..copy_len], slice[0..copy_len]); 73 return result; 74 } 75 76 pub fn toJson(self: *TopPosters, alloc: std.mem.Allocator) ![]const u8 { 77 self.mutex.lock(); 78 defer self.mutex.unlock(); 79 80 var buf = std.ArrayList(u8).init(alloc); 81 const w = buf.writer(); 82 try w.writeAll("{"); 83 var first = true; 84 for (self.entries[0..self.count]) |entry| { 85 if (entry.count == 0) continue; 86 if (!first) try w.writeAll(","); 87 first = false; 88 try w.print("\"{s}\":{d}", .{ entry.getDid(), entry.count }); 89 } 90 try w.writeAll("}"); 91 return buf.toOwnedSlice(); 92 } 93 94 pub fn loadFromJson(self: *TopPosters, data: []const u8) void { 95 self.mutex.lock(); 96 defer self.mutex.unlock(); 97 98 const parsed = json.parseFromSlice(json.Value, std.heap.page_allocator, data, .{}) catch return; 99 defer parsed.deinit(); 100 101 if (parsed.value != .object) return; 102 const obj = parsed.value.object; 103 104 self.count = 0; 105 var iter = obj.iterator(); 106 while (iter.next()) |kv| { 107 if (self.count >= MAX_POSTERS) break; 108 if (kv.value_ptr.* != .integer) continue; 109 self.entries[self.count].setDid(kv.key_ptr.*); 110 self.entries[self.count].count = @intCast(@max(0, kv.value_ptr.integer)); 111 self.count += 1; 112 } 113 } 114}; 115 116pub const Stats = struct { 117 started_at: i64, 118 prior_uptime: u64 = 0, 119 messages: Atomic(u64), 120 matches: Atomic(u64), 121 last_event_time_us: Atomic(i64), 122 last_event_received_at: Atomic(i64), 123 last_match_time: Atomic(i64), 124 connected_at: Atomic(i64), 125 last_post_time_ms: Atomic(i64), // actual post creation time from TID 126 127 // platform counters 128 soundcloud: Atomic(u64), 129 bandcamp: Atomic(u64), 130 spotify: Atomic(u64), 131 plyr: Atomic(u64), 132 apple: Atomic(u64), 133 134 // match type counters 135 quote_matches: Atomic(u64), // posts quoting music posts 136 multi_platform: Atomic(u64), // posts with 2+ platforms 137 138 // for lag trend tracking 139 prev_lag_ms: Atomic(i64), 140 141 // top posters 142 top_posters: TopPosters = .{}, 143 posters_since: Atomic(i64) = Atomic(i64).init(0), // when tracking started 144 145 pub fn init() Stats { 146 var self = Stats{ 147 .started_at = std.time.timestamp(), 148 .messages = Atomic(u64).init(0), 149 .matches = Atomic(u64).init(0), 150 .last_event_time_us = Atomic(i64).init(0), 151 .last_event_received_at = Atomic(i64).init(0), 152 .last_match_time = Atomic(i64).init(0), 153 .connected_at = Atomic(i64).init(0), 154 .last_post_time_ms = Atomic(i64).init(0), 155 .soundcloud = Atomic(u64).init(0), 156 .bandcamp = Atomic(u64).init(0), 157 .spotify = Atomic(u64).init(0), 158 .plyr = Atomic(u64).init(0), 159 .apple = Atomic(u64).init(0), 160 .quote_matches = Atomic(u64).init(0), 161 .multi_platform = Atomic(u64).init(0), 162 .prev_lag_ms = Atomic(i64).init(0), 163 }; 164 self.load(); 165 return self; 166 } 167 168 fn load(self: *Stats) void { 169 const file = fs.openFileAbsolute(STATS_PATH, .{}) catch return; 170 defer file.close(); 171 172 var buf: [16384]u8 = undefined; 173 const len = file.readAll(&buf) catch return; 174 if (len == 0) return; 175 176 const parsed = json.parseFromSlice(json.Value, std.heap.page_allocator, buf[0..len], .{}) catch return; 177 defer parsed.deinit(); 178 179 const root = parsed.value.object; 180 181 if (root.get("messages")) |v| if (v == .integer) { 182 self.messages.store(@intCast(@max(0, v.integer)), .monotonic); 183 }; 184 if (root.get("cumulative_uptime")) |v| if (v == .integer) { 185 self.prior_uptime = @intCast(@max(0, v.integer)); 186 }; 187 if (root.get("matches")) |v| if (v == .integer) { 188 self.matches.store(@intCast(@max(0, v.integer)), .monotonic); 189 }; 190 if (root.get("soundcloud")) |v| if (v == .integer) { 191 self.soundcloud.store(@intCast(@max(0, v.integer)), .monotonic); 192 }; 193 if (root.get("bandcamp")) |v| if (v == .integer) { 194 self.bandcamp.store(@intCast(@max(0, v.integer)), .monotonic); 195 }; 196 if (root.get("spotify")) |v| if (v == .integer) { 197 self.spotify.store(@intCast(@max(0, v.integer)), .monotonic); 198 }; 199 if (root.get("plyr")) |v| if (v == .integer) { 200 self.plyr.store(@intCast(@max(0, v.integer)), .monotonic); 201 }; 202 if (root.get("apple")) |v| if (v == .integer) { 203 self.apple.store(@intCast(@max(0, v.integer)), .monotonic); 204 }; 205 if (root.get("quote_matches")) |v| if (v == .integer) { 206 self.quote_matches.store(@intCast(@max(0, v.integer)), .monotonic); 207 }; 208 if (root.get("multi_platform")) |v| if (v == .integer) { 209 self.multi_platform.store(@intCast(@max(0, v.integer)), .monotonic); 210 }; 211 212 // load top posters 213 if (root.get("top_posters")) |v| if (v == .object) { 214 var iter = v.object.iterator(); 215 while (iter.next()) |kv| { 216 if (self.top_posters.count >= MAX_POSTERS) break; 217 if (kv.value_ptr.* != .integer) continue; 218 self.top_posters.entries[self.top_posters.count].setDid(kv.key_ptr.*); 219 self.top_posters.entries[self.top_posters.count].count = @intCast(@max(0, kv.value_ptr.integer)); 220 self.top_posters.count += 1; 221 } 222 }; 223 if (root.get("posters_since")) |v| if (v == .integer) { 224 self.posters_since.store(v.integer, .monotonic); 225 }; 226 227 std.debug.print("loaded stats from {s}\n", .{STATS_PATH}); 228 } 229 230 pub fn save(self: *Stats) void { 231 const file = fs.createFileAbsolute(STATS_PATH, .{}) catch return; 232 defer file.close(); 233 234 const now = std.time.timestamp(); 235 const session_uptime: u64 = @intCast(@max(0, now - self.started_at)); 236 const total_uptime = self.prior_uptime + session_uptime; 237 238 // build top_posters json 239 var posters_buf: [8192]u8 = undefined; 240 var posters_len: usize = 0; 241 { 242 self.top_posters.mutex.lock(); 243 defer self.top_posters.mutex.unlock(); 244 245 posters_buf[0] = '{'; 246 posters_len = 1; 247 var first = true; 248 for (self.top_posters.entries[0..self.top_posters.count]) |entry| { 249 if (entry.count == 0) continue; 250 const prefix: []const u8 = if (first) "\"" else ",\""; 251 first = false; 252 const written = std.fmt.bufPrint(posters_buf[posters_len..], "{s}{s}\":{d}", .{ 253 prefix, 254 entry.getDid(), 255 entry.count, 256 }) catch break; 257 posters_len += written.len; 258 } 259 posters_buf[posters_len] = '}'; 260 posters_len += 1; 261 } 262 263 var buf: [16384]u8 = undefined; 264 const data = std.fmt.bufPrint(&buf, 265 \\{{"messages":{},"matches":{},"cumulative_uptime":{},"soundcloud":{},"bandcamp":{},"spotify":{},"plyr":{},"apple":{},"quote_matches":{},"multi_platform":{},"posters_since":{},"top_posters":{s}}} 266 , .{ 267 self.messages.load(.monotonic), 268 self.matches.load(.monotonic), 269 total_uptime, 270 self.soundcloud.load(.monotonic), 271 self.bandcamp.load(.monotonic), 272 self.spotify.load(.monotonic), 273 self.plyr.load(.monotonic), 274 self.apple.load(.monotonic), 275 self.quote_matches.load(.monotonic), 276 self.multi_platform.load(.monotonic), 277 self.posters_since.load(.monotonic), 278 posters_buf[0..posters_len], 279 }) catch return; 280 281 file.writeAll(data) catch return; 282 } 283 284 pub fn totalUptime(self: *Stats) i64 { 285 const now = std.time.timestamp(); 286 const session: i64 = now - self.started_at; 287 return @as(i64, @intCast(self.prior_uptime)) + session; 288 } 289 290 pub fn recordMessage(self: *Stats) void { 291 _ = self.messages.fetchAdd(1, .monotonic); 292 } 293 294 pub fn getMessages(self: *const Stats) u64 { 295 return self.messages.load(.monotonic); 296 } 297 298 pub fn recordEvent(self: *Stats, time_us: i64, post_time_ms: i64) void { 299 _ = self.messages.fetchAdd(1, .monotonic); 300 self.last_event_time_us.store(time_us, .monotonic); 301 self.last_event_received_at.store(std.time.milliTimestamp(), .monotonic); 302 // track post creation time (from TID) for actual lag 303 if (post_time_ms > 0) { 304 self.last_post_time_ms.store(post_time_ms, .monotonic); 305 } 306 } 307 308 pub fn getPostLagMs(self: *Stats) i64 { 309 const post_time = self.last_post_time_ms.load(.monotonic); 310 if (post_time == 0) return 0; 311 const now_ms = std.time.milliTimestamp(); 312 return now_ms - post_time; 313 } 314 315 pub fn recordMatch(self: *Stats) void { 316 _ = self.matches.fetchAdd(1, .monotonic); 317 self.last_match_time.store(std.time.milliTimestamp(), .monotonic); 318 } 319 320 pub fn recordPlatform(self: *Stats, platform: Platform) void { 321 switch (platform) { 322 .soundcloud => _ = self.soundcloud.fetchAdd(1, .monotonic), 323 .bandcamp => _ = self.bandcamp.fetchAdd(1, .monotonic), 324 .spotify => _ = self.spotify.fetchAdd(1, .monotonic), 325 .plyr => _ = self.plyr.fetchAdd(1, .monotonic), 326 .apple => _ = self.apple.fetchAdd(1, .monotonic), 327 } 328 } 329 330 pub fn getPlatformCounts(self: *const Stats) struct { soundcloud: u64, bandcamp: u64, spotify: u64, plyr: u64, apple: u64 } { 331 return .{ 332 .soundcloud = self.soundcloud.load(.monotonic), 333 .bandcamp = self.bandcamp.load(.monotonic), 334 .spotify = self.spotify.load(.monotonic), 335 .plyr = self.plyr.load(.monotonic), 336 .apple = self.apple.load(.monotonic), 337 }; 338 } 339 340 pub fn recordQuoteMatch(self: *Stats) void { 341 _ = self.quote_matches.fetchAdd(1, .monotonic); 342 } 343 344 pub fn recordMultiPlatform(self: *Stats) void { 345 _ = self.multi_platform.fetchAdd(1, .monotonic); 346 } 347 348 pub fn recordPoster(self: *Stats, did: []const u8) void { 349 // set tracking start time on first poster 350 if (self.posters_since.load(.monotonic) == 0) { 351 self.posters_since.store(std.time.timestamp(), .monotonic); 352 } 353 self.top_posters.record(did); 354 } 355 356 pub fn getPostersSince(self: *Stats) i64 { 357 return self.posters_since.load(.monotonic); 358 } 359 360 pub fn getTopPosters(self: *Stats, comptime N: usize) [N]PosterEntry { 361 return self.top_posters.getTop(N); 362 } 363 364 pub fn getUniquePosterCount(self: *Stats) usize { 365 self.top_posters.mutex.lock(); 366 defer self.top_posters.mutex.unlock(); 367 return self.top_posters.count; 368 } 369 370 /// returns concentration of top N posters as percentage of tracked poster posts 371 pub fn getTopConcentration(self: *Stats, comptime N: usize) f64 { 372 self.top_posters.mutex.lock(); 373 defer self.top_posters.mutex.unlock(); 374 375 // sum all tracked poster posts 376 var total_tracked: u64 = 0; 377 for (self.top_posters.entries[0..self.top_posters.count]) |entry| { 378 total_tracked += entry.count; 379 } 380 if (total_tracked == 0) return 0; 381 382 // sum top N 383 var sorted: [MAX_POSTERS]PosterEntry = self.top_posters.entries; 384 const slice = sorted[0..self.top_posters.count]; 385 std.mem.sort(PosterEntry, slice, {}, struct { 386 fn cmp(_: void, a: PosterEntry, b: PosterEntry) bool { 387 return a.count > b.count; 388 } 389 }.cmp); 390 391 var top_sum: u64 = 0; 392 const limit = @min(N, self.top_posters.count); 393 for (slice[0..limit]) |entry| { 394 top_sum += entry.count; 395 } 396 397 return @as(f64, @floatFromInt(top_sum)) / @as(f64, @floatFromInt(total_tracked)) * 100.0; 398 } 399 400 pub fn getQuoteMatches(self: *const Stats) u64 { 401 return self.quote_matches.load(.monotonic); 402 } 403 404 pub fn getMultiPlatform(self: *const Stats) u64 { 405 return self.multi_platform.load(.monotonic); 406 } 407 408 pub fn getMatchRate(self: *Stats) f64 { 409 const uptime_sec = self.totalUptime(); 410 if (uptime_sec <= 0) return 0; 411 const uptime_hours: f64 = @as(f64, @floatFromInt(uptime_sec)) / 3600.0; 412 const matches: f64 = @floatFromInt(self.matches.load(.monotonic)); 413 return matches / uptime_hours; 414 } 415 416 pub fn recordConnected(self: *Stats) void { 417 self.connected_at.store(std.time.milliTimestamp(), .monotonic); 418 } 419 420 pub fn getLagMs(self: *Stats) i64 { 421 const event_time_us = self.last_event_time_us.load(.monotonic); 422 if (event_time_us == 0) return 0; 423 const event_time_ms = @divTrunc(event_time_us, 1000); 424 const now_ms = std.time.milliTimestamp(); 425 return now_ms - event_time_ms; 426 } 427 428 pub fn getMatches(self: *const Stats) u64 { 429 return self.matches.load(.monotonic); 430 } 431 432 pub fn getLastEventReceivedAt(self: *const Stats) i64 { 433 return self.last_event_received_at.load(.monotonic); 434 } 435 436 pub fn getLastMatchTime(self: *const Stats) i64 { 437 return self.last_match_time.load(.monotonic); 438 } 439 440 pub fn getConnectedAt(self: *const Stats) i64 { 441 return self.connected_at.load(.monotonic); 442 } 443 444 pub fn getStatus(self: *Stats) []const u8 { 445 const lag = self.getPostLagMs(); 446 if (lag > 60000) return "catching_up"; 447 return "live"; 448 } 449}; 450 451var global_stats: Stats = undefined; 452var initialized: bool = false; 453 454pub fn init() void { 455 global_stats = Stats.init(); 456 initialized = true; 457 _ = Thread.spawn(.{}, saveTicker, .{}) catch {}; 458} 459 460pub fn get() *Stats { 461 if (!initialized) init(); 462 return &global_stats; 463} 464 465fn saveTicker() void { 466 while (true) { 467 Thread.sleep(60 * std.time.ns_per_s); 468 global_stats.save(); 469 } 470} 471 472// ----------------------------------------------------------------------------- 473// tests 474// ----------------------------------------------------------------------------- 475 476test "Stats.recordMessage increments counter" { 477 var s = Stats{ 478 .started_at = std.time.timestamp(), 479 .messages = Atomic(u64).init(0), 480 .matches = Atomic(u64).init(0), 481 .last_event_time_us = Atomic(i64).init(0), 482 .last_event_received_at = Atomic(i64).init(0), 483 .last_match_time = Atomic(i64).init(0), 484 .connected_at = Atomic(i64).init(0), 485 .last_post_time_ms = Atomic(i64).init(0), 486 .soundcloud = Atomic(u64).init(0), 487 .bandcamp = Atomic(u64).init(0), 488 .spotify = Atomic(u64).init(0), 489 .plyr = Atomic(u64).init(0), 490 .apple = Atomic(u64).init(0), 491 .quote_matches = Atomic(u64).init(0), 492 .multi_platform = Atomic(u64).init(0), 493 .prev_lag_ms = Atomic(i64).init(0), 494 }; 495 496 try std.testing.expectEqual(0, s.getMessages()); 497 s.recordMessage(); 498 try std.testing.expectEqual(1, s.getMessages()); 499 s.recordMessage(); 500 try std.testing.expectEqual(2, s.getMessages()); 501} 502 503test "Stats.recordPlatform increments platform counters" { 504 var s = Stats{ 505 .started_at = std.time.timestamp(), 506 .messages = Atomic(u64).init(0), 507 .matches = Atomic(u64).init(0), 508 .last_event_time_us = Atomic(i64).init(0), 509 .last_event_received_at = Atomic(i64).init(0), 510 .last_match_time = Atomic(i64).init(0), 511 .connected_at = Atomic(i64).init(0), 512 .last_post_time_ms = Atomic(i64).init(0), 513 .soundcloud = Atomic(u64).init(0), 514 .bandcamp = Atomic(u64).init(0), 515 .spotify = Atomic(u64).init(0), 516 .plyr = Atomic(u64).init(0), 517 .apple = Atomic(u64).init(0), 518 .quote_matches = Atomic(u64).init(0), 519 .multi_platform = Atomic(u64).init(0), 520 .prev_lag_ms = Atomic(i64).init(0), 521 }; 522 523 s.recordPlatform(.soundcloud); 524 s.recordPlatform(.soundcloud); 525 s.recordPlatform(.bandcamp); 526 s.recordPlatform(.apple); 527 528 const counts = s.getPlatformCounts(); 529 try std.testing.expectEqual(2, counts.soundcloud); 530 try std.testing.expectEqual(1, counts.bandcamp); 531 try std.testing.expectEqual(0, counts.spotify); 532 try std.testing.expectEqual(0, counts.plyr); 533 try std.testing.expectEqual(1, counts.apple); 534} 535 536test "Stats.getStatus returns live or catching_up" { 537 var s = Stats{ 538 .started_at = std.time.timestamp(), 539 .messages = Atomic(u64).init(0), 540 .matches = Atomic(u64).init(0), 541 .last_event_time_us = Atomic(i64).init(0), 542 .last_event_received_at = Atomic(i64).init(0), 543 .last_match_time = Atomic(i64).init(0), 544 .connected_at = Atomic(i64).init(0), 545 .last_post_time_ms = Atomic(i64).init(0), 546 .soundcloud = Atomic(u64).init(0), 547 .bandcamp = Atomic(u64).init(0), 548 .spotify = Atomic(u64).init(0), 549 .plyr = Atomic(u64).init(0), 550 .apple = Atomic(u64).init(0), 551 .quote_matches = Atomic(u64).init(0), 552 .multi_platform = Atomic(u64).init(0), 553 .prev_lag_ms = Atomic(i64).init(0), 554 }; 555 556 // no post time set = live 557 try std.testing.expectEqualStrings("live", s.getStatus()); 558 559 // recent post = live 560 s.last_post_time_ms.store(std.time.milliTimestamp(), .monotonic); 561 try std.testing.expectEqualStrings("live", s.getStatus()); 562 563 // old post = catching_up 564 s.last_post_time_ms.store(std.time.milliTimestamp() - 120000, .monotonic); 565 try std.testing.expectEqualStrings("catching_up", s.getStatus()); 566}