Scalable and distributed custom feed generator, ott - on that topic
at main 163 lines 5.5 kB view raw
1use serde::{Deserialize, Serialize}; 2use tokio::{ 3 select, 4 sync::mpsc::{self, Receiver, Sender}, 5 time::Duration, 6}; 7 8use tokio_stream::StreamExt; 9use tracing::{error, info, warn}; 10use tracing_subscriber::EnvFilter; 11 12use fluvio::{ 13 consumer::{ConsumerConfigExtBuilder, ConsumerStream}, 14 metadata::topic::TopicSpec, 15 Fluvio, Offset, 16}; 17use moka::{ops::compute::Op, sync::Cache}; 18use ott_types::{Commit, Like, Post, RawPost}; 19 20const LIKES_TOPIC: &str = "raw-likes"; 21const RAW_POSTS_TOPIC: &str = "raw-posts"; 22const POSTS_TOPIC: &str = "posts"; 23const PARTITION_NUM: u32 = 0; 24 25#[tokio::main] 26async fn main() { 27 tracing_subscriber::fmt() 28 .with_ansi(true) // Colors enabled (default) 29 .with_env_filter(EnvFilter::from_default_env()) 30 .init(); 31 32 let posts_cache: Cache<String, Post> = Cache::builder() 33 .time_to_live(Duration::from_secs(60 * 60)) 34 .build(); 35 36 let fluvio = Fluvio::connect() 37 .await 38 .expect("Failed to connect to Fluvio"); 39 40 // Create a topic 41 let admin = fluvio.admin().await; 42 let topics = admin 43 .all::<TopicSpec>() 44 .await 45 .expect("Failed to list topics") 46 .iter() 47 .map(|topic| topic.name.clone()) 48 .collect::<Vec<String>>(); 49 50 if !topics.contains(&POSTS_TOPIC.to_string()) { 51 warn!("Creating posts topic"); 52 let topic_spec = TopicSpec::new_computed(1, 1, None); 53 admin 54 .create(POSTS_TOPIC.to_string(), false, topic_spec) 55 .await 56 .unwrap(); 57 }; 58 59 let posts_fut = get_topic_stream(RAW_POSTS_TOPIC, PARTITION_NUM, &fluvio); 60 let like_fut = get_topic_stream(LIKES_TOPIC, PARTITION_NUM, &fluvio); 61 let (mut posts_stream, mut like_stream) = tokio::join!(posts_fut, like_fut); 62 63 let (embed_tx, embed_rx) = mpsc::channel::<Post>(1000); 64 65 // Start embedding tracing_subscriber 66 let fut = async move { 67 embed_post(embed_rx).await; 68 }; 69 tokio::spawn(fut); 70 71 loop { 72 let pcc = posts_cache.clone(); 73 let lcc = posts_cache.clone(); 74 select! { 75 Some(Ok(record)) = posts_stream.next() => { 76 let post: RawPost = serde_json::from_slice(record.value()).unwrap(); 77 match &post.commit { 78 Commit::Create{record} => { 79 pcc.entry(post.uri.clone()) 80 .and_compute_with(|maybe_entry| { 81 if maybe_entry.is_some() { 82 Op::Nop 83 } else { 84 let post = Post{ 85 uri: post.uri, 86 did: post.did, 87 text: record.text.to_string(), 88 ..Default::default()}; 89 Op::Put(post) // Insert 90 } 91 } 92 ); 93 }, 94 _ => { 95 info!("Got create or update post"); 96 } 97 } 98 99 }, 100 Some(Ok(record)) = like_stream.next() => { 101 if let Ok(like) = serde_json::from_slice::<Like>(record.value()) { 102 lcc.entry(like.uri) 103 .and_compute_with(|maybe_entry| { 104 if let Some(entry) = maybe_entry { 105 let mut post = entry.into_value(); 106 if post.count < 20 { 107 post.count +=1; 108 Op::Put(post) 109 } else { 110 let tx_clone = embed_tx.clone(); 111 tokio::spawn(async move { 112 if let Err(e) = tx_clone.send(post).await { 113 error!("Failed to send post: {}", e); 114 } 115 }); 116 Op::Remove 117 } 118 } else { 119 Op::Nop // Skip as post is out of cache 120 } 121 }); 122 } else { 123 warn!("Failed deserializing, likely not like commit"); 124 }; 125 } 126 } 127 } 128} 129 130async fn get_topic_stream(topic: &str, partition: u32, fluvio: &Fluvio) -> impl ConsumerStream { 131 let config = ConsumerConfigExtBuilder::default() 132 .topic(topic) 133 .partition(partition) 134 .offset_start(Offset::beginning()) 135 .build() 136 .expect("Failed to build consumer config"); 137 let stream = fluvio 138 .consumer_with_config(config) 139 .await 140 .expect("Failed to create consumer"); 141 stream 142} 143 144struct PostEmbedding { 145 uri: String, 146 vector: Vec<f32>, 147} 148 149async fn embed_post(mut post_rx: Receiver<Post>) { 150 let producer = fluvio::producer(POSTS_TOPIC) 151 .await 152 .expect("Failed to create producer"); 153 154 while let Some(post) = post_rx.recv().await { 155 producer 156 .send( 157 fluvio::RecordKey::NULL, 158 serde_json::to_string(&post).unwrap(), 159 ) 160 .await 161 .expect("Failed to send record"); 162 } 163}