bsky feeds about music
music-atmosphere-feed.plyr.fm/
bsky
feed
zig
1const std = @import("std");
2const fs = std.fs;
3const json = std.json;
4const Atomic = std.atomic.Value;
5const Thread = std.Thread;
6const Mutex = Thread.Mutex;
7
8const STATS_PATH = "/data/stats.json";
9
10pub const Platform = enum { soundcloud, bandcamp, spotify, plyr, apple };
11
12// top posters tracking
13pub const MAX_POSTERS = 100;
14
15pub const PosterEntry = struct {
16 did: [64]u8 = undefined, // DIDs are typically 32 chars but allow headroom
17 did_len: u8 = 0,
18 count: u32 = 0,
19
20 pub fn getDid(self: *const PosterEntry) []const u8 {
21 return self.did[0..self.did_len];
22 }
23
24 pub fn setDid(self: *PosterEntry, did: []const u8) void {
25 const len = @min(did.len, 64);
26 @memcpy(self.did[0..len], did[0..len]);
27 self.did_len = @intCast(len);
28 }
29};
30
31pub const TopPosters = struct {
32 entries: [MAX_POSTERS]PosterEntry = [_]PosterEntry{.{}} ** MAX_POSTERS,
33 count: usize = 0,
34 mutex: Mutex = .{},
35
36 pub fn record(self: *TopPosters, did: []const u8) void {
37 self.mutex.lock();
38 defer self.mutex.unlock();
39
40 // look for existing entry
41 for (self.entries[0..self.count]) |*entry| {
42 if (std.mem.eql(u8, entry.getDid(), did)) {
43 entry.count += 1;
44 return;
45 }
46 }
47
48 // add new entry if space
49 if (self.count < MAX_POSTERS) {
50 self.entries[self.count].setDid(did);
51 self.entries[self.count].count = 1;
52 self.count += 1;
53 }
54 }
55
56 pub fn getTop(self: *TopPosters, comptime N: usize) [N]PosterEntry {
57 self.mutex.lock();
58 defer self.mutex.unlock();
59
60 // copy and sort
61 var sorted: [MAX_POSTERS]PosterEntry = self.entries;
62 const slice = sorted[0..self.count];
63
64 std.mem.sort(PosterEntry, slice, {}, struct {
65 fn cmp(_: void, a: PosterEntry, b: PosterEntry) bool {
66 return a.count > b.count;
67 }
68 }.cmp);
69
70 var result: [N]PosterEntry = [_]PosterEntry{.{}} ** N;
71 const copy_len = @min(N, self.count);
72 @memcpy(result[0..copy_len], slice[0..copy_len]);
73 return result;
74 }
75
76 pub fn toJson(self: *TopPosters, alloc: std.mem.Allocator) ![]const u8 {
77 self.mutex.lock();
78 defer self.mutex.unlock();
79
80 var buf = std.ArrayList(u8).init(alloc);
81 const w = buf.writer();
82 try w.writeAll("{");
83 var first = true;
84 for (self.entries[0..self.count]) |entry| {
85 if (entry.count == 0) continue;
86 if (!first) try w.writeAll(",");
87 first = false;
88 try w.print("\"{s}\":{d}", .{ entry.getDid(), entry.count });
89 }
90 try w.writeAll("}");
91 return buf.toOwnedSlice();
92 }
93
94 pub fn loadFromJson(self: *TopPosters, data: []const u8) void {
95 self.mutex.lock();
96 defer self.mutex.unlock();
97
98 const parsed = json.parseFromSlice(json.Value, std.heap.page_allocator, data, .{}) catch return;
99 defer parsed.deinit();
100
101 if (parsed.value != .object) return;
102 const obj = parsed.value.object;
103
104 self.count = 0;
105 var iter = obj.iterator();
106 while (iter.next()) |kv| {
107 if (self.count >= MAX_POSTERS) break;
108 if (kv.value_ptr.* != .integer) continue;
109 self.entries[self.count].setDid(kv.key_ptr.*);
110 self.entries[self.count].count = @intCast(@max(0, kv.value_ptr.integer));
111 self.count += 1;
112 }
113 }
114};
115
116pub const Stats = struct {
117 started_at: i64,
118 prior_uptime: u64 = 0,
119 messages: Atomic(u64),
120 matches: Atomic(u64),
121 last_event_time_us: Atomic(i64),
122 last_event_received_at: Atomic(i64),
123 last_match_time: Atomic(i64),
124 connected_at: Atomic(i64),
125 last_post_time_ms: Atomic(i64), // actual post creation time from TID
126
127 // platform counters
128 soundcloud: Atomic(u64),
129 bandcamp: Atomic(u64),
130 spotify: Atomic(u64),
131 plyr: Atomic(u64),
132 apple: Atomic(u64),
133
134 // match type counters
135 quote_matches: Atomic(u64), // posts quoting music posts
136 multi_platform: Atomic(u64), // posts with 2+ platforms
137
138 // for lag trend tracking
139 prev_lag_ms: Atomic(i64),
140
141 // top posters
142 top_posters: TopPosters = .{},
143 posters_since: Atomic(i64) = Atomic(i64).init(0), // when tracking started
144
145 pub fn init() Stats {
146 var self = Stats{
147 .started_at = std.time.timestamp(),
148 .messages = Atomic(u64).init(0),
149 .matches = Atomic(u64).init(0),
150 .last_event_time_us = Atomic(i64).init(0),
151 .last_event_received_at = Atomic(i64).init(0),
152 .last_match_time = Atomic(i64).init(0),
153 .connected_at = Atomic(i64).init(0),
154 .last_post_time_ms = Atomic(i64).init(0),
155 .soundcloud = Atomic(u64).init(0),
156 .bandcamp = Atomic(u64).init(0),
157 .spotify = Atomic(u64).init(0),
158 .plyr = Atomic(u64).init(0),
159 .apple = Atomic(u64).init(0),
160 .quote_matches = Atomic(u64).init(0),
161 .multi_platform = Atomic(u64).init(0),
162 .prev_lag_ms = Atomic(i64).init(0),
163 };
164 self.load();
165 return self;
166 }
167
168 fn load(self: *Stats) void {
169 const file = fs.openFileAbsolute(STATS_PATH, .{}) catch return;
170 defer file.close();
171
172 var buf: [16384]u8 = undefined;
173 const len = file.readAll(&buf) catch return;
174 if (len == 0) return;
175
176 const parsed = json.parseFromSlice(json.Value, std.heap.page_allocator, buf[0..len], .{}) catch return;
177 defer parsed.deinit();
178
179 const root = parsed.value.object;
180
181 if (root.get("messages")) |v| if (v == .integer) {
182 self.messages.store(@intCast(@max(0, v.integer)), .monotonic);
183 };
184 if (root.get("cumulative_uptime")) |v| if (v == .integer) {
185 self.prior_uptime = @intCast(@max(0, v.integer));
186 };
187 if (root.get("matches")) |v| if (v == .integer) {
188 self.matches.store(@intCast(@max(0, v.integer)), .monotonic);
189 };
190 if (root.get("soundcloud")) |v| if (v == .integer) {
191 self.soundcloud.store(@intCast(@max(0, v.integer)), .monotonic);
192 };
193 if (root.get("bandcamp")) |v| if (v == .integer) {
194 self.bandcamp.store(@intCast(@max(0, v.integer)), .monotonic);
195 };
196 if (root.get("spotify")) |v| if (v == .integer) {
197 self.spotify.store(@intCast(@max(0, v.integer)), .monotonic);
198 };
199 if (root.get("plyr")) |v| if (v == .integer) {
200 self.plyr.store(@intCast(@max(0, v.integer)), .monotonic);
201 };
202 if (root.get("apple")) |v| if (v == .integer) {
203 self.apple.store(@intCast(@max(0, v.integer)), .monotonic);
204 };
205 if (root.get("quote_matches")) |v| if (v == .integer) {
206 self.quote_matches.store(@intCast(@max(0, v.integer)), .monotonic);
207 };
208 if (root.get("multi_platform")) |v| if (v == .integer) {
209 self.multi_platform.store(@intCast(@max(0, v.integer)), .monotonic);
210 };
211
212 // load top posters
213 if (root.get("top_posters")) |v| if (v == .object) {
214 var iter = v.object.iterator();
215 while (iter.next()) |kv| {
216 if (self.top_posters.count >= MAX_POSTERS) break;
217 if (kv.value_ptr.* != .integer) continue;
218 self.top_posters.entries[self.top_posters.count].setDid(kv.key_ptr.*);
219 self.top_posters.entries[self.top_posters.count].count = @intCast(@max(0, kv.value_ptr.integer));
220 self.top_posters.count += 1;
221 }
222 };
223 if (root.get("posters_since")) |v| if (v == .integer) {
224 self.posters_since.store(v.integer, .monotonic);
225 };
226
227 std.debug.print("loaded stats from {s}\n", .{STATS_PATH});
228 }
229
230 pub fn save(self: *Stats) void {
231 const file = fs.createFileAbsolute(STATS_PATH, .{}) catch return;
232 defer file.close();
233
234 const now = std.time.timestamp();
235 const session_uptime: u64 = @intCast(@max(0, now - self.started_at));
236 const total_uptime = self.prior_uptime + session_uptime;
237
238 // build top_posters json
239 var posters_buf: [8192]u8 = undefined;
240 var posters_len: usize = 0;
241 {
242 self.top_posters.mutex.lock();
243 defer self.top_posters.mutex.unlock();
244
245 posters_buf[0] = '{';
246 posters_len = 1;
247 var first = true;
248 for (self.top_posters.entries[0..self.top_posters.count]) |entry| {
249 if (entry.count == 0) continue;
250 const prefix: []const u8 = if (first) "\"" else ",\"";
251 first = false;
252 const written = std.fmt.bufPrint(posters_buf[posters_len..], "{s}{s}\":{d}", .{
253 prefix,
254 entry.getDid(),
255 entry.count,
256 }) catch break;
257 posters_len += written.len;
258 }
259 posters_buf[posters_len] = '}';
260 posters_len += 1;
261 }
262
263 var buf: [16384]u8 = undefined;
264 const data = std.fmt.bufPrint(&buf,
265 \\{{"messages":{},"matches":{},"cumulative_uptime":{},"soundcloud":{},"bandcamp":{},"spotify":{},"plyr":{},"apple":{},"quote_matches":{},"multi_platform":{},"posters_since":{},"top_posters":{s}}}
266 , .{
267 self.messages.load(.monotonic),
268 self.matches.load(.monotonic),
269 total_uptime,
270 self.soundcloud.load(.monotonic),
271 self.bandcamp.load(.monotonic),
272 self.spotify.load(.monotonic),
273 self.plyr.load(.monotonic),
274 self.apple.load(.monotonic),
275 self.quote_matches.load(.monotonic),
276 self.multi_platform.load(.monotonic),
277 self.posters_since.load(.monotonic),
278 posters_buf[0..posters_len],
279 }) catch return;
280
281 file.writeAll(data) catch return;
282 }
283
284 pub fn totalUptime(self: *Stats) i64 {
285 const now = std.time.timestamp();
286 const session: i64 = now - self.started_at;
287 return @as(i64, @intCast(self.prior_uptime)) + session;
288 }
289
290 pub fn recordMessage(self: *Stats) void {
291 _ = self.messages.fetchAdd(1, .monotonic);
292 }
293
294 pub fn getMessages(self: *const Stats) u64 {
295 return self.messages.load(.monotonic);
296 }
297
298 pub fn recordEvent(self: *Stats, time_us: i64, post_time_ms: i64) void {
299 _ = self.messages.fetchAdd(1, .monotonic);
300 self.last_event_time_us.store(time_us, .monotonic);
301 self.last_event_received_at.store(std.time.milliTimestamp(), .monotonic);
302 // track post creation time (from TID) for actual lag
303 if (post_time_ms > 0) {
304 self.last_post_time_ms.store(post_time_ms, .monotonic);
305 }
306 }
307
308 pub fn getPostLagMs(self: *Stats) i64 {
309 const post_time = self.last_post_time_ms.load(.monotonic);
310 if (post_time == 0) return 0;
311 const now_ms = std.time.milliTimestamp();
312 return now_ms - post_time;
313 }
314
315 pub fn recordMatch(self: *Stats) void {
316 _ = self.matches.fetchAdd(1, .monotonic);
317 self.last_match_time.store(std.time.milliTimestamp(), .monotonic);
318 }
319
320 pub fn recordPlatform(self: *Stats, platform: Platform) void {
321 switch (platform) {
322 .soundcloud => _ = self.soundcloud.fetchAdd(1, .monotonic),
323 .bandcamp => _ = self.bandcamp.fetchAdd(1, .monotonic),
324 .spotify => _ = self.spotify.fetchAdd(1, .monotonic),
325 .plyr => _ = self.plyr.fetchAdd(1, .monotonic),
326 .apple => _ = self.apple.fetchAdd(1, .monotonic),
327 }
328 }
329
330 pub fn getPlatformCounts(self: *const Stats) struct { soundcloud: u64, bandcamp: u64, spotify: u64, plyr: u64, apple: u64 } {
331 return .{
332 .soundcloud = self.soundcloud.load(.monotonic),
333 .bandcamp = self.bandcamp.load(.monotonic),
334 .spotify = self.spotify.load(.monotonic),
335 .plyr = self.plyr.load(.monotonic),
336 .apple = self.apple.load(.monotonic),
337 };
338 }
339
340 pub fn recordQuoteMatch(self: *Stats) void {
341 _ = self.quote_matches.fetchAdd(1, .monotonic);
342 }
343
344 pub fn recordMultiPlatform(self: *Stats) void {
345 _ = self.multi_platform.fetchAdd(1, .monotonic);
346 }
347
348 pub fn recordPoster(self: *Stats, did: []const u8) void {
349 // set tracking start time on first poster
350 if (self.posters_since.load(.monotonic) == 0) {
351 self.posters_since.store(std.time.timestamp(), .monotonic);
352 }
353 self.top_posters.record(did);
354 }
355
356 pub fn getPostersSince(self: *Stats) i64 {
357 return self.posters_since.load(.monotonic);
358 }
359
360 pub fn getTopPosters(self: *Stats, comptime N: usize) [N]PosterEntry {
361 return self.top_posters.getTop(N);
362 }
363
364 pub fn getUniquePosterCount(self: *Stats) usize {
365 self.top_posters.mutex.lock();
366 defer self.top_posters.mutex.unlock();
367 return self.top_posters.count;
368 }
369
370 /// returns concentration of top N posters as percentage of tracked poster posts
371 pub fn getTopConcentration(self: *Stats, comptime N: usize) f64 {
372 self.top_posters.mutex.lock();
373 defer self.top_posters.mutex.unlock();
374
375 // sum all tracked poster posts
376 var total_tracked: u64 = 0;
377 for (self.top_posters.entries[0..self.top_posters.count]) |entry| {
378 total_tracked += entry.count;
379 }
380 if (total_tracked == 0) return 0;
381
382 // sum top N
383 var sorted: [MAX_POSTERS]PosterEntry = self.top_posters.entries;
384 const slice = sorted[0..self.top_posters.count];
385 std.mem.sort(PosterEntry, slice, {}, struct {
386 fn cmp(_: void, a: PosterEntry, b: PosterEntry) bool {
387 return a.count > b.count;
388 }
389 }.cmp);
390
391 var top_sum: u64 = 0;
392 const limit = @min(N, self.top_posters.count);
393 for (slice[0..limit]) |entry| {
394 top_sum += entry.count;
395 }
396
397 return @as(f64, @floatFromInt(top_sum)) / @as(f64, @floatFromInt(total_tracked)) * 100.0;
398 }
399
400 pub fn getQuoteMatches(self: *const Stats) u64 {
401 return self.quote_matches.load(.monotonic);
402 }
403
404 pub fn getMultiPlatform(self: *const Stats) u64 {
405 return self.multi_platform.load(.monotonic);
406 }
407
408 pub fn getMatchRate(self: *Stats) f64 {
409 const uptime_sec = self.totalUptime();
410 if (uptime_sec <= 0) return 0;
411 const uptime_hours: f64 = @as(f64, @floatFromInt(uptime_sec)) / 3600.0;
412 const matches: f64 = @floatFromInt(self.matches.load(.monotonic));
413 return matches / uptime_hours;
414 }
415
416 pub fn recordConnected(self: *Stats) void {
417 self.connected_at.store(std.time.milliTimestamp(), .monotonic);
418 }
419
420 pub fn getLagMs(self: *Stats) i64 {
421 const event_time_us = self.last_event_time_us.load(.monotonic);
422 if (event_time_us == 0) return 0;
423 const event_time_ms = @divTrunc(event_time_us, 1000);
424 const now_ms = std.time.milliTimestamp();
425 return now_ms - event_time_ms;
426 }
427
428 pub fn getMatches(self: *const Stats) u64 {
429 return self.matches.load(.monotonic);
430 }
431
432 pub fn getLastEventReceivedAt(self: *const Stats) i64 {
433 return self.last_event_received_at.load(.monotonic);
434 }
435
436 pub fn getLastMatchTime(self: *const Stats) i64 {
437 return self.last_match_time.load(.monotonic);
438 }
439
440 pub fn getConnectedAt(self: *const Stats) i64 {
441 return self.connected_at.load(.monotonic);
442 }
443
444 pub fn getStatus(self: *Stats) []const u8 {
445 const lag = self.getPostLagMs();
446 if (lag > 60000) return "catching_up";
447 return "live";
448 }
449};
450
451var global_stats: Stats = undefined;
452var initialized: bool = false;
453
454pub fn init() void {
455 global_stats = Stats.init();
456 initialized = true;
457 _ = Thread.spawn(.{}, saveTicker, .{}) catch {};
458}
459
460pub fn get() *Stats {
461 if (!initialized) init();
462 return &global_stats;
463}
464
465fn saveTicker() void {
466 while (true) {
467 Thread.sleep(60 * std.time.ns_per_s);
468 global_stats.save();
469 }
470}
471
472// -----------------------------------------------------------------------------
473// tests
474// -----------------------------------------------------------------------------
475
476test "Stats.recordMessage increments counter" {
477 var s = Stats{
478 .started_at = std.time.timestamp(),
479 .messages = Atomic(u64).init(0),
480 .matches = Atomic(u64).init(0),
481 .last_event_time_us = Atomic(i64).init(0),
482 .last_event_received_at = Atomic(i64).init(0),
483 .last_match_time = Atomic(i64).init(0),
484 .connected_at = Atomic(i64).init(0),
485 .last_post_time_ms = Atomic(i64).init(0),
486 .soundcloud = Atomic(u64).init(0),
487 .bandcamp = Atomic(u64).init(0),
488 .spotify = Atomic(u64).init(0),
489 .plyr = Atomic(u64).init(0),
490 .apple = Atomic(u64).init(0),
491 .quote_matches = Atomic(u64).init(0),
492 .multi_platform = Atomic(u64).init(0),
493 .prev_lag_ms = Atomic(i64).init(0),
494 };
495
496 try std.testing.expectEqual(0, s.getMessages());
497 s.recordMessage();
498 try std.testing.expectEqual(1, s.getMessages());
499 s.recordMessage();
500 try std.testing.expectEqual(2, s.getMessages());
501}
502
503test "Stats.recordPlatform increments platform counters" {
504 var s = Stats{
505 .started_at = std.time.timestamp(),
506 .messages = Atomic(u64).init(0),
507 .matches = Atomic(u64).init(0),
508 .last_event_time_us = Atomic(i64).init(0),
509 .last_event_received_at = Atomic(i64).init(0),
510 .last_match_time = Atomic(i64).init(0),
511 .connected_at = Atomic(i64).init(0),
512 .last_post_time_ms = Atomic(i64).init(0),
513 .soundcloud = Atomic(u64).init(0),
514 .bandcamp = Atomic(u64).init(0),
515 .spotify = Atomic(u64).init(0),
516 .plyr = Atomic(u64).init(0),
517 .apple = Atomic(u64).init(0),
518 .quote_matches = Atomic(u64).init(0),
519 .multi_platform = Atomic(u64).init(0),
520 .prev_lag_ms = Atomic(i64).init(0),
521 };
522
523 s.recordPlatform(.soundcloud);
524 s.recordPlatform(.soundcloud);
525 s.recordPlatform(.bandcamp);
526 s.recordPlatform(.apple);
527
528 const counts = s.getPlatformCounts();
529 try std.testing.expectEqual(2, counts.soundcloud);
530 try std.testing.expectEqual(1, counts.bandcamp);
531 try std.testing.expectEqual(0, counts.spotify);
532 try std.testing.expectEqual(0, counts.plyr);
533 try std.testing.expectEqual(1, counts.apple);
534}
535
536test "Stats.getStatus returns live or catching_up" {
537 var s = Stats{
538 .started_at = std.time.timestamp(),
539 .messages = Atomic(u64).init(0),
540 .matches = Atomic(u64).init(0),
541 .last_event_time_us = Atomic(i64).init(0),
542 .last_event_received_at = Atomic(i64).init(0),
543 .last_match_time = Atomic(i64).init(0),
544 .connected_at = Atomic(i64).init(0),
545 .last_post_time_ms = Atomic(i64).init(0),
546 .soundcloud = Atomic(u64).init(0),
547 .bandcamp = Atomic(u64).init(0),
548 .spotify = Atomic(u64).init(0),
549 .plyr = Atomic(u64).init(0),
550 .apple = Atomic(u64).init(0),
551 .quote_matches = Atomic(u64).init(0),
552 .multi_platform = Atomic(u64).init(0),
553 .prev_lag_ms = Atomic(i64).init(0),
554 };
555
556 // no post time set = live
557 try std.testing.expectEqualStrings("live", s.getStatus());
558
559 // recent post = live
560 s.last_post_time_ms.store(std.time.milliTimestamp(), .monotonic);
561 try std.testing.expectEqualStrings("live", s.getStatus());
562
563 // old post = catching_up
564 s.last_post_time_ms.store(std.time.milliTimestamp() - 120000, .monotonic);
565 try std.testing.expectEqualStrings("catching_up", s.getStatus());
566}