Source code for intergov.use_cases.retrieve_and_store_foreign_documents

import json
from io import BytesIO

import requests

from intergov.domain.country import Country
from intergov.loggers import logging

logger = logging.getLogger(__name__)


[docs]class RetrieveAndStoreForeignDocumentsUseCase: """ Processes single request from the queue to download some remote document. The process is recursive. If an object has sub-objects, add more jobs to download them later. .. admonition:: Note * returns None if the object has already been downloaded * returns True in case of success * raises exceptions for errors """ def __init__( self, country, object_retrieval_repo, object_lake_repo, object_acl_repo): self.country = country self.object_retrieval = object_retrieval_repo self.object_lake = object_lake_repo self.object_acl_repo = object_acl_repo def execute(self): retrieval_task = self.object_retrieval.get_job() if not retrieval_task: return False (job_id, job) = retrieval_task logger.info( "[%s] Running the RetrieveAndStoreForeignDocumentsUseCase for job %s", self.country, job ) multihash = job['object'] sender = Country(job['sender']) # 1. check if this object is not in the object lake yet exists = True try: # TODO: replace by just exist check instead of reading the whole file # maybe create and use an '.exists(multihash)' method on object_lake self.object_lake.get_body(multihash) except Exception as e: if e.__class__.__name__ == 'NoSuchKey': exists = False else: raise e # 2. if not - download it to the object lake if not exists: self._download_remote_obj(sender, multihash) # 3. Give receiver access to the object self.object_acl_repo.allow_access_to( multihash, self.country.name ) # 4. Delete the job as completed # 4.1. Schedule downloads of sub-documents self.object_retrieval.delete(job_id) return True def _download_remote_obj(self, sender, mh): logger.info("Downloading %s from %s as %s", mh, sender, self.country) remote_doc_api_url = sender.object_api_base_url() doc_resp = requests.get( remote_doc_api_url + mh, headers={ 'Authorization': 'JWTBODY {}'.format( json.dumps({ "sub": "documents-api", "party": "spider", "country": self.country.name, }) ) } ) # TODO: we should process various response codes differently: # e.g. if 5xx, rescuedule for later # if 429, rescuedule for later with increasing wait times # different 4xx, different strategies # (put thought into logging/monitoring) assert doc_resp.status_code in (200, 201), "{} {}".format(doc_resp, doc_resp.content) # logger.info("For URL %s we got resp %s", remote_doc_api_url, doc_resp) self.object_lake.post_from_file_obj( mh, BytesIO(doc_resp.content) ) # try to parse the downloaded documents for `links` section try: json_document = json.loads(doc_resp.content) except Exception: # not a json, which is fine pass else: links = json_document.get('links') if isinstance(links, list): for link in links: # {"TYPE1": "document", "TYPE2": "Exporters Information Form Update", # "name": "hmmm_6W4jRRG.png", # "ct": "binary/octet-stream", # "link": "QmZxJAJhq98T683RQSk3T2wkLBH2nFV4y43iCHRk3DZyWn"} if 'link' in link: link_qmhash = link['link'] assert '/' not in link_qmhash assert ':' not in link_qmhash logger.info("Posting sub-job to retrieve %s", link_qmhash) self.object_retrieval.post_job( { 'action': 'download-object', 'sender': sender.name, 'object': link_qmhash, 'parent': mh } )