this repo has no description
1use bytes::Bytes; 2use cid::Cid; 3use jacquard_repo::error::RepoError; 4use jacquard_repo::repo::CommitData; 5use jacquard_repo::storage::BlockStore; 6use multihash::Multihash; 7use sha2::{Digest, Sha256}; 8use sqlx::PgPool; 9 10pub mod tracking; 11 12#[derive(Clone)] 13pub struct PostgresBlockStore { 14 pool: PgPool, 15} 16 17impl PostgresBlockStore { 18 pub fn new(pool: PgPool) -> Self { 19 Self { pool } 20 } 21} 22 23impl BlockStore for PostgresBlockStore { 24 async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> { 25 let cid_bytes = cid.to_bytes(); 26 let row = sqlx::query!("SELECT data FROM blocks WHERE cid = $1", &cid_bytes) 27 .fetch_optional(&self.pool) 28 .await 29 .map_err(|e| RepoError::storage(e))?; 30 31 match row { 32 Some(row) => Ok(Some(Bytes::from(row.data))), 33 None => Ok(None), 34 } 35 } 36 37 async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> { 38 let mut hasher = Sha256::new(); 39 hasher.update(data); 40 let hash = hasher.finalize(); 41 let multihash = Multihash::wrap(0x12, &hash) 42 .map_err(|e| RepoError::storage(std::io::Error::new(std::io::ErrorKind::InvalidData, format!("Failed to wrap multihash: {:?}", e))))?; 43 let cid = Cid::new_v1(0x71, multihash); 44 let cid_bytes = cid.to_bytes(); 45 46 sqlx::query!("INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING", &cid_bytes, data) 47 .execute(&self.pool) 48 .await 49 .map_err(|e| RepoError::storage(e))?; 50 51 Ok(cid) 52 } 53 54 async fn has(&self, cid: &Cid) -> Result<bool, RepoError> { 55 let cid_bytes = cid.to_bytes(); 56 let row = sqlx::query!("SELECT 1 as one FROM blocks WHERE cid = $1", &cid_bytes) 57 .fetch_optional(&self.pool) 58 .await 59 .map_err(|e| RepoError::storage(e))?; 60 61 Ok(row.is_some()) 62 } 63 64 async fn put_many( 65 &self, 66 blocks: impl IntoIterator<Item = (Cid, Bytes)> + Send, 67 ) -> Result<(), RepoError> { 68 let blocks: Vec<_> = blocks.into_iter().collect(); 69 for (cid, data) in blocks { 70 let cid_bytes = cid.to_bytes(); 71 let data_ref = data.as_ref(); 72 sqlx::query!( 73 "INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING", 74 &cid_bytes, 75 data_ref 76 ) 77 .execute(&self.pool) 78 .await 79 .map_err(|e| RepoError::storage(e))?; 80 } 81 Ok(()) 82 } 83 84 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Bytes>>, RepoError> { 85 let mut results = Vec::new(); 86 for cid in cids { 87 results.push(self.get(cid).await?); 88 } 89 Ok(results) 90 } 91 92 async fn apply_commit(&self, commit: CommitData) -> Result<(), RepoError> { 93 self.put_many(commit.blocks).await?; 94 Ok(()) 95 } 96}