search for standard sites pub-search.waow.tech
search zig blog atproto
at multi-platform-schema 297 lines 10 kB view raw
1const std = @import("std"); 2const mem = std.mem; 3const json = std.json; 4const posix = std.posix; 5const Allocator = mem.Allocator; 6const websocket = @import("websocket"); 7const zat = @import("zat"); 8const indexer = @import("indexer.zig"); 9 10const DOCUMENT_COLLECTION = "pub.leaflet.document"; 11const PUBLICATION_COLLECTION = "pub.leaflet.publication"; 12 13fn getTapHost() []const u8 { 14 return posix.getenv("TAP_HOST") orelse "leaflet-search-tap.fly.dev"; 15} 16 17fn getTapPort() u16 { 18 const port_str = posix.getenv("TAP_PORT") orelse "443"; 19 return std.fmt.parseInt(u16, port_str, 10) catch 443; 20} 21 22fn useTls() bool { 23 return getTapPort() == 443; 24} 25 26pub fn consumer(allocator: Allocator) void { 27 var backoff: u64 = 1; 28 const max_backoff: u64 = 30; 29 30 while (true) { 31 const connected = connect(allocator); 32 if (connected) |_| { 33 // connection succeeded then closed - reset backoff 34 backoff = 1; 35 std.debug.print("tap connection closed, reconnecting immediately...\n", .{}); 36 } else |err| { 37 // connection failed - backoff 38 std.debug.print("tap error: {}, reconnecting in {}s...\n", .{ err, backoff }); 39 posix.nanosleep(backoff, 0); 40 backoff = @min(backoff * 2, max_backoff); 41 } 42 } 43} 44 45const Handler = struct { 46 allocator: Allocator, 47 msg_count: usize = 0, 48 49 pub fn serverMessage(self: *Handler, data: []const u8) !void { 50 self.msg_count += 1; 51 if (self.msg_count % 100 == 1) { 52 std.debug.print("tap: received {} messages\n", .{self.msg_count}); 53 } 54 processMessage(self.allocator, data) catch |err| { 55 std.debug.print("message processing error: {}\n", .{err}); 56 }; 57 } 58 59 pub fn close(_: *Handler) void { 60 std.debug.print("tap connection closed\n", .{}); 61 } 62}; 63 64fn connect(allocator: Allocator) !void { 65 const host = getTapHost(); 66 const port = getTapPort(); 67 const tls = useTls(); 68 const path = "/channel"; 69 70 std.debug.print("connecting to {s}://{s}:{d}{s}\n", .{ if (tls) "wss" else "ws", host, port, path }); 71 72 var client = websocket.Client.init(allocator, .{ 73 .host = host, 74 .port = port, 75 .tls = tls, 76 .max_size = 1024 * 1024, // 1MB 77 }) catch |err| { 78 std.debug.print("websocket client init failed: {}\n", .{err}); 79 return err; 80 }; 81 defer client.deinit(); 82 83 var host_header_buf: [256]u8 = undefined; 84 const host_header = std.fmt.bufPrint(&host_header_buf, "Host: {s}\r\n", .{host}) catch host; 85 86 client.handshake(path, .{ .headers = host_header }) catch |err| { 87 std.debug.print("websocket handshake failed: {}\n", .{err}); 88 return err; 89 }; 90 91 std.debug.print("tap connected!\n", .{}); 92 93 var handler = Handler{ .allocator = allocator }; 94 client.readLoop(&handler) catch |err| { 95 std.debug.print("websocket read loop error: {}\n", .{err}); 96 return err; 97 }; 98} 99 100/// TAP record envelope - extracted via zat.json.extractAt 101const TapRecord = struct { 102 collection: []const u8, 103 action: zat.CommitAction, 104 did: []const u8, 105 rkey: []const u8, 106}; 107 108/// Leaflet document fields 109const LeafletDocument = struct { 110 title: []const u8, 111 publication: ?[]const u8 = null, 112 publishedAt: ?[]const u8 = null, 113 createdAt: ?[]const u8 = null, 114 description: ?[]const u8 = null, 115}; 116 117/// Leaflet publication fields 118const LeafletPublication = struct { 119 name: []const u8, 120 description: ?[]const u8 = null, 121 base_path: ?[]const u8 = null, 122}; 123 124fn processMessage(allocator: Allocator, payload: []const u8) !void { 125 const parsed = json.parseFromSlice(json.Value, allocator, payload, .{}) catch return; 126 defer parsed.deinit(); 127 128 // check message type 129 const msg_type = zat.json.getString(parsed.value, "type") orelse return; 130 if (!mem.eql(u8, msg_type, "record")) return; 131 132 // extract record envelope 133 const rec = zat.json.extractAt(TapRecord, allocator, parsed.value, .{"record"}) catch return; 134 135 // validate DID 136 const did = zat.Did.parse(rec.did) orelse return; 137 138 // build AT-URI string 139 const uri = try std.fmt.allocPrint(allocator, "at://{s}/{s}/{s}", .{ did.raw, rec.collection, rec.rkey }); 140 defer allocator.free(uri); 141 142 switch (rec.action) { 143 .create, .update => { 144 const record_obj = zat.json.getObject(parsed.value, "record.record") orelse return; 145 146 if (mem.eql(u8, rec.collection, DOCUMENT_COLLECTION)) { 147 processDocument(allocator, uri, did.raw, rec.rkey, record_obj) catch |err| { 148 std.debug.print("document processing error: {}\n", .{err}); 149 }; 150 } else if (mem.eql(u8, rec.collection, PUBLICATION_COLLECTION)) { 151 processPublication(allocator, uri, did.raw, rec.rkey, record_obj) catch |err| { 152 std.debug.print("publication processing error: {}\n", .{err}); 153 }; 154 } 155 }, 156 .delete => { 157 if (mem.eql(u8, rec.collection, DOCUMENT_COLLECTION)) { 158 indexer.deleteDocument(uri); 159 std.debug.print("deleted document: {s}\n", .{uri}); 160 } else if (mem.eql(u8, rec.collection, PUBLICATION_COLLECTION)) { 161 indexer.deletePublication(uri); 162 std.debug.print("deleted publication: {s}\n", .{uri}); 163 } 164 }, 165 } 166} 167 168fn processDocument(allocator: Allocator, uri: []const u8, did: []const u8, rkey: []const u8, record: json.ObjectMap) !void { 169 const record_val: json.Value = .{ .object = record }; 170 171 // extract known fields via struct 172 const doc = zat.json.extractAt(LeafletDocument, allocator, record_val, .{}) catch return; 173 const created_at = doc.publishedAt orelse doc.createdAt; 174 175 // extract tags array 176 var tags_list: std.ArrayList([]const u8) = .{}; 177 defer tags_list.deinit(allocator); 178 if (zat.json.getArray(record_val, "tags")) |tags| { 179 for (tags) |tag_item| { 180 if (tag_item == .string) { 181 try tags_list.append(allocator, tag_item.string); 182 } 183 } 184 } 185 186 // extract plaintext from pages 187 var content_buf: std.ArrayList(u8) = .{}; 188 defer content_buf.deinit(allocator); 189 190 if (doc.description) |desc| { 191 if (desc.len > 0) { 192 try content_buf.appendSlice(allocator, desc); 193 } 194 } 195 196 if (zat.json.getArray(record_val, "pages")) |pages| { 197 for (pages) |page| { 198 if (page == .object) { 199 try extractPlaintextFromPage(allocator, &content_buf, page.object); 200 } 201 } 202 } 203 204 if (content_buf.items.len == 0) return; 205 206 try indexer.insertDocument(uri, did, rkey, doc.title, content_buf.items, created_at, doc.publication, tags_list.items); 207 std.debug.print("indexed document: {s} ({} chars, {} tags)\n", .{ uri, content_buf.items.len, tags_list.items.len }); 208} 209 210fn extractPlaintextFromPage(allocator: Allocator, buf: *std.ArrayList(u8), page: json.ObjectMap) !void { 211 // pages can be linearDocument or canvas 212 // linearDocument has blocks array 213 const blocks_val = page.get("blocks") orelse return; 214 if (blocks_val != .array) return; 215 216 for (blocks_val.array.items) |block_wrapper| { 217 if (block_wrapper != .object) continue; 218 219 // block wrapper has "block" field with actual content 220 const block_val = block_wrapper.object.get("block") orelse continue; 221 if (block_val != .object) continue; 222 223 try extractTextFromBlock(allocator, buf, block_val.object); 224 } 225} 226 227fn extractTextFromBlock(allocator: Allocator, buf: *std.ArrayList(u8), block: json.ObjectMap) Allocator.Error!void { 228 const type_val = block.get("$type") orelse return; 229 if (type_val != .string) return; 230 231 const block_type = type_val.string; 232 233 // blocks with plaintext field: text, header, blockquote, code 234 if (mem.eql(u8, block_type, "pub.leaflet.blocks.text") or 235 mem.eql(u8, block_type, "pub.leaflet.blocks.header") or 236 mem.eql(u8, block_type, "pub.leaflet.blocks.blockquote") or 237 mem.eql(u8, block_type, "pub.leaflet.blocks.code")) 238 { 239 if (block.get("plaintext")) |plaintext_val| { 240 if (plaintext_val == .string) { 241 if (buf.items.len > 0) { 242 try buf.appendSlice(allocator, " "); 243 } 244 try buf.appendSlice(allocator, plaintext_val.string); 245 } 246 } 247 } 248 // button has text field 249 else if (mem.eql(u8, block_type, "pub.leaflet.blocks.button")) { 250 if (block.get("text")) |text_val| { 251 if (text_val == .string) { 252 if (buf.items.len > 0) { 253 try buf.appendSlice(allocator, " "); 254 } 255 try buf.appendSlice(allocator, text_val.string); 256 } 257 } 258 } 259 // unorderedList has children array with nested content 260 else if (mem.eql(u8, block_type, "pub.leaflet.blocks.unorderedList")) { 261 if (block.get("children")) |children_val| { 262 if (children_val == .array) { 263 for (children_val.array.items) |child| { 264 try extractListItemText(allocator, buf, child); 265 } 266 } 267 } 268 } 269} 270 271fn extractListItemText(allocator: Allocator, buf: *std.ArrayList(u8), item: json.Value) Allocator.Error!void { 272 if (item != .object) return; 273 274 // list item has content field which is a block 275 if (item.object.get("content")) |content_val| { 276 if (content_val == .object) { 277 try extractTextFromBlock(allocator, buf, content_val.object); 278 } 279 } 280 281 // nested children 282 if (item.object.get("children")) |children_val| { 283 if (children_val == .array) { 284 for (children_val.array.items) |child| { 285 try extractListItemText(allocator, buf, child); 286 } 287 } 288 } 289} 290 291fn processPublication(allocator: Allocator, uri: []const u8, did: []const u8, rkey: []const u8, record: json.ObjectMap) !void { 292 const record_val: json.Value = .{ .object = record }; 293 const pub_data = zat.json.extractAt(LeafletPublication, allocator, record_val, .{}) catch return; 294 295 try indexer.insertPublication(uri, did, rkey, pub_data.name, pub_data.description, pub_data.base_path); 296 std.debug.print("indexed publication: {s} (base_path: {s})\n", .{ uri, pub_data.base_path orelse "none" }); 297}