Python bindings to oxyroot. Makes reading .root files blazing fast ๐Ÿš€

Add concat_trees function with rayon

+200 -59
+5
Cargo.toml
··· 17 17 arrow = "53.0.0" 18 18 polars = "0.50.0" 19 19 pyo3-polars = "0.23.1" 20 + rayon = "1.10.0" 21 + glob = "0.3.1" 22 + once_cell = "1.19.0" 23 + num_cpus = "1.16.0" 24 + polars-core = "0.50.0"
+20
README.md
··· 58 58 ) 59 59 ``` 60 60 61 + ## Combining Multiple Files 62 + 63 + You can efficiently read and concatenate a TTree from multiple ROOT files into a single Polars DataFrame using `concat_trees`. This function processes files in parallel to maximize performance. 64 + 65 + ```python 66 + import oxyroot 67 + 68 + # Combine trees from multiple files using a wildcard 69 + df = oxyroot.concat_trees(paths=["ntuples*.root"], tree_name="mu_mc") 70 + 71 + print(df) 72 + 73 + # You can also provide a list of specific files 74 + # df = oxyroot.concat_trees(paths=["file1.root", "file2.root"], tree_name="my_tree") 75 + 76 + # Control the number of threads used for parallel processing 77 + # By default, it uses half the available CPU cores. 78 + oxyroot.set_num_threads(4) 79 + ``` 80 + 61 81 ## Performance 62 82 63 83 `oxyroot` is intended to be fast. Here is a simple benchmark comparing the time taken to read all branches of a TTree with `uproot` and `oxyroot`.
+31
python/oxyroot/__init__.pyi
··· 1 1 from typing import Iterator, List, Optional 2 2 import numpy as np 3 + import polars as pl 3 4 4 5 class RootFile: 5 6 path: str ··· 13 14 def branches(self) -> List[str]: ... 14 15 def __getitem__(self, name: str) -> Branch: ... 15 16 def __iter__(self) -> Iterator[Branch]: ... 17 + def arrays(self, columns:Optional[List[str]] = None, ignore_columns: Optional[List[str]] = None) -> pl.DataFrame ... 16 18 def to_parquet(self, output_file: str, overwrite: bool = False, compression: str = "snappy", columns: Optional[List[str]] = None) -> None: ... 17 19 18 20 class Branch: ··· 40 42 A RootFile object. 41 43 """ 42 44 ... 45 + 46 + def concat_trees( 47 + paths: List[str], 48 + tree_name: str, 49 + columns: Optional[List[str]] = None, 50 + ignore_columns: Optional[List[str]] = None, 51 + ) -> pl.DataFrame: 52 + """ 53 + Reads multiple ROOT files, concatenates the specified tree, and returns a single Polars DataFrame. 54 + 55 + Args: 56 + paths: A list of paths to the ROOT files. Wildcards are supported. 57 + tree_name: The name of the tree to read from each file. 58 + columns: An optional list of column names to include. If None, all columns are included. 59 + ignore_columns: An optional list of column names to exclude. 60 + 61 + Returns: 62 + A single Polars DataFrame containing the concatenated data. 63 + """ 64 + ... 65 + 66 + def set_num_threads(num_threads: int) -> None: 67 + """ 68 + Sets the number of threads to use for parallel operations. 69 + 70 + Args: 71 + num_threads: The number of threads to use. 72 + """ 73 + ...
+144 -59
src/lib.rs
··· 11 11 }; 12 12 use arrow::datatypes::{DataType, Field, Schema}; 13 13 use arrow::record_batch::RecordBatch; 14 + use once_cell::sync::Lazy; 15 + use parking_lot::Mutex; 14 16 use parquet::arrow::ArrowWriter; 15 17 use parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel}; 16 18 use parquet::file::properties::WriterProperties; 17 19 use polars::prelude::*; 20 + use polars_core::utils::concat_df; 18 21 use pyo3_polars::PyDataFrame; 22 + use rayon::prelude::*; 23 + 24 + static POOL: Lazy<Mutex<rayon::ThreadPool>> = Lazy::new(|| { 25 + let num_threads = std::cmp::max(1, num_cpus::get() / 2); 26 + let pool = rayon::ThreadPoolBuilder::new() 27 + .num_threads(num_threads) 28 + .build() 29 + .unwrap(); 30 + Mutex::new(pool) 31 + }); 32 + 33 + #[pyfunction] 34 + fn set_num_threads(num_threads: usize) -> PyResult<()> { 35 + let pool = rayon::ThreadPoolBuilder::new() 36 + .num_threads(num_threads) 37 + .build() 38 + .map_err(|e| PyValueError::new_err(e.to_string()))?; 39 + *POOL.lock() = pool; 40 + Ok(()) 41 + } 19 42 20 43 #[pyclass(name = "RootFile")] 21 44 struct PyRootFile { ··· 41 64 name: String, 42 65 } 43 66 67 + fn tree_to_dataframe( 68 + tree: &::oxyroot::ReaderTree, 69 + columns: Option<Vec<String>>, 70 + ignore_columns: Option<Vec<String>>, 71 + ) -> PyResult<DataFrame> { 72 + let mut branches_to_save = if let Some(columns) = columns { 73 + columns 74 + } else { 75 + tree.branches().map(|b| b.name().to_string()).collect() 76 + }; 77 + 78 + if let Some(ignore_columns) = ignore_columns { 79 + branches_to_save.retain(|c| !ignore_columns.contains(c)); 80 + } 81 + 82 + let mut series_vec = Vec::new(); 83 + 84 + for branch_name in branches_to_save { 85 + let branch = match tree.branch(&branch_name) { 86 + Some(branch) => branch, 87 + None => { 88 + println!("Branch '{}' not found, skipping", branch_name); 89 + continue; 90 + } 91 + }; 92 + 93 + let series = match branch.item_type_name().as_str() { 94 + "float" => { 95 + let data = branch.as_iter::<f32>().unwrap().collect::<Vec<_>>(); 96 + Series::new((&branch_name).into(), data) 97 + } 98 + "double" => { 99 + let data = branch.as_iter::<f64>().unwrap().collect::<Vec<_>>(); 100 + Series::new((&branch_name).into(), data) 101 + } 102 + "int32_t" => { 103 + let data = branch.as_iter::<i32>().unwrap().collect::<Vec<_>>(); 104 + Series::new((&branch_name).into(), data) 105 + } 106 + "int64_t" => { 107 + let data = branch.as_iter::<i64>().unwrap().collect::<Vec<_>>(); 108 + Series::new((&branch_name).into(), data) 109 + } 110 + "uint32_t" => { 111 + let data = branch.as_iter::<u32>().unwrap().collect::<Vec<_>>(); 112 + Series::new((&branch_name).into(), data) 113 + } 114 + "uint64_t" => { 115 + let data = branch.as_iter::<u64>().unwrap().collect::<Vec<_>>(); 116 + Series::new((&branch_name).into(), data) 117 + } 118 + "string" => { 119 + let data = branch.as_iter::<String>().unwrap().collect::<Vec<_>>(); 120 + Series::new((&branch_name).into(), data) 121 + } 122 + other => { 123 + println!("Unsupported branch type: {}, skipping", other); 124 + continue; 125 + } 126 + }; 127 + series_vec.push(series); 128 + } 129 + 130 + DataFrame::new(series_vec.into_iter().map(|s| s.into()).collect()) 131 + .map_err(|e| PyValueError::new_err(e.to_string())) 132 + } 133 + 44 134 #[pymethods] 45 135 impl PyRootFile { 46 136 #[new] ··· 96 186 ) 97 187 } 98 188 99 - #[pyo3(signature = (columns = None))] 100 - fn arrays(&self, columns: Option<Vec<String>>) -> PyResult<PyDataFrame> { 189 + #[pyo3(signature = (columns = None, ignore_columns = None))] 190 + fn arrays( 191 + &self, 192 + columns: Option<Vec<String>>, 193 + ignore_columns: Option<Vec<String>>, 194 + ) -> PyResult<PyDataFrame> { 101 195 let mut file = 102 196 RootFile::open(&self.path).map_err(|e| PyValueError::new_err(e.to_string()))?; 103 197 let tree = file 104 198 .get_tree(&self.name) 105 199 .map_err(|e| PyValueError::new_err(e.to_string()))?; 106 - 107 - let branches_to_save = if let Some(columns) = columns { 108 - columns 109 - } else { 110 - tree.branches().map(|b| b.name().to_string()).collect() 111 - }; 112 - 113 - let mut series_vec = Vec::new(); 114 - 115 - for branch_name in branches_to_save { 116 - let branch = match tree.branch(&branch_name) { 117 - Some(branch) => branch, 118 - None => { 119 - println!("Branch '{}' not found, skipping", branch_name); 120 - continue; 121 - } 122 - }; 123 - 124 - let series = match branch.item_type_name().as_str() { 125 - "float" => { 126 - let data = branch.as_iter::<f32>().unwrap().collect::<Vec<_>>(); 127 - Series::new((&branch_name).into(), data) 128 - } 129 - "double" => { 130 - let data = branch.as_iter::<f64>().unwrap().collect::<Vec<_>>(); 131 - Series::new((&branch_name).into(), data) 132 - } 133 - "int32_t" => { 134 - let data = branch.as_iter::<i32>().unwrap().collect::<Vec<_>>(); 135 - Series::new((&branch_name).into(), data) 136 - } 137 - "int64_t" => { 138 - let data = branch.as_iter::<i64>().unwrap().collect::<Vec<_>>(); 139 - Series::new((&branch_name).into(), data) 140 - } 141 - "uint32_t" => { 142 - let data = branch.as_iter::<u32>().unwrap().collect::<Vec<_>>(); 143 - Series::new((&branch_name).into(), data) 144 - } 145 - "uint64_t" => { 146 - let data = branch.as_iter::<u64>().unwrap().collect::<Vec<_>>(); 147 - Series::new((&branch_name).into(), data) 148 - } 149 - "string" => { 150 - let data = branch.as_iter::<String>().unwrap().collect::<Vec<_>>(); 151 - Series::new((&branch_name).into(), data) 152 - } 153 - other => { 154 - println!("Unsupported branch type: {}, skipping", other); 155 - continue; 156 - } 157 - }; 158 - series_vec.push(series); 159 - } 160 - 161 - let df = DataFrame::new(series_vec.into_iter().map(|s| s.into()).collect()) 162 - .map_err(|e| PyValueError::new_err(e.to_string()))?; 200 + let df = tree_to_dataframe(&tree, columns, ignore_columns)?; 163 201 Ok(PyDataFrame(df)) 164 202 } 165 203 ··· 390 428 Ok(env!("CARGO_PKG_VERSION").to_string()) 391 429 } 392 430 431 + #[pyfunction] 432 + #[pyo3(signature = (paths, tree_name, columns = None, ignore_columns = None))] 433 + fn concat_trees( 434 + paths: Vec<String>, 435 + tree_name: String, 436 + columns: Option<Vec<String>>, 437 + ignore_columns: Option<Vec<String>>, 438 + ) -> PyResult<PyDataFrame> { 439 + let mut all_paths = Vec::new(); 440 + for path in paths { 441 + for entry in glob::glob(&path).map_err(|e| PyValueError::new_err(e.to_string()))? { 442 + match entry { 443 + Ok(path) => { 444 + all_paths.push(path.to_str().unwrap().to_string()); 445 + } 446 + Err(e) => return Err(PyValueError::new_err(e.to_string())), 447 + } 448 + } 449 + } 450 + 451 + let pool = POOL.lock(); 452 + let dfs: Vec<DataFrame> = pool.install(|| { 453 + all_paths 454 + .par_iter() 455 + .map(|path| { 456 + let mut file = 457 + RootFile::open(path).map_err(|e| PyValueError::new_err(e.to_string()))?; 458 + let tree = file 459 + .get_tree(&tree_name) 460 + .map_err(|e| PyValueError::new_err(e.to_string()))?; 461 + tree_to_dataframe(&tree, columns.clone(), ignore_columns.clone()) 462 + }) 463 + .filter_map(Result::ok) 464 + .collect() 465 + }); 466 + 467 + if dfs.is_empty() { 468 + return Ok(PyDataFrame(DataFrame::default())); 469 + } 470 + 471 + let combined_df = concat_df(&dfs).map_err(|e| PyValueError::new_err(e.to_string()))?; 472 + 473 + Ok(PyDataFrame(combined_df)) 474 + } 475 + 393 476 /// A Python module to read root files, implemented in Rust. 394 477 #[pymodule] 395 478 fn oxyroot(m: &Bound<'_, PyModule>) -> PyResult<()> { 396 479 m.add_function(wrap_pyfunction!(version, m)?)?; 397 480 m.add_function(wrap_pyfunction!(open, m)?)?; 481 + m.add_function(wrap_pyfunction!(concat_trees, m)?)?; 482 + m.add_function(wrap_pyfunction!(set_num_threads, m)?)?; 398 483 m.add_class::<PyRootFile>()?; 399 484 m.add_class::<PyTree>()?; 400 485 m.add_class::<PyBranch>()?;