this repo has no description
1use crate::repo::PostgresBlockStore; 2use bytes::Bytes; 3use cid::Cid; 4use jacquard_repo::error::RepoError; 5use jacquard_repo::repo::CommitData; 6use jacquard_repo::storage::BlockStore; 7use std::collections::HashSet; 8use std::sync::{Arc, Mutex}; 9 10#[derive(Clone)] 11pub struct TrackingBlockStore { 12 inner: PostgresBlockStore, 13 written_cids: Arc<Mutex<Vec<Cid>>>, 14 read_cids: Arc<Mutex<HashSet<Cid>>>, 15} 16 17impl TrackingBlockStore { 18 pub fn new(store: PostgresBlockStore) -> Self { 19 Self { 20 inner: store, 21 written_cids: Arc::new(Mutex::new(Vec::new())), 22 read_cids: Arc::new(Mutex::new(HashSet::new())), 23 } 24 } 25 26 pub fn get_written_cids(&self) -> Vec<Cid> { 27 match self.written_cids.lock() { 28 Ok(guard) => guard.clone(), 29 Err(poisoned) => poisoned.into_inner().clone(), 30 } 31 } 32 33 pub fn get_read_cids(&self) -> Vec<Cid> { 34 match self.read_cids.lock() { 35 Ok(guard) => guard.iter().cloned().collect(), 36 Err(poisoned) => poisoned.into_inner().iter().cloned().collect(), 37 } 38 } 39 40 pub fn get_all_relevant_cids(&self) -> Vec<Cid> { 41 let written = self.get_written_cids(); 42 let read = self.get_read_cids(); 43 let mut all: HashSet<Cid> = written.into_iter().collect(); 44 all.extend(read); 45 all.into_iter().collect() 46 } 47} 48 49impl BlockStore for TrackingBlockStore { 50 async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> { 51 let result = self.inner.get(cid).await?; 52 if result.is_some() { 53 match self.read_cids.lock() { 54 Ok(mut guard) => { 55 guard.insert(*cid); 56 } 57 Err(poisoned) => { 58 poisoned.into_inner().insert(*cid); 59 } 60 } 61 } 62 Ok(result) 63 } 64 65 async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> { 66 let cid = self.inner.put(data).await?; 67 match self.written_cids.lock() { 68 Ok(mut guard) => guard.push(cid), 69 Err(poisoned) => poisoned.into_inner().push(cid), 70 } 71 Ok(cid) 72 } 73 74 async fn has(&self, cid: &Cid) -> Result<bool, RepoError> { 75 self.inner.has(cid).await 76 } 77 78 async fn put_many( 79 &self, 80 blocks: impl IntoIterator<Item = (Cid, Bytes)> + Send, 81 ) -> Result<(), RepoError> { 82 let blocks: Vec<_> = blocks.into_iter().collect(); 83 let cids: Vec<Cid> = blocks.iter().map(|(cid, _)| *cid).collect(); 84 self.inner.put_many(blocks).await?; 85 match self.written_cids.lock() { 86 Ok(mut guard) => guard.extend(cids), 87 Err(poisoned) => poisoned.into_inner().extend(cids), 88 } 89 Ok(()) 90 } 91 92 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Bytes>>, RepoError> { 93 let results = self.inner.get_many(cids).await?; 94 for (cid, result) in cids.iter().zip(results.iter()) { 95 if result.is_some() { 96 match self.read_cids.lock() { 97 Ok(mut guard) => { 98 guard.insert(*cid); 99 } 100 Err(poisoned) => { 101 poisoned.into_inner().insert(*cid); 102 } 103 } 104 } 105 } 106 Ok(results) 107 } 108 109 async fn apply_commit(&self, commit: CommitData) -> Result<(), RepoError> { 110 self.put_many(commit.blocks).await?; 111 Ok(()) 112 } 113}