Source code for intergov.use_cases.process_message
from intergov.domain.wire_protocols.generic_discrete import Message
from intergov.loggers import logging # NOQA
logger = logging.getLogger(__name__)
[docs]class ProcessMessageUseCase:
"""
Used by the message processing background worker.
Gets one message from the channel inbox
and does number of things with it.
* dispatch document retreval job
(if the message is from a foreign source)
* dispatch message sending task to channel-outbox
(if the message is from a domestic source)
* ensure the message is stored in the message lake
* ensure the access control lists are updated
for this message
* dispatch any WebSub events required for this message
Note: the inbound message may have come from
one of two sources: it may be a message from within
this country, or it may be a message sent from another country.
This use-case works with either message,
however it needs to know which country it is working as
to get the logic right
(that is why it takes a country parameter
when it is instantiated).
"""
def __init__(
self,
country,
bc_inbox_repo,
message_lake_repo,
object_acl_repo,
object_retreval_repo,
notifications_repo, # TODO: rename to notifications_repo
blockchain_outbox_repo):
self.country = country
self.bc_inbox_repo = bc_inbox_repo
self.message_lake_repo = message_lake_repo
self.object_acl_repo = object_acl_repo
self.object_retreval_repo = object_retreval_repo
self.notifications_repo = notifications_repo
self.blockchain_outbox_repo = blockchain_outbox_repo
def execute(self):
# Get the message from the bc_inbox_repo (which is a events queue)
fetched_bc_inbox = self.bc_inbox_repo.get()
if not fetched_bc_inbox:
return None
else:
# let it be procssed
(queue_message_id, message) = fetched_bc_inbox
logger.info("Received message to process: %s", message)
# TODO: if something is fine and something is failed then first
# steps will be done again
# fine for object storage but not for queues
try:
ml_OK = self.message_lake_repo.post(message)
except Exception as e:
logger.exception(e)
ml_OK = False
try:
acl_OK = self.object_acl_repo.post(message)
except Exception as e:
logger.exception(e)
acl_OK = False
# publish outbox is for notifications to internal clients
# and in fact it's worthy only for received messages, not for sent
# so ideally we shouldn't notify ourselves about our messages
# or may be we do if local apps want to know about it?...
try:
# we delay it a little to make sure the message has got to the repo
# and remove status because notifications don't need it
message_without_status = Message.from_dict(
message.to_dict(exclude=['status'])
)
pub_OK = self.notifications_repo.post(
message_without_status,
delay_seconds=3
)
except Exception as e:
logger.exception(e)
pub_OK = False
# blockchain part - pass the message to the blockchain worker
# so it can be shared to the foreign parties
if message.status == 'pending':
# not received from the foreign party = must be sent
logger.info("Sending this message out to the world")
try:
outbox_OK = self.blockchain_outbox_repo.post(message)
except Exception as e:
logger.exception(e)
outbox_OK = False
else:
outbox_OK = True
ret_OK = True
if message.status == 'received':
# might need to download remote documents using the
# Documents Spider
if message.sender != self.country:
# if it's not loopback message (test installations only)
logger.info(
"Scheduling download remote documents for %s", message
)
try:
ret_OK = self.object_retreval_repo.post_job({
"action": "download-object",
"sender": message.sender,
"object": message.obj
})
except Exception as e:
logger.exception(e)
ret_OK = False
else:
logger.info(
"Seems that this message is loopback (sent by us back to us)"
)
if ml_OK and acl_OK and ret_OK and pub_OK and outbox_OK:
self.bc_inbox_repo.delete(queue_message_id)
return True
else:
logger.error("Task processing failed, will try again later")
# what TODO with the failed ones?
# the problem is the fact that we have submitted message
# to some repos and some other failed,
# which means we must retry just failed submissions
# and it may introduce a tricky state when some external message
# processors will get info from the one source and won't get it
# from the another. They should wait then.
return False