Scalable and distributed custom feed generator, ott - on that topic
at main 129 lines 4.2 kB view raw
1use std::time::Duration; 2 3use ott_embed::pg_client::PgClient; 4use ott_embed::tei_client::TextEmbedding; 5use tokio::{ 6 sync::mpsc::{Receiver, Sender}, 7 time::interval, 8}; 9 10use tokio_stream::StreamExt; 11use tracing::{debug, error, warn}; 12use tracing_subscriber::EnvFilter; 13 14use fluvio::{consumer::ConsumerConfigExtBuilder, Fluvio, Offset}; 15use ott_types::{Embedding, Post}; 16 17const TEI_URL: &str = "http://tei-host-service:8080"; 18const TOPIC: &str = "posts"; 19const PARTITION: u32 = 0; 20 21#[tokio::main] 22async fn main() { 23 tracing_subscriber::fmt() 24 .with_ansi(true) // Colors enabled (default) 25 .with_env_filter(EnvFilter::from_default_env()) 26 .init(); 27 28 let (embed_tx, embed_rx) = tokio::sync::mpsc::channel::<Post>(1000); 29 let (store_tx, store_rx) = tokio::sync::mpsc::channel::<Embedding>(1000); 30 31 let read_task = tokio::spawn(async { read_task(embed_tx).await }); 32 let embed_task = tokio::spawn(async { embed_task(embed_rx, store_tx).await }); 33 let store_task = tokio::spawn(async { store_task(store_rx).await }); 34 35 let _result = tokio::join!(read_task, embed_task, store_task); 36} 37 38async fn read_task(sink: Sender<Post>) { 39 let fluvio = Fluvio::connect() 40 .await 41 .expect("Failed to connect to Fluvio"); 42 43 let config = ConsumerConfigExtBuilder::default() 44 .topic(TOPIC) 45 .partition(PARTITION) 46 .offset_start(Offset::beginning()) 47 .build() 48 .expect("Failed to build consumer config"); 49 let mut stream = fluvio 50 .consumer_with_config(config) 51 .await 52 .expect("Failed to create consumer"); 53 54 warn!("Ready to start consuming posts"); 55 while let Some(message) = stream.next().await 56 && let Ok(record) = message 57 { 58 let post: Post = serde_json::from_slice(record.value()).expect("Invalid post message"); 59 sink.send(post).await.expect("Failed to internally send post"); 60 } 61} 62 63async fn embed_task(mut posts: Receiver<Post>, sink: Sender<Embedding>) { 64 let tei_client = TextEmbedding::new(TEI_URL); 65 66 warn!("Ready to start embedding posts"); 67 while let Some(post) = posts.recv().await { 68 let embedding = tei_client.embed(&post.text).await; 69 match embedding { 70 Ok(vec) => { 71 sink.send(Embedding { 72 uri: post.uri, 73 vector: vec, 74 }) 75 .await 76 .expect("Failed to send embedding between tasks"); 77 } 78 Err(e) => { 79 error!("Failed to embed post! {} {}", post.uri, e); 80 } 81 }; 82 } 83} 84 85async fn store_task(mut embeddings: Receiver<Embedding>) { 86 warn!("Ready to start storing embeddings"); 87 let batch_size = 100; 88 let mut flush_timer = interval(Duration::from_millis(500)); 89 let pg_client = PgClient::new().await.expect("Failed to connect to db"); 90 91 let mut batch = Vec::with_capacity(batch_size); 92 loop { 93 tokio::select! { 94 Some(record) = embeddings.recv() => { 95 batch.push(record); 96 if batch.len() >= batch_size { 97 if let Err(e) = pg_client.insert_embeddings(&batch).await { 98 error!("Insert error: {}", e); 99 } 100 flush_timer.reset(); 101 batch.clear(); 102 debug!("Inserted normally"); 103 } 104 } 105 106 // Flush periodically even if batch isn't full 107 _ = flush_timer.tick() => { 108 if !batch.is_empty() { 109 if let Err(e) = pg_client.insert_embeddings(&batch).await { 110 error!("Insert error: {}", e); 111 } 112 batch.clear(); 113 debug!("Inserted flushed"); 114 } 115 } 116 117 // Channel closed 118 else => { 119 // Final flush 120 if !batch.is_empty() { 121 if let Err(e) = pg_client.insert_embeddings(&batch).await { 122 error!("Final insert error: {}", e); 123 } 124 } 125 break; 126 } 127 } 128 } 129}