this repo has no description
1use bytes::Bytes;
2use cid::Cid;
3use jacquard_repo::error::RepoError;
4use jacquard_repo::repo::CommitData;
5use jacquard_repo::storage::BlockStore;
6use multihash::Multihash;
7use sha2::{Digest, Sha256};
8use sqlx::{PgPool, Row};
9
10#[derive(Clone)]
11pub struct PostgresBlockStore {
12 pool: PgPool,
13}
14
15impl PostgresBlockStore {
16 pub fn new(pool: PgPool) -> Self {
17 Self { pool }
18 }
19}
20
21impl BlockStore for PostgresBlockStore {
22 async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> {
23 let cid_bytes = cid.to_bytes();
24 let row = sqlx::query("SELECT data FROM blocks WHERE cid = $1")
25 .bind(cid_bytes)
26 .fetch_optional(&self.pool)
27 .await
28 .map_err(|e| RepoError::storage(e))?;
29
30 match row {
31 Some(row) => {
32 let data: Vec<u8> = row.get("data");
33 Ok(Some(Bytes::from(data)))
34 }
35 None => Ok(None),
36 }
37 }
38
39 async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> {
40 let mut hasher = Sha256::new();
41 hasher.update(data);
42 let hash = hasher.finalize();
43 let multihash = Multihash::wrap(0x12, &hash).unwrap();
44 let cid = Cid::new_v1(0x71, multihash);
45 let cid_bytes = cid.to_bytes();
46
47 sqlx::query("INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING")
48 .bind(cid_bytes)
49 .bind(data)
50 .execute(&self.pool)
51 .await
52 .map_err(|e| RepoError::storage(e))?;
53
54 Ok(cid)
55 }
56
57 async fn has(&self, cid: &Cid) -> Result<bool, RepoError> {
58 let cid_bytes = cid.to_bytes();
59 let row = sqlx::query("SELECT 1 FROM blocks WHERE cid = $1")
60 .bind(cid_bytes)
61 .fetch_optional(&self.pool)
62 .await
63 .map_err(|e| RepoError::storage(e))?;
64
65 Ok(row.is_some())
66 }
67
68 async fn put_many(
69 &self,
70 blocks: impl IntoIterator<Item = (Cid, Bytes)> + Send,
71 ) -> Result<(), RepoError> {
72 let blocks: Vec<_> = blocks.into_iter().collect();
73 for (cid, data) in blocks {
74 let cid_bytes = cid.to_bytes();
75 sqlx::query(
76 "INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING",
77 )
78 .bind(cid_bytes)
79 .bind(data.as_ref())
80 .execute(&self.pool)
81 .await
82 .map_err(|e| RepoError::storage(e))?;
83 }
84 Ok(())
85 }
86
87 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Bytes>>, RepoError> {
88 let mut results = Vec::new();
89 for cid in cids {
90 results.push(self.get(cid).await?);
91 }
92 Ok(results)
93 }
94
95 async fn apply_commit(&self, commit: CommitData) -> Result<(), RepoError> {
96 self.put_many(commit.blocks).await?;
97 Ok(())
98 }
99}