## producer.py import pika # Connect to RabbitMQ connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() # Declare a queue (if it doesn't exist) channel.queue_declare(queue='hello') # Send a message message = 'Hello World!' channel.basic_publish(exchange='', routing_key='hello', body=message) print(f" [x] Sent '{message}'") # Close the connection connection.close() # ------------------------------------ # ## consumer.py import pika def callback(ch, method, properties, body): print(f" [x] Received {body.decode()}") # Connect to RabbitMQ connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() # Declare a queue (if it doesn't exist) channel.queue_declare(queue='hello') # Consume messages from the queue channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() #!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel()import pika import time class consume_engine: def __init__(self): self._messages = 10 self._message_interval = 1 self._queue_name = "some.queue" self._connection = None self._channel = None def connection(self): credentials = pika.PlainCredentials('rabbit', 'password') parameters = pika.ConnectionParameters('rabbitmq-hostname', 5672, 'vhost_1', credentials, socket_timeout=300) print(parameters) self._connection = pika.BlockingConnection(parameters) print("Connected Successfully !!!") return self._connection def channel(self): self._channel = self._connection.channel() print("Channel opened...") def declare_queue(self): try: self._channel.queue_declare(queue=self._queue_name, durable=True) except pika.exceptions.ChannelWrongStateError: self._channel = self._connection.channel() self._channel.queue_unbind(exchange='service.request.exchange', queue=self._queue_name, routing_key=self._queue_name) self._channel.queue_delete(self._queue_name) self._channel.queue_declare(queue=self._queue_name, durable=True, auto_delete=True) print("Queue declared....") print(' [*] Waiting for messages. To exit press CTRL+C') def declare_exchange(self): try: self._channel.exchange_declare(exchange='some.exchange', exchange_type='direct') except pika.exceptions.ChannelClosedByBroker: pass def bind_exchange_queue(self): self._channel.queue_bind(exchange='some.exchange', queue=self._queue_name, routing_key=self._queue_name) def on_message(self, channel, method, properties, body): print(" [x] working on %r" % body) time.sleep(3) print(" [x] Done") self._channel.basic_ack(delivery_tag = method.delivery_tag) def consume_messages(self): self._channel.basic_qos(prefetch_count=1) self._channel.basic_consume(self._queue_name, self.on_message) self._channel.start_consuming() def run(self): self.connection() self.channel() self.declare_exchange() self.declare_queue() self.bind_exchange_queue() self.consume_messages() class publish_engine: def __init__(self): self._messages = 5 self._message_interval = 1 self._queue_name = "16999403m1.request.queue" self._connection = None self._channel = None def make_connection(self): credentials = pika.PlainCredentials('rabbitProd', 'dangerous') parameters = pika.ConnectionParameters('at-rabbit-hop-1.cec.lab.emc.com', 5672, '/', credentials, socket_timeout=300) self._connection = pika.BlockingConnection(parameters) print("Connected Successfully !!!") return self._connection def channel(self): self._channel = self._connection.channel() print("Channel opened...") def declare_queue(self): self._channel.queue_declare(queue=self._queue_name, durable=True) print("Queue declared....") def declare_exchange(self): self._channel.exchange_declare(exchange='new.exchange', exchange_type='direct') def bind_exchange_queue(self): self._channel.queue_bind(exchange='new.exchange', queue=self._queue_name, routing_key=self._queue_name) def publish_message(self): message_count = 0 while message_count < self._messages: message_count += 1 message_body = "task number %i" %(message_count) self._channel.basic_publish(exchange='new.exchange', routing_key=self._queue_name, body=message_body, properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print("Published message %i" %(message_count)) time.sleep(self._message_interval) def close_connection(self): self._connection.close() print("Closed connection....") def run(self): self.make_connection() self.channel() self.declare_exchange() self.declare_queue() self.bind_exchange_queue() self.publish_message() self.close_connection() # Check out https://www.rabbitmq.com/tutorials/tutorial-one-python.html