this repo has no description
1use crate::repo::PostgresBlockStore;
2use bytes::Bytes;
3use cid::Cid;
4use jacquard_repo::error::RepoError;
5use jacquard_repo::repo::CommitData;
6use jacquard_repo::storage::BlockStore;
7use std::collections::HashSet;
8use std::sync::{Arc, Mutex};
9
10#[derive(Clone)]
11pub struct TrackingBlockStore {
12 inner: PostgresBlockStore,
13 written_cids: Arc<Mutex<Vec<Cid>>>,
14 read_cids: Arc<Mutex<HashSet<Cid>>>,
15}
16
17impl TrackingBlockStore {
18 pub fn new(store: PostgresBlockStore) -> Self {
19 Self {
20 inner: store,
21 written_cids: Arc::new(Mutex::new(Vec::new())),
22 read_cids: Arc::new(Mutex::new(HashSet::new())),
23 }
24 }
25
26 pub fn get_written_cids(&self) -> Vec<Cid> {
27 match self.written_cids.lock() {
28 Ok(guard) => guard.clone(),
29 Err(poisoned) => poisoned.into_inner().clone(),
30 }
31 }
32
33 pub fn get_read_cids(&self) -> Vec<Cid> {
34 match self.read_cids.lock() {
35 Ok(guard) => guard.iter().cloned().collect(),
36 Err(poisoned) => poisoned.into_inner().iter().cloned().collect(),
37 }
38 }
39
40 pub fn get_all_relevant_cids(&self) -> Vec<Cid> {
41 let written = self.get_written_cids();
42 let read = self.get_read_cids();
43 let mut all: HashSet<Cid> = written.into_iter().collect();
44 all.extend(read);
45 all.into_iter().collect()
46 }
47}
48
49impl BlockStore for TrackingBlockStore {
50 async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> {
51 let result = self.inner.get(cid).await?;
52 if result.is_some() {
53 match self.read_cids.lock() {
54 Ok(mut guard) => {
55 guard.insert(*cid);
56 }
57 Err(poisoned) => {
58 poisoned.into_inner().insert(*cid);
59 }
60 }
61 }
62 Ok(result)
63 }
64
65 async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> {
66 let cid = self.inner.put(data).await?;
67 match self.written_cids.lock() {
68 Ok(mut guard) => guard.push(cid),
69 Err(poisoned) => poisoned.into_inner().push(cid),
70 }
71 Ok(cid)
72 }
73
74 async fn has(&self, cid: &Cid) -> Result<bool, RepoError> {
75 self.inner.has(cid).await
76 }
77
78 async fn put_many(
79 &self,
80 blocks: impl IntoIterator<Item = (Cid, Bytes)> + Send,
81 ) -> Result<(), RepoError> {
82 let blocks: Vec<_> = blocks.into_iter().collect();
83 let cids: Vec<Cid> = blocks.iter().map(|(cid, _)| *cid).collect();
84 self.inner.put_many(blocks).await?;
85 match self.written_cids.lock() {
86 Ok(mut guard) => guard.extend(cids),
87 Err(poisoned) => poisoned.into_inner().extend(cids),
88 }
89 Ok(())
90 }
91
92 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Bytes>>, RepoError> {
93 let results = self.inner.get_many(cids).await?;
94 for (cid, result) in cids.iter().zip(results.iter()) {
95 if result.is_some() {
96 match self.read_cids.lock() {
97 Ok(mut guard) => {
98 guard.insert(*cid);
99 }
100 Err(poisoned) => {
101 poisoned.into_inner().insert(*cid);
102 }
103 }
104 }
105 }
106 Ok(results)
107 }
108
109 async fn apply_commit(&self, commit: CommitData) -> Result<(), RepoError> {
110 self.put_many(commit.blocks).await?;
111 Ok(())
112 }
113}