this repo has no description
1use bytes::Bytes;
2use cid::Cid;
3use jacquard_repo::error::RepoError;
4use jacquard_repo::repo::CommitData;
5use jacquard_repo::storage::BlockStore;
6use multihash::Multihash;
7use sha2::{Digest, Sha256};
8use sqlx::PgPool;
9
10pub mod tracking;
11
12#[derive(Clone)]
13pub struct PostgresBlockStore {
14 pool: PgPool,
15}
16
17impl PostgresBlockStore {
18 pub fn new(pool: PgPool) -> Self {
19 Self { pool }
20 }
21}
22
23impl BlockStore for PostgresBlockStore {
24 async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> {
25 crate::metrics::record_block_operation("get");
26 let cid_bytes = cid.to_bytes();
27 let row = sqlx::query!("SELECT data FROM blocks WHERE cid = $1", &cid_bytes)
28 .fetch_optional(&self.pool)
29 .await
30 .map_err(RepoError::storage)?;
31 match row {
32 Some(row) => Ok(Some(Bytes::from(row.data))),
33 None => Ok(None),
34 }
35 }
36
37 async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> {
38 crate::metrics::record_block_operation("put");
39 let mut hasher = Sha256::new();
40 hasher.update(data);
41 let hash = hasher.finalize();
42 let multihash = Multihash::wrap(0x12, &hash).map_err(|e| {
43 RepoError::storage(std::io::Error::new(
44 std::io::ErrorKind::InvalidData,
45 format!("Failed to wrap multihash: {:?}", e),
46 ))
47 })?;
48 let cid = Cid::new_v1(0x71, multihash);
49 let cid_bytes = cid.to_bytes();
50 sqlx::query!(
51 "INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING",
52 &cid_bytes,
53 data
54 )
55 .execute(&self.pool)
56 .await
57 .map_err(RepoError::storage)?;
58 Ok(cid)
59 }
60
61 async fn has(&self, cid: &Cid) -> Result<bool, RepoError> {
62 crate::metrics::record_block_operation("has");
63 let cid_bytes = cid.to_bytes();
64 let row = sqlx::query!("SELECT 1 as one FROM blocks WHERE cid = $1", &cid_bytes)
65 .fetch_optional(&self.pool)
66 .await
67 .map_err(RepoError::storage)?;
68 Ok(row.is_some())
69 }
70
71 async fn put_many(
72 &self,
73 blocks: impl IntoIterator<Item = (Cid, Bytes)> + Send,
74 ) -> Result<(), RepoError> {
75 let blocks: Vec<_> = blocks.into_iter().collect();
76 if blocks.is_empty() {
77 return Ok(());
78 }
79 crate::metrics::record_block_operation("put_many");
80 let cids: Vec<Vec<u8>> = blocks.iter().map(|(cid, _)| cid.to_bytes()).collect();
81 let data: Vec<&[u8]> = blocks.iter().map(|(_, d)| d.as_ref()).collect();
82 sqlx::query!(
83 r#"
84 INSERT INTO blocks (cid, data)
85 SELECT * FROM UNNEST($1::bytea[], $2::bytea[])
86 ON CONFLICT (cid) DO NOTHING
87 "#,
88 &cids,
89 &data as &[&[u8]]
90 )
91 .execute(&self.pool)
92 .await
93 .map_err(RepoError::storage)?;
94 Ok(())
95 }
96
97 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Bytes>>, RepoError> {
98 if cids.is_empty() {
99 return Ok(Vec::new());
100 }
101 crate::metrics::record_block_operation("get_many");
102 let cid_bytes: Vec<Vec<u8>> = cids.iter().map(|c| c.to_bytes()).collect();
103 let rows = sqlx::query!(
104 "SELECT cid, data FROM blocks WHERE cid = ANY($1)",
105 &cid_bytes
106 )
107 .fetch_all(&self.pool)
108 .await
109 .map_err(RepoError::storage)?;
110 let found: std::collections::HashMap<Vec<u8>, Bytes> = rows
111 .into_iter()
112 .map(|row| (row.cid, Bytes::from(row.data)))
113 .collect();
114 let results = cid_bytes
115 .iter()
116 .map(|cid| found.get(cid).cloned())
117 .collect();
118 Ok(results)
119 }
120
121 async fn apply_commit(&self, commit: CommitData) -> Result<(), RepoError> {
122 self.put_many(commit.blocks).await?;
123 Ok(())
124 }
125}