Source code for intergov.use_cases.dispatch_message_to_subscribers
import json
from intergov.loggers import logging
from intergov.serializers import generic_discrete_message as ser
logger = logging.getLogger(__name__)
[docs]class DispatchMessageToSubscribersUseCase:
"""
Used by the callbacks spreader worker.
This is the "fan-out" part of the WebSub,
where each event dispatched
to all the relevant subscribers.
For each event (notification),
it looks-up the relevant subscribers
and dispatches a callback task
so that they will be notified.
There is a downstream delivery processor
that actually makes the callback,
it is insulated from this process
by the delivery outbox message queue.
Note: In this application
the subscription signature
is based on the message predicate.
"""
def __init__(
self, notifications_repo,
delivery_outbox_repo, subscriptions_repo):
self.notifications = notifications_repo
self.delivery_outbox = delivery_outbox_repo
self.subscriptions = subscriptions_repo
def execute(self):
fetched_publish = self.notifications.get_job()
if not fetched_publish:
return None
else:
(publish_msg_id, message_job) = fetched_publish
# message_job is either a Message class
# or a dict with 'message' field which is Message
# which may be empty
# or just a dict which must be sent as a callback directly
message = message_job.get('message')
predicate = message_job.get('predicate') or message.predicate
assert predicate
# find the subscribers for this predicate
subscribers = self._get_subscribers(predicate)
# what is worse, multiple delivery or lost messages?
# here we assume lost messages are worse
# given the delivery outbox is just a queue there aren't many reasons
# for it to fail, real fails will be on the next step - when it's trivial
# to re-process the single message when other ones will be fine.
# (see DeliverCallbackUseCase)
all_OK = True
payload = {}
if message:
payload = json.dumps(message, cls=ser.MessageJSONEncoder)
else:
payload = message_job
for s in subscribers:
job = {
's': s, # subscribed callback url
'payload': payload # the payload to be sent to the callback
}
logger.info("Sceduling notification of \n[%s] with payload \n%s", s, payload)
status = self.delivery_outbox.post_job(job)
if not status:
all_OK = False
if all_OK:
self.notifications.delete(publish_msg_id)
return True
else:
return False
def _get_subscribers(self, predicate):
subscribers = self.subscriptions.search(predicate, layered=True)
if not subscribers:
logger.info("Nobody to notify about the message %s", predicate)
return subscribers