this repo has no description
1use crate::repo::PostgresBlockStore; 2use bytes::Bytes; 3use cid::Cid; 4use jacquard_repo::error::RepoError; 5use jacquard_repo::repo::CommitData; 6use jacquard_repo::storage::BlockStore; 7use std::collections::HashSet; 8use std::sync::{Arc, Mutex}; 9#[derive(Clone)] 10pub struct TrackingBlockStore { 11 inner: PostgresBlockStore, 12 written_cids: Arc<Mutex<Vec<Cid>>>, 13 read_cids: Arc<Mutex<HashSet<Cid>>>, 14} 15impl TrackingBlockStore { 16 pub fn new(store: PostgresBlockStore) -> Self { 17 Self { 18 inner: store, 19 written_cids: Arc::new(Mutex::new(Vec::new())), 20 read_cids: Arc::new(Mutex::new(HashSet::new())), 21 } 22 } 23 pub fn get_written_cids(&self) -> Vec<Cid> { 24 match self.written_cids.lock() { 25 Ok(guard) => guard.clone(), 26 Err(poisoned) => poisoned.into_inner().clone(), 27 } 28 } 29 pub fn get_read_cids(&self) -> Vec<Cid> { 30 match self.read_cids.lock() { 31 Ok(guard) => guard.iter().cloned().collect(), 32 Err(poisoned) => poisoned.into_inner().iter().cloned().collect(), 33 } 34 } 35 pub fn get_all_relevant_cids(&self) -> Vec<Cid> { 36 let written = self.get_written_cids(); 37 let read = self.get_read_cids(); 38 let mut all: HashSet<Cid> = written.into_iter().collect(); 39 all.extend(read); 40 all.into_iter().collect() 41 } 42} 43impl BlockStore for TrackingBlockStore { 44 async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> { 45 let result = self.inner.get(cid).await?; 46 if result.is_some() { 47 match self.read_cids.lock() { 48 Ok(mut guard) => { guard.insert(*cid); }, 49 Err(poisoned) => { poisoned.into_inner().insert(*cid); }, 50 } 51 } 52 Ok(result) 53 } 54 async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> { 55 let cid = self.inner.put(data).await?; 56 match self.written_cids.lock() { 57 Ok(mut guard) => guard.push(cid.clone()), 58 Err(poisoned) => poisoned.into_inner().push(cid.clone()), 59 } 60 Ok(cid) 61 } 62 async fn has(&self, cid: &Cid) -> Result<bool, RepoError> { 63 self.inner.has(cid).await 64 } 65 async fn put_many( 66 &self, 67 blocks: impl IntoIterator<Item = (Cid, Bytes)> + Send, 68 ) -> Result<(), RepoError> { 69 let blocks: Vec<_> = blocks.into_iter().collect(); 70 let cids: Vec<Cid> = blocks.iter().map(|(cid, _)| cid.clone()).collect(); 71 self.inner.put_many(blocks).await?; 72 match self.written_cids.lock() { 73 Ok(mut guard) => guard.extend(cids), 74 Err(poisoned) => poisoned.into_inner().extend(cids), 75 } 76 Ok(()) 77 } 78 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Bytes>>, RepoError> { 79 let results = self.inner.get_many(cids).await?; 80 for (cid, result) in cids.iter().zip(results.iter()) { 81 if result.is_some() { 82 match self.read_cids.lock() { 83 Ok(mut guard) => { guard.insert(*cid); }, 84 Err(poisoned) => { poisoned.into_inner().insert(*cid); }, 85 } 86 } 87 } 88 Ok(results) 89 } 90 async fn apply_commit(&self, commit: CommitData) -> Result<(), RepoError> { 91 self.put_many(commit.blocks).await?; 92 Ok(()) 93 } 94}