Scalable and distributed custom feed generator, ott - on that topic
1use serde::{Deserialize, Serialize};
2use tokio::{
3 select,
4 sync::mpsc::{self, Receiver, Sender},
5 time::Duration,
6};
7
8use tokio_stream::StreamExt;
9use tracing::{error, info, warn};
10use tracing_subscriber::EnvFilter;
11
12use fluvio::{
13 consumer::{ConsumerConfigExtBuilder, ConsumerStream},
14 metadata::topic::TopicSpec,
15 Fluvio, Offset,
16};
17use moka::{ops::compute::Op, sync::Cache};
18use ott_types::{Commit, Like, Post, RawPost};
19
20const LIKES_TOPIC: &str = "raw-likes";
21const RAW_POSTS_TOPIC: &str = "raw-posts";
22const POSTS_TOPIC: &str = "posts";
23const PARTITION_NUM: u32 = 0;
24
25#[tokio::main]
26async fn main() {
27 tracing_subscriber::fmt()
28 .with_ansi(true) // Colors enabled (default)
29 .with_env_filter(EnvFilter::from_default_env())
30 .init();
31
32 let posts_cache: Cache<String, Post> = Cache::builder()
33 .time_to_live(Duration::from_secs(60 * 60))
34 .build();
35
36 let fluvio = Fluvio::connect()
37 .await
38 .expect("Failed to connect to Fluvio");
39
40 // Create a topic
41 let admin = fluvio.admin().await;
42 let topics = admin
43 .all::<TopicSpec>()
44 .await
45 .expect("Failed to list topics")
46 .iter()
47 .map(|topic| topic.name.clone())
48 .collect::<Vec<String>>();
49
50 if !topics.contains(&POSTS_TOPIC.to_string()) {
51 warn!("Creating posts topic");
52 let topic_spec = TopicSpec::new_computed(1, 1, None);
53 admin
54 .create(POSTS_TOPIC.to_string(), false, topic_spec)
55 .await
56 .unwrap();
57 };
58
59 let posts_fut = get_topic_stream(RAW_POSTS_TOPIC, PARTITION_NUM, &fluvio);
60 let like_fut = get_topic_stream(LIKES_TOPIC, PARTITION_NUM, &fluvio);
61 let (mut posts_stream, mut like_stream) = tokio::join!(posts_fut, like_fut);
62
63 let (embed_tx, embed_rx) = mpsc::channel::<Post>(1000);
64
65 // Start embedding tracing_subscriber
66 let fut = async move {
67 embed_post(embed_rx).await;
68 };
69 tokio::spawn(fut);
70
71 loop {
72 let pcc = posts_cache.clone();
73 let lcc = posts_cache.clone();
74 select! {
75 Some(Ok(record)) = posts_stream.next() => {
76 let post: RawPost = serde_json::from_slice(record.value()).unwrap();
77 match &post.commit {
78 Commit::Create{record} => {
79 pcc.entry(post.uri.clone())
80 .and_compute_with(|maybe_entry| {
81 if maybe_entry.is_some() {
82 Op::Nop
83 } else {
84 let post = Post{
85 uri: post.uri,
86 did: post.did,
87 text: record.text.to_string(),
88 ..Default::default()};
89 Op::Put(post) // Insert
90 }
91 }
92 );
93 },
94 _ => {
95 info!("Got create or update post");
96 }
97 }
98
99 },
100 Some(Ok(record)) = like_stream.next() => {
101 if let Ok(like) = serde_json::from_slice::<Like>(record.value()) {
102 lcc.entry(like.uri)
103 .and_compute_with(|maybe_entry| {
104 if let Some(entry) = maybe_entry {
105 let mut post = entry.into_value();
106 if post.count < 20 {
107 post.count +=1;
108 Op::Put(post)
109 } else {
110 let tx_clone = embed_tx.clone();
111 tokio::spawn(async move {
112 if let Err(e) = tx_clone.send(post).await {
113 error!("Failed to send post: {}", e);
114 }
115 });
116 Op::Remove
117 }
118 } else {
119 Op::Nop // Skip as post is out of cache
120 }
121 });
122 } else {
123 warn!("Failed deserializing, likely not like commit");
124 };
125 }
126 }
127 }
128}
129
130async fn get_topic_stream(topic: &str, partition: u32, fluvio: &Fluvio) -> impl ConsumerStream {
131 let config = ConsumerConfigExtBuilder::default()
132 .topic(topic)
133 .partition(partition)
134 .offset_start(Offset::beginning())
135 .build()
136 .expect("Failed to build consumer config");
137 let stream = fluvio
138 .consumer_with_config(config)
139 .await
140 .expect("Failed to create consumer");
141 stream
142}
143
144struct PostEmbedding {
145 uri: String,
146 vector: Vec<f32>,
147}
148
149async fn embed_post(mut post_rx: Receiver<Post>) {
150 let producer = fluvio::producer(POSTS_TOPIC)
151 .await
152 .expect("Failed to create producer");
153
154 while let Some(post) = post_rx.recv().await {
155 producer
156 .send(
157 fluvio::RecordKey::NULL,
158 serde_json::to_string(&post).unwrap(),
159 )
160 .await
161 .expect("Failed to send record");
162 }
163}