this repo has no description
1use bytes::Bytes; 2use cid::Cid; 3use jacquard_repo::error::RepoError; 4use jacquard_repo::repo::CommitData; 5use jacquard_repo::storage::BlockStore; 6use multihash::Multihash; 7use sha2::{Digest, Sha256}; 8use sqlx::{PgPool, Row}; 9 10#[derive(Clone)] 11pub struct PostgresBlockStore { 12 pool: PgPool, 13} 14 15impl PostgresBlockStore { 16 pub fn new(pool: PgPool) -> Self { 17 Self { pool } 18 } 19} 20 21impl BlockStore for PostgresBlockStore { 22 async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> { 23 let cid_bytes = cid.to_bytes(); 24 let row = sqlx::query("SELECT data FROM blocks WHERE cid = $1") 25 .bind(cid_bytes) 26 .fetch_optional(&self.pool) 27 .await 28 .map_err(|e| RepoError::storage(e))?; 29 30 match row { 31 Some(row) => { 32 let data: Vec<u8> = row.get("data"); 33 Ok(Some(Bytes::from(data))) 34 } 35 None => Ok(None), 36 } 37 } 38 39 async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> { 40 let mut hasher = Sha256::new(); 41 hasher.update(data); 42 let hash = hasher.finalize(); 43 let multihash = Multihash::wrap(0x12, &hash).unwrap(); 44 let cid = Cid::new_v1(0x71, multihash); 45 let cid_bytes = cid.to_bytes(); 46 47 sqlx::query("INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING") 48 .bind(cid_bytes) 49 .bind(data) 50 .execute(&self.pool) 51 .await 52 .map_err(|e| RepoError::storage(e))?; 53 54 Ok(cid) 55 } 56 57 async fn has(&self, cid: &Cid) -> Result<bool, RepoError> { 58 let cid_bytes = cid.to_bytes(); 59 let row = sqlx::query("SELECT 1 FROM blocks WHERE cid = $1") 60 .bind(cid_bytes) 61 .fetch_optional(&self.pool) 62 .await 63 .map_err(|e| RepoError::storage(e))?; 64 65 Ok(row.is_some()) 66 } 67 68 async fn put_many( 69 &self, 70 blocks: impl IntoIterator<Item = (Cid, Bytes)> + Send, 71 ) -> Result<(), RepoError> { 72 let blocks: Vec<_> = blocks.into_iter().collect(); 73 for (cid, data) in blocks { 74 let cid_bytes = cid.to_bytes(); 75 sqlx::query( 76 "INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING", 77 ) 78 .bind(cid_bytes) 79 .bind(data.as_ref()) 80 .execute(&self.pool) 81 .await 82 .map_err(|e| RepoError::storage(e))?; 83 } 84 Ok(()) 85 } 86 87 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Bytes>>, RepoError> { 88 let mut results = Vec::new(); 89 for cid in cids { 90 results.push(self.get(cid).await?); 91 } 92 Ok(results) 93 } 94 95 async fn apply_commit(&self, commit: CommitData) -> Result<(), RepoError> { 96 self.put_many(commit.blocks).await?; 97 Ok(()) 98 } 99}