Skip to content

Mongo Connector API Reference

MongoDBOperation

Handles MongoDB operations such as connecting to a collection, inserting documents, checking for existing records, and retrieving documents grouped by date.

Parameters:

Name Type Description Default
mongo_db_config MongoDBConfig

Configuration object containing MongoDB connection details.

required
max_retries int

Maximum number of retries to get a MongoDB collection.

3
retry_delay float

Initial delay between retries in seconds. Doubles each attempt.

1.0

Examples:

>>> from src.entity.config_entity import MongoDBConfig
>>> from src.clients.mongo_connector import MongoDBOperation
>>> config = MongoDBConfig()
>>> with MongoDBOperation(config) as mongo_op:
...     mongo_op.insert_data([{"title": "Example", "published": datetime.utcnow()}])
...     exists = mongo_op.is_article_link_exists("http://example.com/article")
...     print("Article exists:", exists)
Source code in src/clients/mongo_connector.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
class MongoDBOperation:
    """
    Handles MongoDB operations such as connecting to a collection,
    inserting documents, checking for existing records, and retrieving
    documents grouped by date.

    Parameters
    ----------
    mongo_db_config : MongoDBConfig
        Configuration object containing MongoDB connection details.
    max_retries : int, default=3
        Maximum number of retries to get a MongoDB collection.
    retry_delay : float, default=1.0
        Initial delay between retries in seconds. Doubles each attempt.

    Examples
    --------
    >>> from src.entity.config_entity import MongoDBConfig
    >>> from src.clients.mongo_connector import MongoDBOperation
    >>> config = MongoDBConfig()
    >>> with MongoDBOperation(config) as mongo_op:
    ...     mongo_op.insert_data([{"title": "Example", "published": datetime.utcnow()}])
    ...     exists = mongo_op.is_article_link_exists("http://example.com/article")
    ...     print("Article exists:", exists)
    """

    def __init__(self, mongo_db_config: MongoDBConfig, max_retries: int = 3, retry_delay: float = 1.0):
        """
        Initialize MongoDBOperation with config and establish MongoDB client connection.

        Parameters
        ----------
        mongo_db_config : MongoDBConfig
            Configuration object with MongoDB settings.
        max_retries : int, default=3
            Maximum retry attempts for collection retrieval.
        retry_delay : float, default=1.0
            Initial delay between retries (seconds).

        Raises
        ------
        CustomException
            If connection fails.
        """
        self.mongo_db_config = mongo_db_config
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.client: Optional[MongoClient] = None
        self._collection = None

        self._connect()


    def _connect(self):
        """
        Establish a MongoDB client connection.

        Raises
        ------
        CustomException
            If connection to MongoDB fails.
        """
        try:
            logging.info("Establishing MongoDB client connection...")
            self.client = MongoClient(
                self.mongo_db_config.MONGODB_URI,
                serverSelectionTimeoutMS=5000  # Fast fail if not reachable
            )
            # Test the connection
            self.client.admin.command("ping")
            logging.info("MongoDB connection established successfully.")
        except ConnectionFailure as e:
            logging.error(f"MongoDB connection failed: {e}")
            raise CustomException("Failed to connect to MongoDB", sys)


    def _get_collection(self) -> Collection:
        """
        Retrieve the MongoDB collection with retry logic.

        Returns
        -------
        Collection
            The MongoDB collection object.

        Raises
        ------
        CustomException
            If retrieval fails after retries.
        """
        if self._collection:
            return self._collection

        for attempt in range(1, self.max_retries + 1):
            try:
                logging.info(f"[Attempt {attempt}] Getting collection: "
                             f"{self.mongo_db_config.MONGODB_COLLECTION_NAME}")

                if self.client is None:
                    raise CustomException("MongoDB client is not initialized", sys)

                db = self.client[self.mongo_db_config.MONGODB_DATABASE_NAME]
                collection = db[self.mongo_db_config.MONGODB_COLLECTION_NAME]

                # Ping the collection to validate access
                collection.estimated_document_count()
                logging.info("MongoDB collection retrieved successfully.")
                self._collection = collection
                return collection
            except PyMongoError as e:
                logging.warning(f"Attempt {attempt} failed: {e}")
                if attempt < self.max_retries:
                    time.sleep(self.retry_delay * (2 ** (attempt - 1)))
                else:
                    logging.error("Exceeded maximum retries for getting collection.")
                    raise CustomException("Failed to get MongoDB collection", sys)


    @property
    def collection(self) -> Collection:
        """
        MongoDB collection instance (lazy-loaded with retry).

        Returns
        -------
        Collection
            MongoDB collection object.
        """
        return self._get_collection()


    def get_date_wise_doc_count(self) -> Optional[List[Tuple[datetime, int]]]:
        """
        Get the dates and respective document counts, ordered by date.

        Returns
        -------
        list of tuple of (datetime.date, int) or None
            List of tuples containing (date, document count).
            Returns None if no documents are found.

        Raises
        ------
        CustomException
            If query execution fails.

        Examples
        --------
        >>> counts = mongo_op.get_date_wise_doc_count()
        >>> if counts:
        ...     for date, count in counts:
        ...         print(date, count)
        """
        try:
            pipeline = [
                            {
                                "$group": {
                                    "_id": {  # Extract just the date part (Y-M-D)
                                        "$dateToString": {
                                            "format": "%Y-%m-%d",
                                            "date": "$published"
                                        }
                                    },
                                    "documents": { "$push": "$$ROOT" },  # Group all documents for the same date
                                    "count": { "$sum": 1 }  # Optional: count of documents per date
                                }
                            },
                            {
                                "$sort": { "_id": 1 }  # Sort by date ascending
                            }
                        ]
            result = list(self.collection.aggregate(pipeline))
            date_wise_count = None
            if result:
                # Convert _id from string to datetime.date
                date_wise_count = [
                    (group['_id'], group['count'])
                    for group in result
                ]
                logging.debug(f"Fetched latest document: {date_wise_count}")
                return date_wise_count
            else:
                logging.info(f"No documents found : {date_wise_count}.")
                return date_wise_count
        except Exception as e:
            logging.error(f"Error fetching latest date: {e}")
            raise CustomException(e, sys)


    def insert_data(self, data: List[Dict]):
        """
        Insert multiple documents into the MongoDB collection.

        Parameters
        ----------
        data : list of dict
            Documents to insert.

        Raises
        ------
        CustomException
            If insertion fails.

        Examples
        --------
        >>> documents = [
        ...     {"title": "Breaking News", "published": datetime(2023, 8, 25)},
        ...     {"title": "Tech Update", "published": datetime(2023, 8, 26)}
        ... ]
        >>> mongo_op.insert_data(documents)
        """
        try:
            logging.info(f"Inserting data to MongoDB collection {self.mongo_db_config.MONGODB_COLLECTION_NAME}")
            self.collection.insert_many(data)
        except Exception as e:
            logging.error(f"Error while inserting data to MongoDB: {e}")
            raise CustomException(e, sys)


    def is_article_link_exists(self, article_link: str):
        """
        Check if a document with the given article link exists.

        Parameters
        ----------
        article_link : str
            URL of the article.

        Returns
        -------
        bool
            True if the document exists, False otherwise.

        Raises
        ------
        CustomException
            If query fails.

        Examples
        --------
        >>> exists = mongo_op.is_article_link_exists("http://example.com/article123")
        >>> print(exists)
        True
        """
        try:
            doc_url = self.collection.find_one(
                filter={
                    self.mongo_db_config.MONGO_DOC_ARTICLE_URL_KEY: article_link,
                }
            )

            if doc_url is not None:
                logging.info(f"Document exists for Url: {article_link}")
                return True
            else:
                logging.info(f"No document exists for Url: {article_link}")
                return False
        except Exception as e:
            logging.error(f"Error while fetching data from MongoDB: {e}")
            raise CustomException(e, sys)


    def close_connection(self):
        """
        Gracefully close the MongoDB client connection.

        Raises
        ------
        CustomException
            If closing the connection fails.

        Examples
        --------
        >>> mongo_op.close_connection()
        """
        try:
            if self.client:
                logging.info("Closing MongoDB client connection...")
                self.client.close()
                self.client = None
                self._collection = None
                logging.info("MongoDB connection closed.")
        except Exception as e:
            logging.error(f"Error closing MongoDB connection: {e}")
            raise CustomException(e, sys)

    def __enter__(self):
        """
        Enter runtime context (for `with` statement).

        Returns
        -------
        MongoDBOperation
            The MongoDBOperation instance.

        Examples
        --------
        >>> with MongoDBOperation(config) as mongo_op:
        ...     mongo_op.insert_data([{"title": "inside context"}])
        """
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """
        Exit runtime context and close MongoDB connection.

        Parameters
        ----------
        exc_type : type
            Exception type (if raised inside context).
        exc_val : Exception
            Exception instance (if raised).
        exc_tb : traceback
            Traceback object.
        """
        self.close_connection()

collection

MongoDB collection instance (lazy-loaded with retry).

Returns:

Type Description
Collection

MongoDB collection object.

__enter__()

Enter runtime context (for with statement).

Returns:

Type Description
MongoDBOperation

The MongoDBOperation instance.

Examples:

>>> with MongoDBOperation(config) as mongo_op:
...     mongo_op.insert_data([{"title": "inside context"}])
Source code in src/clients/mongo_connector.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def __enter__(self):
    """
    Enter runtime context (for `with` statement).

    Returns
    -------
    MongoDBOperation
        The MongoDBOperation instance.

    Examples
    --------
    >>> with MongoDBOperation(config) as mongo_op:
    ...     mongo_op.insert_data([{"title": "inside context"}])
    """
    return self

__exit__(exc_type, exc_val, exc_tb)

Exit runtime context and close MongoDB connection.

Parameters:

Name Type Description Default
exc_type type

Exception type (if raised inside context).

required
exc_val Exception

Exception instance (if raised).

required
exc_tb traceback

Traceback object.

required
Source code in src/clients/mongo_connector.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
def __exit__(self, exc_type, exc_val, exc_tb):
    """
    Exit runtime context and close MongoDB connection.

    Parameters
    ----------
    exc_type : type
        Exception type (if raised inside context).
    exc_val : Exception
        Exception instance (if raised).
    exc_tb : traceback
        Traceback object.
    """
    self.close_connection()

__init__(mongo_db_config, max_retries=3, retry_delay=1.0)

Initialize MongoDBOperation with config and establish MongoDB client connection.

Parameters:

Name Type Description Default
mongo_db_config MongoDBConfig

Configuration object with MongoDB settings.

required
max_retries int

Maximum retry attempts for collection retrieval.

3
retry_delay float

Initial delay between retries (seconds).

1.0

Raises:

Type Description
CustomException

If connection fails.

Source code in src/clients/mongo_connector.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def __init__(self, mongo_db_config: MongoDBConfig, max_retries: int = 3, retry_delay: float = 1.0):
    """
    Initialize MongoDBOperation with config and establish MongoDB client connection.

    Parameters
    ----------
    mongo_db_config : MongoDBConfig
        Configuration object with MongoDB settings.
    max_retries : int, default=3
        Maximum retry attempts for collection retrieval.
    retry_delay : float, default=1.0
        Initial delay between retries (seconds).

    Raises
    ------
    CustomException
        If connection fails.
    """
    self.mongo_db_config = mongo_db_config
    self.max_retries = max_retries
    self.retry_delay = retry_delay
    self.client: Optional[MongoClient] = None
    self._collection = None

    self._connect()

close_connection()

Gracefully close the MongoDB client connection.

Raises:

Type Description
CustomException

If closing the connection fails.

Examples:

>>> mongo_op.close_connection()
Source code in src/clients/mongo_connector.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
def close_connection(self):
    """
    Gracefully close the MongoDB client connection.

    Raises
    ------
    CustomException
        If closing the connection fails.

    Examples
    --------
    >>> mongo_op.close_connection()
    """
    try:
        if self.client:
            logging.info("Closing MongoDB client connection...")
            self.client.close()
            self.client = None
            self._collection = None
            logging.info("MongoDB connection closed.")
    except Exception as e:
        logging.error(f"Error closing MongoDB connection: {e}")
        raise CustomException(e, sys)

get_date_wise_doc_count()

Get the dates and respective document counts, ordered by date.

Returns:

Type Description
list of tuple of (datetime.date, int) or None

List of tuples containing (date, document count). Returns None if no documents are found.

Raises:

Type Description
CustomException

If query execution fails.

Examples:

>>> counts = mongo_op.get_date_wise_doc_count()
>>> if counts:
...     for date, count in counts:
...         print(date, count)
Source code in src/clients/mongo_connector.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def get_date_wise_doc_count(self) -> Optional[List[Tuple[datetime, int]]]:
    """
    Get the dates and respective document counts, ordered by date.

    Returns
    -------
    list of tuple of (datetime.date, int) or None
        List of tuples containing (date, document count).
        Returns None if no documents are found.

    Raises
    ------
    CustomException
        If query execution fails.

    Examples
    --------
    >>> counts = mongo_op.get_date_wise_doc_count()
    >>> if counts:
    ...     for date, count in counts:
    ...         print(date, count)
    """
    try:
        pipeline = [
                        {
                            "$group": {
                                "_id": {  # Extract just the date part (Y-M-D)
                                    "$dateToString": {
                                        "format": "%Y-%m-%d",
                                        "date": "$published"
                                    }
                                },
                                "documents": { "$push": "$$ROOT" },  # Group all documents for the same date
                                "count": { "$sum": 1 }  # Optional: count of documents per date
                            }
                        },
                        {
                            "$sort": { "_id": 1 }  # Sort by date ascending
                        }
                    ]
        result = list(self.collection.aggregate(pipeline))
        date_wise_count = None
        if result:
            # Convert _id from string to datetime.date
            date_wise_count = [
                (group['_id'], group['count'])
                for group in result
            ]
            logging.debug(f"Fetched latest document: {date_wise_count}")
            return date_wise_count
        else:
            logging.info(f"No documents found : {date_wise_count}.")
            return date_wise_count
    except Exception as e:
        logging.error(f"Error fetching latest date: {e}")
        raise CustomException(e, sys)

insert_data(data)

Insert multiple documents into the MongoDB collection.

Parameters:

Name Type Description Default
data list of dict

Documents to insert.

required

Raises:

Type Description
CustomException

If insertion fails.

Examples:

>>> documents = [
...     {"title": "Breaking News", "published": datetime(2023, 8, 25)},
...     {"title": "Tech Update", "published": datetime(2023, 8, 26)}
... ]
>>> mongo_op.insert_data(documents)
Source code in src/clients/mongo_connector.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def insert_data(self, data: List[Dict]):
    """
    Insert multiple documents into the MongoDB collection.

    Parameters
    ----------
    data : list of dict
        Documents to insert.

    Raises
    ------
    CustomException
        If insertion fails.

    Examples
    --------
    >>> documents = [
    ...     {"title": "Breaking News", "published": datetime(2023, 8, 25)},
    ...     {"title": "Tech Update", "published": datetime(2023, 8, 26)}
    ... ]
    >>> mongo_op.insert_data(documents)
    """
    try:
        logging.info(f"Inserting data to MongoDB collection {self.mongo_db_config.MONGODB_COLLECTION_NAME}")
        self.collection.insert_many(data)
    except Exception as e:
        logging.error(f"Error while inserting data to MongoDB: {e}")
        raise CustomException(e, sys)

is_article_link_exists(article_link)

Check if a document with the given article link exists.

Parameters:

Name Type Description Default
article_link str

URL of the article.

required

Returns:

Type Description
bool

True if the document exists, False otherwise.

Raises:

Type Description
CustomException

If query fails.

Examples:

>>> exists = mongo_op.is_article_link_exists("http://example.com/article123")
>>> print(exists)
True
Source code in src/clients/mongo_connector.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def is_article_link_exists(self, article_link: str):
    """
    Check if a document with the given article link exists.

    Parameters
    ----------
    article_link : str
        URL of the article.

    Returns
    -------
    bool
        True if the document exists, False otherwise.

    Raises
    ------
    CustomException
        If query fails.

    Examples
    --------
    >>> exists = mongo_op.is_article_link_exists("http://example.com/article123")
    >>> print(exists)
    True
    """
    try:
        doc_url = self.collection.find_one(
            filter={
                self.mongo_db_config.MONGO_DOC_ARTICLE_URL_KEY: article_link,
            }
        )

        if doc_url is not None:
            logging.info(f"Document exists for Url: {article_link}")
            return True
        else:
            logging.info(f"No document exists for Url: {article_link}")
            return False
    except Exception as e:
        logging.error(f"Error while fetching data from MongoDB: {e}")
        raise CustomException(e, sys)