Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

wip: m2m

authored by seoul.systems and committed by tangled.org 1cbfe6e5 8de64a8f

+460 -1
+1 -1
constellation/src/lib.rs
··· 31 31 } 32 32 } 33 33 34 - #[derive(Debug, PartialEq, Serialize, Deserialize)] 34 + #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] 35 35 pub struct RecordId { 36 36 pub did: Did, 37 37 pub collection: String,
+102
constellation/src/server/mod.rs
··· 114 114 }), 115 115 ) 116 116 .route( 117 + "/xrpc/blue.microcosm.links.getManyToMany", 118 + get({ 119 + let store = store.clone(); 120 + move |accept, query| async { 121 + spawn_blocking(|| get_many_to_many(accept, query, store)) 122 + .await 123 + .map_err(to500)? 124 + } 125 + }), 126 + ) 127 + .route( 117 128 "/xrpc/blue.microcosm.links.getBacklinks", 118 129 get({ 119 130 let store = store.clone(); ··· 660 671 accept, 661 672 GetLinkItemsResponse { 662 673 total: paged.total, 674 + linking_records: paged.items, 675 + cursor, 676 + query: (*query).clone(), 677 + }, 678 + )) 679 + } 680 + 681 + #[derive(Clone, Deserialize)] 682 + #[serde(rename_all = "camelCase")] 683 + struct GetManyToManyItemsQuery { 684 + subject: String, 685 + source: String, 686 + /// path to the secondary link in the linking record 687 + path_to_other: String, 688 + /// filter to linking records (join of the m2m) by these DIDs 689 + #[serde(default)] 690 + did: Vec<String>, 691 + /// filter to specific secondary records 692 + #[serde(default)] 693 + other_subject: Vec<String>, 694 + cursor: Option<OpaqueApiCursor>, 695 + #[serde(default = "get_default_cursor_limit")] 696 + limit: u64, 697 + } 698 + #[derive(Template, Serialize)] 699 + #[template(path = "get-many-to-many.html.j2")] 700 + struct GetManyToManyItemsResponse { 701 + linking_records: Vec<(String, Vec<RecordId>)>, 702 + cursor: Option<OpaqueApiCursor>, 703 + #[serde(skip_serializing)] 704 + query: GetManyToManyItemsQuery, 705 + } 706 + fn get_many_to_many( 707 + accept: ExtractAccept, 708 + query: axum_extra::extract::Query<GetManyToManyItemsQuery>, // supports multiple param occurrences 709 + store: impl LinkReader, 710 + ) -> Result<impl IntoResponse, http::StatusCode> { 711 + let after = query 712 + .cursor 713 + .clone() 714 + .map(|oc| ApiKeyedCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST)) 715 + .transpose()? 716 + .map(|c| c.next); 717 + 718 + let limit = query.limit; 719 + if limit > DEFAULT_CURSOR_LIMIT_MAX { 720 + return Err(http::StatusCode::BAD_REQUEST); 721 + } 722 + 723 + let filter_dids: HashSet<Did> = HashSet::from_iter( 724 + query 725 + .did 726 + .iter() 727 + .map(|d| d.trim()) 728 + .filter(|d| !d.is_empty()) 729 + .map(|d| Did(d.to_string())), 730 + ); 731 + 732 + let filter_other_subjects: HashSet<String> = HashSet::from_iter( 733 + query 734 + .other_subject 735 + .iter() 736 + .map(|s| s.trim().to_string()) 737 + .filter(|s| !s.is_empty()), 738 + ); 739 + 740 + let Some((collection, path)) = query.source.split_once(':') else { 741 + return Err(http::StatusCode::BAD_REQUEST); 742 + }; 743 + let path = format!(".{path}"); 744 + 745 + let path_to_other = format!(".{}", query.path_to_other); 746 + 747 + let paged = store 748 + .get_many_to_many( 749 + &query.subject, 750 + collection, 751 + &path, 752 + &path_to_other, 753 + limit, 754 + after, 755 + &filter_dids, 756 + &filter_other_subjects, 757 + ) 758 + .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 759 + 760 + let cursor = paged.next.map(|next| ApiKeyedCursor { next }.into()); 761 + 762 + Ok(acceptable( 763 + accept, 764 + GetManyToManyItemsResponse { 663 765 linking_records: paged.items, 664 766 cursor, 665 767 query: (*query).clone(),
+93
constellation/src/storage/mem_store.rs
··· 234 234 .len() as u64) 235 235 } 236 236 237 + fn get_many_to_many( 238 + &self, 239 + target: &str, 240 + collection: &str, 241 + path: &str, 242 + path_to_other: &str, 243 + limit: u64, 244 + after: Option<String>, 245 + filter_dids: &HashSet<Did>, 246 + filter_to_targets: &HashSet<String>, 247 + ) -> Result<PagedOrderedCollection<(String, Vec<RecordId>), String>> { 248 + let empty_res = Ok(PagedOrderedCollection { 249 + items: Vec::new(), 250 + next: Some("".to_string()), 251 + }); 252 + 253 + // struct MemStorageData { 254 + // dids: HashMap<Did, bool>, 255 + // targets: HashMap<Target, HashMap<Source, Linkers>>, 256 + // links: HashMap<Did, HashMap<RepoId, Vec<(RecordPath, Target)>>>, 257 + // } 258 + let data = self.0.lock().unwrap(); 259 + 260 + let Some(sources) = data.targets.get(&Target::new(target)) else { 261 + return empty_res; 262 + }; 263 + let Some(linkers) = sources.get(&Source::new(collection, path)) else { 264 + return empty_res; 265 + }; 266 + let path_to_other = RecordPath::new(path_to_other); 267 + 268 + // Convert filter_to_targets to Target objects for comparison 269 + let filter_to_target_objs: HashSet<Target> = 270 + HashSet::from_iter(filter_to_targets.iter().map(|s| Target::new(s))); 271 + 272 + let mut grouped_links: HashMap<Target, Vec<RecordId>> = HashMap::new(); 273 + for (did, rkey) in linkers.iter().flatten().cloned() { 274 + // Filter by DID if filter is provided 275 + if !filter_dids.is_empty() && !filter_dids.contains(&did) { 276 + continue; 277 + } 278 + if let Some(fwd_target) = data 279 + .links 280 + .get(&did) 281 + .unwrap_or(&HashMap::new()) 282 + .get(&RepoId { 283 + collection: collection.to_string(), 284 + rkey: rkey.clone(), 285 + }) 286 + .unwrap_or(&Vec::new()) 287 + .iter() 288 + .find_map(|(path, target)| { 289 + if *path == path_to_other 290 + && (filter_to_target_objs.is_empty() 291 + || filter_to_target_objs.contains(target)) 292 + { 293 + Some(target) 294 + } else { 295 + None 296 + } 297 + }) 298 + { 299 + let record_ids = grouped_links.entry(fwd_target.clone()).or_default(); 300 + record_ids.push(RecordId { 301 + did, 302 + collection: collection.to_string(), 303 + rkey: rkey.0, 304 + }); 305 + } 306 + } 307 + 308 + let mut items = grouped_links 309 + .into_iter() 310 + .map(|(t, r)| (t.0, r)) 311 + .collect::<Vec<_>>(); 312 + 313 + items.sort_by(|(a, _), (b, _)| a.cmp(b)); 314 + 315 + items = items 316 + .into_iter() 317 + .skip_while(|(t, _)| after.as_ref().map(|a| t <= a).unwrap_or(false)) 318 + .take(limit as usize) 319 + .collect(); 320 + 321 + let next = if items.len() as u64 >= limit { 322 + items.last().map(|(t, _)| t.clone()) 323 + } else { 324 + None 325 + }; 326 + 327 + Ok(PagedOrderedCollection { items, next }) 328 + } 329 + 237 330 fn get_links( 238 331 &self, 239 332 target: &str,
+12
constellation/src/storage/mod.rs
··· 135 135 fn get_all_record_counts(&self, _target: &str) 136 136 -> Result<HashMap<String, HashMap<String, u64>>>; 137 137 138 + fn get_many_to_many( 139 + &self, 140 + target: &str, 141 + collection: &str, 142 + path: &str, 143 + path_to_other: &str, 144 + limit: u64, 145 + after: Option<String>, 146 + filter_dids: &HashSet<Did>, 147 + filter_to_targets: &HashSet<String>, 148 + ) -> Result<PagedOrderedCollection<(String, Vec<RecordId>), String>>; 149 + 138 150 fn get_all_counts( 139 151 &self, 140 152 _target: &str,
+143
constellation/src/storage/rocks_store.rs
··· 1122 1122 } 1123 1123 } 1124 1124 1125 + fn get_many_to_many( 1126 + &self, 1127 + target: &str, 1128 + collection: &str, 1129 + path: &str, 1130 + path_to_other: &str, 1131 + limit: u64, 1132 + after: Option<String>, 1133 + filter_dids: &HashSet<Did>, 1134 + filter_to_targets: &HashSet<String>, 1135 + ) -> Result<PagedOrderedCollection<(String, Vec<RecordId>), String>> { 1136 + let collection = Collection(collection.to_string()); 1137 + let path = RPath(path.to_string()); 1138 + 1139 + let target_key = TargetKey(Target(target.to_string()), collection.clone(), path); 1140 + 1141 + let after = after.map(|s| s.parse::<u64>().map(TargetId)).transpose()?; 1142 + 1143 + let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 1144 + eprintln!("Target not found for {target_key:?}"); 1145 + return Ok(Default::default()); 1146 + }; 1147 + 1148 + let filter_did_ids: HashMap<DidId, bool> = filter_dids 1149 + .iter() 1150 + .filter_map(|did| self.did_id_table.get_id_val(&self.db, did).transpose()) 1151 + .collect::<Result<Vec<DidIdValue>>>()? 1152 + .into_iter() 1153 + .map(|DidIdValue(id, active)| (id, active)) 1154 + .collect(); 1155 + 1156 + let mut filter_to_target_ids: HashSet<TargetId> = HashSet::new(); 1157 + for t in filter_to_targets { 1158 + for (_, target_id) in self.iter_targets_for_target(&Target(t.to_string())) { 1159 + filter_to_target_ids.insert(target_id); 1160 + } 1161 + } 1162 + 1163 + let linkers = self.get_target_linkers(&target_id)?; 1164 + 1165 + // we want to provide many to many which effectively means that we want to show a specific 1166 + // list of reords that is linked to by a specific number of linkers 1167 + let mut grouped_links: BTreeMap<TargetId, Vec<RecordId>> = BTreeMap::new(); 1168 + for (did_id, rkey) in linkers.0 { 1169 + if did_id.is_empty() { 1170 + continue; 1171 + } 1172 + 1173 + if !filter_did_ids.is_empty() && filter_did_ids.get(&did_id) != Some(&true) { 1174 + continue; 1175 + } 1176 + 1177 + // Make sure the current did is active 1178 + let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? else { 1179 + eprintln!("failed to look up did from did_id {did_id:?}"); 1180 + continue; 1181 + }; 1182 + let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? else { 1183 + eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?"); 1184 + continue; 1185 + }; 1186 + if !active { 1187 + continue; 1188 + } 1189 + 1190 + let record_link_key = RecordLinkKey(did_id, collection.clone(), rkey.clone()); 1191 + let Some(targets) = self.get_record_link_targets(&record_link_key)? else { 1192 + continue; 1193 + }; 1194 + 1195 + let Some(fwd_target) = targets 1196 + .0 1197 + .into_iter() 1198 + .filter_map(|RecordLinkTarget(rpath, target_id)| { 1199 + if rpath.0 == path_to_other 1200 + && (filter_to_target_ids.is_empty() 1201 + || filter_to_target_ids.contains(&target_id)) 1202 + { 1203 + Some(target_id) 1204 + } else { 1205 + None 1206 + } 1207 + }) 1208 + .take(1) 1209 + .next() 1210 + else { 1211 + eprintln!("no forward match found."); 1212 + continue; 1213 + }; 1214 + 1215 + // pagination logic mirrors what is currently done in get_many_to_many_counts 1216 + if after.as_ref().map(|a| fwd_target <= *a).unwrap_or(false) { 1217 + continue; 1218 + } 1219 + let page_is_full = grouped_links.len() as u64 >= limit; 1220 + if page_is_full { 1221 + let current_max = grouped_links.keys().next_back().unwrap(); 1222 + if fwd_target > *current_max { 1223 + continue; 1224 + } 1225 + } 1226 + 1227 + // pagination, continued 1228 + let mut should_evict = false; 1229 + let entry = grouped_links.entry(fwd_target.clone()).or_insert_with(|| { 1230 + should_evict = page_is_full; 1231 + Vec::default() 1232 + }); 1233 + entry.push(RecordId { 1234 + did, 1235 + collection: collection.0.clone(), 1236 + rkey: rkey.0, 1237 + }); 1238 + 1239 + if should_evict { 1240 + grouped_links.pop_last(); 1241 + } 1242 + } 1243 + 1244 + let mut items: Vec<(String, Vec<RecordId>)> = Vec::with_capacity(grouped_links.len()); 1245 + for (fwd_target_id, records) in &grouped_links { 1246 + let Some(target_key) = self 1247 + .target_id_table 1248 + .get_val_from_id(&self.db, fwd_target_id.0)? 1249 + else { 1250 + eprintln!("failed to look up target from target_id {fwd_target_id:?}"); 1251 + continue; 1252 + }; 1253 + 1254 + let target_string = target_key.0 .0; 1255 + 1256 + items.push((target_string, records.clone())); 1257 + } 1258 + 1259 + let next = if grouped_links.len() as u64 >= limit { 1260 + grouped_links.keys().next_back().map(|k| format!("{}", k.0)) 1261 + } else { 1262 + None 1263 + }; 1264 + 1265 + Ok(PagedOrderedCollection { items, next }) 1266 + } 1267 + 1125 1268 fn get_links( 1126 1269 &self, 1127 1270 target: &str,
+60
constellation/templates/get-many-to-many.html.j2
··· 1 + {% extends "base.html.j2" %} 2 + {% import "try-it-macros.html.j2" as try_it %} 3 + 4 + {% block title %}Many-to-Many Links{% endblock %} 5 + {% block description %}All {{ query.source }} records with many-to-many links to {{ query.subject }} joining through {{ query.path_to_other }}{% endblock %} 6 + 7 + {% block content %} 8 + 9 + {% call try_it::get_many_to_many(query.subject, query.source, query.path_to_other, query.did, query.other_subject, query.limit) %} 10 + 11 + <h2> 12 + Many-to-many links to <code>{{ query.subject }}</code> 13 + {% if let Some(browseable_uri) = query.subject|to_browseable %} 14 + <small style="font-weight: normal; font-size: 1rem"><a href="{{ browseable_uri }}">browse record</a></small> 15 + {% endif %} 16 + </h2> 17 + 18 + <p><strong>Many-to-many links</strong> from <code>{{ query.source }}</code> joining through <code>{{ query.path_to_other }}</code></p> 19 + 20 + <ul> 21 + <li>See all links to this target at <code>/links/all</code>: <a href="/links/all?target={{ query.subject|urlencode }}">/links/all?target={{ query.subject }}</a></li> 22 + </ul> 23 + 24 + <h3>Many-to-many links, most recent first:</h3> 25 + 26 + {% for (target, records) in linking_records %} 27 + <h4>Target: <code>{{ target }}</code> <small>(<a href="/links/all?target={{ target|urlencode }}">view all links</a>)</small></h4> 28 + {% for record in records %} 29 + <pre style="display: block; margin: 1em 2em" class="code"><strong>DID</strong>: {{ record.did().0 }} 30 + <strong>Collection</strong>: {{ record.collection }} 31 + <strong>RKey</strong>: {{ record.rkey }} 32 + -> <a href="https://pdsls.dev/at://{{ record.did().0 }}/{{ record.collection }}/{{ record.rkey }}">browse record</a></pre> 33 + {% endfor %} 34 + {% endfor %} 35 + 36 + {% if let Some(c) = cursor %} 37 + <form method="get" action="/xrpc/blue.microcosm.links.getManyToMany"> 38 + <input type="hidden" name="subject" value="{{ query.subject }}" /> 39 + <input type="hidden" name="source" value="{{ query.source }}" /> 40 + <input type="hidden" name="pathToOther" value="{{ query.path_to_other }}" /> 41 + {% for did in query.did %} 42 + <input type="hidden" name="did" value="{{ did }}" /> 43 + {% endfor %} 44 + {% for other in query.other_subject %} 45 + <input type="hidden" name="otherSubject" value="{{ other }}" /> 46 + {% endfor %} 47 + <input type="hidden" name="limit" value="{{ query.limit }}" /> 48 + <input type="hidden" name="cursor" value={{ c|json|safe }} /> 49 + <button type="submit">next page&hellip;</button> 50 + </form> 51 + {% else %} 52 + <button disabled><em>end of results</em></button> 53 + {% endif %} 54 + 55 + <details> 56 + <summary>Raw JSON response</summary> 57 + <pre class="code">{{ self|tojson }}</pre> 58 + </details> 59 + 60 + {% endblock %}
+19
constellation/templates/hello.html.j2
··· 83 83 ) %} 84 84 85 85 86 + <h3 class="route"><code>GET /xrpc/blue.microcosm.links.getManyToMany</code></h3> 87 + 88 + <p>A list of many-to-many join records linking to a target and a secondary target.</p> 89 + 90 + <h4>Query parameters:</h4> 91 + 92 + <ul> 93 + <li><p><code>subject</code>: required, must url-encode. Example: <code>at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r</code></p></li> 94 + <li><p><code>source</code>: required. Example: <code>app.bsky.feed.like:subject.uri</code></p></li> 95 + <li><p><code>pathToOther</code>: required. Path to the secondary link in the many-to-many record. Example: <code>otherThing.uri</code></p></li> 96 + <li><p><code>did</code>: optional, filter links to those from specific users. Include multiple times to filter by multiple users. Example: <code>did=did:plc:vc7f4oafdgxsihk4cry2xpze&did=did:plc:vc7f4oafdgxsihk4cry2xpze</code></p></li> 97 + <li><p><code>otherSubject</code>: optional, filter secondary links to specific subjects. Include multiple times to filter by multiple subjects. Example: <code>at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r</code></p></li> 98 + <li><p><code>limit</code>: optional. Default: <code>16</code>. Maximum: <code>100</code></p></li> 99 + </ul> 100 + 101 + <p style="margin-bottom: 0"><strong>Try it:</strong></p> 102 + {% call try_it::get_many_to_many("at://did:plc:a4pqq234yw7fqbddawjo7y35/app.bsky.feed.post/3m237ilwc372e", "app.bsky.feed.like:subject.uri", "reply.parent.uri", [""], [""], 16) %} 103 + 104 + 86 105 <h3 class="route"><code>GET /links</code></h3> 87 106 88 107 <p>A list of records linking to a target.</p>
+30
constellation/templates/try-it-macros.html.j2
··· 68 68 </script> 69 69 {% endmacro %} 70 70 71 + {% macro get_many_to_many(subject, source, pathToOther, dids, otherSubjects, limit) %} 72 + <form method="get" action="/xrpc/blue.microcosm.links.getManyToMany"> 73 + <pre class="code"><strong>GET</strong> /xrpc/blue.microcosm.links.getManyToMany 74 + ?subject= <input type="text" name="subject" value="{{ subject }}" placeholder="at-uri, did, uri..." /> 75 + &source= <input type="text" name="source" value="{{ source }}" placeholder="app.bsky.feed.like:subject" /> 76 + &pathToOther= <input type="text" name="pathToOther" value="{{ pathToOther }}" placeholder="otherThing" /> 77 + {%- for did in dids %}{% if !did.is_empty() %} 78 + &did= <input type="text" name="did" value="{{ did }}" placeholder="did:plc:..." />{% endif %}{% endfor %} 79 + <span id="m2m-did-placeholder"></span> <button id="m2m-add-did">+ did filter</button> 80 + {%- for otherSubject in otherSubjects %}{% if !otherSubject.is_empty() %} 81 + &otherSubject= <input type="text" name="otherSubject" value="{{ otherSubject }}" placeholder="at-uri, did, uri..." />{% endif %}{% endfor %} 82 + <span id="m2m-other-placeholder"></span> <button id="m2m-add-other">+ other subject filter</button> 83 + &limit= <input type="number" name="limit" value="{{ limit }}" max="100" placeholder="100" /> <button type="submit">get many-to-many links</button></pre> 84 + </form> 85 + <script> 86 + const m2mAddDidButton = document.getElementById('m2m-add-did'); 87 + const m2mDidPlaceholder = document.getElementById('m2m-did-placeholder'); 88 + m2mAddDidButton.addEventListener('click', e => { 89 + e.preventDefault(); 90 + const i = document.createElement('input'); 91 + i.placeholder = 'did:plc:...'; 92 + i.name = "did" 93 + const p = m2mAddDidButton.parentNode; 94 + p.insertBefore(document.createTextNode('&did= '), m2mDidPlaceholder); 95 + p.insertBefore(i, m2mDidPlaceholder); 96 + p.insertBefore(document.createTextNode('\n '), m2mDidPlaceholder); 97 + }); 98 + </script> 99 + {% endmacro %} 100 + 71 101 {% macro links(target, collection, path, dids, limit) %} 72 102 <form method="get" action="/links"> 73 103 <pre class="code"><strong>GET</strong> /links