this repo has no description
1use bytes::Bytes;
2use cid::Cid;
3use jacquard_repo::error::RepoError;
4use jacquard_repo::repo::CommitData;
5use jacquard_repo::storage::BlockStore;
6use multihash::Multihash;
7use sha2::{Digest, Sha256};
8use sqlx::PgPool;
9
10pub mod tracking;
11
12#[derive(Clone)]
13pub struct PostgresBlockStore {
14 pool: PgPool,
15}
16
17impl PostgresBlockStore {
18 pub fn new(pool: PgPool) -> Self {
19 Self { pool }
20 }
21}
22
23impl BlockStore for PostgresBlockStore {
24 async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> {
25 crate::metrics::record_block_operation("get");
26 let cid_bytes = cid.to_bytes();
27 let row = sqlx::query!("SELECT data FROM blocks WHERE cid = $1", &cid_bytes)
28 .fetch_optional(&self.pool)
29 .await
30 .map_err(|e| RepoError::storage(e))?;
31
32 match row {
33 Some(row) => Ok(Some(Bytes::from(row.data))),
34 None => Ok(None),
35 }
36 }
37
38 async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> {
39 crate::metrics::record_block_operation("put");
40 let mut hasher = Sha256::new();
41 hasher.update(data);
42 let hash = hasher.finalize();
43 let multihash = Multihash::wrap(0x12, &hash)
44 .map_err(|e| RepoError::storage(std::io::Error::new(std::io::ErrorKind::InvalidData, format!("Failed to wrap multihash: {:?}", e))))?;
45 let cid = Cid::new_v1(0x71, multihash);
46 let cid_bytes = cid.to_bytes();
47
48 sqlx::query!("INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING", &cid_bytes, data)
49 .execute(&self.pool)
50 .await
51 .map_err(|e| RepoError::storage(e))?;
52
53 Ok(cid)
54 }
55
56 async fn has(&self, cid: &Cid) -> Result<bool, RepoError> {
57 crate::metrics::record_block_operation("has");
58 let cid_bytes = cid.to_bytes();
59 let row = sqlx::query!("SELECT 1 as one FROM blocks WHERE cid = $1", &cid_bytes)
60 .fetch_optional(&self.pool)
61 .await
62 .map_err(|e| RepoError::storage(e))?;
63
64 Ok(row.is_some())
65 }
66
67 async fn put_many(
68 &self,
69 blocks: impl IntoIterator<Item = (Cid, Bytes)> + Send,
70 ) -> Result<(), RepoError> {
71 let blocks: Vec<_> = blocks.into_iter().collect();
72 if blocks.is_empty() {
73 return Ok(());
74 }
75
76 crate::metrics::record_block_operation("put_many");
77 let cids: Vec<Vec<u8>> = blocks.iter().map(|(cid, _)| cid.to_bytes()).collect();
78 let data: Vec<&[u8]> = blocks.iter().map(|(_, d)| d.as_ref()).collect();
79
80 sqlx::query!(
81 r#"
82 INSERT INTO blocks (cid, data)
83 SELECT * FROM UNNEST($1::bytea[], $2::bytea[])
84 ON CONFLICT (cid) DO NOTHING
85 "#,
86 &cids,
87 &data as &[&[u8]]
88 )
89 .execute(&self.pool)
90 .await
91 .map_err(|e| RepoError::storage(e))?;
92
93 Ok(())
94 }
95
96 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Bytes>>, RepoError> {
97 if cids.is_empty() {
98 return Ok(Vec::new());
99 }
100
101 crate::metrics::record_block_operation("get_many");
102 let cid_bytes: Vec<Vec<u8>> = cids.iter().map(|c| c.to_bytes()).collect();
103
104 let rows = sqlx::query!(
105 "SELECT cid, data FROM blocks WHERE cid = ANY($1)",
106 &cid_bytes
107 )
108 .fetch_all(&self.pool)
109 .await
110 .map_err(|e| RepoError::storage(e))?;
111
112 let found: std::collections::HashMap<Vec<u8>, Bytes> = rows
113 .into_iter()
114 .map(|row| (row.cid, Bytes::from(row.data)))
115 .collect();
116
117 let results = cid_bytes
118 .iter()
119 .map(|cid| found.get(cid).cloned())
120 .collect();
121
122 Ok(results)
123 }
124
125 async fn apply_commit(&self, commit: CommitData) -> Result<(), RepoError> {
126 self.put_many(commit.blocks).await?;
127 Ok(())
128 }
129}