this repo has no description
1use jacquard_repo::storage::BlockStore;
2use jacquard_repo::error::RepoError;
3use jacquard_repo::repo::CommitData;
4use cid::Cid;
5use sqlx::{PgPool, Row};
6use bytes::Bytes;
7use sha2::{Sha256, Digest};
8use multihash::Multihash;
9
10#[derive(Clone)]
11pub struct PostgresBlockStore {
12 pool: PgPool,
13}
14
15impl PostgresBlockStore {
16 pub fn new(pool: PgPool) -> Self {
17 Self { pool }
18 }
19}
20
21impl BlockStore for PostgresBlockStore {
22 async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> {
23 let cid_bytes = cid.to_bytes();
24 let row = sqlx::query("SELECT data FROM blocks WHERE cid = $1")
25 .bind(cid_bytes)
26 .fetch_optional(&self.pool)
27 .await
28 .map_err(|e| RepoError::storage(e))?;
29
30 match row {
31 Some(row) => {
32 let data: Vec<u8> = row.get("data");
33 Ok(Some(Bytes::from(data)))
34 },
35 None => Ok(None),
36 }
37 }
38
39 async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> {
40 let mut hasher = Sha256::new();
41 hasher.update(data);
42 let hash = hasher.finalize();
43 let multihash = Multihash::wrap(0x12, &hash).unwrap();
44 let cid = Cid::new_v1(0x71, multihash);
45 let cid_bytes = cid.to_bytes();
46
47 sqlx::query("INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING")
48 .bind(cid_bytes)
49 .bind(data)
50 .execute(&self.pool)
51 .await
52 .map_err(|e| RepoError::storage(e))?;
53
54 Ok(cid)
55 }
56
57 async fn has(&self, cid: &Cid) -> Result<bool, RepoError> {
58 let cid_bytes = cid.to_bytes();
59 let row = sqlx::query("SELECT 1 FROM blocks WHERE cid = $1")
60 .bind(cid_bytes)
61 .fetch_optional(&self.pool)
62 .await
63 .map_err(|e| RepoError::storage(e))?;
64
65 Ok(row.is_some())
66 }
67
68 async fn put_many(&self, blocks: impl IntoIterator<Item = (Cid, Bytes)> + Send) -> Result<(), RepoError> {
69 let blocks: Vec<_> = blocks.into_iter().collect();
70 for (cid, data) in blocks {
71 let cid_bytes = cid.to_bytes();
72 sqlx::query("INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING")
73 .bind(cid_bytes)
74 .bind(data.as_ref())
75 .execute(&self.pool)
76 .await
77 .map_err(|e| RepoError::storage(e))?;
78 }
79 Ok(())
80 }
81
82 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Bytes>>, RepoError> {
83 let mut results = Vec::new();
84 for cid in cids {
85 results.push(self.get(cid).await?);
86 }
87 Ok(results)
88 }
89
90 async fn apply_commit(&self, commit: CommitData) -> Result<(), RepoError> {
91 self.put_many(commit.blocks).await?;
92 Ok(())
93 }
94}