this repo has no description
1use bytes::Bytes;
2use cid::Cid;
3use jacquard_repo::error::RepoError;
4use jacquard_repo::repo::CommitData;
5use jacquard_repo::storage::BlockStore;
6use multihash::Multihash;
7use sha2::{Digest, Sha256};
8use sqlx::PgPool;
9
10#[derive(Clone)]
11pub struct PostgresBlockStore {
12 pool: PgPool,
13}
14
15impl PostgresBlockStore {
16 pub fn new(pool: PgPool) -> Self {
17 Self { pool }
18 }
19}
20
21impl BlockStore for PostgresBlockStore {
22 async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> {
23 let cid_bytes = cid.to_bytes();
24 let row = sqlx::query!("SELECT data FROM blocks WHERE cid = $1", &cid_bytes)
25 .fetch_optional(&self.pool)
26 .await
27 .map_err(|e| RepoError::storage(e))?;
28
29 match row {
30 Some(row) => Ok(Some(Bytes::from(row.data))),
31 None => Ok(None),
32 }
33 }
34
35 async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> {
36 let mut hasher = Sha256::new();
37 hasher.update(data);
38 let hash = hasher.finalize();
39 let multihash = Multihash::wrap(0x12, &hash).unwrap();
40 let cid = Cid::new_v1(0x71, multihash);
41 let cid_bytes = cid.to_bytes();
42
43 sqlx::query!("INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING", &cid_bytes, data)
44 .execute(&self.pool)
45 .await
46 .map_err(|e| RepoError::storage(e))?;
47
48 Ok(cid)
49 }
50
51 async fn has(&self, cid: &Cid) -> Result<bool, RepoError> {
52 let cid_bytes = cid.to_bytes();
53 let row = sqlx::query!("SELECT 1 as one FROM blocks WHERE cid = $1", &cid_bytes)
54 .fetch_optional(&self.pool)
55 .await
56 .map_err(|e| RepoError::storage(e))?;
57
58 Ok(row.is_some())
59 }
60
61 async fn put_many(
62 &self,
63 blocks: impl IntoIterator<Item = (Cid, Bytes)> + Send,
64 ) -> Result<(), RepoError> {
65 let blocks: Vec<_> = blocks.into_iter().collect();
66 for (cid, data) in blocks {
67 let cid_bytes = cid.to_bytes();
68 let data_ref = data.as_ref();
69 sqlx::query!(
70 "INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING",
71 &cid_bytes,
72 data_ref
73 )
74 .execute(&self.pool)
75 .await
76 .map_err(|e| RepoError::storage(e))?;
77 }
78 Ok(())
79 }
80
81 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Bytes>>, RepoError> {
82 let mut results = Vec::new();
83 for cid in cids {
84 results.push(self.get(cid).await?);
85 }
86 Ok(results)
87 }
88
89 async fn apply_commit(&self, commit: CommitData) -> Result<(), RepoError> {
90 self.put_many(commit.blocks).await?;
91 Ok(())
92 }
93}