this repo has no description
1use crate::repo::PostgresBlockStore;
2use bytes::Bytes;
3use cid::Cid;
4use jacquard_repo::error::RepoError;
5use jacquard_repo::repo::CommitData;
6use jacquard_repo::storage::BlockStore;
7use std::collections::HashSet;
8use std::sync::{Arc, Mutex};
9#[derive(Clone)]
10pub struct TrackingBlockStore {
11 inner: PostgresBlockStore,
12 written_cids: Arc<Mutex<Vec<Cid>>>,
13 read_cids: Arc<Mutex<HashSet<Cid>>>,
14}
15impl TrackingBlockStore {
16 pub fn new(store: PostgresBlockStore) -> Self {
17 Self {
18 inner: store,
19 written_cids: Arc::new(Mutex::new(Vec::new())),
20 read_cids: Arc::new(Mutex::new(HashSet::new())),
21 }
22 }
23 pub fn get_written_cids(&self) -> Vec<Cid> {
24 match self.written_cids.lock() {
25 Ok(guard) => guard.clone(),
26 Err(poisoned) => poisoned.into_inner().clone(),
27 }
28 }
29 pub fn get_read_cids(&self) -> Vec<Cid> {
30 match self.read_cids.lock() {
31 Ok(guard) => guard.iter().cloned().collect(),
32 Err(poisoned) => poisoned.into_inner().iter().cloned().collect(),
33 }
34 }
35 pub fn get_all_relevant_cids(&self) -> Vec<Cid> {
36 let written = self.get_written_cids();
37 let read = self.get_read_cids();
38 let mut all: HashSet<Cid> = written.into_iter().collect();
39 all.extend(read);
40 all.into_iter().collect()
41 }
42}
43impl BlockStore for TrackingBlockStore {
44 async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> {
45 let result = self.inner.get(cid).await?;
46 if result.is_some() {
47 match self.read_cids.lock() {
48 Ok(mut guard) => { guard.insert(*cid); },
49 Err(poisoned) => { poisoned.into_inner().insert(*cid); },
50 }
51 }
52 Ok(result)
53 }
54 async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> {
55 let cid = self.inner.put(data).await?;
56 match self.written_cids.lock() {
57 Ok(mut guard) => guard.push(cid.clone()),
58 Err(poisoned) => poisoned.into_inner().push(cid.clone()),
59 }
60 Ok(cid)
61 }
62 async fn has(&self, cid: &Cid) -> Result<bool, RepoError> {
63 self.inner.has(cid).await
64 }
65 async fn put_many(
66 &self,
67 blocks: impl IntoIterator<Item = (Cid, Bytes)> + Send,
68 ) -> Result<(), RepoError> {
69 let blocks: Vec<_> = blocks.into_iter().collect();
70 let cids: Vec<Cid> = blocks.iter().map(|(cid, _)| cid.clone()).collect();
71 self.inner.put_many(blocks).await?;
72 match self.written_cids.lock() {
73 Ok(mut guard) => guard.extend(cids),
74 Err(poisoned) => poisoned.into_inner().extend(cids),
75 }
76 Ok(())
77 }
78 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Bytes>>, RepoError> {
79 let results = self.inner.get_many(cids).await?;
80 for (cid, result) in cids.iter().zip(results.iter()) {
81 if result.is_some() {
82 match self.read_cids.lock() {
83 Ok(mut guard) => { guard.insert(*cid); },
84 Err(poisoned) => { poisoned.into_inner().insert(*cid); },
85 }
86 }
87 }
88 Ok(results)
89 }
90 async fn apply_commit(&self, commit: CommitData) -> Result<(), RepoError> {
91 self.put_many(commit.blocks).await?;
92 Ok(())
93 }
94}