at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at main 437 lines 12 kB view raw
1use data_encoding::BASE32_NOPAD; 2use fjall::UserKey; 3use jacquard_common::types::crypto::{PublicKey, code_of, encode_uvarint}; 4use jacquard_common::types::string::Did; 5use jacquard_common::types::tid::Tid; 6use jacquard_common::{CowStr, IntoStatic}; 7use miette::{Context, IntoDiagnostic}; 8use serde::{Deserialize, Deserializer, Serialize, Serializer}; 9use smol_str::{SmolStr, SmolStrBuilder, format_smolstr}; 10use std::borrow::Cow; 11use std::fmt::Display; 12 13const S32_CHAR: &str = "234567abcdefghijklmnopqrstuvwxyz"; 14 15#[derive(Clone, Debug, PartialEq, Eq)] 16pub enum TrimmedDid<'s> { 17 Plc([u8; 15]), 18 Web(CowStr<'s>), 19 Other(CowStr<'s>), 20} 21 22const TAG_PLC: u8 = 1; 23const TAG_WEB: u8 = 2; 24 25impl<'s> TrimmedDid<'s> { 26 pub fn len(&self) -> usize { 27 match self { 28 TrimmedDid::Plc(_) => 16, 29 TrimmedDid::Web(s) => 1 + s.len(), 30 TrimmedDid::Other(s) => s.len(), 31 } 32 } 33 34 pub fn into_static(self) -> TrimmedDid<'static> { 35 match self { 36 TrimmedDid::Plc(bytes) => TrimmedDid::Plc(bytes), 37 TrimmedDid::Web(s) => TrimmedDid::Web(s.into_static()), 38 TrimmedDid::Other(s) => TrimmedDid::Other(s.into_static()), 39 } 40 } 41 42 pub fn to_did(&self) -> Did<'static> { 43 match self { 44 TrimmedDid::Plc(_) => { 45 let s = self.to_string(); 46 Did::new_owned(format_smolstr!("did:{}", s)).expect("valid did from plc") 47 } 48 TrimmedDid::Web(s) => { 49 Did::new_owned(format_smolstr!("did:web:{}", s)).expect("valid did from web") 50 } 51 TrimmedDid::Other(s) => { 52 Did::new_owned(format_smolstr!("did:{}", s)).expect("valid did from other") 53 } 54 } 55 } 56 57 pub fn write_to_vec(&self, buf: &mut Vec<u8>) { 58 match self { 59 TrimmedDid::Plc(bytes) => { 60 buf.push(TAG_PLC); 61 buf.extend_from_slice(bytes); 62 } 63 TrimmedDid::Web(s) => { 64 buf.push(TAG_WEB); 65 buf.extend_from_slice(s.as_bytes()); 66 } 67 TrimmedDid::Other(s) => buf.extend_from_slice(s.as_bytes()), 68 } 69 } 70 71 pub fn to_string(&self) -> String { 72 match self { 73 TrimmedDid::Plc(bytes) => { 74 let mut s = String::with_capacity(28); 75 s.push_str("plc:"); 76 s.push_str(&BASE32_NOPAD.encode(bytes).to_ascii_lowercase()); 77 s 78 } 79 TrimmedDid::Web(s) => { 80 format!("web:{}", s) 81 } 82 TrimmedDid::Other(s) => s.to_string(), 83 } 84 } 85} 86 87impl Display for TrimmedDid<'_> { 88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 89 match self { 90 TrimmedDid::Plc(bytes) => { 91 f.write_str("plc:")?; 92 f.write_str(&BASE32_NOPAD.encode(bytes).to_ascii_lowercase()) 93 } 94 TrimmedDid::Web(s) => { 95 f.write_str("web:")?; 96 f.write_str(s) 97 } 98 TrimmedDid::Other(s) => f.write_str(s), 99 } 100 } 101} 102 103impl<'a> From<&'a Did<'a>> for TrimmedDid<'a> { 104 fn from(did: &'a Did<'a>) -> Self { 105 let s = did.as_str(); 106 if let Some(rest) = s.strip_prefix("did:plc:") { 107 if rest.len() == 24 { 108 // decode 109 if let Ok(bytes) = BASE32_NOPAD.decode(rest.to_ascii_uppercase().as_bytes()) { 110 if bytes.len() == 15 { 111 return TrimmedDid::Plc(bytes.try_into().unwrap()); 112 } 113 } 114 } 115 } else if let Some(rest) = s.strip_prefix("did:web:") { 116 return TrimmedDid::Web(CowStr::Borrowed(rest)); 117 } 118 TrimmedDid::Other(CowStr::Borrowed(s.trim_start_matches("did:"))) 119 } 120} 121 122impl<'a> TryFrom<&'a [u8]> for TrimmedDid<'a> { 123 type Error = miette::Report; 124 125 fn try_from(value: &'a [u8]) -> miette::Result<Self> { 126 if value.is_empty() { 127 miette::bail!("empty did key"); 128 } 129 match value[0] { 130 TAG_PLC => { 131 if value.len() == 16 { 132 let mut arr = [0u8; 15]; 133 arr.copy_from_slice(&value[1..]); 134 return Ok(TrimmedDid::Plc(arr)); 135 } 136 miette::bail!("invalid length for tagged plc did"); 137 } 138 TAG_WEB => { 139 if let Ok(s) = std::str::from_utf8(&value[1..]) { 140 return Ok(TrimmedDid::Web(CowStr::Borrowed(s))); 141 } 142 miette::bail!("invalid utf8 for tagged web did"); 143 } 144 _ => { 145 if let Ok(s) = std::str::from_utf8(value) { 146 return Ok(TrimmedDid::Other(CowStr::Borrowed(s))); 147 } 148 miette::bail!("invalid utf8 for other did"); 149 } 150 } 151 } 152} 153 154impl<'a> Into<UserKey> for TrimmedDid<'a> { 155 fn into(self) -> UserKey { 156 let mut vec = Vec::with_capacity(32); 157 self.write_to_vec(&mut vec); 158 UserKey::new(&vec) 159 } 160} 161 162impl<'a> Into<UserKey> for &TrimmedDid<'a> { 163 fn into(self) -> UserKey { 164 let mut vec = Vec::with_capacity(32); 165 self.write_to_vec(&mut vec); 166 UserKey::new(&vec) 167 } 168} 169 170impl Serialize for TrimmedDid<'_> { 171 fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> { 172 let mut vec = Vec::with_capacity(32); 173 self.write_to_vec(&mut vec); 174 serializer.serialize_bytes(&vec) 175 } 176} 177 178impl<'de> Deserialize<'de> for TrimmedDid<'de> { 179 fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> { 180 struct DidVisitor; 181 impl<'de> serde::de::Visitor<'de> for DidVisitor { 182 type Value = TrimmedDid<'de>; 183 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { 184 formatter.write_str("bytes (tagged) or string (legacy)") 185 } 186 187 fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E> 188 where 189 E: serde::de::Error, 190 { 191 TrimmedDid::try_from(v) 192 .map(|td| td.into_static()) 193 .map_err(E::custom) 194 } 195 196 fn visit_borrowed_bytes<E>(self, v: &'de [u8]) -> Result<Self::Value, E> 197 where 198 E: serde::de::Error, 199 { 200 TrimmedDid::try_from(v).map_err(E::custom) 201 } 202 } 203 deserializer.deserialize_any(DidVisitor) 204 } 205} 206 207#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] 208pub struct DbTid(#[serde(with = "serde_bytes")] [u8; 8]); 209 210impl DbTid { 211 pub fn new_from_bytes(bytes: [u8; 8]) -> Self { 212 Self(bytes) 213 } 214 215 pub fn len(&self) -> usize { 216 self.0.len() 217 } 218 219 pub fn as_bytes(&self) -> &[u8] { 220 &self.0 221 } 222 223 pub fn to_tid(&self) -> Tid { 224 Tid::raw(self.to_smolstr()) 225 } 226 227 fn to_smolstr(&self) -> SmolStr { 228 let mut i = u64::from_be_bytes(self.0); 229 let mut s = SmolStrBuilder::new(); 230 for _ in 0..13 { 231 let c = i & 0x1F; 232 s.push(S32_CHAR.chars().nth(c as usize).unwrap()); 233 i >>= 5; 234 } 235 236 let mut builder = SmolStrBuilder::new(); 237 for c in s.finish().chars().rev() { 238 builder.push(c); 239 } 240 builder.finish() 241 } 242} 243 244pub fn s32decode(s: &str) -> u64 { 245 let mut i: usize = 0; 246 for c in s.chars() { 247 i = i * 32 + S32_CHAR.chars().position(|x| x == c).unwrap(); 248 } 249 i as u64 250} 251 252impl From<&Tid> for DbTid { 253 fn from(tid: &Tid) -> Self { 254 DbTid(s32decode(tid.as_str()).to_be_bytes()) 255 } 256} 257 258impl From<DbTid> for Tid { 259 fn from(val: DbTid) -> Self { 260 val.to_tid() 261 } 262} 263 264impl Display for DbTid { 265 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 266 write!(f, "{}", self.to_smolstr()) 267 } 268} 269 270#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] 271#[repr(u8)] 272pub enum DbAction { 273 Create = 0, 274 Update = 1, 275 Delete = 2, 276} 277 278impl DbAction { 279 pub fn as_str(&self) -> &'static str { 280 match self { 281 DbAction::Create => "create", 282 DbAction::Update => "update", 283 DbAction::Delete => "delete", 284 } 285 } 286} 287 288impl<'a> TryFrom<&'a str> for DbAction { 289 type Error = miette::Report; 290 291 fn try_from(s: &'a str) -> Result<Self, Self::Error> { 292 match s { 293 "create" => Ok(DbAction::Create), 294 "update" => Ok(DbAction::Update), 295 "delete" => Ok(DbAction::Delete), 296 _ => miette::bail!("invalid action: {}", s), 297 } 298 } 299} 300 301impl Display for DbAction { 302 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 303 write!(f, "{}", self.as_str()) 304 } 305} 306 307#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] 308#[serde(untagged)] 309pub enum DbRkey { 310 Tid(DbTid), 311 Str(SmolStr), 312} 313 314impl DbRkey { 315 pub fn new(s: &str) -> Self { 316 if let Ok(tid) = Tid::new(s) { 317 DbRkey::Tid(DbTid::from(&tid)) 318 } else { 319 DbRkey::Str(SmolStr::from(s)) 320 } 321 } 322 323 pub fn len(&self) -> usize { 324 match self { 325 DbRkey::Tid(tid) => tid.len(), 326 DbRkey::Str(s) => s.len(), 327 } 328 } 329 330 pub fn to_smolstr(&self) -> SmolStr { 331 match self { 332 DbRkey::Tid(tid) => tid.to_smolstr(), 333 DbRkey::Str(s) => s.clone(), 334 } 335 } 336} 337 338impl From<&str> for DbRkey { 339 fn from(s: &str) -> Self { 340 Self::new(s) 341 } 342} 343 344impl From<String> for DbRkey { 345 fn from(s: String) -> Self { 346 Self::new(&s) 347 } 348} 349 350impl From<SmolStr> for DbRkey { 351 fn from(s: SmolStr) -> Self { 352 Self::new(s.as_str()) 353 } 354} 355 356impl Display for DbRkey { 357 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 358 write!(f, "{}", self.to_smolstr()) 359 } 360} 361 362/// did:key:z... → raw multicodec public key bytes 363#[derive(Debug, Clone, Serialize, Deserialize, jacquard_derive::IntoStatic)] 364pub struct DidKey<'b>( 365 #[serde(borrow)] 366 #[serde(with = "serde_bytes")] 367 pub Cow<'b, [u8]>, 368); 369 370impl DidKey<'_> { 371 pub fn from_did_key(s: &str) -> miette::Result<Self> { 372 let multibase_str = s 373 .strip_prefix("did:key:") 374 .ok_or_else(|| miette::miette!("missing did:key: prefix in {s}"))?; 375 let (_base, bytes) = multibase::decode(multibase_str) 376 .into_diagnostic() 377 .wrap_err("invalid multibase in did:key")?; 378 Ok(Self(Cow::Owned(bytes))) 379 } 380 381 pub fn encode(&self) -> String { 382 multibase::encode(multibase::Base::Base58Btc, &self.0) 383 } 384} 385 386impl std::fmt::Display for DidKey<'_> { 387 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 388 write!(f, "did:key:{}", self.encode()) 389 } 390} 391 392impl From<PublicKey<'_>> for DidKey<'static> { 393 fn from(value: PublicKey<'_>) -> Self { 394 let mut bytes = Vec::with_capacity(8 + value.bytes.len()); 395 bytes.append(&mut encode_uvarint(code_of(value.codec))); 396 bytes.extend_from_slice(&value.bytes); 397 Self(bytes.into()) 398 } 399} 400 401#[cfg(test)] 402mod tests { 403 use super::*; 404 use jacquard_common::types::tid::Tid; 405 406 #[test] 407 fn test_dbtid_roundtrip() { 408 let tid_str = "3jzfcijpj2z2a"; 409 let tid = Tid::new(tid_str).unwrap(); 410 let db_tid = DbTid::from(&tid); 411 assert_eq!(db_tid.to_tid().as_str(), tid_str); 412 413 let tid_str_2 = "2222222222222"; 414 let tid = Tid::new(tid_str_2).unwrap(); 415 let db_tid = DbTid::from(&tid); 416 assert_eq!(db_tid.to_tid().as_str(), tid_str_2); 417 } 418 419 #[test] 420 fn test_dbrkey() { 421 let tid_str = "3jzfcijpj2z2a"; 422 let rkey = DbRkey::new(tid_str); 423 if let DbRkey::Tid(t) = rkey { 424 assert_eq!(t.to_tid().as_str(), tid_str); 425 } else { 426 panic!("expected tid"); 427 } 428 429 let str_val = "self"; 430 let rkey = DbRkey::new(str_val); 431 if let DbRkey::Str(s) = rkey { 432 assert_eq!(s, str_val); 433 } else { 434 panic!("expected str"); 435 } 436 } 437}