this repo has no description
1use jacquard_repo::storage::BlockStore; 2use jacquard_repo::error::RepoError; 3use jacquard_repo::repo::CommitData; 4use cid::Cid; 5use sqlx::{PgPool, Row}; 6use bytes::Bytes; 7use sha2::{Sha256, Digest}; 8use multihash::Multihash; 9 10#[derive(Clone)] 11pub struct PostgresBlockStore { 12 pool: PgPool, 13} 14 15impl PostgresBlockStore { 16 pub fn new(pool: PgPool) -> Self { 17 Self { pool } 18 } 19} 20 21impl BlockStore for PostgresBlockStore { 22 async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> { 23 let cid_bytes = cid.to_bytes(); 24 let row = sqlx::query("SELECT data FROM blocks WHERE cid = $1") 25 .bind(cid_bytes) 26 .fetch_optional(&self.pool) 27 .await 28 .map_err(|e| RepoError::storage(e))?; 29 30 match row { 31 Some(row) => { 32 let data: Vec<u8> = row.get("data"); 33 Ok(Some(Bytes::from(data))) 34 }, 35 None => Ok(None), 36 } 37 } 38 39 async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> { 40 let mut hasher = Sha256::new(); 41 hasher.update(data); 42 let hash = hasher.finalize(); 43 let multihash = Multihash::wrap(0x12, &hash).unwrap(); 44 let cid = Cid::new_v1(0x71, multihash); 45 let cid_bytes = cid.to_bytes(); 46 47 sqlx::query("INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING") 48 .bind(cid_bytes) 49 .bind(data) 50 .execute(&self.pool) 51 .await 52 .map_err(|e| RepoError::storage(e))?; 53 54 Ok(cid) 55 } 56 57 async fn has(&self, cid: &Cid) -> Result<bool, RepoError> { 58 let cid_bytes = cid.to_bytes(); 59 let row = sqlx::query("SELECT 1 FROM blocks WHERE cid = $1") 60 .bind(cid_bytes) 61 .fetch_optional(&self.pool) 62 .await 63 .map_err(|e| RepoError::storage(e))?; 64 65 Ok(row.is_some()) 66 } 67 68 async fn put_many(&self, blocks: impl IntoIterator<Item = (Cid, Bytes)> + Send) -> Result<(), RepoError> { 69 let blocks: Vec<_> = blocks.into_iter().collect(); 70 for (cid, data) in blocks { 71 let cid_bytes = cid.to_bytes(); 72 sqlx::query("INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING") 73 .bind(cid_bytes) 74 .bind(data.as_ref()) 75 .execute(&self.pool) 76 .await 77 .map_err(|e| RepoError::storage(e))?; 78 } 79 Ok(()) 80 } 81 82 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Bytes>>, RepoError> { 83 let mut results = Vec::new(); 84 for cid in cids { 85 results.push(self.get(cid).await?); 86 } 87 Ok(results) 88 } 89 90 async fn apply_commit(&self, commit: CommitData) -> Result<(), RepoError> { 91 self.put_many(commit.blocks).await?; 92 Ok(()) 93 } 94}