Skip to content

RabbitMQ Connector API Reference

RabbitMQClient

Client for interacting with a RabbitMQ message broker.

Handles connection setup, publishing messages, queue declaration, and graceful shutdown with retry logic and error handling.

Examples:

Basic usage:

>>> from src.entity.config_entity import RabbitMQConfig
>>> from src.clients.rabbitmq_connector import RabbitMQClient
>>> config = RabbitMQConfig()
>>> client = RabbitMQClient(config)
>>> client.declare_queue('my_queue')
>>> client.publish('my_queue', {"message": "Hello, world!"})
>>> client.close()

Context manager usage:

>>> with RabbitMQClient(config) as client:
...     client.declare_queue('my_queue')
...     client.publish('my_queue', {"message": "Hello, with context manager!"})
Source code in src/clients/rabbitmq_connector.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
class RabbitMQClient:
    """
    Client for interacting with a RabbitMQ message broker.

    Handles connection setup, publishing messages, queue declaration,
    and graceful shutdown with retry logic and error handling.

    Examples
    --------
    Basic usage:

    >>> from src.entity.config_entity import RabbitMQConfig
    >>> from src.clients.rabbitmq_connector import RabbitMQClient
    >>> config = RabbitMQConfig()
    >>> client = RabbitMQClient(config)
    >>> client.declare_queue('my_queue')
    >>> client.publish('my_queue', {"message": "Hello, world!"})
    >>> client.close()

    Context manager usage:

    >>> with RabbitMQClient(config) as client:
    ...     client.declare_queue('my_queue')
    ...     client.publish('my_queue', {"message": "Hello, with context manager!"})
    """

    def __init__(self, rabbitmq_config: RabbitMQConfig):
        """
        Initialize RabbitMQ client and establish a connection.

        The connection and channel are created with retry logic.

        Parameters
        ----------
        rabbitmq_config : RabbitMQConfig
            Configuration object with connection parameters.

        Examples
        --------
        >>> client = RabbitMQClient(config)
        >>> assert client.connection.is_open
        """
        self.rabbitmq_config = rabbitmq_config
        self.connection: Optional[pika.BlockingConnection] = None
        self.channel: Optional[pika.adapters.blocking_connection.BlockingChannel] = None
        self._connect()


    def _connect(self):
        """
        Establish a connection to RabbitMQ with retries and exponential backoff.

        Retries using configuration parameters. Logs connection attempts
        and raises a `CustomException` on failure.

        Raises
        ------
        CustomException
            If unable to connect after all retry attempts or unexpected errors occur.
        """
        start_time = time.perf_counter()
        try:
            attempt = 0
            while attempt < self.rabbitmq_config.RETRIES:
                try:
                    credentials = pika.PlainCredentials(
                        username=self.rabbitmq_config.RABBITMQ_USER,
                        password=self.rabbitmq_config.RABBITMQ_PASSWORD
                    )

                    parameters = pika.ConnectionParameters(
                        host=self.rabbitmq_config.RABBITMQ_HOST,
                        port=self.rabbitmq_config.RABBITMQ_PORT,
                        virtual_host=self.rabbitmq_config.RABBITMQ_VHOST,
                        credentials=credentials,
                        heartbeat=600,
                        blocked_connection_timeout=300,
                    )

                    self.connection = pika.BlockingConnection(parameters)
                    self.channel = self.connection.channel()
                    logging.info(
                        "Connected to RabbitMQ successfully.",
                        extra={
                            "service": service_name,
                            "host": self.rabbitmq_config.RABBITMQ_HOST,
                            "duration_ms": calculate_duration(start_time),
                        }
                    )
                    break

                except AMQPConnectionError as e:
                    attempt += 1
                    logging.warning(
                        f"RabbitMQ connection attempt {attempt} failed",
                        extra={
                            "service": service_name,
                            "host": self.rabbitmq_config.RABBITMQ_HOST,
                            "stack_trace": str(e),
                            "duration_ms": calculate_duration(start_time),
                        }
                    )
                    time.sleep((self.rabbitmq_config.RETRY_BACKOFF_FACTOR ** attempt)  + random.uniform(0, 0.5))  # exponential backoff # nosec B311

                except Exception as e:
                    logging.error(
                        "Unexpected error connecting to RabbitMQ",
                        extra={
                            "service": service_name,
                            "host": self.rabbitmq_config.RABBITMQ_HOST,
                            "stack_trace": str(e),
                            "duration_ms": calculate_duration(start_time),
                        }
                    )
                    raise

            else:
                self.close()
                logging.error(
                        f"Failed to connect to RabbitMQ after {attempt} retries.",
                        extra={
                            "service": service_name,
                            "host": self.rabbitmq_config.RABBITMQ_HOST,
                            "duration_ms": calculate_duration(start_time),
                        }
                    )
                raise ConnectionError("Failed to connect to RabbitMQ after retries.")

        except Exception as e:
            logging.error(
                f"Error in RabbitMQ connection: {e}",
                extra={
                    "service": service_name,
                    "host": self.rabbitmq_config.RABBITMQ_HOST,
                    "stack_trace": str(e),
                    "duration_ms": calculate_duration(start_time),
                }
            )
            raise CustomException(e, sys)


    def declare_queue(self, queue_name: str, durable: bool = True) -> None:
        """
        Declare a queue on the RabbitMQ server.

        Parameters
        ----------
        queue_name : str
            Name of the queue to declare.
        durable : bool, optional
            Whether the queue should survive broker restarts (default True).

        Raises
        ------
        CustomException
            If the queue declaration fails.

        Examples
        --------
        >>> client.declare_queue('task_queue')
        """
        start_time = time.perf_counter()
        try:
            assert self.channel is not None
            self.channel.queue_declare(queue=queue_name, durable=durable)
            logging.info(
                f"Declared queue: {queue_name}",
                extra={
                    "service": service_name,
                    "host": self.rabbitmq_config.RABBITMQ_HOST,
                    "duration_ms": calculate_duration(start_time),
                }
            )
        except Exception as e:
            logging.error(
                f"Failed to declare queue '{queue_name}': {e}",
                extra= {
                    "service": service_name,
                    "host": self.rabbitmq_config.RABBITMQ_HOST,
                    "stack_trace": str(e),
                    "duration_ms": calculate_duration(start_time),
                }
            )
            raise CustomException(e, sys)


    def publish(self, queue_name: str, message: dict) -> None:
        """
        Publish a message to a specified queue.

        Converts datetime objects in the message to ISO format automatically.

        Parameters
        ----------
        queue_name : str
            Target queue name.
        message : dict
            The message body (must be a dictionary).

        Raises
        ------
        CustomException
            If message publishing fails.

        Examples
        --------
        >>> client.publish('task_queue', {"task": "process_data"})
        """
        start_time = time.perf_counter()
        try:
            if not isinstance(message, dict):
                raise CustomException("Message must be a dict", sys)
            elif isinstance(message.get("published"), datetime):
                message["published"] = message["published"].isoformat()  # or .strftime("%Y-%m-%d")
            assert self.channel is not None
            self.channel.basic_publish(
                exchange='',
                routing_key=queue_name,
                body=json.dumps(message),
                properties=pika.BasicProperties(
                    delivery_mode=2  # Make message persistent
                )
            )
            logging.info(
                f"Message published: {message}",
                extra= {
                    "service": service_name,
                    "host": self.rabbitmq_config.RABBITMQ_HOST,
                    "duration_ms": calculate_duration(start_time)
                }

            )
        except Exception as e:
            logging.error(
                f"Error publishing message: {e}",
                extra= {
                    "service": service_name,
                    "host": self.rabbitmq_config.RABBITMQ_HOST,
                    "stack_trace": str(e),
                    "duration_ms": calculate_duration(start_time)
                }
            )
            raise CustomException(e, sys)


    def consume(
            self,
            queue_name: str,
            callback: Callable[[pika.channel.Channel, pika.spec.Basic.Deliver, pika.spec.BasicProperties, bytes], None],
            auto_ack: bool = False,
            prefetch_count: int = 1
        ) -> None:
        """
        Start consuming messages from a queue.

        Parameters
        ----------
        queue_name : str
            Queue to consume from.
        callback : callable
            Function to process messages. Signature:
            callback(channel, method, properties, body)
        auto_ack : bool, optional
            Automatically acknowledge messages (default False).
        prefetch_count : int, optional
            Number of messages to prefetch (default 1).

        Raises
        ------
        CustomException
            If consuming messages fails.
        """
        start_time = time.perf_counter()
        try:
            assert self.channel is not None
            self.channel.basic_qos(prefetch_count=prefetch_count)
            self.channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=auto_ack)
            logging.info(
                f"Started consuming from queue: {queue_name}",
                extra= {
                    "service": service_name,
                    "host": self.rabbitmq_config.RABBITMQ_HOST,
                    "duration_ms": calculate_duration(start_time)
                }
            )
            self.channel.start_consuming()
        except KeyboardInterrupt:
            logging.info(
                "Consumer interrupted by user.",
                extra= {
                    "service": service_name,
                    "host": self.rabbitmq_config.RABBITMQ_HOST,
                    "duration_ms": calculate_duration(start_time)
                }
            )
            self.close()
            return
        except ChannelClosedByBroker as e:
            logging.error(
                f"Channel closed by broker: {e}",
                extra= {
                    "service": service_name,
                    "host": self.rabbitmq_config.RABBITMQ_HOST,
                    "stack_trace": str(e),
                    "duration_ms": calculate_duration(start_time)
                }
            )
            self.close()
            raise CustomException(e, sys)
        except Exception as e:
            logging.error(
                f"Error during consuming messages: {e}",
                extra= {
                    "service": service_name,
                    "host": self.rabbitmq_config.RABBITMQ_HOST,
                    "stack_trace": str(e),
                    "duration_ms": calculate_duration(start_time)
                }
            )
            self.close()
            raise CustomException(e, sys)


    def close(self) -> None:
        """
        Gracefully close the channel and connection.

        Ensures all resources are released properly. Should be called when
        RabbitMQ operations are complete.

        Raises
        ------
        CustomException
            If closing the channel or connection fails.

        Examples
        --------
        >>> client.close()
        """
        start_time = time.perf_counter()
        try:
            if self.channel and self.channel.is_open:
                self.channel.close()
            if self.connection and self.connection.is_open:
                self.connection.close()
            logging.info(
                "RabbitMQ connection closed.",
                extra= {
                    "service": service_name,
                    "host": self.rabbitmq_config.RABBITMQ_HOST,
                    "duration_ms": calculate_duration(start_time)
                }
            )
        except Exception as e:
            logging.warning(
                f"Error while closing RabbitMQ connection: {e}",
                extra= {
                    "service": service_name,
                    "host": self.rabbitmq_config.RABBITMQ_HOST,
                    "stack_trace": str(e),
                    "duration_ms": calculate_duration(start_time)
                }
            )
            raise CustomException(e, sys)


    def __enter__(self) -> "RabbitMQClient":
        """
        Enter runtime context for `with` statement.

        Returns
        -------
        RabbitMQClient
            The client instance itself.

        Examples
        --------
        >>> with RabbitMQClient(config) as client:
        ...     client.declare_queue('my_queue')
        ...     client.publish('my_queue', {"message": "Hello"})
        """
        return self


    def __exit__(self, exc_type, exc_value, traceback) -> None:
        """
        Exit runtime context and close the connection.

        Parameters
        ----------
        exc_type : type
            Exception type, if any.
        exc_value : Exception
            Exception instance, if any.
        traceback : traceback
            Traceback object, if any.
        """
        self.close()

__enter__()

Enter runtime context for with statement.

Returns:

Type Description
RabbitMQClient

The client instance itself.

Examples:

>>> with RabbitMQClient(config) as client:
...     client.declare_queue('my_queue')
...     client.publish('my_queue', {"message": "Hello"})
Source code in src/clients/rabbitmq_connector.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
def __enter__(self) -> "RabbitMQClient":
    """
    Enter runtime context for `with` statement.

    Returns
    -------
    RabbitMQClient
        The client instance itself.

    Examples
    --------
    >>> with RabbitMQClient(config) as client:
    ...     client.declare_queue('my_queue')
    ...     client.publish('my_queue', {"message": "Hello"})
    """
    return self

__exit__(exc_type, exc_value, traceback)

Exit runtime context and close the connection.

Parameters:

Name Type Description Default
exc_type type

Exception type, if any.

required
exc_value Exception

Exception instance, if any.

required
traceback traceback

Traceback object, if any.

required
Source code in src/clients/rabbitmq_connector.py
404
405
406
407
408
409
410
411
412
413
414
415
416
417
def __exit__(self, exc_type, exc_value, traceback) -> None:
    """
    Exit runtime context and close the connection.

    Parameters
    ----------
    exc_type : type
        Exception type, if any.
    exc_value : Exception
        Exception instance, if any.
    traceback : traceback
        Traceback object, if any.
    """
    self.close()

__init__(rabbitmq_config)

Initialize RabbitMQ client and establish a connection.

The connection and channel are created with retry logic.

Parameters:

Name Type Description Default
rabbitmq_config RabbitMQConfig

Configuration object with connection parameters.

required

Examples:

>>> client = RabbitMQClient(config)
>>> assert client.connection.is_open
Source code in src/clients/rabbitmq_connector.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def __init__(self, rabbitmq_config: RabbitMQConfig):
    """
    Initialize RabbitMQ client and establish a connection.

    The connection and channel are created with retry logic.

    Parameters
    ----------
    rabbitmq_config : RabbitMQConfig
        Configuration object with connection parameters.

    Examples
    --------
    >>> client = RabbitMQClient(config)
    >>> assert client.connection.is_open
    """
    self.rabbitmq_config = rabbitmq_config
    self.connection: Optional[pika.BlockingConnection] = None
    self.channel: Optional[pika.adapters.blocking_connection.BlockingChannel] = None
    self._connect()

close()

Gracefully close the channel and connection.

Ensures all resources are released properly. Should be called when RabbitMQ operations are complete.

Raises:

Type Description
CustomException

If closing the channel or connection fails.

Examples:

>>> client.close()
Source code in src/clients/rabbitmq_connector.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
def close(self) -> None:
    """
    Gracefully close the channel and connection.

    Ensures all resources are released properly. Should be called when
    RabbitMQ operations are complete.

    Raises
    ------
    CustomException
        If closing the channel or connection fails.

    Examples
    --------
    >>> client.close()
    """
    start_time = time.perf_counter()
    try:
        if self.channel and self.channel.is_open:
            self.channel.close()
        if self.connection and self.connection.is_open:
            self.connection.close()
        logging.info(
            "RabbitMQ connection closed.",
            extra= {
                "service": service_name,
                "host": self.rabbitmq_config.RABBITMQ_HOST,
                "duration_ms": calculate_duration(start_time)
            }
        )
    except Exception as e:
        logging.warning(
            f"Error while closing RabbitMQ connection: {e}",
            extra= {
                "service": service_name,
                "host": self.rabbitmq_config.RABBITMQ_HOST,
                "stack_trace": str(e),
                "duration_ms": calculate_duration(start_time)
            }
        )
        raise CustomException(e, sys)

consume(queue_name, callback, auto_ack=False, prefetch_count=1)

Start consuming messages from a queue.

Parameters:

Name Type Description Default
queue_name str

Queue to consume from.

required
callback callable

Function to process messages. Signature: callback(channel, method, properties, body)

required
auto_ack bool

Automatically acknowledge messages (default False).

False
prefetch_count int

Number of messages to prefetch (default 1).

1

Raises:

Type Description
CustomException

If consuming messages fails.

Source code in src/clients/rabbitmq_connector.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
def consume(
        self,
        queue_name: str,
        callback: Callable[[pika.channel.Channel, pika.spec.Basic.Deliver, pika.spec.BasicProperties, bytes], None],
        auto_ack: bool = False,
        prefetch_count: int = 1
    ) -> None:
    """
    Start consuming messages from a queue.

    Parameters
    ----------
    queue_name : str
        Queue to consume from.
    callback : callable
        Function to process messages. Signature:
        callback(channel, method, properties, body)
    auto_ack : bool, optional
        Automatically acknowledge messages (default False).
    prefetch_count : int, optional
        Number of messages to prefetch (default 1).

    Raises
    ------
    CustomException
        If consuming messages fails.
    """
    start_time = time.perf_counter()
    try:
        assert self.channel is not None
        self.channel.basic_qos(prefetch_count=prefetch_count)
        self.channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=auto_ack)
        logging.info(
            f"Started consuming from queue: {queue_name}",
            extra= {
                "service": service_name,
                "host": self.rabbitmq_config.RABBITMQ_HOST,
                "duration_ms": calculate_duration(start_time)
            }
        )
        self.channel.start_consuming()
    except KeyboardInterrupt:
        logging.info(
            "Consumer interrupted by user.",
            extra= {
                "service": service_name,
                "host": self.rabbitmq_config.RABBITMQ_HOST,
                "duration_ms": calculate_duration(start_time)
            }
        )
        self.close()
        return
    except ChannelClosedByBroker as e:
        logging.error(
            f"Channel closed by broker: {e}",
            extra= {
                "service": service_name,
                "host": self.rabbitmq_config.RABBITMQ_HOST,
                "stack_trace": str(e),
                "duration_ms": calculate_duration(start_time)
            }
        )
        self.close()
        raise CustomException(e, sys)
    except Exception as e:
        logging.error(
            f"Error during consuming messages: {e}",
            extra= {
                "service": service_name,
                "host": self.rabbitmq_config.RABBITMQ_HOST,
                "stack_trace": str(e),
                "duration_ms": calculate_duration(start_time)
            }
        )
        self.close()
        raise CustomException(e, sys)

declare_queue(queue_name, durable=True)

Declare a queue on the RabbitMQ server.

Parameters:

Name Type Description Default
queue_name str

Name of the queue to declare.

required
durable bool

Whether the queue should survive broker restarts (default True).

True

Raises:

Type Description
CustomException

If the queue declaration fails.

Examples:

>>> client.declare_queue('task_queue')
Source code in src/clients/rabbitmq_connector.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def declare_queue(self, queue_name: str, durable: bool = True) -> None:
    """
    Declare a queue on the RabbitMQ server.

    Parameters
    ----------
    queue_name : str
        Name of the queue to declare.
    durable : bool, optional
        Whether the queue should survive broker restarts (default True).

    Raises
    ------
    CustomException
        If the queue declaration fails.

    Examples
    --------
    >>> client.declare_queue('task_queue')
    """
    start_time = time.perf_counter()
    try:
        assert self.channel is not None
        self.channel.queue_declare(queue=queue_name, durable=durable)
        logging.info(
            f"Declared queue: {queue_name}",
            extra={
                "service": service_name,
                "host": self.rabbitmq_config.RABBITMQ_HOST,
                "duration_ms": calculate_duration(start_time),
            }
        )
    except Exception as e:
        logging.error(
            f"Failed to declare queue '{queue_name}': {e}",
            extra= {
                "service": service_name,
                "host": self.rabbitmq_config.RABBITMQ_HOST,
                "stack_trace": str(e),
                "duration_ms": calculate_duration(start_time),
            }
        )
        raise CustomException(e, sys)

publish(queue_name, message)

Publish a message to a specified queue.

Converts datetime objects in the message to ISO format automatically.

Parameters:

Name Type Description Default
queue_name str

Target queue name.

required
message dict

The message body (must be a dictionary).

required

Raises:

Type Description
CustomException

If message publishing fails.

Examples:

>>> client.publish('task_queue', {"task": "process_data"})
Source code in src/clients/rabbitmq_connector.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def publish(self, queue_name: str, message: dict) -> None:
    """
    Publish a message to a specified queue.

    Converts datetime objects in the message to ISO format automatically.

    Parameters
    ----------
    queue_name : str
        Target queue name.
    message : dict
        The message body (must be a dictionary).

    Raises
    ------
    CustomException
        If message publishing fails.

    Examples
    --------
    >>> client.publish('task_queue', {"task": "process_data"})
    """
    start_time = time.perf_counter()
    try:
        if not isinstance(message, dict):
            raise CustomException("Message must be a dict", sys)
        elif isinstance(message.get("published"), datetime):
            message["published"] = message["published"].isoformat()  # or .strftime("%Y-%m-%d")
        assert self.channel is not None
        self.channel.basic_publish(
            exchange='',
            routing_key=queue_name,
            body=json.dumps(message),
            properties=pika.BasicProperties(
                delivery_mode=2  # Make message persistent
            )
        )
        logging.info(
            f"Message published: {message}",
            extra= {
                "service": service_name,
                "host": self.rabbitmq_config.RABBITMQ_HOST,
                "duration_ms": calculate_duration(start_time)
            }

        )
    except Exception as e:
        logging.error(
            f"Error publishing message: {e}",
            extra= {
                "service": service_name,
                "host": self.rabbitmq_config.RABBITMQ_HOST,
                "stack_trace": str(e),
                "duration_ms": calculate_duration(start_time)
            }
        )
        raise CustomException(e, sys)