this repo has no description
1use bytes::Bytes;
2use cid::Cid;
3use jacquard_repo::error::RepoError;
4use jacquard_repo::repo::CommitData;
5use jacquard_repo::storage::BlockStore;
6use multihash::Multihash;
7use sha2::{Digest, Sha256};
8use sqlx::PgPool;
9
10pub mod tracking;
11
12#[derive(Clone)]
13pub struct PostgresBlockStore {
14 pool: PgPool,
15}
16
17impl PostgresBlockStore {
18 pub fn new(pool: PgPool) -> Self {
19 Self { pool }
20 }
21}
22
23impl BlockStore for PostgresBlockStore {
24 async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> {
25 let cid_bytes = cid.to_bytes();
26 let row = sqlx::query!("SELECT data FROM blocks WHERE cid = $1", &cid_bytes)
27 .fetch_optional(&self.pool)
28 .await
29 .map_err(|e| RepoError::storage(e))?;
30
31 match row {
32 Some(row) => Ok(Some(Bytes::from(row.data))),
33 None => Ok(None),
34 }
35 }
36
37 async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> {
38 let mut hasher = Sha256::new();
39 hasher.update(data);
40 let hash = hasher.finalize();
41 let multihash = Multihash::wrap(0x12, &hash).unwrap();
42 let cid = Cid::new_v1(0x71, multihash);
43 let cid_bytes = cid.to_bytes();
44
45 sqlx::query!("INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING", &cid_bytes, data)
46 .execute(&self.pool)
47 .await
48 .map_err(|e| RepoError::storage(e))?;
49
50 Ok(cid)
51 }
52
53 async fn has(&self, cid: &Cid) -> Result<bool, RepoError> {
54 let cid_bytes = cid.to_bytes();
55 let row = sqlx::query!("SELECT 1 as one FROM blocks WHERE cid = $1", &cid_bytes)
56 .fetch_optional(&self.pool)
57 .await
58 .map_err(|e| RepoError::storage(e))?;
59
60 Ok(row.is_some())
61 }
62
63 async fn put_many(
64 &self,
65 blocks: impl IntoIterator<Item = (Cid, Bytes)> + Send,
66 ) -> Result<(), RepoError> {
67 let blocks: Vec<_> = blocks.into_iter().collect();
68 for (cid, data) in blocks {
69 let cid_bytes = cid.to_bytes();
70 let data_ref = data.as_ref();
71 sqlx::query!(
72 "INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING",
73 &cid_bytes,
74 data_ref
75 )
76 .execute(&self.pool)
77 .await
78 .map_err(|e| RepoError::storage(e))?;
79 }
80 Ok(())
81 }
82
83 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Bytes>>, RepoError> {
84 let mut results = Vec::new();
85 for cid in cids {
86 results.push(self.get(cid).await?);
87 }
88 Ok(results)
89 }
90
91 async fn apply_commit(&self, commit: CommitData) -> Result<(), RepoError> {
92 self.put_many(commit.blocks).await?;
93 Ok(())
94 }
95}