this repo has no description
1use bytes::Bytes; 2use cid::Cid; 3use jacquard_repo::error::RepoError; 4use jacquard_repo::repo::CommitData; 5use jacquard_repo::storage::BlockStore; 6use multihash::Multihash; 7use sha2::{Digest, Sha256}; 8use sqlx::PgPool; 9pub mod tracking; 10#[derive(Clone)] 11pub struct PostgresBlockStore { 12 pool: PgPool, 13} 14impl PostgresBlockStore { 15 pub fn new(pool: PgPool) -> Self { 16 Self { pool } 17 } 18} 19impl BlockStore for PostgresBlockStore { 20 async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> { 21 crate::metrics::record_block_operation("get"); 22 let cid_bytes = cid.to_bytes(); 23 let row = sqlx::query!("SELECT data FROM blocks WHERE cid = $1", &cid_bytes) 24 .fetch_optional(&self.pool) 25 .await 26 .map_err(|e| RepoError::storage(e))?; 27 match row { 28 Some(row) => Ok(Some(Bytes::from(row.data))), 29 None => Ok(None), 30 } 31 } 32 async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> { 33 crate::metrics::record_block_operation("put"); 34 let mut hasher = Sha256::new(); 35 hasher.update(data); 36 let hash = hasher.finalize(); 37 let multihash = Multihash::wrap(0x12, &hash) 38 .map_err(|e| RepoError::storage(std::io::Error::new(std::io::ErrorKind::InvalidData, format!("Failed to wrap multihash: {:?}", e))))?; 39 let cid = Cid::new_v1(0x71, multihash); 40 let cid_bytes = cid.to_bytes(); 41 sqlx::query!("INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING", &cid_bytes, data) 42 .execute(&self.pool) 43 .await 44 .map_err(|e| RepoError::storage(e))?; 45 Ok(cid) 46 } 47 async fn has(&self, cid: &Cid) -> Result<bool, RepoError> { 48 crate::metrics::record_block_operation("has"); 49 let cid_bytes = cid.to_bytes(); 50 let row = sqlx::query!("SELECT 1 as one FROM blocks WHERE cid = $1", &cid_bytes) 51 .fetch_optional(&self.pool) 52 .await 53 .map_err(|e| RepoError::storage(e))?; 54 Ok(row.is_some()) 55 } 56 async fn put_many( 57 &self, 58 blocks: impl IntoIterator<Item = (Cid, Bytes)> + Send, 59 ) -> Result<(), RepoError> { 60 let blocks: Vec<_> = blocks.into_iter().collect(); 61 if blocks.is_empty() { 62 return Ok(()); 63 } 64 crate::metrics::record_block_operation("put_many"); 65 let cids: Vec<Vec<u8>> = blocks.iter().map(|(cid, _)| cid.to_bytes()).collect(); 66 let data: Vec<&[u8]> = blocks.iter().map(|(_, d)| d.as_ref()).collect(); 67 sqlx::query!( 68 r#" 69 INSERT INTO blocks (cid, data) 70 SELECT * FROM UNNEST($1::bytea[], $2::bytea[]) 71 ON CONFLICT (cid) DO NOTHING 72 "#, 73 &cids, 74 &data as &[&[u8]] 75 ) 76 .execute(&self.pool) 77 .await 78 .map_err(|e| RepoError::storage(e))?; 79 Ok(()) 80 } 81 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Bytes>>, RepoError> { 82 if cids.is_empty() { 83 return Ok(Vec::new()); 84 } 85 crate::metrics::record_block_operation("get_many"); 86 let cid_bytes: Vec<Vec<u8>> = cids.iter().map(|c| c.to_bytes()).collect(); 87 let rows = sqlx::query!( 88 "SELECT cid, data FROM blocks WHERE cid = ANY($1)", 89 &cid_bytes 90 ) 91 .fetch_all(&self.pool) 92 .await 93 .map_err(|e| RepoError::storage(e))?; 94 let found: std::collections::HashMap<Vec<u8>, Bytes> = rows 95 .into_iter() 96 .map(|row| (row.cid, Bytes::from(row.data))) 97 .collect(); 98 let results = cid_bytes 99 .iter() 100 .map(|cid| found.get(cid).cloned()) 101 .collect(); 102 Ok(results) 103 } 104 async fn apply_commit(&self, commit: CommitData) -> Result<(), RepoError> { 105 self.put_many(commit.blocks).await?; 106 Ok(()) 107 } 108}