Scalable and distributed custom feed generator, ott - on that topic
1use std::time::Duration;
2
3use ott_embed::pg_client::PgClient;
4use ott_embed::tei_client::TextEmbedding;
5use tokio::{
6 sync::mpsc::{Receiver, Sender},
7 time::interval,
8};
9
10use tokio_stream::StreamExt;
11use tracing::{debug, error, warn};
12use tracing_subscriber::EnvFilter;
13
14use fluvio::{consumer::ConsumerConfigExtBuilder, Fluvio, Offset};
15use ott_types::{Embedding, Post};
16
17const TEI_URL: &str = "http://tei-host-service:8080";
18const TOPIC: &str = "posts";
19const PARTITION: u32 = 0;
20
21#[tokio::main]
22async fn main() {
23 tracing_subscriber::fmt()
24 .with_ansi(true) // Colors enabled (default)
25 .with_env_filter(EnvFilter::from_default_env())
26 .init();
27
28 let (embed_tx, embed_rx) = tokio::sync::mpsc::channel::<Post>(1000);
29 let (store_tx, store_rx) = tokio::sync::mpsc::channel::<Embedding>(1000);
30
31 let read_task = tokio::spawn(async { read_task(embed_tx).await });
32 let embed_task = tokio::spawn(async { embed_task(embed_rx, store_tx).await });
33 let store_task = tokio::spawn(async { store_task(store_rx).await });
34
35 let _result = tokio::join!(read_task, embed_task, store_task);
36}
37
38async fn read_task(sink: Sender<Post>) {
39 let fluvio = Fluvio::connect()
40 .await
41 .expect("Failed to connect to Fluvio");
42
43 let config = ConsumerConfigExtBuilder::default()
44 .topic(TOPIC)
45 .partition(PARTITION)
46 .offset_start(Offset::beginning())
47 .build()
48 .expect("Failed to build consumer config");
49 let mut stream = fluvio
50 .consumer_with_config(config)
51 .await
52 .expect("Failed to create consumer");
53
54 warn!("Ready to start consuming posts");
55 while let Some(message) = stream.next().await
56 && let Ok(record) = message
57 {
58 let post: Post = serde_json::from_slice(record.value()).expect("Invalid post message");
59 sink.send(post).await.expect("Failed to internally send post");
60 }
61}
62
63async fn embed_task(mut posts: Receiver<Post>, sink: Sender<Embedding>) {
64 let tei_client = TextEmbedding::new(TEI_URL);
65
66 warn!("Ready to start embedding posts");
67 while let Some(post) = posts.recv().await {
68 let embedding = tei_client.embed(&post.text).await;
69 match embedding {
70 Ok(vec) => {
71 sink.send(Embedding {
72 uri: post.uri,
73 vector: vec,
74 })
75 .await
76 .expect("Failed to send embedding between tasks");
77 }
78 Err(e) => {
79 error!("Failed to embed post! {} {}", post.uri, e);
80 }
81 };
82 }
83}
84
85async fn store_task(mut embeddings: Receiver<Embedding>) {
86 warn!("Ready to start storing embeddings");
87 let batch_size = 100;
88 let mut flush_timer = interval(Duration::from_millis(500));
89 let pg_client = PgClient::new().await.expect("Failed to connect to db");
90
91 let mut batch = Vec::with_capacity(batch_size);
92 loop {
93 tokio::select! {
94 Some(record) = embeddings.recv() => {
95 batch.push(record);
96 if batch.len() >= batch_size {
97 if let Err(e) = pg_client.insert_embeddings(&batch).await {
98 error!("Insert error: {}", e);
99 }
100 flush_timer.reset();
101 batch.clear();
102 debug!("Inserted normally");
103 }
104 }
105
106 // Flush periodically even if batch isn't full
107 _ = flush_timer.tick() => {
108 if !batch.is_empty() {
109 if let Err(e) = pg_client.insert_embeddings(&batch).await {
110 error!("Insert error: {}", e);
111 }
112 batch.clear();
113 debug!("Inserted flushed");
114 }
115 }
116
117 // Channel closed
118 else => {
119 // Final flush
120 if !batch.is_empty() {
121 if let Err(e) = pg_client.insert_embeddings(&batch).await {
122 error!("Final insert error: {}", e);
123 }
124 }
125 break;
126 }
127 }
128 }
129}