search for standard sites
pub-search.waow.tech
search
zig
blog
atproto
1const std = @import("std");
2const mem = std.mem;
3const json = std.json;
4const posix = std.posix;
5const Allocator = mem.Allocator;
6const websocket = @import("websocket");
7const zat = @import("zat");
8const indexer = @import("indexer.zig");
9
10const DOCUMENT_COLLECTION = "pub.leaflet.document";
11const PUBLICATION_COLLECTION = "pub.leaflet.publication";
12
13fn getTapHost() []const u8 {
14 return posix.getenv("TAP_HOST") orelse "leaflet-search-tap.fly.dev";
15}
16
17fn getTapPort() u16 {
18 const port_str = posix.getenv("TAP_PORT") orelse "443";
19 return std.fmt.parseInt(u16, port_str, 10) catch 443;
20}
21
22fn useTls() bool {
23 return getTapPort() == 443;
24}
25
26pub fn consumer(allocator: Allocator) void {
27 var backoff: u64 = 1;
28 const max_backoff: u64 = 30;
29
30 while (true) {
31 const connected = connect(allocator);
32 if (connected) |_| {
33 // connection succeeded then closed - reset backoff
34 backoff = 1;
35 std.debug.print("tap connection closed, reconnecting immediately...\n", .{});
36 } else |err| {
37 // connection failed - backoff
38 std.debug.print("tap error: {}, reconnecting in {}s...\n", .{ err, backoff });
39 posix.nanosleep(backoff, 0);
40 backoff = @min(backoff * 2, max_backoff);
41 }
42 }
43}
44
45const Handler = struct {
46 allocator: Allocator,
47 msg_count: usize = 0,
48
49 pub fn serverMessage(self: *Handler, data: []const u8) !void {
50 self.msg_count += 1;
51 if (self.msg_count % 100 == 1) {
52 std.debug.print("tap: received {} messages\n", .{self.msg_count});
53 }
54 processMessage(self.allocator, data) catch |err| {
55 std.debug.print("message processing error: {}\n", .{err});
56 };
57 }
58
59 pub fn close(_: *Handler) void {
60 std.debug.print("tap connection closed\n", .{});
61 }
62};
63
64fn connect(allocator: Allocator) !void {
65 const host = getTapHost();
66 const port = getTapPort();
67 const tls = useTls();
68 const path = "/channel";
69
70 std.debug.print("connecting to {s}://{s}:{d}{s}\n", .{ if (tls) "wss" else "ws", host, port, path });
71
72 var client = websocket.Client.init(allocator, .{
73 .host = host,
74 .port = port,
75 .tls = tls,
76 .max_size = 1024 * 1024, // 1MB
77 }) catch |err| {
78 std.debug.print("websocket client init failed: {}\n", .{err});
79 return err;
80 };
81 defer client.deinit();
82
83 var host_header_buf: [256]u8 = undefined;
84 const host_header = std.fmt.bufPrint(&host_header_buf, "Host: {s}\r\n", .{host}) catch host;
85
86 client.handshake(path, .{ .headers = host_header }) catch |err| {
87 std.debug.print("websocket handshake failed: {}\n", .{err});
88 return err;
89 };
90
91 std.debug.print("tap connected!\n", .{});
92
93 var handler = Handler{ .allocator = allocator };
94 client.readLoop(&handler) catch |err| {
95 std.debug.print("websocket read loop error: {}\n", .{err});
96 return err;
97 };
98}
99
100/// TAP record envelope - extracted via zat.json.extractAt
101const TapRecord = struct {
102 collection: []const u8,
103 action: zat.CommitAction,
104 did: []const u8,
105 rkey: []const u8,
106};
107
108/// Leaflet document fields
109const LeafletDocument = struct {
110 title: []const u8,
111 publication: ?[]const u8 = null,
112 publishedAt: ?[]const u8 = null,
113 createdAt: ?[]const u8 = null,
114 description: ?[]const u8 = null,
115};
116
117/// Leaflet publication fields
118const LeafletPublication = struct {
119 name: []const u8,
120 description: ?[]const u8 = null,
121 base_path: ?[]const u8 = null,
122};
123
124fn processMessage(allocator: Allocator, payload: []const u8) !void {
125 const parsed = json.parseFromSlice(json.Value, allocator, payload, .{}) catch return;
126 defer parsed.deinit();
127
128 // check message type
129 const msg_type = zat.json.getString(parsed.value, "type") orelse return;
130 if (!mem.eql(u8, msg_type, "record")) return;
131
132 // extract record envelope
133 const rec = zat.json.extractAt(TapRecord, allocator, parsed.value, .{"record"}) catch return;
134
135 // validate DID
136 const did = zat.Did.parse(rec.did) orelse return;
137
138 // build AT-URI string
139 const uri = try std.fmt.allocPrint(allocator, "at://{s}/{s}/{s}", .{ did.raw, rec.collection, rec.rkey });
140 defer allocator.free(uri);
141
142 switch (rec.action) {
143 .create, .update => {
144 const record_obj = zat.json.getObject(parsed.value, "record.record") orelse return;
145
146 if (mem.eql(u8, rec.collection, DOCUMENT_COLLECTION)) {
147 processDocument(allocator, uri, did.raw, rec.rkey, record_obj) catch |err| {
148 std.debug.print("document processing error: {}\n", .{err});
149 };
150 } else if (mem.eql(u8, rec.collection, PUBLICATION_COLLECTION)) {
151 processPublication(allocator, uri, did.raw, rec.rkey, record_obj) catch |err| {
152 std.debug.print("publication processing error: {}\n", .{err});
153 };
154 }
155 },
156 .delete => {
157 if (mem.eql(u8, rec.collection, DOCUMENT_COLLECTION)) {
158 indexer.deleteDocument(uri);
159 std.debug.print("deleted document: {s}\n", .{uri});
160 } else if (mem.eql(u8, rec.collection, PUBLICATION_COLLECTION)) {
161 indexer.deletePublication(uri);
162 std.debug.print("deleted publication: {s}\n", .{uri});
163 }
164 },
165 }
166}
167
168fn processDocument(allocator: Allocator, uri: []const u8, did: []const u8, rkey: []const u8, record: json.ObjectMap) !void {
169 const record_val: json.Value = .{ .object = record };
170
171 // extract known fields via struct
172 const doc = zat.json.extractAt(LeafletDocument, allocator, record_val, .{}) catch return;
173 const created_at = doc.publishedAt orelse doc.createdAt;
174
175 // extract tags array
176 var tags_list: std.ArrayList([]const u8) = .{};
177 defer tags_list.deinit(allocator);
178 if (zat.json.getArray(record_val, "tags")) |tags| {
179 for (tags) |tag_item| {
180 if (tag_item == .string) {
181 try tags_list.append(allocator, tag_item.string);
182 }
183 }
184 }
185
186 // extract plaintext from pages
187 var content_buf: std.ArrayList(u8) = .{};
188 defer content_buf.deinit(allocator);
189
190 if (doc.description) |desc| {
191 if (desc.len > 0) {
192 try content_buf.appendSlice(allocator, desc);
193 }
194 }
195
196 if (zat.json.getArray(record_val, "pages")) |pages| {
197 for (pages) |page| {
198 if (page == .object) {
199 try extractPlaintextFromPage(allocator, &content_buf, page.object);
200 }
201 }
202 }
203
204 if (content_buf.items.len == 0) return;
205
206 try indexer.insertDocument(uri, did, rkey, doc.title, content_buf.items, created_at, doc.publication, tags_list.items);
207 std.debug.print("indexed document: {s} ({} chars, {} tags)\n", .{ uri, content_buf.items.len, tags_list.items.len });
208}
209
210fn extractPlaintextFromPage(allocator: Allocator, buf: *std.ArrayList(u8), page: json.ObjectMap) !void {
211 // pages can be linearDocument or canvas
212 // linearDocument has blocks array
213 const blocks_val = page.get("blocks") orelse return;
214 if (blocks_val != .array) return;
215
216 for (blocks_val.array.items) |block_wrapper| {
217 if (block_wrapper != .object) continue;
218
219 // block wrapper has "block" field with actual content
220 const block_val = block_wrapper.object.get("block") orelse continue;
221 if (block_val != .object) continue;
222
223 try extractTextFromBlock(allocator, buf, block_val.object);
224 }
225}
226
227fn extractTextFromBlock(allocator: Allocator, buf: *std.ArrayList(u8), block: json.ObjectMap) Allocator.Error!void {
228 const type_val = block.get("$type") orelse return;
229 if (type_val != .string) return;
230
231 const block_type = type_val.string;
232
233 // blocks with plaintext field: text, header, blockquote, code
234 if (mem.eql(u8, block_type, "pub.leaflet.blocks.text") or
235 mem.eql(u8, block_type, "pub.leaflet.blocks.header") or
236 mem.eql(u8, block_type, "pub.leaflet.blocks.blockquote") or
237 mem.eql(u8, block_type, "pub.leaflet.blocks.code"))
238 {
239 if (block.get("plaintext")) |plaintext_val| {
240 if (plaintext_val == .string) {
241 if (buf.items.len > 0) {
242 try buf.appendSlice(allocator, " ");
243 }
244 try buf.appendSlice(allocator, plaintext_val.string);
245 }
246 }
247 }
248 // button has text field
249 else if (mem.eql(u8, block_type, "pub.leaflet.blocks.button")) {
250 if (block.get("text")) |text_val| {
251 if (text_val == .string) {
252 if (buf.items.len > 0) {
253 try buf.appendSlice(allocator, " ");
254 }
255 try buf.appendSlice(allocator, text_val.string);
256 }
257 }
258 }
259 // unorderedList has children array with nested content
260 else if (mem.eql(u8, block_type, "pub.leaflet.blocks.unorderedList")) {
261 if (block.get("children")) |children_val| {
262 if (children_val == .array) {
263 for (children_val.array.items) |child| {
264 try extractListItemText(allocator, buf, child);
265 }
266 }
267 }
268 }
269}
270
271fn extractListItemText(allocator: Allocator, buf: *std.ArrayList(u8), item: json.Value) Allocator.Error!void {
272 if (item != .object) return;
273
274 // list item has content field which is a block
275 if (item.object.get("content")) |content_val| {
276 if (content_val == .object) {
277 try extractTextFromBlock(allocator, buf, content_val.object);
278 }
279 }
280
281 // nested children
282 if (item.object.get("children")) |children_val| {
283 if (children_val == .array) {
284 for (children_val.array.items) |child| {
285 try extractListItemText(allocator, buf, child);
286 }
287 }
288 }
289}
290
291fn processPublication(allocator: Allocator, uri: []const u8, did: []const u8, rkey: []const u8, record: json.ObjectMap) !void {
292 const record_val: json.Value = .{ .object = record };
293 const pub_data = zat.json.extractAt(LeafletPublication, allocator, record_val, .{}) catch return;
294
295 try indexer.insertPublication(uri, did, rkey, pub_data.name, pub_data.description, pub_data.base_path);
296 std.debug.print("indexed publication: {s} (base_path: {s})\n", .{ uri, pub_data.base_path orelse "none" });
297}