search for standard sites pub-search.waow.tech
search zig blog atproto

fix: non-blocking full sync — don't hold mutex during turso fetches

full sync was holding the local db mutex for the entire 3+ minute
duration, blocking all search queries. now fetches each batch from
turso without the lock, then briefly locks to write to local sqlite.

search can use the (progressively filling) local db during sync
instead of being blocked for minutes.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+64 -51
+64 -51
backend/src/db/sync.zig
··· 10 10 const BATCH_SIZE = 500; 11 11 12 12 /// Full sync: fetch all data from Turso and populate local SQLite 13 + /// Uses brief locks per batch so search queries aren't blocked during sync. 13 14 pub fn fullSync(turso: *Client, local: *LocalDb) !void { 14 15 std.debug.print("sync: starting full sync...\n", .{}); 15 16 16 - local.setReady(false); 17 - 18 17 const conn = local.getConn() orelse return error.LocalNotOpen; 19 18 20 - local.lock(); 21 - defer local.unlock(); 19 + // clear existing data (brief lock) 20 + { 21 + local.lock(); 22 + defer local.unlock(); 23 + conn.exec("DELETE FROM documents_fts", .{}) catch {}; 24 + conn.exec("DELETE FROM documents", .{}) catch {}; 25 + conn.exec("DELETE FROM publications_fts", .{}) catch {}; 26 + conn.exec("DELETE FROM publications", .{}) catch {}; 27 + conn.exec("DELETE FROM document_tags", .{}) catch {}; 28 + } 22 29 23 - // start transaction for bulk insert 24 - conn.exec("BEGIN IMMEDIATE", .{}) catch |err| { 25 - std.debug.print("sync: failed to begin transaction: {}\n", .{err}); 26 - return err; 27 - }; 28 - errdefer conn.exec("ROLLBACK", .{}) catch {}; 29 - 30 - // clear existing data 31 - conn.exec("DELETE FROM documents_fts", .{}) catch {}; 32 - conn.exec("DELETE FROM documents", .{}) catch {}; 33 - conn.exec("DELETE FROM publications_fts", .{}) catch {}; 34 - conn.exec("DELETE FROM publications", .{}) catch {}; 35 - conn.exec("DELETE FROM document_tags", .{}) catch {}; 30 + // mark ready so search can fall through to Turso while we sync 31 + local.setReady(true); 36 32 37 - // sync documents in batches 33 + // sync documents in batches — fetch from Turso unlocked, write to local with brief lock 38 34 var doc_count: usize = 0; 39 35 var offset: usize = 0; 40 36 while (true) { 41 37 var offset_buf: [16]u8 = undefined; 42 38 const offset_str = std.fmt.bufPrint(&offset_buf, "{d}", .{offset}) catch break; 43 39 40 + // fetch from Turso (no lock held — search can use local DB) 44 41 var result = turso.query( 45 42 \\SELECT uri, did, rkey, title, content, created_at, publication_uri, 46 43 \\ platform, source_collection, path, base_path, has_publication, indexed_at ··· 55 52 56 53 if (result.rows.len == 0) break; 57 54 58 - for (result.rows) |row| { 59 - insertDocumentLocal(conn, row) catch |err| { 60 - std.debug.print("sync: insert doc failed: {}\n", .{err}); 61 - }; 62 - doc_count += 1; 55 + // write batch to local (brief lock) 56 + { 57 + local.lock(); 58 + defer local.unlock(); 59 + conn.exec("BEGIN", .{}) catch {}; 60 + for (result.rows) |row| { 61 + insertDocumentLocal(conn, row) catch |err| { 62 + std.debug.print("sync: insert doc failed: {}\n", .{err}); 63 + }; 64 + doc_count += 1; 65 + } 66 + conn.exec("COMMIT", .{}) catch {}; 63 67 } 64 68 65 69 offset += result.rows.len; ··· 68 72 } 69 73 } 70 74 71 - // sync publications 75 + // sync publications (fetch unlocked, write with brief lock) 72 76 var pub_count: usize = 0; 73 77 { 74 78 var pub_result = turso.query( ··· 76 80 &.{}, 77 81 ) catch |err| { 78 82 std.debug.print("sync: turso publications query failed: {}\n", .{err}); 79 - conn.exec("COMMIT", .{}) catch {}; 80 - local.setReady(true); 81 83 return; 82 84 }; 83 85 defer pub_result.deinit(); 84 86 87 + local.lock(); 88 + defer local.unlock(); 89 + conn.exec("BEGIN", .{}) catch {}; 85 90 for (pub_result.rows) |row| { 86 91 insertPublicationLocal(conn, row) catch |err| { 87 92 std.debug.print("sync: insert pub failed: {}\n", .{err}); 88 93 }; 89 94 pub_count += 1; 90 95 } 96 + conn.exec("COMMIT", .{}) catch {}; 91 97 } 92 98 93 99 // sync tags ··· 98 104 &.{}, 99 105 ) catch |err| { 100 106 std.debug.print("sync: turso tags query failed: {}\n", .{err}); 101 - conn.exec("COMMIT", .{}) catch {}; 102 - local.setReady(true); 103 107 return; 104 108 }; 105 109 defer tags_result.deinit(); 106 110 111 + local.lock(); 112 + defer local.unlock(); 113 + conn.exec("BEGIN", .{}) catch {}; 107 114 for (tags_result.rows) |row| { 108 115 conn.exec( 109 116 "INSERT OR IGNORE INTO document_tags (document_uri, tag) VALUES (?, ?)", ··· 111 118 ) catch {}; 112 119 tag_count += 1; 113 120 } 121 + conn.exec("COMMIT", .{}) catch {}; 114 122 } 115 123 116 124 // sync popular searches 117 125 var popular_count: usize = 0; 118 126 { 119 - conn.exec("DELETE FROM popular_searches", .{}) catch {}; 120 - 121 127 var popular_result = turso.query( 122 128 "SELECT query, count FROM popular_searches", 123 129 &.{}, 124 130 ) catch |err| { 125 131 std.debug.print("sync: turso popular_searches query failed: {}\n", .{err}); 126 - conn.exec("COMMIT", .{}) catch {}; 127 - local.setReady(true); 128 132 return; 129 133 }; 130 134 defer popular_result.deinit(); 131 135 136 + local.lock(); 137 + defer local.unlock(); 138 + conn.exec("DELETE FROM popular_searches", .{}) catch {}; 139 + conn.exec("BEGIN", .{}) catch {}; 132 140 for (popular_result.rows) |row| { 133 141 conn.exec( 134 142 "INSERT OR REPLACE INTO popular_searches (query, count) VALUES (?, ?)", ··· 136 144 ) catch {}; 137 145 popular_count += 1; 138 146 } 147 + conn.exec("COMMIT", .{}) catch {}; 139 148 } 140 149 141 150 // sync similarity cache 142 151 var cache_count: usize = 0; 143 152 { 144 - conn.exec("DELETE FROM similarity_cache", .{}) catch {}; 145 - 146 153 if (turso.query( 147 154 "SELECT source_uri, results, doc_count, computed_at FROM similarity_cache", 148 155 &.{}, ··· 150 157 var res = res_val; 151 158 defer res.deinit(); 152 159 160 + local.lock(); 161 + defer local.unlock(); 162 + conn.exec("DELETE FROM similarity_cache", .{}) catch {}; 163 + conn.exec("BEGIN", .{}) catch {}; 153 164 for (res.rows) |row| { 154 165 conn.exec( 155 166 "INSERT OR REPLACE INTO similarity_cache (source_uri, results, doc_count, computed_at) VALUES (?, ?, ?, ?)", ··· 157 168 ) catch {}; 158 169 cache_count += 1; 159 170 } 171 + conn.exec("COMMIT", .{}) catch {}; 160 172 } else |err| { 161 173 std.debug.print("sync: turso similarity_cache query failed: {}\n", .{err}); 162 - // continue anyway - cache isn't critical 163 174 } 164 175 } 165 176 166 - // record sync time 167 - var ts_buf: [20]u8 = undefined; 168 - const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{std.time.timestamp()}) catch "0"; 169 - conn.exec( 170 - "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync', ?)", 171 - .{ts_str}, 172 - ) catch {}; 173 - 174 - conn.exec("COMMIT", .{}) catch |err| { 175 - std.debug.print("sync: commit failed: {}\n", .{err}); 176 - return err; 177 - }; 177 + // record sync time (brief lock) 178 + { 179 + local.lock(); 180 + defer local.unlock(); 181 + var ts_buf: [20]u8 = undefined; 182 + const ts_str = std.fmt.bufPrint(&ts_buf, "{d}", .{std.time.timestamp()}) catch "0"; 183 + conn.exec( 184 + "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync', ?)", 185 + .{ts_str}, 186 + ) catch {}; 187 + } 178 188 179 189 // checkpoint WAL to prevent unbounded growth 180 - conn.exec("PRAGMA wal_checkpoint(TRUNCATE)", .{}) catch |err| { 181 - std.debug.print("sync: wal checkpoint failed: {}\n", .{err}); 182 - }; 190 + { 191 + local.lock(); 192 + defer local.unlock(); 193 + conn.exec("PRAGMA wal_checkpoint(PASSIVE)", .{}) catch |err| { 194 + std.debug.print("sync: wal checkpoint failed: {}\n", .{err}); 195 + }; 196 + } 183 197 184 - local.setReady(true); 185 198 std.debug.print("sync: full sync complete - {d} docs, {d} pubs, {d} tags, {d} popular, {d} cached\n", .{ doc_count, pub_count, tag_count, popular_count, cache_count }); 186 199 } 187 200