this repo has no description
1use crate::repo::PostgresBlockStore;
2use bytes::Bytes;
3use cid::Cid;
4use jacquard_repo::error::RepoError;
5use jacquard_repo::repo::CommitData;
6use jacquard_repo::storage::BlockStore;
7use std::collections::HashSet;
8use std::sync::{Arc, Mutex};
9
10#[derive(Clone)]
11pub struct TrackingBlockStore {
12 inner: PostgresBlockStore,
13 written_cids: Arc<Mutex<Vec<Cid>>>,
14 read_cids: Arc<Mutex<HashSet<Cid>>>,
15}
16
17impl TrackingBlockStore {
18 pub fn new(store: PostgresBlockStore) -> Self {
19 Self {
20 inner: store,
21 written_cids: Arc::new(Mutex::new(Vec::new())),
22 read_cids: Arc::new(Mutex::new(HashSet::new())),
23 }
24 }
25
26 pub fn get_written_cids(&self) -> Vec<Cid> {
27 match self.written_cids.lock() {
28 Ok(guard) => guard.clone(),
29 Err(poisoned) => poisoned.into_inner().clone(),
30 }
31 }
32
33 pub fn get_read_cids(&self) -> Vec<Cid> {
34 match self.read_cids.lock() {
35 Ok(guard) => guard.iter().cloned().collect(),
36 Err(poisoned) => poisoned.into_inner().iter().cloned().collect(),
37 }
38 }
39
40 pub fn get_all_relevant_cids(&self) -> Vec<Cid> {
41 let written = self.get_written_cids();
42 let read = self.get_read_cids();
43 let mut all: HashSet<Cid> = written.into_iter().collect();
44 all.extend(read);
45 all.into_iter().collect()
46 }
47}
48
49impl BlockStore for TrackingBlockStore {
50 async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> {
51 let result = self.inner.get(cid).await?;
52 if result.is_some() {
53 match self.read_cids.lock() {
54 Ok(mut guard) => { guard.insert(*cid); },
55 Err(poisoned) => { poisoned.into_inner().insert(*cid); },
56 }
57 }
58 Ok(result)
59 }
60
61 async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> {
62 let cid = self.inner.put(data).await?;
63 match self.written_cids.lock() {
64 Ok(mut guard) => guard.push(cid.clone()),
65 Err(poisoned) => poisoned.into_inner().push(cid.clone()),
66 }
67 Ok(cid)
68 }
69
70 async fn has(&self, cid: &Cid) -> Result<bool, RepoError> {
71 self.inner.has(cid).await
72 }
73
74 async fn put_many(
75 &self,
76 blocks: impl IntoIterator<Item = (Cid, Bytes)> + Send,
77 ) -> Result<(), RepoError> {
78 let blocks: Vec<_> = blocks.into_iter().collect();
79 let cids: Vec<Cid> = blocks.iter().map(|(cid, _)| cid.clone()).collect();
80 self.inner.put_many(blocks).await?;
81 match self.written_cids.lock() {
82 Ok(mut guard) => guard.extend(cids),
83 Err(poisoned) => poisoned.into_inner().extend(cids),
84 }
85 Ok(())
86 }
87
88 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Bytes>>, RepoError> {
89 let results = self.inner.get_many(cids).await?;
90 for (cid, result) in cids.iter().zip(results.iter()) {
91 if result.is_some() {
92 match self.read_cids.lock() {
93 Ok(mut guard) => { guard.insert(*cid); },
94 Err(poisoned) => { poisoned.into_inner().insert(*cid); },
95 }
96 }
97 }
98 Ok(results)
99 }
100
101 async fn apply_commit(&self, commit: CommitData) -> Result<(), RepoError> {
102 self.put_many(commit.blocks).await?;
103 Ok(())
104 }
105}