29 lines
No EOL
1.2 KiB
Markdown
29 lines
No EOL
1.2 KiB
Markdown
```python
|
|
def start(self):
|
|
# Connect to RabbitMQ
|
|
if self.rabbitmq_ssl:
|
|
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
|
|
ssl_context.check_hostname = False
|
|
ssl_context.verify_mode = ssl.CERT_NONE
|
|
self.connection = await aio_pika.connect_robust(
|
|
self.rabbitmq_url, ssl=True, ssl_context=ssl_context)
|
|
else:
|
|
self.connection = await aio_pika.connect_robust(self.rabbitmq_url)
|
|
self.channel = await self.connection.channel()
|
|
await self.channel.set_qos(prefetch_count=self.max_concurrent_tasks)
|
|
|
|
# Declare exchange and queue
|
|
exchange = await self.channel.declare_exchange(
|
|
self.exchange_name, ExchangeType.TOPIC, durable=True
|
|
)
|
|
queue = await self.channel.declare_queue(self.queue_name, durable=True)
|
|
|
|
# Bind queue to exchange with routing key
|
|
await queue.bind(exchange, routing_key=self.routing_key)
|
|
|
|
# Start consuming messages
|
|
await queue.consume(self.consume_message)
|
|
logging.info("Consumer started and waiting for messages...")
|
|
# Keep the event loop running
|
|
await asyncio.Future()
|
|
``` |