this repo has no description
1use bytes::Bytes; 2use cid::Cid; 3use jacquard_repo::error::RepoError; 4use jacquard_repo::repo::CommitData; 5use jacquard_repo::storage::BlockStore; 6use multihash::Multihash; 7use sha2::{Digest, Sha256}; 8use sqlx::PgPool; 9 10pub mod tracking; 11 12#[derive(Clone)] 13pub struct PostgresBlockStore { 14 pool: PgPool, 15} 16 17impl PostgresBlockStore { 18 pub fn new(pool: PgPool) -> Self { 19 Self { pool } 20 } 21} 22 23impl BlockStore for PostgresBlockStore { 24 async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> { 25 crate::metrics::record_block_operation("get"); 26 let cid_bytes = cid.to_bytes(); 27 let row = sqlx::query!("SELECT data FROM blocks WHERE cid = $1", &cid_bytes) 28 .fetch_optional(&self.pool) 29 .await 30 .map_err(|e| RepoError::storage(e))?; 31 32 match row { 33 Some(row) => Ok(Some(Bytes::from(row.data))), 34 None => Ok(None), 35 } 36 } 37 38 async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> { 39 crate::metrics::record_block_operation("put"); 40 let mut hasher = Sha256::new(); 41 hasher.update(data); 42 let hash = hasher.finalize(); 43 let multihash = Multihash::wrap(0x12, &hash) 44 .map_err(|e| RepoError::storage(std::io::Error::new(std::io::ErrorKind::InvalidData, format!("Failed to wrap multihash: {:?}", e))))?; 45 let cid = Cid::new_v1(0x71, multihash); 46 let cid_bytes = cid.to_bytes(); 47 48 sqlx::query!("INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING", &cid_bytes, data) 49 .execute(&self.pool) 50 .await 51 .map_err(|e| RepoError::storage(e))?; 52 53 Ok(cid) 54 } 55 56 async fn has(&self, cid: &Cid) -> Result<bool, RepoError> { 57 crate::metrics::record_block_operation("has"); 58 let cid_bytes = cid.to_bytes(); 59 let row = sqlx::query!("SELECT 1 as one FROM blocks WHERE cid = $1", &cid_bytes) 60 .fetch_optional(&self.pool) 61 .await 62 .map_err(|e| RepoError::storage(e))?; 63 64 Ok(row.is_some()) 65 } 66 67 async fn put_many( 68 &self, 69 blocks: impl IntoIterator<Item = (Cid, Bytes)> + Send, 70 ) -> Result<(), RepoError> { 71 let blocks: Vec<_> = blocks.into_iter().collect(); 72 if blocks.is_empty() { 73 return Ok(()); 74 } 75 76 crate::metrics::record_block_operation("put_many"); 77 let cids: Vec<Vec<u8>> = blocks.iter().map(|(cid, _)| cid.to_bytes()).collect(); 78 let data: Vec<&[u8]> = blocks.iter().map(|(_, d)| d.as_ref()).collect(); 79 80 sqlx::query!( 81 r#" 82 INSERT INTO blocks (cid, data) 83 SELECT * FROM UNNEST($1::bytea[], $2::bytea[]) 84 ON CONFLICT (cid) DO NOTHING 85 "#, 86 &cids, 87 &data as &[&[u8]] 88 ) 89 .execute(&self.pool) 90 .await 91 .map_err(|e| RepoError::storage(e))?; 92 93 Ok(()) 94 } 95 96 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Bytes>>, RepoError> { 97 if cids.is_empty() { 98 return Ok(Vec::new()); 99 } 100 101 crate::metrics::record_block_operation("get_many"); 102 let cid_bytes: Vec<Vec<u8>> = cids.iter().map(|c| c.to_bytes()).collect(); 103 104 let rows = sqlx::query!( 105 "SELECT cid, data FROM blocks WHERE cid = ANY($1)", 106 &cid_bytes 107 ) 108 .fetch_all(&self.pool) 109 .await 110 .map_err(|e| RepoError::storage(e))?; 111 112 let found: std::collections::HashMap<Vec<u8>, Bytes> = rows 113 .into_iter() 114 .map(|row| (row.cid, Bytes::from(row.data))) 115 .collect(); 116 117 let results = cid_bytes 118 .iter() 119 .map(|cid| found.get(cid).cloned()) 120 .collect(); 121 122 Ok(results) 123 } 124 125 async fn apply_commit(&self, commit: CommitData) -> Result<(), RepoError> { 126 self.put_many(commit.blocks).await?; 127 Ok(()) 128 } 129}