this repo has no description
1use bytes::Bytes;
2use cid::Cid;
3use jacquard_repo::error::RepoError;
4use jacquard_repo::repo::CommitData;
5use jacquard_repo::storage::BlockStore;
6use multihash::Multihash;
7use sha2::{Digest, Sha256};
8use sqlx::PgPool;
9use std::collections::HashSet;
10use std::sync::{Arc, Mutex};
11
12#[derive(Clone)]
13pub struct PostgresBlockStore {
14 pool: PgPool,
15}
16
17impl PostgresBlockStore {
18 pub fn new(pool: PgPool) -> Self {
19 Self { pool }
20 }
21
22 pub fn pool(&self) -> &PgPool {
23 &self.pool
24 }
25}
26
27impl BlockStore for PostgresBlockStore {
28 async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> {
29 let cid_bytes = cid.to_bytes();
30 let row = sqlx::query!("SELECT data FROM blocks WHERE cid = $1", &cid_bytes)
31 .fetch_optional(&self.pool)
32 .await
33 .map_err(RepoError::storage)?;
34 match row {
35 Some(row) => Ok(Some(Bytes::from(row.data))),
36 None => Ok(None),
37 }
38 }
39
40 async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> {
41 let mut hasher = Sha256::new();
42 hasher.update(data);
43 let hash = hasher.finalize();
44 let multihash = Multihash::wrap(0x12, &hash).map_err(|e| {
45 RepoError::storage(std::io::Error::new(
46 std::io::ErrorKind::InvalidData,
47 format!("Failed to wrap multihash: {:?}", e),
48 ))
49 })?;
50 let cid = Cid::new_v1(0x71, multihash);
51 let cid_bytes = cid.to_bytes();
52 sqlx::query!(
53 "INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING",
54 &cid_bytes,
55 data
56 )
57 .execute(&self.pool)
58 .await
59 .map_err(RepoError::storage)?;
60 Ok(cid)
61 }
62
63 async fn has(&self, cid: &Cid) -> Result<bool, RepoError> {
64 let cid_bytes = cid.to_bytes();
65 let row = sqlx::query!("SELECT 1 as one FROM blocks WHERE cid = $1", &cid_bytes)
66 .fetch_optional(&self.pool)
67 .await
68 .map_err(RepoError::storage)?;
69 Ok(row.is_some())
70 }
71
72 async fn put_many(
73 &self,
74 blocks: impl IntoIterator<Item = (Cid, Bytes)> + Send,
75 ) -> Result<(), RepoError> {
76 let blocks: Vec<_> = blocks.into_iter().collect();
77 if blocks.is_empty() {
78 return Ok(());
79 }
80 let cids: Vec<Vec<u8>> = blocks.iter().map(|(cid, _)| cid.to_bytes()).collect();
81 let data: Vec<&[u8]> = blocks.iter().map(|(_, d)| d.as_ref()).collect();
82 sqlx::query!(
83 r#"
84 INSERT INTO blocks (cid, data)
85 SELECT * FROM UNNEST($1::bytea[], $2::bytea[])
86 ON CONFLICT (cid) DO NOTHING
87 "#,
88 &cids,
89 &data as &[&[u8]]
90 )
91 .execute(&self.pool)
92 .await
93 .map_err(RepoError::storage)?;
94 Ok(())
95 }
96
97 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Bytes>>, RepoError> {
98 if cids.is_empty() {
99 return Ok(Vec::new());
100 }
101 let cid_bytes: Vec<Vec<u8>> = cids.iter().map(|c| c.to_bytes()).collect();
102 let rows = sqlx::query!(
103 "SELECT cid, data FROM blocks WHERE cid = ANY($1)",
104 &cid_bytes
105 )
106 .fetch_all(&self.pool)
107 .await
108 .map_err(RepoError::storage)?;
109 let found: std::collections::HashMap<Vec<u8>, Bytes> = rows
110 .into_iter()
111 .map(|row| (row.cid, Bytes::from(row.data)))
112 .collect();
113 let results = cid_bytes
114 .iter()
115 .map(|cid| found.get(cid).cloned())
116 .collect();
117 Ok(results)
118 }
119
120 async fn apply_commit(&self, commit: CommitData) -> Result<(), RepoError> {
121 self.put_many(commit.blocks).await?;
122 Ok(())
123 }
124}
125
126#[derive(Clone)]
127pub struct TrackingBlockStore {
128 inner: PostgresBlockStore,
129 written_cids: Arc<Mutex<Vec<Cid>>>,
130 read_cids: Arc<Mutex<HashSet<Cid>>>,
131}
132
133impl TrackingBlockStore {
134 pub fn new(store: PostgresBlockStore) -> Self {
135 Self {
136 inner: store,
137 written_cids: Arc::new(Mutex::new(Vec::new())),
138 read_cids: Arc::new(Mutex::new(HashSet::new())),
139 }
140 }
141
142 pub fn get_written_cids(&self) -> Vec<Cid> {
143 match self.written_cids.lock() {
144 Ok(guard) => guard.clone(),
145 Err(poisoned) => poisoned.into_inner().clone(),
146 }
147 }
148
149 pub fn get_read_cids(&self) -> Vec<Cid> {
150 match self.read_cids.lock() {
151 Ok(guard) => guard.iter().cloned().collect(),
152 Err(poisoned) => poisoned.into_inner().iter().cloned().collect(),
153 }
154 }
155
156 pub fn get_all_relevant_cids(&self) -> Vec<Cid> {
157 let written = self.get_written_cids();
158 let read = self.get_read_cids();
159 let mut all: HashSet<Cid> = written.into_iter().collect();
160 all.extend(read);
161 all.into_iter().collect()
162 }
163}
164
165impl BlockStore for TrackingBlockStore {
166 async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> {
167 let result = self.inner.get(cid).await?;
168 if result.is_some() {
169 match self.read_cids.lock() {
170 Ok(mut guard) => {
171 guard.insert(*cid);
172 }
173 Err(poisoned) => {
174 poisoned.into_inner().insert(*cid);
175 }
176 }
177 }
178 Ok(result)
179 }
180
181 async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> {
182 let cid = self.inner.put(data).await?;
183 match self.written_cids.lock() {
184 Ok(mut guard) => guard.push(cid),
185 Err(poisoned) => poisoned.into_inner().push(cid),
186 }
187 Ok(cid)
188 }
189
190 async fn has(&self, cid: &Cid) -> Result<bool, RepoError> {
191 self.inner.has(cid).await
192 }
193
194 async fn put_many(
195 &self,
196 blocks: impl IntoIterator<Item = (Cid, Bytes)> + Send,
197 ) -> Result<(), RepoError> {
198 let blocks: Vec<_> = blocks.into_iter().collect();
199 let cids: Vec<Cid> = blocks.iter().map(|(cid, _)| *cid).collect();
200 self.inner.put_many(blocks).await?;
201 match self.written_cids.lock() {
202 Ok(mut guard) => guard.extend(cids),
203 Err(poisoned) => poisoned.into_inner().extend(cids),
204 }
205 Ok(())
206 }
207
208 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Bytes>>, RepoError> {
209 let results = self.inner.get_many(cids).await?;
210 cids.iter()
211 .zip(results.iter())
212 .filter(|(_, result)| result.is_some())
213 .for_each(|(cid, _)| match self.read_cids.lock() {
214 Ok(mut guard) => {
215 guard.insert(*cid);
216 }
217 Err(poisoned) => {
218 poisoned.into_inner().insert(*cid);
219 }
220 });
221 Ok(results)
222 }
223
224 async fn apply_commit(&self, commit: CommitData) -> Result<(), RepoError> {
225 self.put_many(commit.blocks).await?;
226 Ok(())
227 }
228}