A SpaceTraders Agent

create rate-limited http client for spacetraders

altagos.dev e513358c b6f5095b

verified
+302
+302
src/st/http.zig
··· 1 + const std = @import("std"); 2 + const HTTPClient = std.http.Client; 3 + const Io = std.Io; 4 + const json = std.json; 5 + 6 + const models = @import("models.zig"); 7 + 8 + const TIME_SLEEP_FACTOR: f64 = 0.98; 9 + 10 + const log = std.log.scoped(.SpaceTraders); 11 + 12 + const Semaphore = struct { 13 + mutex: Io.Mutex = .{ .state = .unlocked }, 14 + cond: Io.Condition = .{}, 15 + /// It is OK to initialise this field to any value. 16 + permits: u64 = 0, 17 + 18 + pub fn post(sem: *Semaphore, io: Io) void { 19 + sem.mutex.lockUncancelable(io); 20 + defer sem.mutex.unlock(io); 21 + 22 + sem.permits += 1; 23 + sem.cond.signal(io); 24 + } 25 + 26 + pub fn set(sem: *Semaphore, io: Io, permits: u64) void { 27 + sem.mutex.lockUncancelable(io); 28 + defer sem.mutex.unlock(io); 29 + 30 + sem.permits = permits; 31 + sem.cond.signal(io); 32 + } 33 + 34 + pub fn wait(sem: *Semaphore, io: Io) !void { 35 + sem.mutex.lockUncancelable(io); 36 + defer sem.mutex.unlock(io); 37 + 38 + while (sem.permits == 0) 39 + try sem.cond.wait(io, &sem.mutex); 40 + 41 + sem.permits -= 1; 42 + if (sem.permits > 0) 43 + sem.cond.signal(io); 44 + } 45 + 46 + pub fn available(sem: *Semaphore, io: Io) u64 { 47 + sem.mutex.lockUncancelable(io); 48 + defer sem.mutex.unlock(io); 49 + 50 + return sem.permits; 51 + } 52 + }; 53 + 54 + pub const Limiter = struct { 55 + points: u64, 56 + duration: i64, 57 + time: ?Io.Timestamp, 58 + 59 + mutex: Io.Mutex = .{ .state = .unlocked }, 60 + semaphor: Semaphore, 61 + 62 + pub fn init(opts: struct { points: u54 = 2, duration: i64 = 1000 }) Limiter { 63 + return .{ 64 + .points = opts.points, 65 + .duration = opts.duration, 66 + .time = null, 67 + .semaphor = Semaphore{ .permits = opts.points }, 68 + }; 69 + } 70 + 71 + pub fn checkReset(l: *Limiter, io: Io) bool { 72 + l.mutex.lock(io) catch return false; 73 + defer l.mutex.unlock(io); 74 + 75 + if (l.time) |t| { 76 + const dur = t.durationTo(Io.Clock.now(.real, io) catch return false); 77 + if (dur.toSeconds() > 0) { 78 + l.semaphor.set(io, l.points); 79 + l.time = null; 80 + return true; 81 + } 82 + } 83 + 84 + return false; 85 + } 86 + 87 + pub fn aquire(l: *Limiter, io: Io) !void { 88 + try l.mutex.lock(io); 89 + defer l.mutex.unlock(io); 90 + 91 + if (l.time == null) { 92 + const now = try Io.Clock.now(.real, io); 93 + l.time = now.addDuration(.fromMilliseconds(l.duration)); 94 + } 95 + 96 + return l.semaphor.wait(io); 97 + } 98 + 99 + pub fn timeToReset(l: *Limiter, io: Io) i64 { 100 + if (l.time) |t| { 101 + return t.durationTo(Io.Clock.now(.real, io) catch return 0).raw.toMilliseconds(); 102 + } 103 + return 0; 104 + } 105 + 106 + pub fn available(l: *Limiter, io: Io) bool { 107 + return l.semaphor.available(io) > 0; 108 + } 109 + }; 110 + 111 + pub const BurstyLimiter = struct { 112 + static: Limiter, 113 + burst: Limiter, 114 + 115 + pub fn wait(bl: *BurstyLimiter, io: Io) !bool { 116 + _ = bl.static.checkReset(io); 117 + _ = bl.burst.checkReset(io); 118 + 119 + if (!bl.static.available(io)) { 120 + if (bl.burst.available(io)) { 121 + log.debug("Using Burst", .{}); 122 + try bl.burst.aquire(io); 123 + return true; 124 + } else { 125 + log.warn("No request available, waiting", .{}); 126 + } 127 + } 128 + 129 + try bl.static.aquire(io); 130 + return true; 131 + } 132 + }; 133 + 134 + pub const AuthType = enum { account, agent, none }; 135 + 136 + pub const Auth = struct { 137 + account: []const u8 = "", 138 + agent: []const u8 = "", 139 + }; 140 + 141 + pub const RequestOptions = struct { 142 + method: std.http.Method = .GET, 143 + auth: AuthType = .none, 144 + body: Body = .empty, 145 + 146 + pub const Body = union(enum) { 147 + empty: void, 148 + buffer: []u8, 149 + }; 150 + 151 + pub fn authorization(opts: *const RequestOptions, client: *const Client) HTTPClient.Request.Headers.Value { 152 + switch (opts.auth) { 153 + .account => return .{ .override = client.auth.account }, 154 + .agent => return .{ .override = client.auth.agent }, 155 + .none => return .{ .omit = {} }, 156 + } 157 + } 158 + }; 159 + 160 + pub const RequestError = error{ 161 + OutOfMemory, 162 + InvalidResponse, 163 + RateLimiterError, 164 + } || HTTPClient.RequestError || HTTPClient.Request.ReceiveHeadError || std.Uri.ParseError; 165 + 166 + pub fn RawResponse(comptime T: type) type { 167 + return Io.Future(RequestError!json.Parsed(T)); 168 + } 169 + 170 + pub fn Response(comptime T: type) type { 171 + return RawResponse(models.Wrapper(T)); 172 + } 173 + 174 + pub const Client = struct { 175 + allocator: std.mem.Allocator, 176 + io: Io, 177 + limiter: BurstyLimiter, 178 + 179 + base_url: []const u8, 180 + auth: Auth, 181 + 182 + http: HTTPClient, 183 + 184 + pub fn init( 185 + allocator: std.mem.Allocator, 186 + io: std.Io, 187 + opts: struct { 188 + base_url: []const u8 = "https://api.spacetraders.io/v2", 189 + auth: Auth = .{}, 190 + }, 191 + ) Client { 192 + return .{ 193 + .allocator = allocator, 194 + .io = io, 195 + .limiter = .{ 196 + .static = .init(.{}), 197 + .burst = .init(.{ .points = 30, .duration = 60_000 }), 198 + }, 199 + .base_url = opts.base_url, 200 + .auth = opts.auth, 201 + .http = .{ .allocator = allocator, .io = io }, 202 + }; 203 + } 204 + 205 + pub fn deinit(client: *Client) void { 206 + client.http.deinit(); 207 + } 208 + 209 + pub fn request( 210 + client: *Client, 211 + comptime T: type, 212 + comptime path: []const u8, 213 + args: anytype, 214 + opts: RequestOptions, 215 + ) !RawResponse(T) { 216 + const path_fmt = try std.fmt.allocPrint(client.allocator, path, args); 217 + defer client.allocator.free(path_fmt); 218 + 219 + const url = try std.fmt.allocPrint(client.allocator, "{s}{s}", .{ client.base_url, path_fmt }); 220 + 221 + const Wrapper = struct { 222 + fn call( 223 + cl: *Client, 224 + url_param: []const u8, 225 + opts_param: *const RequestOptions, 226 + ) RequestError!json.Parsed(T) { 227 + defer cl.allocator.free(url_param); 228 + if (cl.limiter.wait(cl.io) catch return error.RateLimiterError) 229 + return Client.requestRaw(cl, T, url_param, opts_param); 230 + return error.RateLimiterError; 231 + } 232 + }; 233 + 234 + return client.io.concurrent( 235 + Wrapper.call, 236 + .{ client, url, &opts }, 237 + ); 238 + } 239 + 240 + pub fn requestRaw( 241 + client: *Client, 242 + comptime T: type, 243 + url: []const u8, 244 + opts: *const RequestOptions, 245 + ) RequestError!json.Parsed(T) { 246 + const uri = std.Uri.parse(url) catch |err| { 247 + log.err("Error parsing url: {} - url = {s}", .{ err, url }); 248 + return err; 249 + }; 250 + 251 + var req = try client.http.request(opts.method, uri, .{ 252 + .headers = .{ 253 + .authorization = opts.authorization(client), 254 + .user_agent = .{ .override = "All your codebases are belong to us" }, 255 + }, 256 + }); 257 + defer req.deinit(); 258 + 259 + log.debug("requesting: {s}", .{uri.path.percent_encoded}); 260 + 261 + switch (opts.body) { 262 + .empty => try req.sendBodiless(), 263 + .buffer => |body| try req.sendBodyComplete(body), 264 + } 265 + 266 + var redirect_buffer: [1024]u8 = undefined; 267 + 268 + var response = try req.receiveHead(&redirect_buffer); 269 + const colour = blk: { 270 + if (std.mem.eql(u8, response.head.reason, "OK")) { 271 + break :blk "\x1b[92m"; 272 + } else { 273 + break :blk "\x1b[1m\x1b[91m"; 274 + } 275 + }; 276 + log.debug( 277 + "\x1b[2m[path = {s}]\x1b[0m received {s}{d} {s}\x1b[0m", 278 + .{ url[client.base_url.len..], colour, response.head.status, response.head.reason }, 279 + ); 280 + 281 + // var header_iter = response.head.iterateHeaders(); 282 + // while (header_iter.next()) |header| { 283 + // log.debug("{s}: {s}", .{ header.name, header.value }); 284 + // } 285 + 286 + var decompress_buffer: [std.compress.flate.max_window_len]u8 = undefined; 287 + var transfer_buffer: [64]u8 = undefined; 288 + var decompress: std.http.Decompress = undefined; 289 + 290 + const decompressed_body_reader = response.readerDecompressing(&transfer_buffer, &decompress, &decompress_buffer); 291 + 292 + var json_reader: json.Reader = .init(client.allocator, decompressed_body_reader); 293 + defer json_reader.deinit(); 294 + 295 + return json.parseFromTokenSource(T, client.allocator, &json_reader, .{ 296 + .ignore_unknown_fields = true, 297 + }) catch |err| { 298 + log.err("Error parsing response: {}", .{err}); 299 + return RequestError.InvalidResponse; 300 + }; 301 + } 302 + };