High-performance implementation of plcbundle written in Rust
at main 1140 lines 35 kB view raw
1//! C-compatible FFI surface exposing BundleManager and related types for integration from other languages 2use crate::constants; 3use crate::manager::*; 4use crate::operations::*; 5use sonic_rs::JsonValueTrait; 6use std::ffi::{CStr, CString}; 7use std::os::raw::c_char; 8use std::path::PathBuf; 9use std::sync::Arc; 10 11// ============================================================================ 12// C-compatible types 13// ============================================================================ 14 15#[repr(C)] 16pub struct CBundleManager { 17 manager: Arc<BundleManager>, 18} 19 20#[repr(C)] 21pub struct CLoadOptions { 22 pub verify_hash: bool, 23 pub decompress: bool, 24 pub cache: bool, 25} 26 27#[repr(C)] 28pub struct CLoadResult { 29 pub success: bool, 30 pub operation_count: u32, 31 pub size_bytes: u64, 32 pub error_msg: *mut c_char, 33} 34 35#[repr(C)] 36#[derive(Clone, Copy)] 37pub struct COperation { 38 pub did: *mut c_char, 39 pub op_type: *mut c_char, 40 pub cid: *mut c_char, 41 pub nullified: bool, 42 pub created_at: *mut c_char, 43 pub json: *mut c_char, 44} 45 46#[repr(C)] 47pub struct COperationRequest { 48 pub bundle_num: u32, 49 pub operation_idx: usize, 50} 51 52#[repr(C)] 53pub struct CVerifyResult { 54 pub valid: bool, 55 pub hash_match: bool, 56 pub compression_ok: bool, 57 pub error_msg: *mut c_char, 58} 59 60#[repr(C)] 61pub struct CBundleInfo { 62 pub bundle_number: u32, 63 pub operation_count: u32, 64 pub did_count: u32, 65 pub compressed_size: u64, 66 pub uncompressed_size: u64, 67 pub start_time: *mut c_char, 68 pub end_time: *mut c_char, 69 pub hash: *mut c_char, 70} 71 72#[repr(C)] 73pub struct CManagerStats { 74 pub cache_hits: u64, 75 pub cache_misses: u64, 76 pub bytes_read: u64, 77 pub operations_processed: u64, 78} 79 80#[repr(C)] 81pub struct CDIDIndexStats { 82 pub total_dids: usize, 83 pub total_operations: usize, 84 pub index_size_bytes: usize, 85} 86 87#[repr(C)] 88pub struct CRebuildStats { 89 pub bundles_processed: u32, 90 pub operations_indexed: u64, 91 pub unique_dids: usize, 92 pub duration_ms: u64, 93} 94 95#[repr(C)] 96pub struct CExportSpec { 97 pub bundle_start: u32, 98 pub bundle_end: u32, 99 pub export_all: bool, 100 pub format: u8, // 0=jsonl, 1=json, 2=csv 101 pub count_limit: u64, // 0 = no limit 102 pub after_timestamp: *const c_char, // NULL = no filter 103 pub did_filter: *const c_char, // NULL = no filter 104 pub op_type_filter: *const c_char, // NULL = no filter 105} 106 107#[repr(C)] 108pub struct CExportStats { 109 pub records_written: u64, 110 pub bytes_written: u64, 111} 112 113// ============================================================================ 114// BundleManager lifecycle 115// ============================================================================ 116 117/// # Safety 118/// 119/// The caller must ensure `bundle_dir` is a valid, NUL-terminated C string 120/// pointer. The returned pointer is owned by the caller and must be freed 121/// with `bundle_manager_free` when no longer needed. Passing a null or 122/// invalid pointer is undefined behavior. 123#[unsafe(no_mangle)] 124pub unsafe extern "C" fn bundle_manager_new(bundle_dir: *const c_char) -> *mut CBundleManager { 125 let bundle_dir = unsafe { 126 if bundle_dir.is_null() { 127 return std::ptr::null_mut(); 128 } 129 match CStr::from_ptr(bundle_dir).to_str() { 130 Ok(s) => PathBuf::from(s), 131 Err(_) => return std::ptr::null_mut(), 132 } 133 }; 134 135 match BundleManager::new(bundle_dir, ()) { 136 Ok(manager) => Box::into_raw(Box::new(CBundleManager { 137 manager: Arc::new(manager), 138 })), 139 Err(_) => std::ptr::null_mut(), 140 } 141} 142 143/// # Safety 144/// 145/// The caller must ensure `manager` is a pointer previously returned by 146/// `bundle_manager_new` and not already freed. Passing invalid or dangling 147/// pointers is undefined behavior. 148#[unsafe(no_mangle)] 149pub unsafe extern "C" fn bundle_manager_free(manager: *mut CBundleManager) { 150 if !manager.is_null() { 151 unsafe { 152 let _ = Box::from_raw(manager); 153 } 154 } 155} 156 157// ============================================================================ 158// Smart Loading 159// ============================================================================ 160 161/// # Safety 162/// 163/// The `manager` pointer must be valid. `options` may be NULL to use defaults. 164/// `out_result` must be a valid writable pointer to a `CLoadResult` struct. 165/// Strings passed to this API must be NUL-terminated. Violating these 166/// requirements is undefined behavior. 167#[unsafe(no_mangle)] 168pub unsafe extern "C" fn bundle_manager_load_bundle( 169 manager: *const CBundleManager, 170 bundle_num: u32, 171 options: *const CLoadOptions, 172 out_result: *mut CLoadResult, 173) -> i32 { 174 if manager.is_null() || out_result.is_null() { 175 return -1; 176 } 177 178 let manager = unsafe { &*manager }; 179 180 let load_opts = if options.is_null() { 181 LoadOptions::default() 182 } else { 183 let opts = unsafe { &*options }; 184 LoadOptions { 185 cache: opts.cache, 186 decompress: opts.decompress, 187 filter: None, 188 limit: None, 189 } 190 }; 191 192 match manager.manager.load_bundle(bundle_num, load_opts) { 193 Ok(result) => { 194 unsafe { 195 (*out_result).success = true; 196 (*out_result).operation_count = result.operations.len() as u32; 197 (*out_result).size_bytes = 0; // TODO: calculate actual size 198 (*out_result).error_msg = std::ptr::null_mut(); 199 } 200 0 201 } 202 Err(e) => { 203 let err_msg = CString::new(e.to_string()).unwrap(); 204 unsafe { 205 (*out_result).success = false; 206 (*out_result).operation_count = 0; 207 (*out_result).size_bytes = 0; 208 (*out_result).error_msg = err_msg.into_raw(); 209 } 210 -1 211 } 212 } 213} 214 215// ============================================================================ 216// Batch Operations 217// ============================================================================ 218 219/// # Safety 220/// 221/// The `manager` pointer must be valid. `requests` must point to `count` 222/// valid `COperationRequest` items. `out_operations` and `out_count` must 223/// be valid writable pointers. Returned `COperation` arrays must be freed 224/// by the caller using `bundle_manager_free_operations` if applicable. 225#[unsafe(no_mangle)] 226pub unsafe extern "C" fn bundle_manager_get_operations_batch( 227 manager: *const CBundleManager, 228 requests: *const COperationRequest, 229 count: usize, 230 out_operations: *mut *mut COperation, 231 out_count: *mut usize, 232) -> i32 { 233 if manager.is_null() || requests.is_null() || out_operations.is_null() { 234 return -1; 235 } 236 237 let manager = unsafe { &*manager }; 238 let requests_slice = unsafe { std::slice::from_raw_parts(requests, count) }; 239 240 let mut op_requests = Vec::new(); 241 for req in requests_slice { 242 op_requests.push(OperationRequest { 243 bundle: req.bundle_num, 244 index: Some(req.operation_idx), 245 filter: None, 246 }); 247 } 248 249 match manager.manager.get_operations_batch(op_requests) { 250 Ok(operations) => { 251 let mut c_ops = Vec::new(); 252 for op in operations { 253 c_ops.push(operation_to_c(op)); 254 } 255 256 unsafe { 257 *out_count = c_ops.len(); 258 *out_operations = c_ops.as_mut_ptr(); 259 } 260 std::mem::forget(c_ops); 261 0 262 } 263 Err(_) => -1, 264 } 265} 266 267// ============================================================================ 268// DID Operations 269// ============================================================================ 270 271/// # Safety 272/// 273/// The `manager` pointer must be valid. `did` must be a valid NUL-terminated 274/// C string. `out_operations` and `out_count` must be valid writable pointers 275/// to receive results. Returned memory ownership rules are documented in the 276/// API and must be followed by the caller. 277#[unsafe(no_mangle)] 278pub unsafe extern "C" fn bundle_manager_get_did_operations( 279 manager: *const CBundleManager, 280 did: *const c_char, 281 out_operations: *mut *mut COperation, 282 out_count: *mut usize, 283) -> i32 { 284 if manager.is_null() || did.is_null() || out_operations.is_null() { 285 return -1; 286 } 287 288 let manager = unsafe { &*manager }; 289 let did = unsafe { 290 match CStr::from_ptr(did).to_str() { 291 Ok(s) => s, 292 Err(_) => return -1, 293 } 294 }; 295 296 match manager.manager.get_did_operations(did, false, false) { 297 Ok(result) => { 298 let mut c_ops = Vec::new(); 299 for op in result.operations { 300 c_ops.push(operation_to_c(op)); 301 } 302 303 unsafe { 304 *out_count = c_ops.len(); 305 *out_operations = c_ops.as_mut_ptr(); 306 } 307 std::mem::forget(c_ops); 308 0 309 } 310 Err(_) => -1, 311 } 312} 313 314/// # Safety 315/// 316/// `manager` must be valid. `dids` must point to `did_count` valid 317/// NUL-terminated C string pointers. `callback` will be called from this 318/// function and must be a valid function pointer. The caller must ensure the 319/// callback and its environment remain valid for the duration of the call. 320#[unsafe(no_mangle)] 321pub unsafe extern "C" fn bundle_manager_batch_resolve_dids( 322 manager: *const CBundleManager, 323 dids: *const *const c_char, 324 did_count: usize, 325 callback: extern "C" fn(*const c_char, *const COperation, usize), 326) -> i32 { 327 if manager.is_null() || dids.is_null() { 328 return -1; 329 } 330 331 let manager = unsafe { &*manager }; 332 let dids_slice = unsafe { std::slice::from_raw_parts(dids, did_count) }; 333 334 let mut did_strings = Vec::new(); 335 for did_ptr in dids_slice { 336 let did = unsafe { 337 match CStr::from_ptr(*did_ptr).to_str() { 338 Ok(s) => s.to_string(), 339 Err(_) => return -1, 340 } 341 }; 342 did_strings.push(did); 343 } 344 345 match manager.manager.batch_resolve_dids(did_strings) { 346 Ok(results) => { 347 for (did, operations) in results { 348 let c_did = CString::new(did).unwrap(); 349 let c_ops: Vec<COperation> = operations.into_iter().map(operation_to_c).collect(); 350 351 callback(c_did.as_ptr(), c_ops.as_ptr(), c_ops.len()); 352 353 for op in c_ops { 354 free_c_operation(op); 355 } 356 } 357 0 358 } 359 Err(_) => -1, 360 } 361} 362 363// ============================================================================ 364// Query 365// ============================================================================ 366 367/// # Safety 368/// 369/// `manager` must be valid. `query_str` must be a valid NUL-terminated C 370/// string. `out_operations` and `out_count` must be valid writable pointers. 371/// The caller is responsible for freeing any returned memory according to the 372/// API's ownership rules. 373#[unsafe(no_mangle)] 374pub unsafe extern "C" fn bundle_manager_query( 375 manager: *const CBundleManager, 376 query_str: *const c_char, 377 bundle_start: u32, 378 bundle_end: u32, 379 out_operations: *mut *mut COperation, 380 out_count: *mut usize, 381) -> i32 { 382 if manager.is_null() || query_str.is_null() || out_operations.is_null() || out_count.is_null() { 383 return -1; 384 } 385 386 let manager = unsafe { &*manager }; 387 let query = unsafe { 388 match CStr::from_ptr(query_str).to_str() { 389 Ok(s) => s.to_string(), 390 Err(_) => return -1, 391 } 392 }; 393 394 // Determine bundle range 395 let (start, end) = if bundle_start == 0 && bundle_end == 0 { 396 // Query all bundles if not specified (like Rust CLI) 397 (1, manager.manager.get_last_bundle()) 398 } else if bundle_end == 0 { 399 (bundle_start, bundle_start) 400 } else { 401 (bundle_start, bundle_end) 402 }; 403 404 // Simple query: search through operations for matching fields 405 let mut c_ops = Vec::new(); 406 for bundle_num in start..=end { 407 let result = manager 408 .manager 409 .load_bundle(bundle_num, LoadOptions::default()); 410 if let Ok(load_result) = result { 411 for op in load_result.operations { 412 // Simple string matching in operation JSON 413 let op_json = op.operation.to_string(); 414 if op_json.contains(&query) || op.did.contains(&query) { 415 c_ops.push(operation_to_c(op)); 416 } 417 } 418 } 419 } 420 421 unsafe { 422 *out_count = c_ops.len(); 423 *out_operations = c_ops.as_mut_ptr(); 424 } 425 std::mem::forget(c_ops); 426 0 427} 428 429// ============================================================================ 430// Verification 431// ============================================================================ 432 433/// # Safety 434/// 435/// `manager` must be a valid pointer. `out_result` must be a valid writable 436/// pointer to `CVerifyResult`. Passing invalid or null pointers is undefined 437/// behavior. 438#[unsafe(no_mangle)] 439pub unsafe extern "C" fn bundle_manager_verify_bundle( 440 manager: *const CBundleManager, 441 bundle_num: u32, 442 check_hash: bool, 443 _check_chain: bool, 444 out_result: *mut CVerifyResult, 445) -> i32 { 446 if manager.is_null() || out_result.is_null() { 447 return -1; 448 } 449 450 let manager = unsafe { &*manager }; 451 452 let spec = VerifySpec { 453 check_hash, 454 check_content_hash: check_hash, 455 check_operations: true, 456 fast: false, 457 }; 458 459 match manager.manager.verify_bundle(bundle_num, spec) { 460 Ok(result) => { 461 unsafe { 462 (*out_result).valid = result.valid; 463 (*out_result).hash_match = result.valid; // TODO: return actual hash match result 464 (*out_result).compression_ok = true; // TODO: return actual compression result 465 (*out_result).error_msg = std::ptr::null_mut(); 466 } 467 0 468 } 469 Err(e) => { 470 let err_msg = CString::new(e.to_string()).unwrap(); 471 unsafe { 472 (*out_result).valid = false; 473 (*out_result).hash_match = false; 474 (*out_result).compression_ok = false; 475 (*out_result).error_msg = err_msg.into_raw(); 476 } 477 -1 478 } 479 } 480} 481 482/// # Safety 483/// 484/// `manager` must be a valid pointer. Calling this function with invalid 485/// pointers is undefined behavior. 486#[unsafe(no_mangle)] 487pub unsafe extern "C" fn bundle_manager_verify_chain( 488 manager: *const CBundleManager, 489 start_bundle: u32, 490 end_bundle: u32, 491) -> i32 { 492 if manager.is_null() { 493 return -1; 494 } 495 496 let manager = unsafe { &*manager }; 497 498 let spec = ChainVerifySpec { 499 start_bundle, 500 end_bundle: Some(end_bundle), 501 check_parent_links: true, 502 }; 503 504 match manager.manager.verify_chain(spec) { 505 Ok(result) => { 506 if result.valid { 507 0 508 } else { 509 -1 510 } 511 } 512 Err(_) => -1, 513 } 514} 515 516// ============================================================================ 517// Info 518// ============================================================================ 519 520/// # Safety 521/// 522/// `manager` must be valid. `out_info` must be a valid writable pointer to 523/// `CBundleInfo`. Any returned string pointers must be freed by the caller as 524/// documented by the API. 525#[unsafe(no_mangle)] 526pub unsafe extern "C" fn bundle_manager_get_bundle_info( 527 manager: *const CBundleManager, 528 bundle_num: u32, 529 include_operations: bool, 530 _include_dids: bool, 531 out_info: *mut CBundleInfo, 532) -> i32 { 533 if manager.is_null() || out_info.is_null() { 534 return -1; 535 } 536 537 let manager = unsafe { &*manager }; 538 539 let flags = InfoFlags { 540 include_operations, 541 include_size_info: true, 542 }; 543 544 match manager.manager.get_bundle_info(bundle_num, flags) { 545 Ok(info) => { 546 unsafe { 547 (*out_info).bundle_number = info.metadata.bundle_number; 548 (*out_info).operation_count = info.metadata.operation_count; 549 (*out_info).did_count = info.metadata.did_count; 550 (*out_info).compressed_size = info.metadata.compressed_size; 551 (*out_info).uncompressed_size = info.metadata.uncompressed_size; 552 (*out_info).start_time = CString::new(info.metadata.start_time).unwrap().into_raw(); 553 (*out_info).end_time = CString::new(info.metadata.end_time).unwrap().into_raw(); 554 (*out_info).hash = CString::new(info.metadata.hash).unwrap().into_raw(); 555 } 556 0 557 } 558 Err(_) => -1, 559 } 560} 561 562// ============================================================================ 563// Cache Management 564// ============================================================================ 565 566/// # Safety 567/// 568/// `manager` must be valid. `bundle_nums` must point to `count` valid u32 569/// values. Passing invalid pointers is undefined behavior. 570#[unsafe(no_mangle)] 571pub unsafe extern "C" fn bundle_manager_prefetch_bundles( 572 manager: *const CBundleManager, 573 bundle_nums: *const u32, 574 count: usize, 575) -> i32 { 576 if manager.is_null() || bundle_nums.is_null() { 577 return -1; 578 } 579 580 let manager = unsafe { &*manager }; 581 let bundles = unsafe { std::slice::from_raw_parts(bundle_nums, count) }; 582 583 match manager.manager.prefetch_bundles(bundles.to_vec()) { 584 Ok(_) => 0, 585 Err(_) => -1, 586 } 587} 588 589/// # Safety 590/// 591/// `manager` must be valid. Passing invalid pointers is undefined behavior. 592#[unsafe(no_mangle)] 593pub unsafe extern "C" fn bundle_manager_warm_up( 594 manager: *const CBundleManager, 595 strategy: u8, 596 start_bundle: u32, 597 end_bundle: u32, 598) -> i32 { 599 if manager.is_null() { 600 return -1; 601 } 602 603 let manager = unsafe { &*manager }; 604 605 let warm_strategy = match strategy { 606 0 => WarmUpStrategy::Recent(10), // Default count for recent 607 1 => WarmUpStrategy::All, 608 2 => WarmUpStrategy::Range(start_bundle, end_bundle), 609 _ => return -1, 610 }; 611 612 let spec = WarmUpSpec { 613 strategy: warm_strategy, 614 }; 615 616 match manager.manager.warm_up(spec) { 617 Ok(_) => 0, 618 Err(_) => -1, 619 } 620} 621 622/// # Safety 623/// 624/// `manager` must be a valid pointer previously returned by 625/// `bundle_manager_new`. 626#[unsafe(no_mangle)] 627pub unsafe extern "C" fn bundle_manager_clear_caches(manager: *const CBundleManager) -> i32 { 628 if manager.is_null() { 629 return -1; 630 } 631 632 let manager = unsafe { &*manager }; 633 manager.manager.clear_caches(); 634 0 635} 636 637// ============================================================================ 638// DID Index 639// ============================================================================ 640 641/// # Safety 642/// 643/// `manager` must be valid. `out_stats` must be a valid writable pointer if 644/// provided. The caller must ensure `progress_callback` is a valid function 645/// pointer if passed. 646#[unsafe(no_mangle)] 647pub unsafe extern "C" fn bundle_manager_rebuild_did_index( 648 manager: *const CBundleManager, 649 progress_callback: Option<extern "C" fn(u32, u32, u64, u64)>, 650 out_stats: *mut CRebuildStats, 651) -> i32 { 652 if manager.is_null() { 653 return -1; 654 } 655 656 let manager = unsafe { &*manager }; 657 658 let callback = progress_callback.map(|cb| { 659 Box::new( 660 move |current: u32, total: u32, bytes_processed: u64, total_bytes: u64| { 661 cb(current, total, bytes_processed, total_bytes); 662 }, 663 ) as Box<dyn Fn(u32, u32, u64, u64) + Send + Sync> 664 }); 665 666 // Use default flush interval for FFI 667 match manager 668 .manager 669 .build_did_index(constants::DID_INDEX_FLUSH_INTERVAL, callback, None, None) 670 { 671 Ok(stats) => { 672 if !out_stats.is_null() { 673 unsafe { 674 (*out_stats).bundles_processed = stats.bundles_processed; 675 (*out_stats).operations_indexed = stats.operations_indexed; 676 (*out_stats).unique_dids = 0; // TODO: track unique DIDs 677 (*out_stats).duration_ms = 0; // TODO: track duration 678 } 679 } 680 0 681 } 682 Err(_) => -1, 683 } 684} 685 686/// # Safety 687/// 688/// `manager` must be valid. `out_stats` must be a valid writable pointer. 689#[unsafe(no_mangle)] 690pub unsafe extern "C" fn bundle_manager_get_did_index_stats( 691 manager: *const CBundleManager, 692 out_stats: *mut CDIDIndexStats, 693) -> i32 { 694 if manager.is_null() || out_stats.is_null() { 695 return -1; 696 } 697 698 let manager = unsafe { &*manager }; 699 700 let stats = manager.manager.get_did_index_stats(); 701 unsafe { 702 (*out_stats).total_dids = stats 703 .get("total_dids") 704 .and_then(|v| v.as_i64()) 705 .unwrap_or(0) as usize; 706 (*out_stats).total_operations = stats 707 .get("total_entries") 708 .and_then(|v| v.as_i64()) 709 .unwrap_or(0) as usize; 710 (*out_stats).index_size_bytes = 0; // TODO: track actual index size 711 } 712 0 713} 714 715// ============================================================================ 716// Observability 717// ============================================================================ 718 719/// # Safety 720/// 721/// `manager` must be valid. `out_stats` must be a valid writable pointer. 722#[unsafe(no_mangle)] 723pub unsafe extern "C" fn bundle_manager_get_stats( 724 manager: *const CBundleManager, 725 out_stats: *mut CManagerStats, 726) -> i32 { 727 if manager.is_null() || out_stats.is_null() { 728 return -1; 729 } 730 731 let manager = unsafe { &*manager }; 732 733 let stats = manager.manager.get_stats(); 734 unsafe { 735 (*out_stats).cache_hits = stats.cache_hits; 736 (*out_stats).cache_misses = stats.cache_misses; 737 (*out_stats).bytes_read = stats.bundles_loaded; // TODO: track actual bytes read 738 (*out_stats).operations_processed = stats.operations_read; 739 } 740 0 741} 742 743// ============================================================================ 744// Helper functions 745// ============================================================================ 746 747fn operation_to_c(op: Operation) -> COperation { 748 // Extract operation type from the operation object 749 let op_type = op 750 .operation 751 .get("type") 752 .and_then(|v| v.as_str()) 753 .unwrap_or("unknown") 754 .to_string(); 755 756 COperation { 757 did: CString::new(op.did).unwrap().into_raw(), 758 op_type: CString::new(op_type).unwrap().into_raw(), 759 cid: op 760 .cid 761 .map(|s| CString::new(s).unwrap().into_raw()) 762 .unwrap_or(std::ptr::null_mut()), 763 nullified: op.nullified, 764 created_at: CString::new(op.created_at).unwrap().into_raw(), 765 json: CString::new(op.operation.to_string()).unwrap().into_raw(), 766 } 767} 768 769fn free_c_operation(op: COperation) { 770 unsafe { 771 if !op.did.is_null() { 772 let _ = CString::from_raw(op.did); 773 } 774 if !op.op_type.is_null() { 775 let _ = CString::from_raw(op.op_type); 776 } 777 if !op.cid.is_null() { 778 let _ = CString::from_raw(op.cid); 779 } 780 if !op.created_at.is_null() { 781 let _ = CString::from_raw(op.created_at); 782 } 783 if !op.json.is_null() { 784 let _ = CString::from_raw(op.json); 785 } 786 } 787} 788 789/// # Safety 790/// 791/// `s` must be a pointer previously returned by this API that is safe to 792/// free. Passing invalid pointers is undefined behavior. 793#[unsafe(no_mangle)] 794pub unsafe extern "C" fn bundle_manager_free_string(s: *mut c_char) { 795 if !s.is_null() { 796 unsafe { 797 let _ = CString::from_raw(s); 798 } 799 } 800} 801 802/// # Safety 803/// 804/// `op` must be a pointer previously returned by this API and safe to free. 805#[unsafe(no_mangle)] 806pub unsafe extern "C" fn bundle_manager_free_operation(op: *mut COperation) { 807 if !op.is_null() { 808 unsafe { 809 let op_val = *op; 810 free_c_operation(op_val); 811 } 812 } 813} 814 815/// # Safety 816/// 817/// `ops` must point to an array of `count` `COperation` previously returned 818/// by this API and safe to free. 819#[unsafe(no_mangle)] 820pub unsafe extern "C" fn bundle_manager_free_operations(ops: *mut COperation, count: usize) { 821 if !ops.is_null() { 822 unsafe { 823 let ops_slice = std::slice::from_raw_parts(ops, count); 824 for op in ops_slice { 825 free_c_operation(*op); 826 } 827 let _ = Vec::from_raw_parts(ops, count, count); 828 } 829 } 830} 831 832// ============================================================================ 833// Export 834// ============================================================================ 835 836/// Callback type for streaming export 837/// Returns 0 to continue, non-zero to stop 838pub type ExportCallback = 839 extern "C" fn(data: *const c_char, len: usize, user_data: *mut std::ffi::c_void) -> i32; 840 841/// # Safety 842/// 843/// `manager` must be valid. `spec` must point to a valid `CExportSpec` and 844/// `callback` must be a valid function pointer. `out_stats` must be a valid 845/// writable pointer if provided. 846#[unsafe(no_mangle)] 847pub unsafe extern "C" fn bundle_manager_export( 848 manager: *const CBundleManager, 849 spec: *const CExportSpec, 850 callback: ExportCallback, 851 user_data: *mut std::ffi::c_void, 852 out_stats: *mut CExportStats, 853) -> i32 { 854 if manager.is_null() || spec.is_null() || callback as usize == 0 { 855 return -1; 856 } 857 858 let manager = unsafe { &*manager }; 859 let spec = unsafe { &*spec }; 860 861 use crate::index::Index; 862 use std::fs::File; 863 use std::io::BufRead; 864 865 // Get directory path 866 let dir_path = manager.manager.directory().clone(); 867 // Load index 868 let index = match Index::load(&dir_path) { 869 Ok(idx) => idx, 870 Err(_) => return -1, 871 }; 872 873 // Determine bundle range 874 let bundle_numbers: Vec<u32> = if spec.export_all { 875 (1..=index.last_bundle).collect() 876 } else if spec.bundle_end > 0 && spec.bundle_end >= spec.bundle_start { 877 (spec.bundle_start..=spec.bundle_end).collect() 878 } else { 879 vec![spec.bundle_start] 880 }; 881 882 // Filter bundles by timestamp if specified 883 let bundle_numbers: Vec<u32> = if !spec.after_timestamp.is_null() { 884 let after_ts = unsafe { 885 match CStr::from_ptr(spec.after_timestamp).to_str() { 886 Ok(s) => s.to_string(), 887 Err(_) => return -1, 888 } 889 }; 890 bundle_numbers 891 .into_iter() 892 .filter(|num| match index.get_bundle(*num) { 893 Some(meta) => meta.end_time >= after_ts, 894 None => true, 895 }) 896 .collect() 897 } else { 898 bundle_numbers 899 }; 900 901 let mut exported_count = 0u64; 902 let mut bytes_written = 0u64; 903 let mut output_buffer = Vec::with_capacity(1024 * 1024); 904 905 // Determine if we need parsing 906 let needs_parsing = !spec.after_timestamp.is_null() 907 || !spec.did_filter.is_null() 908 || !spec.op_type_filter.is_null() 909 || spec.format == 1 910 || spec.format == 2; // JSON or CSV 911 912 // Process bundles 913 for bundle_num in bundle_numbers { 914 if spec.count_limit > 0 && exported_count >= spec.count_limit { 915 break; 916 } 917 918 let bundle_path = constants::bundle_path(&dir_path, bundle_num); 919 if !bundle_path.exists() { 920 continue; 921 } 922 923 let file = match File::open(&bundle_path) { 924 Ok(f) => f, 925 Err(_) => continue, 926 }; 927 928 let decoder = match zstd::Decoder::new(file) { 929 Ok(d) => d, 930 Err(_) => continue, 931 }; 932 933 let reader = std::io::BufReader::with_capacity(1024 * 1024, decoder); 934 935 if !needs_parsing { 936 // Fast path: no parsing 937 for line in reader.lines() { 938 if spec.count_limit > 0 && exported_count >= spec.count_limit { 939 break; 940 } 941 942 let line = match line { 943 Ok(l) => l, 944 Err(_) => continue, 945 }; 946 947 if line.is_empty() { 948 continue; 949 } 950 951 output_buffer.extend_from_slice(line.as_bytes()); 952 output_buffer.push(b'\n'); 953 exported_count += 1; 954 bytes_written += line.len() as u64 + 1; 955 956 // Flush buffer when large 957 if output_buffer.len() >= 1024 * 1024 { 958 let result = callback( 959 output_buffer.as_ptr() as *const c_char, 960 output_buffer.len(), 961 user_data, 962 ); 963 if result != 0 { 964 break; 965 } 966 output_buffer.clear(); 967 } 968 } 969 } else { 970 // Slow path: parse and filter 971 use sonic_rs::JsonValueTrait; 972 973 let after_ts = if !spec.after_timestamp.is_null() { 974 Some(unsafe { 975 CStr::from_ptr(spec.after_timestamp) 976 .to_str() 977 .unwrap() 978 .to_string() 979 }) 980 } else { 981 None 982 }; 983 984 let did_filter = if !spec.did_filter.is_null() { 985 Some(unsafe { 986 CStr::from_ptr(spec.did_filter) 987 .to_str() 988 .unwrap() 989 .to_string() 990 }) 991 } else { 992 None 993 }; 994 995 let op_type_filter = if !spec.op_type_filter.is_null() { 996 Some(unsafe { 997 CStr::from_ptr(spec.op_type_filter) 998 .to_str() 999 .unwrap() 1000 .to_string() 1001 }) 1002 } else { 1003 None 1004 }; 1005 1006 for line in reader.lines() { 1007 if spec.count_limit > 0 && exported_count >= spec.count_limit { 1008 break; 1009 } 1010 1011 let line = match line { 1012 Ok(l) => l, 1013 Err(_) => continue, 1014 }; 1015 1016 if line.is_empty() { 1017 continue; 1018 } 1019 1020 // Parse JSON 1021 let data: sonic_rs::Value = match sonic_rs::from_str(&line) { 1022 Ok(d) => d, 1023 Err(_) => continue, 1024 }; 1025 1026 // Apply filters 1027 if let Some(ref after_ts) = after_ts { 1028 if let Some(created_at) = 1029 data.get("createdAt").or_else(|| data.get("created_at")) 1030 { 1031 if let Some(ts_str) = created_at.as_str() { 1032 if ts_str < after_ts.as_str() { 1033 continue; 1034 } 1035 } else { 1036 continue; 1037 } 1038 } else { 1039 continue; 1040 } 1041 } 1042 1043 if let Some(ref did_filter) = did_filter { 1044 if let Some(did_val) = data.get("did") { 1045 if let Some(did_str) = did_val.as_str() { 1046 if did_str != did_filter { 1047 continue; 1048 } 1049 } else { 1050 continue; 1051 } 1052 } else { 1053 continue; 1054 } 1055 } 1056 1057 if let Some(ref op_type_filter) = op_type_filter { 1058 if let Some(op_val) = data.get("operation") { 1059 let matches = if op_val.is_str() { 1060 op_val.as_str().unwrap() == op_type_filter 1061 } else if op_val.is_object() { 1062 if let Some(typ_val) = op_val.get("type") { 1063 typ_val.is_str() && typ_val.as_str().unwrap() == op_type_filter 1064 } else { 1065 false 1066 } 1067 } else { 1068 false 1069 }; 1070 if !matches { 1071 continue; 1072 } 1073 } else { 1074 continue; 1075 } 1076 } 1077 1078 // Format 1079 let formatted = match spec.format { 1080 0 => line, // JSONL - use original 1081 1 => sonic_rs::to_string_pretty(&data).unwrap_or_default(), // JSON 1082 2 => { 1083 let did = data.get("did").and_then(|v| v.as_str()).unwrap_or(""); 1084 let op = data 1085 .get("operation") 1086 .map(|v| sonic_rs::to_string(v).unwrap_or_default()) 1087 .unwrap_or_default(); 1088 let created_at = data 1089 .get("createdAt") 1090 .or_else(|| data.get("created_at")) 1091 .and_then(|v| v.as_str()) 1092 .unwrap_or(""); 1093 let nullified = data 1094 .get("nullified") 1095 .and_then(|v| v.as_bool()) 1096 .unwrap_or(false); 1097 format!("{},{},{},{}", did, op, created_at, nullified) 1098 } 1099 _ => line, 1100 }; 1101 1102 output_buffer.extend_from_slice(formatted.as_bytes()); 1103 output_buffer.push(b'\n'); 1104 exported_count += 1; 1105 bytes_written += formatted.len() as u64 + 1; 1106 1107 // Flush buffer when large 1108 if output_buffer.len() >= 1024 * 1024 { 1109 let result = callback( 1110 output_buffer.as_ptr() as *const c_char, 1111 output_buffer.len(), 1112 user_data, 1113 ); 1114 if result != 0 { 1115 break; 1116 } 1117 output_buffer.clear(); 1118 } 1119 } 1120 } 1121 } 1122 1123 // Flush remaining buffer 1124 if !output_buffer.is_empty() { 1125 callback( 1126 output_buffer.as_ptr() as *const c_char, 1127 output_buffer.len(), 1128 user_data, 1129 ); 1130 } 1131 1132 if !out_stats.is_null() { 1133 unsafe { 1134 (*out_stats).records_written = exported_count; 1135 (*out_stats).bytes_written = bytes_written; 1136 } 1137 } 1138 1139 0 1140}