Server tools to backfill, tail, mirror, and verify PLC logs

don't exit the program on reqwest / decode errors, try again instead

ptr.pet 2589c1fa d472cadc

verified
+98 -44
+42 -20
src/poll.rs
··· 139 139 /// 140 140 /// Extracts the final op so it can be used to fetch the following page 141 141 pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> { 142 + use futures::TryStreamExt; 143 + use tokio::io::{AsyncBufReadExt, BufReader}; 144 + use tokio_util::compat::FuturesAsyncReadCompatExt; 145 + 142 146 log::trace!("Getting page: {url}"); 143 147 144 - let ops: Vec<Op> = CLIENT 145 - .get(url) 146 - .send() 147 - .await? 148 - .error_for_status()? 149 - .text() 150 - .await? 151 - .trim() 152 - .split('\n') 153 - .filter_map(|s| { 154 - serde_json::from_str::<Op>(s) 155 - .inspect_err(|e| { 156 - if !s.is_empty() { 157 - log::warn!("failed to parse op: {e} ({s})") 158 - } 159 - }) 160 - .ok() 161 - }) 162 - .collect(); 148 + let res = CLIENT.get(url).send().await?.error_for_status()?; 149 + let stream = Box::pin( 150 + res.bytes_stream() 151 + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) 152 + .into_async_read() 153 + .compat(), 154 + ); 155 + 156 + let mut lines = BufReader::new(stream).lines(); 157 + let mut ops = Vec::new(); 158 + 159 + loop { 160 + match lines.next_line().await { 161 + Ok(Some(line)) => { 162 + let line = line.trim(); 163 + if line.is_empty() { 164 + continue; 165 + } 166 + match serde_json::from_str::<Op>(line) { 167 + Ok(op) => ops.push(op), 168 + Err(e) => log::warn!("failed to parse op: {e} ({line})"), 169 + } 170 + } 171 + Ok(None) => break, 172 + Err(e) => { 173 + log::warn!("transport error mid-page: {}; returning partial page", e); 174 + break; 175 + } 176 + } 177 + } 163 178 164 179 let last_op = ops.last().map(Into::into); 165 180 ··· 214 229 .append_pair("after", &pl.created_at.to_rfc3339()); 215 230 }; 216 231 217 - let (mut page, next_last) = get_page(url).await?; 232 + let (mut page, next_last) = match get_page(url).await { 233 + Ok(res) => res, 234 + Err(e) => { 235 + log::warn!("error polling upstream: {e}"); 236 + continue; 237 + } 238 + }; 239 + 218 240 if let Some(ref mut state) = boundary_state { 219 241 state.apply_to_next(&mut page); 220 242 } else {
+56 -24
src/weekly.rs
··· 197 197 dest: mpsc::Sender<ExportPage>, 198 198 ) -> anyhow::Result<()> { 199 199 use futures::TryStreamExt; 200 - let reader = source 201 - .reader_for(week) 202 - .await 203 - .inspect_err(|e| log::error!("week_to_pages reader failed: {e}"))?; 204 - let decoder = GzipDecoder::new(BufReader::new(reader)); 205 - let mut chunks = pin!(LinesStream::new(BufReader::new(decoder).lines()).try_chunks(10000)); 200 + let mut retry_backoff = std::time::Duration::from_secs(2); 206 201 207 - while let Some(chunk) = chunks 208 - .try_next() 209 - .await 210 - .inspect_err(|e| log::error!("failed to get next chunk: {e}"))? 211 - { 212 - let ops: Vec<Op> = chunk 213 - .into_iter() 214 - .filter_map(|s| { 215 - serde_json::from_str::<Op>(&s) 216 - .inspect_err(|e| log::warn!("failed to parse op: {e} ({s})")) 217 - .ok() 218 - }) 219 - .collect(); 220 - let page = ExportPage { ops }; 221 - dest.send(page) 222 - .await 223 - .inspect_err(|e| log::error!("failed to send page: {e}"))?; 202 + loop { 203 + let reader = match source.reader_for(week).await { 204 + Ok(r) => r, 205 + Err(e) => { 206 + log::warn!( 207 + "week_to_pages reader_for failed {e}, retrying in {}s", 208 + retry_backoff.as_secs() 209 + ); 210 + tokio::time::sleep(retry_backoff).await; 211 + retry_backoff = (retry_backoff * 2).min(std::time::Duration::from_secs(300)); 212 + continue; 213 + } 214 + }; 215 + 216 + let decoder = GzipDecoder::new(BufReader::new(reader)); 217 + let mut chunks = pin!(LinesStream::new(BufReader::new(decoder).lines()).try_chunks(10000)); 218 + let mut success = true; 219 + 220 + while let Some(chunk) = match chunks.as_mut().try_next().await { 221 + Ok(Some(c)) => Some(c), 222 + Ok(None) => None, 223 + Err(e) => { 224 + log::warn!( 225 + "failed to get next chunk: {e}, retrying week in {}s", 226 + retry_backoff.as_secs() 227 + ); 228 + tokio::time::sleep(retry_backoff).await; 229 + retry_backoff = (retry_backoff * 2).min(std::time::Duration::from_secs(300)); 230 + success = false; 231 + None 232 + } 233 + } { 234 + let ops: Vec<Op> = chunk 235 + .into_iter() 236 + .filter_map(|s| { 237 + serde_json::from_str::<Op>(&s) 238 + .inspect_err(|e| log::warn!("failed to parse op: {e} ({s})")) 239 + .ok() 240 + }) 241 + .collect(); 242 + 243 + if ops.is_empty() { 244 + continue; 245 + } 246 + 247 + let page = ExportPage { ops }; 248 + if let Err(e) = dest.send(page).await { 249 + log::error!("failed to send page (receiver closed): {e}"); 250 + return Err(e.into()); 251 + } 252 + } 253 + 254 + if success { 255 + return Ok(()); 256 + } 224 257 } 225 - Ok(()) 226 258 }