this repo has no description
at main 7.0 kB view raw
1use bytes::Bytes; 2use cid::Cid; 3use jacquard_repo::error::RepoError; 4use jacquard_repo::repo::CommitData; 5use jacquard_repo::storage::BlockStore; 6use multihash::Multihash; 7use sha2::{Digest, Sha256}; 8use sqlx::PgPool; 9use std::collections::HashSet; 10use std::sync::{Arc, Mutex}; 11 12#[derive(Clone)] 13pub struct PostgresBlockStore { 14 pool: PgPool, 15} 16 17impl PostgresBlockStore { 18 pub fn new(pool: PgPool) -> Self { 19 Self { pool } 20 } 21 22 pub fn pool(&self) -> &PgPool { 23 &self.pool 24 } 25} 26 27impl BlockStore for PostgresBlockStore { 28 async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> { 29 let cid_bytes = cid.to_bytes(); 30 let row = sqlx::query!("SELECT data FROM blocks WHERE cid = $1", &cid_bytes) 31 .fetch_optional(&self.pool) 32 .await 33 .map_err(RepoError::storage)?; 34 match row { 35 Some(row) => Ok(Some(Bytes::from(row.data))), 36 None => Ok(None), 37 } 38 } 39 40 async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> { 41 let mut hasher = Sha256::new(); 42 hasher.update(data); 43 let hash = hasher.finalize(); 44 let multihash = Multihash::wrap(0x12, &hash).map_err(|e| { 45 RepoError::storage(std::io::Error::new( 46 std::io::ErrorKind::InvalidData, 47 format!("Failed to wrap multihash: {:?}", e), 48 )) 49 })?; 50 let cid = Cid::new_v1(0x71, multihash); 51 let cid_bytes = cid.to_bytes(); 52 sqlx::query!( 53 "INSERT INTO blocks (cid, data) VALUES ($1, $2) ON CONFLICT (cid) DO NOTHING", 54 &cid_bytes, 55 data 56 ) 57 .execute(&self.pool) 58 .await 59 .map_err(RepoError::storage)?; 60 Ok(cid) 61 } 62 63 async fn has(&self, cid: &Cid) -> Result<bool, RepoError> { 64 let cid_bytes = cid.to_bytes(); 65 let row = sqlx::query!("SELECT 1 as one FROM blocks WHERE cid = $1", &cid_bytes) 66 .fetch_optional(&self.pool) 67 .await 68 .map_err(RepoError::storage)?; 69 Ok(row.is_some()) 70 } 71 72 async fn put_many( 73 &self, 74 blocks: impl IntoIterator<Item = (Cid, Bytes)> + Send, 75 ) -> Result<(), RepoError> { 76 let blocks: Vec<_> = blocks.into_iter().collect(); 77 if blocks.is_empty() { 78 return Ok(()); 79 } 80 let cids: Vec<Vec<u8>> = blocks.iter().map(|(cid, _)| cid.to_bytes()).collect(); 81 let data: Vec<&[u8]> = blocks.iter().map(|(_, d)| d.as_ref()).collect(); 82 sqlx::query!( 83 r#" 84 INSERT INTO blocks (cid, data) 85 SELECT * FROM UNNEST($1::bytea[], $2::bytea[]) 86 ON CONFLICT (cid) DO NOTHING 87 "#, 88 &cids, 89 &data as &[&[u8]] 90 ) 91 .execute(&self.pool) 92 .await 93 .map_err(RepoError::storage)?; 94 Ok(()) 95 } 96 97 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Bytes>>, RepoError> { 98 if cids.is_empty() { 99 return Ok(Vec::new()); 100 } 101 let cid_bytes: Vec<Vec<u8>> = cids.iter().map(|c| c.to_bytes()).collect(); 102 let rows = sqlx::query!( 103 "SELECT cid, data FROM blocks WHERE cid = ANY($1)", 104 &cid_bytes 105 ) 106 .fetch_all(&self.pool) 107 .await 108 .map_err(RepoError::storage)?; 109 let found: std::collections::HashMap<Vec<u8>, Bytes> = rows 110 .into_iter() 111 .map(|row| (row.cid, Bytes::from(row.data))) 112 .collect(); 113 let results = cid_bytes 114 .iter() 115 .map(|cid| found.get(cid).cloned()) 116 .collect(); 117 Ok(results) 118 } 119 120 async fn apply_commit(&self, commit: CommitData) -> Result<(), RepoError> { 121 self.put_many(commit.blocks).await?; 122 Ok(()) 123 } 124} 125 126#[derive(Clone)] 127pub struct TrackingBlockStore { 128 inner: PostgresBlockStore, 129 written_cids: Arc<Mutex<Vec<Cid>>>, 130 read_cids: Arc<Mutex<HashSet<Cid>>>, 131} 132 133impl TrackingBlockStore { 134 pub fn new(store: PostgresBlockStore) -> Self { 135 Self { 136 inner: store, 137 written_cids: Arc::new(Mutex::new(Vec::new())), 138 read_cids: Arc::new(Mutex::new(HashSet::new())), 139 } 140 } 141 142 pub fn get_written_cids(&self) -> Vec<Cid> { 143 match self.written_cids.lock() { 144 Ok(guard) => guard.clone(), 145 Err(poisoned) => poisoned.into_inner().clone(), 146 } 147 } 148 149 pub fn get_read_cids(&self) -> Vec<Cid> { 150 match self.read_cids.lock() { 151 Ok(guard) => guard.iter().cloned().collect(), 152 Err(poisoned) => poisoned.into_inner().iter().cloned().collect(), 153 } 154 } 155 156 pub fn get_all_relevant_cids(&self) -> Vec<Cid> { 157 let written = self.get_written_cids(); 158 let read = self.get_read_cids(); 159 let mut all: HashSet<Cid> = written.into_iter().collect(); 160 all.extend(read); 161 all.into_iter().collect() 162 } 163} 164 165impl BlockStore for TrackingBlockStore { 166 async fn get(&self, cid: &Cid) -> Result<Option<Bytes>, RepoError> { 167 let result = self.inner.get(cid).await?; 168 if result.is_some() { 169 match self.read_cids.lock() { 170 Ok(mut guard) => { 171 guard.insert(*cid); 172 } 173 Err(poisoned) => { 174 poisoned.into_inner().insert(*cid); 175 } 176 } 177 } 178 Ok(result) 179 } 180 181 async fn put(&self, data: &[u8]) -> Result<Cid, RepoError> { 182 let cid = self.inner.put(data).await?; 183 match self.written_cids.lock() { 184 Ok(mut guard) => guard.push(cid), 185 Err(poisoned) => poisoned.into_inner().push(cid), 186 } 187 Ok(cid) 188 } 189 190 async fn has(&self, cid: &Cid) -> Result<bool, RepoError> { 191 self.inner.has(cid).await 192 } 193 194 async fn put_many( 195 &self, 196 blocks: impl IntoIterator<Item = (Cid, Bytes)> + Send, 197 ) -> Result<(), RepoError> { 198 let blocks: Vec<_> = blocks.into_iter().collect(); 199 let cids: Vec<Cid> = blocks.iter().map(|(cid, _)| *cid).collect(); 200 self.inner.put_many(blocks).await?; 201 match self.written_cids.lock() { 202 Ok(mut guard) => guard.extend(cids), 203 Err(poisoned) => poisoned.into_inner().extend(cids), 204 } 205 Ok(()) 206 } 207 208 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Bytes>>, RepoError> { 209 let results = self.inner.get_many(cids).await?; 210 cids.iter() 211 .zip(results.iter()) 212 .filter(|(_, result)| result.is_some()) 213 .for_each(|(cid, _)| match self.read_cids.lock() { 214 Ok(mut guard) => { 215 guard.insert(*cid); 216 } 217 Err(poisoned) => { 218 poisoned.into_inner().insert(*cid); 219 } 220 }); 221 Ok(results) 222 } 223 224 async fn apply_commit(&self, commit: CommitData) -> Result<(), RepoError> { 225 self.put_many(commit.blocks).await?; 226 Ok(()) 227 } 228}