this repo has no description
1use bytes::Bytes; 2use cid::Cid; 3use jacquard_repo::error::RepoError; 4use jacquard_repo::repo::CommitData; 5use jacquard_repo::storage::BlockStore; 6use multihash::Multihash; 7use sha2::{Digest, Sha256}; 8use sqlx::PgPool; 9 10#[derive(Clone)] 11pub struct PostgresBlockStore { 12 pool: PgPool, 13} 14 15impl PostgresBlockStore { 16 pub fn new(pool: PgPool) -> Self { 17 Self { pool } 18 } 19} 20 21impl BlockStore for PostgresBlockStore { 22 async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> { 23 let cid_bytes = cid.to_bytes(); 24 let row = sqlx::query!("SELECT data FROM blocks WHERE cid = $1", &cid_bytes) 25 .fetch_optional(&self.pool) 26 .await 27 .map_err(|e| RepoError::storage(e))?; 28 29 match row { 30 Some(row) => Ok(Some(Bytes::from(row.data))), 31 None => Ok(None), 32 } 33 } 34 35 async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> { 36 let mut hasher = Sha256::new(); 37 hasher.update(data); 38 let hash = hasher.finalize(); 39 let multihash = Multihash::wrap(0x12, &hash).unwrap(); 40 let cid = Cid::new_v1(0x71, multihash); 41 let cid_bytes = cid.to_bytes(); 42 43 sqlx::query!("INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING", &cid_bytes, data) 44 .execute(&self.pool) 45 .await 46 .map_err(|e| RepoError::storage(e))?; 47 48 Ok(cid) 49 } 50 51 async fn has(&self, cid: &Cid) -> Result<bool, RepoError> { 52 let cid_bytes = cid.to_bytes(); 53 let row = sqlx::query!("SELECT 1 as one FROM blocks WHERE cid = $1", &cid_bytes) 54 .fetch_optional(&self.pool) 55 .await 56 .map_err(|e| RepoError::storage(e))?; 57 58 Ok(row.is_some()) 59 } 60 61 async fn put_many( 62 &self, 63 blocks: impl IntoIterator<Item = (Cid, Bytes)> + Send, 64 ) -> Result<(), RepoError> { 65 let blocks: Vec<_> = blocks.into_iter().collect(); 66 for (cid, data) in blocks { 67 let cid_bytes = cid.to_bytes(); 68 let data_ref = data.as_ref(); 69 sqlx::query!( 70 "INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING", 71 &cid_bytes, 72 data_ref 73 ) 74 .execute(&self.pool) 75 .await 76 .map_err(|e| RepoError::storage(e))?; 77 } 78 Ok(()) 79 } 80 81 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Bytes>>, RepoError> { 82 let mut results = Vec::new(); 83 for cid in cids { 84 results.push(self.get(cid).await?); 85 } 86 Ok(results) 87 } 88 89 async fn apply_commit(&self, commit: CommitData) -> Result<(), RepoError> { 90 self.put_many(commit.blocks).await?; 91 Ok(()) 92 } 93}