this repo has no description
1use bytes::Bytes;
2use cid::Cid;
3use jacquard_repo::error::RepoError;
4use jacquard_repo::repo::CommitData;
5use jacquard_repo::storage::BlockStore;
6use multihash::Multihash;
7use sha2::{Digest, Sha256};
8use sqlx::PgPool;
9pub mod tracking;
10#[derive(Clone)]
11pub struct PostgresBlockStore {
12 pool: PgPool,
13}
14impl PostgresBlockStore {
15 pub fn new(pool: PgPool) -> Self {
16 Self { pool }
17 }
18}
19impl BlockStore for PostgresBlockStore {
20 async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> {
21 crate::metrics::record_block_operation("get");
22 let cid_bytes = cid.to_bytes();
23 let row = sqlx::query!("SELECT data FROM blocks WHERE cid = $1", &cid_bytes)
24 .fetch_optional(&self.pool)
25 .await
26 .map_err(|e| RepoError::storage(e))?;
27 match row {
28 Some(row) => Ok(Some(Bytes::from(row.data))),
29 None => Ok(None),
30 }
31 }
32 async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> {
33 crate::metrics::record_block_operation("put");
34 let mut hasher = Sha256::new();
35 hasher.update(data);
36 let hash = hasher.finalize();
37 let multihash = Multihash::wrap(0x12, &hash)
38 .map_err(|e| RepoError::storage(std::io::Error::new(std::io::ErrorKind::InvalidData, format!("Failed to wrap multihash: {:?}", e))))?;
39 let cid = Cid::new_v1(0x71, multihash);
40 let cid_bytes = cid.to_bytes();
41 sqlx::query!("INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING", &cid_bytes, data)
42 .execute(&self.pool)
43 .await
44 .map_err(|e| RepoError::storage(e))?;
45 Ok(cid)
46 }
47 async fn has(&self, cid: &Cid) -> Result<bool, RepoError> {
48 crate::metrics::record_block_operation("has");
49 let cid_bytes = cid.to_bytes();
50 let row = sqlx::query!("SELECT 1 as one FROM blocks WHERE cid = $1", &cid_bytes)
51 .fetch_optional(&self.pool)
52 .await
53 .map_err(|e| RepoError::storage(e))?;
54 Ok(row.is_some())
55 }
56 async fn put_many(
57 &self,
58 blocks: impl IntoIterator<Item = (Cid, Bytes)> + Send,
59 ) -> Result<(), RepoError> {
60 let blocks: Vec<_> = blocks.into_iter().collect();
61 if blocks.is_empty() {
62 return Ok(());
63 }
64 crate::metrics::record_block_operation("put_many");
65 let cids: Vec<Vec<u8>> = blocks.iter().map(|(cid, _)| cid.to_bytes()).collect();
66 let data: Vec<&[u8]> = blocks.iter().map(|(_, d)| d.as_ref()).collect();
67 sqlx::query!(
68 r#"
69 INSERT INTO blocks (cid, data)
70 SELECT * FROM UNNEST($1::bytea[], $2::bytea[])
71 ON CONFLICT (cid) DO NOTHING
72 "#,
73 &cids,
74 &data as &[&[u8]]
75 )
76 .execute(&self.pool)
77 .await
78 .map_err(|e| RepoError::storage(e))?;
79 Ok(())
80 }
81 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Bytes>>, RepoError> {
82 if cids.is_empty() {
83 return Ok(Vec::new());
84 }
85 crate::metrics::record_block_operation("get_many");
86 let cid_bytes: Vec<Vec<u8>> = cids.iter().map(|c| c.to_bytes()).collect();
87 let rows = sqlx::query!(
88 "SELECT cid, data FROM blocks WHERE cid = ANY($1)",
89 &cid_bytes
90 )
91 .fetch_all(&self.pool)
92 .await
93 .map_err(|e| RepoError::storage(e))?;
94 let found: std::collections::HashMap<Vec<u8>, Bytes> = rows
95 .into_iter()
96 .map(|row| (row.cid, Bytes::from(row.data)))
97 .collect();
98 let results = cid_bytes
99 .iter()
100 .map(|cid| found.get(cid).cloned())
101 .collect();
102 Ok(results)
103 }
104 async fn apply_commit(&self, commit: CommitData) -> Result<(), RepoError> {
105 self.put_many(commit.blocks).await?;
106 Ok(())
107 }
108}