this repo has no description

small indexer tweaks

+75 -45
+75 -45
indexer.py
··· 216 bootstrap_servers: List[str], 217 input_topic: str, 218 group_id: str, 219 ): 220 self.indexer = indexer 221 self.bootstrap_servers = bootstrap_servers 222 self.input_topic = input_topic 223 self.group_id = group_id 224 self.consumer: Optional[AIOKafkaConsumer] = None 225 self._flush_task: Optional[asyncio.Task[Any]] = None 226 227 async def stop(self): 228 if self._flush_task: 229 self._flush_task.cancel() 230 try: ··· 282 finally: 283 prom_metrics.events_handled.labels(kind=kind, status=status).inc() 284 285 async def run(self): 286 self.consumer = AIOKafkaConsumer( 287 self.input_topic, 288 bootstrap_servers=",".join(self.bootstrap_servers), ··· 296 ) 297 await self.consumer.start() 298 logger.info( 299 - f"Started Kafak consumer for topic: {self.bootstrap_servers}, {self.input_topic}" 300 ) 301 302 - if not self.consumer: 303 - raise RuntimeError("Consumer not started, call start() first.") 304 305 try: 306 async for message in self.consumer: 307 - asyncio.ensure_future(self._handle_event(message)) 308 prom_metrics.events_received.inc() 309 except Exception as e: 310 logger.error(f"Error consuming messages: {e}") 311 raise 312 finally: 313 self.indexer.flush_all() 314 315 316 @click.command() 317 - @click.option( 318 - "--ch-host", 319 - ) 320 - @click.option( 321 - "--ch-port", 322 - type=int, 323 - ) 324 - @click.option( 325 - "--ch-user", 326 - ) 327 - @click.option( 328 - "--ch-pass", 329 - ) 330 - @click.option( 331 - "--batch-size", 332 - type=int, 333 - ) 334 - @click.option( 335 - "--bootstrap-servers", 336 - type=List[str], 337 - ) 338 - @click.option( 339 - "--input-topic", 340 - ) 341 - @click.option( 342 - "--group-id", 343 - ) 344 - @click.option( 345 - "--metrics-host", 346 - ) 347 @click.option( 348 - "--metrics-port", 349 - type=int, 350 ) 351 def main( 352 ch_host: Optional[str], 353 ch_port: Optional[int], 354 ch_user: Optional[str], 355 ch_pass: Optional[str], 356 batch_size: Optional[int], 357 - bootstrap_servers: Optional[List[str]], 358 input_topic: Optional[str], 359 group_id: Optional[str], 360 metrics_host: Optional[str], ··· 374 ) 375 indexer.init_schema() 376 377 consumer = Consumer( 378 indexer=indexer, 379 - bootstrap_servers=bootstrap_servers or CONFIG.kafka_bootstrap_servers, 380 input_topic=input_topic or CONFIG.kafka_input_topic, 381 group_id=group_id or CONFIG.kafka_group_id, 382 ) 383 384 - try: 385 - asyncio.run(consumer.run()) 386 - except KeyboardInterrupt: 387 - logger.info("Shutting down...") 388 - finally: 389 - asyncio.run(consumer.stop()) 390 391 - pass 392 393 394 if __name__ == "__main__":
··· 216 bootstrap_servers: List[str], 217 input_topic: str, 218 group_id: str, 219 + max_concurrent_tasks: int = 100, 220 ): 221 self.indexer = indexer 222 self.bootstrap_servers = bootstrap_servers 223 self.input_topic = input_topic 224 self.group_id = group_id 225 + self.max_concurrent_tasks = max_concurrent_tasks 226 self.consumer: Optional[AIOKafkaConsumer] = None 227 self._flush_task: Optional[asyncio.Task[Any]] = None 228 + self._semaphore: Optional[asyncio.Semaphore] = None 229 + self._shutdown_event: Optional[asyncio.Event] = None 230 231 async def stop(self): 232 + if self._shutdown_event: 233 + self._shutdown_event.set() 234 + 235 if self._flush_task: 236 self._flush_task.cancel() 237 try: ··· 289 finally: 290 prom_metrics.events_handled.labels(kind=kind, status=status).inc() 291 292 + async def _handle_event_with_semaphore(self, message: ConsumerRecord[Any, Any]): 293 + assert self._semaphore is not None 294 + async with self._semaphore: 295 + await self._handle_event(message) 296 + 297 async def run(self): 298 + self._semaphore = asyncio.Semaphore(self.max_concurrent_tasks) 299 + self._shutdown_event = asyncio.Event() 300 + 301 self.consumer = AIOKafkaConsumer( 302 self.input_topic, 303 bootstrap_servers=",".join(self.bootstrap_servers), ··· 311 ) 312 await self.consumer.start() 313 logger.info( 314 + f"Started Kafka consumer for topic: {self.bootstrap_servers}, {self.input_topic}" 315 ) 316 317 + self._flush_task = asyncio.create_task(self._periodic_flush()) 318 + 319 + pending_tasks: set[asyncio.Task[Any]] = set() 320 321 try: 322 async for message in self.consumer: 323 prom_metrics.events_received.inc() 324 + 325 + task = asyncio.create_task(self._handle_event_with_semaphore(message)) 326 + pending_tasks.add(task) 327 + task.add_done_callback(pending_tasks.discard) 328 + 329 + if len(pending_tasks) >= self.max_concurrent_tasks * 2: 330 + done, pending_tasks_set = await asyncio.wait( 331 + pending_tasks, timeout=0, return_when=asyncio.FIRST_COMPLETED 332 + ) 333 + pending_tasks = pending_tasks_set 334 + for t in done: 335 + if t.exception(): 336 + logger.error(f"Task failed with exception: {t.exception()}") 337 + 338 except Exception as e: 339 logger.error(f"Error consuming messages: {e}") 340 raise 341 finally: 342 + if pending_tasks: 343 + logger.info( 344 + f"Waiting for {len(pending_tasks)} pending tasks to complete..." 345 + ) 346 + await asyncio.gather(*pending_tasks, return_exceptions=True) 347 self.indexer.flush_all() 348 349 350 @click.command() 351 + @click.option("--ch-host") 352 + @click.option("--ch-port", type=int) 353 + @click.option("--ch-user") 354 + @click.option("--ch-pass") 355 + @click.option("--batch-size", type=int) 356 @click.option( 357 + "--bootstrap-servers", help="Comma-separated list of Kafka bootstrap servers" 358 ) 359 + @click.option("--input-topic") 360 + @click.option("--group-id") 361 + @click.option("--metrics-host") 362 + @click.option("--metrics-port", type=int) 363 def main( 364 ch_host: Optional[str], 365 ch_port: Optional[int], 366 ch_user: Optional[str], 367 ch_pass: Optional[str], 368 batch_size: Optional[int], 369 + bootstrap_servers: Optional[str], 370 input_topic: Optional[str], 371 group_id: Optional[str], 372 metrics_host: Optional[str], ··· 386 ) 387 indexer.init_schema() 388 389 + kafka_servers = ( 390 + bootstrap_servers.split(",") 391 + if bootstrap_servers 392 + else CONFIG.kafka_bootstrap_servers 393 + ) 394 + 395 consumer = Consumer( 396 indexer=indexer, 397 + bootstrap_servers=kafka_servers, 398 input_topic=input_topic or CONFIG.kafka_input_topic, 399 group_id=group_id or CONFIG.kafka_group_id, 400 ) 401 402 + async def run_with_shutdown(): 403 + loop = asyncio.get_event_loop() 404 + 405 + import signal 406 407 + def handle_signal(): 408 + logger.info("Received shutdown signal...") 409 + asyncio.create_task(consumer.stop()) 410 + 411 + for sig in (signal.SIGTERM, signal.SIGINT): 412 + loop.add_signal_handler(sig, handle_signal) 413 + 414 + try: 415 + await consumer.run() 416 + except asyncio.CancelledError: 417 + pass 418 + finally: 419 + await consumer.stop() 420 + 421 + asyncio.run(run_with_shutdown()) 422 423 424 if __name__ == "__main__":