Source code for intergov.processors.rejected_status_updater

import random
import time

from intergov.conf import env_s3_config, env_queue_config
from intergov.repos.message_lake import MessageLakeRepo
from intergov.repos.rejected_message import RejectedMessagesRepo
from intergov.use_cases import RejectPendingMessageUseCase

from intergov.loggers import logging

logger = logging.getLogger('rejected_status_updater')


[docs]class RejectedStatusUpdater(object): """ Iterate over RejectPendingMessageUseCase """ def __init__(self, rejected_message_repo_conf=None, message_lake_repo_conf=None): self._prepare_repos_confs(rejected_message_repo_conf, message_lake_repo_conf) self._prepare_repos() self._prepare_use_case() def _prepare_repos_confs( self, rejected_message_repo_conf=None, message_lake_repo_conf=None ): self.rejected_messages_repo_conf = env_queue_config('PROC_REJECTED_MESSAGES_REPO') self.message_lake_repo_conf = env_s3_config('PROC_MESSAGE_LAKE_REPO') if message_lake_repo_conf: self.message_lake_repo_conf.update(message_lake_repo_conf) if rejected_message_repo_conf: self.rejected_messages_repo_conf.update(rejected_message_repo_conf) def _prepare_repos(self): self.message_lake_repo = MessageLakeRepo(self.message_lake_repo_conf) self.rejected_messages_repo = RejectedMessagesRepo(self.rejected_messages_repo_conf) def _prepare_use_case(self): self.use_case = RejectPendingMessageUseCase( rejected_message_repo=self.rejected_messages_repo, message_lake_repo=self.message_lake_repo ) def __iter__(self): logger.info("Starting the RejectedStatusUpdater") return self def __next__(self): try: result = self.use_case.execute() except Exception as e: logger.exception(e) result = None return result
if __name__ == '__main__': # pragma: no cover # To start it manually, from the base dir: # PYTHONPATH="`pwd`" python intergov/processors/rejected_status_updater/__init__.py for result in RejectedStatusUpdater(): if result is None: # better for tests time.sleep(random.randint(2, 20))