forked from
atscan.net/plcbundle-rs
High-performance implementation of plcbundle written in Rust
1//! C-compatible FFI surface exposing BundleManager and related types for integration from other languages
2use crate::constants;
3use crate::manager::*;
4use crate::operations::*;
5use sonic_rs::JsonValueTrait;
6use std::ffi::{CStr, CString};
7use std::os::raw::c_char;
8use std::path::PathBuf;
9use std::sync::Arc;
10
11// ============================================================================
12// C-compatible types
13// ============================================================================
14
15#[repr(C)]
16pub struct CBundleManager {
17 manager: Arc<BundleManager>,
18}
19
20#[repr(C)]
21pub struct CLoadOptions {
22 pub verify_hash: bool,
23 pub decompress: bool,
24 pub cache: bool,
25}
26
27#[repr(C)]
28pub struct CLoadResult {
29 pub success: bool,
30 pub operation_count: u32,
31 pub size_bytes: u64,
32 pub error_msg: *mut c_char,
33}
34
35#[repr(C)]
36#[derive(Clone, Copy)]
37pub struct COperation {
38 pub did: *mut c_char,
39 pub op_type: *mut c_char,
40 pub cid: *mut c_char,
41 pub nullified: bool,
42 pub created_at: *mut c_char,
43 pub json: *mut c_char,
44}
45
46#[repr(C)]
47pub struct COperationRequest {
48 pub bundle_num: u32,
49 pub operation_idx: usize,
50}
51
52#[repr(C)]
53pub struct CVerifyResult {
54 pub valid: bool,
55 pub hash_match: bool,
56 pub compression_ok: bool,
57 pub error_msg: *mut c_char,
58}
59
60#[repr(C)]
61pub struct CBundleInfo {
62 pub bundle_number: u32,
63 pub operation_count: u32,
64 pub did_count: u32,
65 pub compressed_size: u64,
66 pub uncompressed_size: u64,
67 pub start_time: *mut c_char,
68 pub end_time: *mut c_char,
69 pub hash: *mut c_char,
70}
71
72#[repr(C)]
73pub struct CManagerStats {
74 pub cache_hits: u64,
75 pub cache_misses: u64,
76 pub bytes_read: u64,
77 pub operations_processed: u64,
78}
79
80#[repr(C)]
81pub struct CDIDIndexStats {
82 pub total_dids: usize,
83 pub total_operations: usize,
84 pub index_size_bytes: usize,
85}
86
87#[repr(C)]
88pub struct CRebuildStats {
89 pub bundles_processed: u32,
90 pub operations_indexed: u64,
91 pub unique_dids: usize,
92 pub duration_ms: u64,
93}
94
95#[repr(C)]
96pub struct CExportSpec {
97 pub bundle_start: u32,
98 pub bundle_end: u32,
99 pub export_all: bool,
100 pub format: u8, // 0=jsonl, 1=json, 2=csv
101 pub count_limit: u64, // 0 = no limit
102 pub after_timestamp: *const c_char, // NULL = no filter
103 pub did_filter: *const c_char, // NULL = no filter
104 pub op_type_filter: *const c_char, // NULL = no filter
105}
106
107#[repr(C)]
108pub struct CExportStats {
109 pub records_written: u64,
110 pub bytes_written: u64,
111}
112
113// ============================================================================
114// BundleManager lifecycle
115// ============================================================================
116
117/// # Safety
118///
119/// The caller must ensure `bundle_dir` is a valid, NUL-terminated C string
120/// pointer. The returned pointer is owned by the caller and must be freed
121/// with `bundle_manager_free` when no longer needed. Passing a null or
122/// invalid pointer is undefined behavior.
123#[unsafe(no_mangle)]
124pub unsafe extern "C" fn bundle_manager_new(bundle_dir: *const c_char) -> *mut CBundleManager {
125 let bundle_dir = unsafe {
126 if bundle_dir.is_null() {
127 return std::ptr::null_mut();
128 }
129 match CStr::from_ptr(bundle_dir).to_str() {
130 Ok(s) => PathBuf::from(s),
131 Err(_) => return std::ptr::null_mut(),
132 }
133 };
134
135 match BundleManager::new(bundle_dir, ()) {
136 Ok(manager) => Box::into_raw(Box::new(CBundleManager {
137 manager: Arc::new(manager),
138 })),
139 Err(_) => std::ptr::null_mut(),
140 }
141}
142
143/// # Safety
144///
145/// The caller must ensure `manager` is a pointer previously returned by
146/// `bundle_manager_new` and not already freed. Passing invalid or dangling
147/// pointers is undefined behavior.
148#[unsafe(no_mangle)]
149pub unsafe extern "C" fn bundle_manager_free(manager: *mut CBundleManager) {
150 if !manager.is_null() {
151 unsafe {
152 let _ = Box::from_raw(manager);
153 }
154 }
155}
156
157// ============================================================================
158// Smart Loading
159// ============================================================================
160
161/// # Safety
162///
163/// The `manager` pointer must be valid. `options` may be NULL to use defaults.
164/// `out_result` must be a valid writable pointer to a `CLoadResult` struct.
165/// Strings passed to this API must be NUL-terminated. Violating these
166/// requirements is undefined behavior.
167#[unsafe(no_mangle)]
168pub unsafe extern "C" fn bundle_manager_load_bundle(
169 manager: *const CBundleManager,
170 bundle_num: u32,
171 options: *const CLoadOptions,
172 out_result: *mut CLoadResult,
173) -> i32 {
174 if manager.is_null() || out_result.is_null() {
175 return -1;
176 }
177
178 let manager = unsafe { &*manager };
179
180 let load_opts = if options.is_null() {
181 LoadOptions::default()
182 } else {
183 let opts = unsafe { &*options };
184 LoadOptions {
185 cache: opts.cache,
186 decompress: opts.decompress,
187 filter: None,
188 limit: None,
189 }
190 };
191
192 match manager.manager.load_bundle(bundle_num, load_opts) {
193 Ok(result) => {
194 unsafe {
195 (*out_result).success = true;
196 (*out_result).operation_count = result.operations.len() as u32;
197 (*out_result).size_bytes = 0; // TODO: calculate actual size
198 (*out_result).error_msg = std::ptr::null_mut();
199 }
200 0
201 }
202 Err(e) => {
203 let err_msg = CString::new(e.to_string()).unwrap();
204 unsafe {
205 (*out_result).success = false;
206 (*out_result).operation_count = 0;
207 (*out_result).size_bytes = 0;
208 (*out_result).error_msg = err_msg.into_raw();
209 }
210 -1
211 }
212 }
213}
214
215// ============================================================================
216// Batch Operations
217// ============================================================================
218
219/// # Safety
220///
221/// The `manager` pointer must be valid. `requests` must point to `count`
222/// valid `COperationRequest` items. `out_operations` and `out_count` must
223/// be valid writable pointers. Returned `COperation` arrays must be freed
224/// by the caller using `bundle_manager_free_operations` if applicable.
225#[unsafe(no_mangle)]
226pub unsafe extern "C" fn bundle_manager_get_operations_batch(
227 manager: *const CBundleManager,
228 requests: *const COperationRequest,
229 count: usize,
230 out_operations: *mut *mut COperation,
231 out_count: *mut usize,
232) -> i32 {
233 if manager.is_null() || requests.is_null() || out_operations.is_null() {
234 return -1;
235 }
236
237 let manager = unsafe { &*manager };
238 let requests_slice = unsafe { std::slice::from_raw_parts(requests, count) };
239
240 let mut op_requests = Vec::new();
241 for req in requests_slice {
242 op_requests.push(OperationRequest {
243 bundle: req.bundle_num,
244 index: Some(req.operation_idx),
245 filter: None,
246 });
247 }
248
249 match manager.manager.get_operations_batch(op_requests) {
250 Ok(operations) => {
251 let mut c_ops = Vec::new();
252 for op in operations {
253 c_ops.push(operation_to_c(op));
254 }
255
256 unsafe {
257 *out_count = c_ops.len();
258 *out_operations = c_ops.as_mut_ptr();
259 }
260 std::mem::forget(c_ops);
261 0
262 }
263 Err(_) => -1,
264 }
265}
266
267// ============================================================================
268// DID Operations
269// ============================================================================
270
271/// # Safety
272///
273/// The `manager` pointer must be valid. `did` must be a valid NUL-terminated
274/// C string. `out_operations` and `out_count` must be valid writable pointers
275/// to receive results. Returned memory ownership rules are documented in the
276/// API and must be followed by the caller.
277#[unsafe(no_mangle)]
278pub unsafe extern "C" fn bundle_manager_get_did_operations(
279 manager: *const CBundleManager,
280 did: *const c_char,
281 out_operations: *mut *mut COperation,
282 out_count: *mut usize,
283) -> i32 {
284 if manager.is_null() || did.is_null() || out_operations.is_null() {
285 return -1;
286 }
287
288 let manager = unsafe { &*manager };
289 let did = unsafe {
290 match CStr::from_ptr(did).to_str() {
291 Ok(s) => s,
292 Err(_) => return -1,
293 }
294 };
295
296 match manager.manager.get_did_operations(did, false, false) {
297 Ok(result) => {
298 let mut c_ops = Vec::new();
299 for op in result.operations {
300 c_ops.push(operation_to_c(op));
301 }
302
303 unsafe {
304 *out_count = c_ops.len();
305 *out_operations = c_ops.as_mut_ptr();
306 }
307 std::mem::forget(c_ops);
308 0
309 }
310 Err(_) => -1,
311 }
312}
313
314/// # Safety
315///
316/// `manager` must be valid. `dids` must point to `did_count` valid
317/// NUL-terminated C string pointers. `callback` will be called from this
318/// function and must be a valid function pointer. The caller must ensure the
319/// callback and its environment remain valid for the duration of the call.
320#[unsafe(no_mangle)]
321pub unsafe extern "C" fn bundle_manager_batch_resolve_dids(
322 manager: *const CBundleManager,
323 dids: *const *const c_char,
324 did_count: usize,
325 callback: extern "C" fn(*const c_char, *const COperation, usize),
326) -> i32 {
327 if manager.is_null() || dids.is_null() {
328 return -1;
329 }
330
331 let manager = unsafe { &*manager };
332 let dids_slice = unsafe { std::slice::from_raw_parts(dids, did_count) };
333
334 let mut did_strings = Vec::new();
335 for did_ptr in dids_slice {
336 let did = unsafe {
337 match CStr::from_ptr(*did_ptr).to_str() {
338 Ok(s) => s.to_string(),
339 Err(_) => return -1,
340 }
341 };
342 did_strings.push(did);
343 }
344
345 match manager.manager.batch_resolve_dids(did_strings) {
346 Ok(results) => {
347 for (did, operations) in results {
348 let c_did = CString::new(did).unwrap();
349 let c_ops: Vec<COperation> = operations.into_iter().map(operation_to_c).collect();
350
351 callback(c_did.as_ptr(), c_ops.as_ptr(), c_ops.len());
352
353 for op in c_ops {
354 free_c_operation(op);
355 }
356 }
357 0
358 }
359 Err(_) => -1,
360 }
361}
362
363// ============================================================================
364// Query
365// ============================================================================
366
367/// # Safety
368///
369/// `manager` must be valid. `query_str` must be a valid NUL-terminated C
370/// string. `out_operations` and `out_count` must be valid writable pointers.
371/// The caller is responsible for freeing any returned memory according to the
372/// API's ownership rules.
373#[unsafe(no_mangle)]
374pub unsafe extern "C" fn bundle_manager_query(
375 manager: *const CBundleManager,
376 query_str: *const c_char,
377 bundle_start: u32,
378 bundle_end: u32,
379 out_operations: *mut *mut COperation,
380 out_count: *mut usize,
381) -> i32 {
382 if manager.is_null() || query_str.is_null() || out_operations.is_null() || out_count.is_null() {
383 return -1;
384 }
385
386 let manager = unsafe { &*manager };
387 let query = unsafe {
388 match CStr::from_ptr(query_str).to_str() {
389 Ok(s) => s.to_string(),
390 Err(_) => return -1,
391 }
392 };
393
394 // Determine bundle range
395 let (start, end) = if bundle_start == 0 && bundle_end == 0 {
396 // Query all bundles if not specified (like Rust CLI)
397 (1, manager.manager.get_last_bundle())
398 } else if bundle_end == 0 {
399 (bundle_start, bundle_start)
400 } else {
401 (bundle_start, bundle_end)
402 };
403
404 // Simple query: search through operations for matching fields
405 let mut c_ops = Vec::new();
406 for bundle_num in start..=end {
407 let result = manager
408 .manager
409 .load_bundle(bundle_num, LoadOptions::default());
410 if let Ok(load_result) = result {
411 for op in load_result.operations {
412 // Simple string matching in operation JSON
413 let op_json = op.operation.to_string();
414 if op_json.contains(&query) || op.did.contains(&query) {
415 c_ops.push(operation_to_c(op));
416 }
417 }
418 }
419 }
420
421 unsafe {
422 *out_count = c_ops.len();
423 *out_operations = c_ops.as_mut_ptr();
424 }
425 std::mem::forget(c_ops);
426 0
427}
428
429// ============================================================================
430// Verification
431// ============================================================================
432
433/// # Safety
434///
435/// `manager` must be a valid pointer. `out_result` must be a valid writable
436/// pointer to `CVerifyResult`. Passing invalid or null pointers is undefined
437/// behavior.
438#[unsafe(no_mangle)]
439pub unsafe extern "C" fn bundle_manager_verify_bundle(
440 manager: *const CBundleManager,
441 bundle_num: u32,
442 check_hash: bool,
443 _check_chain: bool,
444 out_result: *mut CVerifyResult,
445) -> i32 {
446 if manager.is_null() || out_result.is_null() {
447 return -1;
448 }
449
450 let manager = unsafe { &*manager };
451
452 let spec = VerifySpec {
453 check_hash,
454 check_content_hash: check_hash,
455 check_operations: true,
456 fast: false,
457 };
458
459 match manager.manager.verify_bundle(bundle_num, spec) {
460 Ok(result) => {
461 unsafe {
462 (*out_result).valid = result.valid;
463 (*out_result).hash_match = result.valid; // TODO: return actual hash match result
464 (*out_result).compression_ok = true; // TODO: return actual compression result
465 (*out_result).error_msg = std::ptr::null_mut();
466 }
467 0
468 }
469 Err(e) => {
470 let err_msg = CString::new(e.to_string()).unwrap();
471 unsafe {
472 (*out_result).valid = false;
473 (*out_result).hash_match = false;
474 (*out_result).compression_ok = false;
475 (*out_result).error_msg = err_msg.into_raw();
476 }
477 -1
478 }
479 }
480}
481
482/// # Safety
483///
484/// `manager` must be a valid pointer. Calling this function with invalid
485/// pointers is undefined behavior.
486#[unsafe(no_mangle)]
487pub unsafe extern "C" fn bundle_manager_verify_chain(
488 manager: *const CBundleManager,
489 start_bundle: u32,
490 end_bundle: u32,
491) -> i32 {
492 if manager.is_null() {
493 return -1;
494 }
495
496 let manager = unsafe { &*manager };
497
498 let spec = ChainVerifySpec {
499 start_bundle,
500 end_bundle: Some(end_bundle),
501 check_parent_links: true,
502 };
503
504 match manager.manager.verify_chain(spec) {
505 Ok(result) => {
506 if result.valid {
507 0
508 } else {
509 -1
510 }
511 }
512 Err(_) => -1,
513 }
514}
515
516// ============================================================================
517// Info
518// ============================================================================
519
520/// # Safety
521///
522/// `manager` must be valid. `out_info` must be a valid writable pointer to
523/// `CBundleInfo`. Any returned string pointers must be freed by the caller as
524/// documented by the API.
525#[unsafe(no_mangle)]
526pub unsafe extern "C" fn bundle_manager_get_bundle_info(
527 manager: *const CBundleManager,
528 bundle_num: u32,
529 include_operations: bool,
530 _include_dids: bool,
531 out_info: *mut CBundleInfo,
532) -> i32 {
533 if manager.is_null() || out_info.is_null() {
534 return -1;
535 }
536
537 let manager = unsafe { &*manager };
538
539 let flags = InfoFlags {
540 include_operations,
541 include_size_info: true,
542 };
543
544 match manager.manager.get_bundle_info(bundle_num, flags) {
545 Ok(info) => {
546 unsafe {
547 (*out_info).bundle_number = info.metadata.bundle_number;
548 (*out_info).operation_count = info.metadata.operation_count;
549 (*out_info).did_count = info.metadata.did_count;
550 (*out_info).compressed_size = info.metadata.compressed_size;
551 (*out_info).uncompressed_size = info.metadata.uncompressed_size;
552 (*out_info).start_time = CString::new(info.metadata.start_time).unwrap().into_raw();
553 (*out_info).end_time = CString::new(info.metadata.end_time).unwrap().into_raw();
554 (*out_info).hash = CString::new(info.metadata.hash).unwrap().into_raw();
555 }
556 0
557 }
558 Err(_) => -1,
559 }
560}
561
562// ============================================================================
563// Cache Management
564// ============================================================================
565
566/// # Safety
567///
568/// `manager` must be valid. `bundle_nums` must point to `count` valid u32
569/// values. Passing invalid pointers is undefined behavior.
570#[unsafe(no_mangle)]
571pub unsafe extern "C" fn bundle_manager_prefetch_bundles(
572 manager: *const CBundleManager,
573 bundle_nums: *const u32,
574 count: usize,
575) -> i32 {
576 if manager.is_null() || bundle_nums.is_null() {
577 return -1;
578 }
579
580 let manager = unsafe { &*manager };
581 let bundles = unsafe { std::slice::from_raw_parts(bundle_nums, count) };
582
583 match manager.manager.prefetch_bundles(bundles.to_vec()) {
584 Ok(_) => 0,
585 Err(_) => -1,
586 }
587}
588
589/// # Safety
590///
591/// `manager` must be valid. Passing invalid pointers is undefined behavior.
592#[unsafe(no_mangle)]
593pub unsafe extern "C" fn bundle_manager_warm_up(
594 manager: *const CBundleManager,
595 strategy: u8,
596 start_bundle: u32,
597 end_bundle: u32,
598) -> i32 {
599 if manager.is_null() {
600 return -1;
601 }
602
603 let manager = unsafe { &*manager };
604
605 let warm_strategy = match strategy {
606 0 => WarmUpStrategy::Recent(10), // Default count for recent
607 1 => WarmUpStrategy::All,
608 2 => WarmUpStrategy::Range(start_bundle, end_bundle),
609 _ => return -1,
610 };
611
612 let spec = WarmUpSpec {
613 strategy: warm_strategy,
614 };
615
616 match manager.manager.warm_up(spec) {
617 Ok(_) => 0,
618 Err(_) => -1,
619 }
620}
621
622/// # Safety
623///
624/// `manager` must be a valid pointer previously returned by
625/// `bundle_manager_new`.
626#[unsafe(no_mangle)]
627pub unsafe extern "C" fn bundle_manager_clear_caches(manager: *const CBundleManager) -> i32 {
628 if manager.is_null() {
629 return -1;
630 }
631
632 let manager = unsafe { &*manager };
633 manager.manager.clear_caches();
634 0
635}
636
637// ============================================================================
638// DID Index
639// ============================================================================
640
641/// # Safety
642///
643/// `manager` must be valid. `out_stats` must be a valid writable pointer if
644/// provided. The caller must ensure `progress_callback` is a valid function
645/// pointer if passed.
646#[unsafe(no_mangle)]
647pub unsafe extern "C" fn bundle_manager_rebuild_did_index(
648 manager: *const CBundleManager,
649 progress_callback: Option<extern "C" fn(u32, u32, u64, u64)>,
650 out_stats: *mut CRebuildStats,
651) -> i32 {
652 if manager.is_null() {
653 return -1;
654 }
655
656 let manager = unsafe { &*manager };
657
658 let callback = progress_callback.map(|cb| {
659 Box::new(
660 move |current: u32, total: u32, bytes_processed: u64, total_bytes: u64| {
661 cb(current, total, bytes_processed, total_bytes);
662 },
663 ) as Box<dyn Fn(u32, u32, u64, u64) + Send + Sync>
664 });
665
666 // Use default flush interval for FFI
667 match manager
668 .manager
669 .build_did_index(constants::DID_INDEX_FLUSH_INTERVAL, callback, None, None)
670 {
671 Ok(stats) => {
672 if !out_stats.is_null() {
673 unsafe {
674 (*out_stats).bundles_processed = stats.bundles_processed;
675 (*out_stats).operations_indexed = stats.operations_indexed;
676 (*out_stats).unique_dids = 0; // TODO: track unique DIDs
677 (*out_stats).duration_ms = 0; // TODO: track duration
678 }
679 }
680 0
681 }
682 Err(_) => -1,
683 }
684}
685
686/// # Safety
687///
688/// `manager` must be valid. `out_stats` must be a valid writable pointer.
689#[unsafe(no_mangle)]
690pub unsafe extern "C" fn bundle_manager_get_did_index_stats(
691 manager: *const CBundleManager,
692 out_stats: *mut CDIDIndexStats,
693) -> i32 {
694 if manager.is_null() || out_stats.is_null() {
695 return -1;
696 }
697
698 let manager = unsafe { &*manager };
699
700 let stats = manager.manager.get_did_index_stats();
701 unsafe {
702 (*out_stats).total_dids = stats
703 .get("total_dids")
704 .and_then(|v| v.as_i64())
705 .unwrap_or(0) as usize;
706 (*out_stats).total_operations = stats
707 .get("total_entries")
708 .and_then(|v| v.as_i64())
709 .unwrap_or(0) as usize;
710 (*out_stats).index_size_bytes = 0; // TODO: track actual index size
711 }
712 0
713}
714
715// ============================================================================
716// Observability
717// ============================================================================
718
719/// # Safety
720///
721/// `manager` must be valid. `out_stats` must be a valid writable pointer.
722#[unsafe(no_mangle)]
723pub unsafe extern "C" fn bundle_manager_get_stats(
724 manager: *const CBundleManager,
725 out_stats: *mut CManagerStats,
726) -> i32 {
727 if manager.is_null() || out_stats.is_null() {
728 return -1;
729 }
730
731 let manager = unsafe { &*manager };
732
733 let stats = manager.manager.get_stats();
734 unsafe {
735 (*out_stats).cache_hits = stats.cache_hits;
736 (*out_stats).cache_misses = stats.cache_misses;
737 (*out_stats).bytes_read = stats.bundles_loaded; // TODO: track actual bytes read
738 (*out_stats).operations_processed = stats.operations_read;
739 }
740 0
741}
742
743// ============================================================================
744// Helper functions
745// ============================================================================
746
747fn operation_to_c(op: Operation) -> COperation {
748 // Extract operation type from the operation object
749 let op_type = op
750 .operation
751 .get("type")
752 .and_then(|v| v.as_str())
753 .unwrap_or("unknown")
754 .to_string();
755
756 COperation {
757 did: CString::new(op.did).unwrap().into_raw(),
758 op_type: CString::new(op_type).unwrap().into_raw(),
759 cid: op
760 .cid
761 .map(|s| CString::new(s).unwrap().into_raw())
762 .unwrap_or(std::ptr::null_mut()),
763 nullified: op.nullified,
764 created_at: CString::new(op.created_at).unwrap().into_raw(),
765 json: CString::new(op.operation.to_string()).unwrap().into_raw(),
766 }
767}
768
769fn free_c_operation(op: COperation) {
770 unsafe {
771 if !op.did.is_null() {
772 let _ = CString::from_raw(op.did);
773 }
774 if !op.op_type.is_null() {
775 let _ = CString::from_raw(op.op_type);
776 }
777 if !op.cid.is_null() {
778 let _ = CString::from_raw(op.cid);
779 }
780 if !op.created_at.is_null() {
781 let _ = CString::from_raw(op.created_at);
782 }
783 if !op.json.is_null() {
784 let _ = CString::from_raw(op.json);
785 }
786 }
787}
788
789/// # Safety
790///
791/// `s` must be a pointer previously returned by this API that is safe to
792/// free. Passing invalid pointers is undefined behavior.
793#[unsafe(no_mangle)]
794pub unsafe extern "C" fn bundle_manager_free_string(s: *mut c_char) {
795 if !s.is_null() {
796 unsafe {
797 let _ = CString::from_raw(s);
798 }
799 }
800}
801
802/// # Safety
803///
804/// `op` must be a pointer previously returned by this API and safe to free.
805#[unsafe(no_mangle)]
806pub unsafe extern "C" fn bundle_manager_free_operation(op: *mut COperation) {
807 if !op.is_null() {
808 unsafe {
809 let op_val = *op;
810 free_c_operation(op_val);
811 }
812 }
813}
814
815/// # Safety
816///
817/// `ops` must point to an array of `count` `COperation` previously returned
818/// by this API and safe to free.
819#[unsafe(no_mangle)]
820pub unsafe extern "C" fn bundle_manager_free_operations(ops: *mut COperation, count: usize) {
821 if !ops.is_null() {
822 unsafe {
823 let ops_slice = std::slice::from_raw_parts(ops, count);
824 for op in ops_slice {
825 free_c_operation(*op);
826 }
827 let _ = Vec::from_raw_parts(ops, count, count);
828 }
829 }
830}
831
832// ============================================================================
833// Export
834// ============================================================================
835
836/// Callback type for streaming export
837/// Returns 0 to continue, non-zero to stop
838pub type ExportCallback =
839 extern "C" fn(data: *const c_char, len: usize, user_data: *mut std::ffi::c_void) -> i32;
840
841/// # Safety
842///
843/// `manager` must be valid. `spec` must point to a valid `CExportSpec` and
844/// `callback` must be a valid function pointer. `out_stats` must be a valid
845/// writable pointer if provided.
846#[unsafe(no_mangle)]
847pub unsafe extern "C" fn bundle_manager_export(
848 manager: *const CBundleManager,
849 spec: *const CExportSpec,
850 callback: ExportCallback,
851 user_data: *mut std::ffi::c_void,
852 out_stats: *mut CExportStats,
853) -> i32 {
854 if manager.is_null() || spec.is_null() || callback as usize == 0 {
855 return -1;
856 }
857
858 let manager = unsafe { &*manager };
859 let spec = unsafe { &*spec };
860
861 use crate::index::Index;
862 use std::fs::File;
863 use std::io::BufRead;
864
865 // Get directory path
866 let dir_path = manager.manager.directory().clone();
867 // Load index
868 let index = match Index::load(&dir_path) {
869 Ok(idx) => idx,
870 Err(_) => return -1,
871 };
872
873 // Determine bundle range
874 let bundle_numbers: Vec<u32> = if spec.export_all {
875 (1..=index.last_bundle).collect()
876 } else if spec.bundle_end > 0 && spec.bundle_end >= spec.bundle_start {
877 (spec.bundle_start..=spec.bundle_end).collect()
878 } else {
879 vec![spec.bundle_start]
880 };
881
882 // Filter bundles by timestamp if specified
883 let bundle_numbers: Vec<u32> = if !spec.after_timestamp.is_null() {
884 let after_ts = unsafe {
885 match CStr::from_ptr(spec.after_timestamp).to_str() {
886 Ok(s) => s.to_string(),
887 Err(_) => return -1,
888 }
889 };
890 bundle_numbers
891 .into_iter()
892 .filter(|num| match index.get_bundle(*num) {
893 Some(meta) => meta.end_time >= after_ts,
894 None => true,
895 })
896 .collect()
897 } else {
898 bundle_numbers
899 };
900
901 let mut exported_count = 0u64;
902 let mut bytes_written = 0u64;
903 let mut output_buffer = Vec::with_capacity(1024 * 1024);
904
905 // Determine if we need parsing
906 let needs_parsing = !spec.after_timestamp.is_null()
907 || !spec.did_filter.is_null()
908 || !spec.op_type_filter.is_null()
909 || spec.format == 1
910 || spec.format == 2; // JSON or CSV
911
912 // Process bundles
913 for bundle_num in bundle_numbers {
914 if spec.count_limit > 0 && exported_count >= spec.count_limit {
915 break;
916 }
917
918 let bundle_path = constants::bundle_path(&dir_path, bundle_num);
919 if !bundle_path.exists() {
920 continue;
921 }
922
923 let file = match File::open(&bundle_path) {
924 Ok(f) => f,
925 Err(_) => continue,
926 };
927
928 let decoder = match zstd::Decoder::new(file) {
929 Ok(d) => d,
930 Err(_) => continue,
931 };
932
933 let reader = std::io::BufReader::with_capacity(1024 * 1024, decoder);
934
935 if !needs_parsing {
936 // Fast path: no parsing
937 for line in reader.lines() {
938 if spec.count_limit > 0 && exported_count >= spec.count_limit {
939 break;
940 }
941
942 let line = match line {
943 Ok(l) => l,
944 Err(_) => continue,
945 };
946
947 if line.is_empty() {
948 continue;
949 }
950
951 output_buffer.extend_from_slice(line.as_bytes());
952 output_buffer.push(b'\n');
953 exported_count += 1;
954 bytes_written += line.len() as u64 + 1;
955
956 // Flush buffer when large
957 if output_buffer.len() >= 1024 * 1024 {
958 let result = callback(
959 output_buffer.as_ptr() as *const c_char,
960 output_buffer.len(),
961 user_data,
962 );
963 if result != 0 {
964 break;
965 }
966 output_buffer.clear();
967 }
968 }
969 } else {
970 // Slow path: parse and filter
971 use sonic_rs::JsonValueTrait;
972
973 let after_ts = if !spec.after_timestamp.is_null() {
974 Some(unsafe {
975 CStr::from_ptr(spec.after_timestamp)
976 .to_str()
977 .unwrap()
978 .to_string()
979 })
980 } else {
981 None
982 };
983
984 let did_filter = if !spec.did_filter.is_null() {
985 Some(unsafe {
986 CStr::from_ptr(spec.did_filter)
987 .to_str()
988 .unwrap()
989 .to_string()
990 })
991 } else {
992 None
993 };
994
995 let op_type_filter = if !spec.op_type_filter.is_null() {
996 Some(unsafe {
997 CStr::from_ptr(spec.op_type_filter)
998 .to_str()
999 .unwrap()
1000 .to_string()
1001 })
1002 } else {
1003 None
1004 };
1005
1006 for line in reader.lines() {
1007 if spec.count_limit > 0 && exported_count >= spec.count_limit {
1008 break;
1009 }
1010
1011 let line = match line {
1012 Ok(l) => l,
1013 Err(_) => continue,
1014 };
1015
1016 if line.is_empty() {
1017 continue;
1018 }
1019
1020 // Parse JSON
1021 let data: sonic_rs::Value = match sonic_rs::from_str(&line) {
1022 Ok(d) => d,
1023 Err(_) => continue,
1024 };
1025
1026 // Apply filters
1027 if let Some(ref after_ts) = after_ts {
1028 if let Some(created_at) =
1029 data.get("createdAt").or_else(|| data.get("created_at"))
1030 {
1031 if let Some(ts_str) = created_at.as_str() {
1032 if ts_str < after_ts.as_str() {
1033 continue;
1034 }
1035 } else {
1036 continue;
1037 }
1038 } else {
1039 continue;
1040 }
1041 }
1042
1043 if let Some(ref did_filter) = did_filter {
1044 if let Some(did_val) = data.get("did") {
1045 if let Some(did_str) = did_val.as_str() {
1046 if did_str != did_filter {
1047 continue;
1048 }
1049 } else {
1050 continue;
1051 }
1052 } else {
1053 continue;
1054 }
1055 }
1056
1057 if let Some(ref op_type_filter) = op_type_filter {
1058 if let Some(op_val) = data.get("operation") {
1059 let matches = if op_val.is_str() {
1060 op_val.as_str().unwrap() == op_type_filter
1061 } else if op_val.is_object() {
1062 if let Some(typ_val) = op_val.get("type") {
1063 typ_val.is_str() && typ_val.as_str().unwrap() == op_type_filter
1064 } else {
1065 false
1066 }
1067 } else {
1068 false
1069 };
1070 if !matches {
1071 continue;
1072 }
1073 } else {
1074 continue;
1075 }
1076 }
1077
1078 // Format
1079 let formatted = match spec.format {
1080 0 => line, // JSONL - use original
1081 1 => sonic_rs::to_string_pretty(&data).unwrap_or_default(), // JSON
1082 2 => {
1083 let did = data.get("did").and_then(|v| v.as_str()).unwrap_or("");
1084 let op = data
1085 .get("operation")
1086 .map(|v| sonic_rs::to_string(v).unwrap_or_default())
1087 .unwrap_or_default();
1088 let created_at = data
1089 .get("createdAt")
1090 .or_else(|| data.get("created_at"))
1091 .and_then(|v| v.as_str())
1092 .unwrap_or("");
1093 let nullified = data
1094 .get("nullified")
1095 .and_then(|v| v.as_bool())
1096 .unwrap_or(false);
1097 format!("{},{},{},{}", did, op, created_at, nullified)
1098 }
1099 _ => line,
1100 };
1101
1102 output_buffer.extend_from_slice(formatted.as_bytes());
1103 output_buffer.push(b'\n');
1104 exported_count += 1;
1105 bytes_written += formatted.len() as u64 + 1;
1106
1107 // Flush buffer when large
1108 if output_buffer.len() >= 1024 * 1024 {
1109 let result = callback(
1110 output_buffer.as_ptr() as *const c_char,
1111 output_buffer.len(),
1112 user_data,
1113 );
1114 if result != 0 {
1115 break;
1116 }
1117 output_buffer.clear();
1118 }
1119 }
1120 }
1121 }
1122
1123 // Flush remaining buffer
1124 if !output_buffer.is_empty() {
1125 callback(
1126 output_buffer.as_ptr() as *const c_char,
1127 output_buffer.len(),
1128 user_data,
1129 );
1130 }
1131
1132 if !out_stats.is_null() {
1133 unsafe {
1134 (*out_stats).records_written = exported_count;
1135 (*out_stats).bytes_written = bytes_written;
1136 }
1137 }
1138
1139 0
1140}