this repo has no description
1use bytes::Bytes;
2use cid::Cid;
3use jacquard_repo::error::RepoError;
4use jacquard_repo::repo::CommitData;
5use jacquard_repo::storage::BlockStore;
6use multihash::Multihash;
7use sha2::{Digest, Sha256};
8use sqlx::PgPool;
9
10pub mod tracking;
11
12#[derive(Clone)]
13pub struct PostgresBlockStore {
14 pool: PgPool,
15}
16
17impl PostgresBlockStore {
18 pub fn new(pool: PgPool) -> Self {
19 Self { pool }
20 }
21}
22
23impl BlockStore for PostgresBlockStore {
24 async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> {
25 crate::metrics::record_block_operation("get");
26 let cid_bytes = cid.to_bytes();
27 let row = sqlx::query!("SELECT data FROM blocks WHERE cid = $1", &cid_bytes)
28 .fetch_optional(&self.pool)
29 .await
30 .map_err(|e| RepoError::storage(e))?;
31 match row {
32 Some(row) => Ok(Some(Bytes::from(row.data))),
33 None => Ok(None),
34 }
35 }
36
37 async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> {
38 crate::metrics::record_block_operation("put");
39 let mut hasher = Sha256::new();
40 hasher.update(data);
41 let hash = hasher.finalize();
42 let multihash = Multihash::wrap(0x12, &hash)
43 .map_err(|e| RepoError::storage(std::io::Error::new(std::io::ErrorKind::InvalidData, format!("Failed to wrap multihash: {:?}", e))))?;
44 let cid = Cid::new_v1(0x71, multihash);
45 let cid_bytes = cid.to_bytes();
46 sqlx::query!("INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING", &cid_bytes, data)
47 .execute(&self.pool)
48 .await
49 .map_err(|e| RepoError::storage(e))?;
50 Ok(cid)
51 }
52
53 async fn has(&self, cid: &Cid) -> Result<bool, RepoError> {
54 crate::metrics::record_block_operation("has");
55 let cid_bytes = cid.to_bytes();
56 let row = sqlx::query!("SELECT 1 as one FROM blocks WHERE cid = $1", &cid_bytes)
57 .fetch_optional(&self.pool)
58 .await
59 .map_err(|e| RepoError::storage(e))?;
60 Ok(row.is_some())
61 }
62
63 async fn put_many(
64 &self,
65 blocks: impl IntoIterator<Item = (Cid, Bytes)> + Send,
66 ) -> Result<(), RepoError> {
67 let blocks: Vec<_> = blocks.into_iter().collect();
68 if blocks.is_empty() {
69 return Ok(());
70 }
71 crate::metrics::record_block_operation("put_many");
72 let cids: Vec<Vec<u8>> = blocks.iter().map(|(cid, _)| cid.to_bytes()).collect();
73 let data: Vec<&[u8]> = blocks.iter().map(|(_, d)| d.as_ref()).collect();
74 sqlx::query!(
75 r#"
76 INSERT INTO blocks (cid, data)
77 SELECT * FROM UNNEST($1::bytea[], $2::bytea[])
78 ON CONFLICT (cid) DO NOTHING
79 "#,
80 &cids,
81 &data as &[&[u8]]
82 )
83 .execute(&self.pool)
84 .await
85 .map_err(|e| RepoError::storage(e))?;
86 Ok(())
87 }
88
89 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Bytes>>, RepoError> {
90 if cids.is_empty() {
91 return Ok(Vec::new());
92 }
93 crate::metrics::record_block_operation("get_many");
94 let cid_bytes: Vec<Vec<u8>> = cids.iter().map(|c| c.to_bytes()).collect();
95 let rows = sqlx::query!(
96 "SELECT cid, data FROM blocks WHERE cid = ANY($1)",
97 &cid_bytes
98 )
99 .fetch_all(&self.pool)
100 .await
101 .map_err(|e| RepoError::storage(e))?;
102 let found: std::collections::HashMap<Vec<u8>, Bytes> = rows
103 .into_iter()
104 .map(|row| (row.cid, Bytes::from(row.data)))
105 .collect();
106 let results = cid_bytes
107 .iter()
108 .map(|cid| found.get(cid).cloned())
109 .collect();
110 Ok(results)
111 }
112
113 async fn apply_commit(&self, commit: CommitData) -> Result<(), RepoError> {
114 self.put_many(commit.blocks).await?;
115 Ok(())
116 }
117}