diff --git a/swh/deposit/api/private/deposit_read.py b/swh/deposit/api/private/deposit_read.py index 1a5bdca7..a88f790f 100644 --- a/swh/deposit/api/private/deposit_read.py +++ b/swh/deposit/api/private/deposit_read.py @@ -1,145 +1,144 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import shutil import tempfile from rest_framework import status from swh.loader.tar import tarball from ..common import SWHGetDepositAPI, SWHPrivateAPIView from ...models import Deposit, DepositRequest, TemporaryArchive def aggregate_tarballs(extraction_dir, archive_paths): """Aggregate multiple tarballs into one and returns this new archive's path. Args: extraction_dir (path): Path to use for the tarballs computation archive_paths ([str]): Deposit's archive paths Returns: Tuple (directory to clean up, archive path (aggregated or not)) """ if len(archive_paths) > 1: # need to rebuild one archive # from multiple ones os.makedirs(extraction_dir, 0o755, exist_ok=True) dir_path = tempfile.mkdtemp(prefix='swh.deposit.scheduler-', dir=extraction_dir) # root folder to build an aggregated tarball aggregated_tarball_rootdir = os.path.join(dir_path, 'aggregate') os.makedirs(aggregated_tarball_rootdir, 0o755, exist_ok=True) # uncompress in a temporary location all archives for archive_path in archive_paths: tarball.uncompress(archive_path, aggregated_tarball_rootdir) # Aggregate into one big tarball the multiple smaller ones temp_tarpath = tarball.compress( aggregated_tarball_rootdir + '.zip', nature='zip', dirpath_or_files=aggregated_tarball_rootdir) # clean up temporary uncompressed tarball's on-disk content shutil.rmtree(aggregated_tarball_rootdir) # need to cleanup the temporary tarball when we are done directory_to_cleanup = dir_path else: # only 1 archive, no need to do fancy actions (and no cleanup step) temp_tarpath = archive_paths[0] directory_to_cleanup = None return directory_to_cleanup, temp_tarpath def stream_content(tarpath): """Stream a tarpath's content. Args: tarpath (path): Path to a tarball Raises: ValueError if the tarpath targets something nonexistent """ if not os.path.exists(tarpath): raise ValueError('Development error: %s should exist' % tarpath) with open(tarpath, 'rb') as f: - for chunk in f: - yield chunk + yield from f class SWHDepositReadArchives(SWHGetDepositAPI, SWHPrivateAPIView): """Dedicated class to read a deposit's raw archives content. Only GET is supported. """ ADDITIONAL_CONFIG = { 'extraction_dir': ('str', '/tmp/swh-deposit/archive/') } def __init__(self): super().__init__() self.extraction_dir = self.config['extraction_dir'] if not os.path.exists(self.extraction_dir): os.makedirs(self.extraction_dir) def retrieve_archives(self, deposit_id): """Given a deposit identifier, returns its associated archives' path. Yields: path to deposited archives """ deposit = Deposit.objects.get(pk=deposit_id) deposit_requests = DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['archive']).order_by('id') for deposit_request in deposit_requests: yield deposit_request.archive.path def cleanup(self, directory_to_cleanup): """Reference the temporary directory holding the archive to be cleaned up. This actually does not clean up but add a reference for a directory to be cleaned up if it exists. Args: directory_to_cleanup (str/None): A reference to a directory to be cleaned up """ if directory_to_cleanup: # Add a temporary directory to be cleaned up in the db model # Another service is in charge of actually cleaning up if os.path.exists(directory_to_cleanup): tmp_archive = TemporaryArchive(path=directory_to_cleanup) tmp_archive.save() def process_get(self, req, collection_name, deposit_id): """Build a unique tarball from the multiple received and stream that content to the client. Args: req (Request): collection_name (str): Collection owning the deposit deposit_id (id): Deposit concerned by the reading Returns: Tuple status, stream of content, content-type """ archive_paths = list(self.retrieve_archives(deposit_id)) directory_to_cleanup, temp_tarpath = aggregate_tarballs( self.extraction_dir, archive_paths) stream = stream_content(temp_tarpath) self.cleanup(directory_to_cleanup) return status.HTTP_200_OK, stream, 'application/octet-stream' diff --git a/swh/deposit/tasks.py b/swh/deposit/tasks.py index 446eeebf..4b088ce4 100644 --- a/swh/deposit/tasks.py +++ b/swh/deposit/tasks.py @@ -1,44 +1,94 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.scheduler.task import Task +import os +import requests +import tempfile +from swh.scheduler.task import Task from swh.loader.tar.loader import TarLoader -def fetch_archive_locally(archive_url): - pass +def retrieve_archive_to(archive_url, archive_path): + """Retrieve the archive from the deposit to a local directory. + + Args: + + archive_url (str): The full deposit archive(s)'s raw content + to retrieve locally + + archive_path (str): the local archive's path where to store + the raw content + + Returns: + The archive path to the local archive to load. + Or None if any problem arose. + + """ + r = requests.get(archive_url, stream=True) + if r.ok: + with open(archive_path, 'wb') as f: + for chunk in r.iter_content(): + f.write(chunk) + + return archive_path + return None + + +def update_deposit_status(archive_url, status): + """Update the deposit's status. + + Args: + archive_url (str): the full deposit's archive + status (str): The status to update the deposit with + + """ + update_url = archive_url.replace('/raw/', '/update/') + requests.put(update_url, + json={'status': status}) class LoadDepositArchive(Task): + """Deposit archive ingestion task described by the following steps: + + 1. Retrieve tarball from deposit's private api and store + locally in a temporary directory + 2. Trigger the ingestion + 3. clean up the temporary directory + 4. Update the deposit's status according to result using the + deposit's private update status api + + """ task_queue = 'swh_deposit_archive' def run_task(self, *, deposit_archive_url, origin, visit_date, revision): """Import a deposit tarball into swh. Args: see :func:`TarLoader.load`. """ - loader = TarLoader() - loader.log = self.log - - # 1. Retrieve tarball from deposit's private api - # 2. Store locally in a temporary directory - # 3. Trigger the ingestion - # 4. clean up the temporary directory - # 5. Update the deposit's status according to result using the - # deposit's private update status api - - tar_path = 'foobar' - - import os - occurrence = os.path.basename(tar_path) - - self.log.info('%s %s %s %s %s' % (deposit_archive_url, origin, - visit_date, revision, - [occurrence])) - # loader.load(tar_path=tar_path, origin=origin, visit_date=visit_date, - # revision=revision, occurrences=[occurrence]) + temporary_directory = tempfile.TemporaryDirectory() + archive_path = os.path.join(temporary_directory.name, 'archive.zip') + archive = retrieve_archive_to(deposit_archive_url, archive_path) + + if not archive: + raise ValueError('Failure to retrieve archive') + + occurrence = {'branch': 'master'} + try: + loader = TarLoader() + loader.log = self.log + loader.load(tar_path=archive_path, + origin=origin, + visit_date=visit_date, + revision=revision, + occurrences=[occurrence]) + status = 'success' + except: + self.log.exception('What happened?') + status = 'failure' + + update_deposit_status(deposit_archive_url, status)