Source code for intergov.processors.message_updater
import time
import requests
from http import HTTPStatus
from intergov.conf import env_queue_config
from intergov.repos.message_updates import MessageUpdatesRepo
from intergov.domain.wire_protocols import generic_discrete as gd
from intergov.loggers import logging
from intergov.processors.common.env import (
MESSAGE_PATCH_API_ENDPOINT
)
logger = logging.getLogger('channel_message_updater')
[docs]class MessageUpdater(object):
"""
Iterate over message update jobs:
* get a job from the queue
* after some job validation, update the message using the API
* if sucessful, delete the job from the queue
* if unsucessful, increment retry counter and reschedule attempt
"""
# TODO: FIXME: push business logic into a testable use_case object
# maybe also put the "update job" into a request model
# TODO: tar-pit algorithm on retrys?
# (prevent thundering herd after outage)
def _prepare_message_updates_repo(self, conf):
message_updates_repo_conf = env_queue_config('BCH_MESSAGE_UPDATES')
if conf:
message_updates_repo_conf.update(conf)
self.message_updates_repo = MessageUpdatesRepo(message_updates_repo_conf)
def _patch_message(self, job):
msg = job['message']
patch = job['patch']
retry = job.get('retry', 0)
retry_max = job.get('retry_max', 2)
sender = msg[gd.SENDER_KEY]
sender_ref = msg[gd.SENDER_REF_KEY]
logger.info(
'Patching message[sender:%s, sender_ref:%s, patch:%s]',
sender,
sender_ref,
patch
)
resp = requests.patch(
MESSAGE_PATCH_API_ENDPOINT.format(
sender=sender,
sender_ref=sender_ref
),
json=patch
)
if resp.status_code == HTTPStatus.NOT_FOUND:
if retry + 1 > retry_max:
# this should probably be at least WARN level
logger.info('Message not found. Max retries reached.')
return True
logger.info('Message not found. Schedule retry')
job['retry'] = retry + 1
self.message_updates_repo.post_job(job, delay_seconds=30)
return True
if resp.status_code == HTTPStatus.CONFLICT:
logger.info('Patch causing conflic with the current message state.')
return True
if resp.status_code != HTTPStatus.OK:
raise RuntimeError(
f"Can't patch the message reason:{resp.text}"
)
logger.info('Message patched successfully.')
return True
def __init__(
self,
message_updates_repo_conf=None
):
self._prepare_message_updates_repo(message_updates_repo_conf)
def __iter__(self):
logger.info('Starting channel message updater')
return self
def __next__(self):
try:
result = self.message_updates_repo.get_job()
if not result:
return None
job_queue_id, job_payload = result
if self._patch_message(job_payload):
logger.info('Deleting the job.')
return self.message_updates_repo.delete(job_queue_id)
except Exception as e:
logger.exception(e)
return None
if __name__ == '__main__':
for result in MessageUpdater():
if result is None:
time.sleep(1)