this repo has no description
1use crate::repo::PostgresBlockStore;
2use bytes::Bytes;
3use cid::Cid;
4use jacquard_repo::error::RepoError;
5use jacquard_repo::repo::CommitData;
6use jacquard_repo::storage::BlockStore;
7use std::sync::{Arc, Mutex};
8
9#[derive(Clone)]
10pub struct TrackingBlockStore {
11 inner: PostgresBlockStore,
12 written_cids: Arc<Mutex<Vec<Cid>>>,
13}
14
15impl TrackingBlockStore {
16 pub fn new(store: PostgresBlockStore) -> Self {
17 Self {
18 inner: store,
19 written_cids: Arc::new(Mutex::new(Vec::new())),
20 }
21 }
22
23 pub fn get_written_cids(&self) -> Vec<Cid> {
24 match self.written_cids.lock() {
25 Ok(guard) => guard.clone(),
26 Err(poisoned) => poisoned.into_inner().clone(),
27 }
28 }
29}
30
31impl BlockStore for TrackingBlockStore {
32 async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> {
33 self.inner.get(cid).await
34 }
35
36 async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> {
37 let cid = self.inner.put(data).await?;
38 match self.written_cids.lock() {
39 Ok(mut guard) => guard.push(cid.clone()),
40 Err(poisoned) => poisoned.into_inner().push(cid.clone()),
41 }
42 Ok(cid)
43 }
44
45 async fn has(&self, cid: &Cid) -> Result<bool, RepoError> {
46 self.inner.has(cid).await
47 }
48
49 async fn put_many(
50 &self,
51 blocks: impl IntoIterator<Item = (Cid, Bytes)> + Send,
52 ) -> Result<(), RepoError> {
53 let blocks: Vec<_> = blocks.into_iter().collect();
54 let cids: Vec<Cid> = blocks.iter().map(|(cid, _)| cid.clone()).collect();
55 self.inner.put_many(blocks).await?;
56 match self.written_cids.lock() {
57 Ok(mut guard) => guard.extend(cids),
58 Err(poisoned) => poisoned.into_inner().extend(cids),
59 }
60 Ok(())
61 }
62
63 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Bytes>>, RepoError> {
64 self.inner.get_many(cids).await
65 }
66
67 async fn apply_commit(&self, commit: CommitData) -> Result<(), RepoError> {
68 self.put_many(commit.blocks).await?;
69 Ok(())
70 }
71}