diff --git a/swh/deposit/api/private/deposit_read.py b/swh/deposit/api/private/deposit_read.py index 71baca87..26e05db5 100644 --- a/swh/deposit/api/private/deposit_read.py +++ b/swh/deposit/api/private/deposit_read.py @@ -1,226 +1,236 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import os import shutil import tempfile from contextlib import contextmanager from django.http import FileResponse from rest_framework import status from swh.loader.tar import tarball from swh.model import hashutil, identifiers from ..common import SWHGetDepositAPI, SWHPrivateAPIView from ...models import Deposit, DepositRequest from ...models import previous_revision_id @contextmanager def aggregate_tarballs(extraction_dir, archive_paths): """Aggregate multiple tarballs into one and returns this new archive's path. Args: extraction_dir (path): Path to use for the tarballs computation archive_paths ([str]): Deposit's archive paths Returns: Tuple (directory to clean up, archive path (aggregated or not)) """ if len(archive_paths) > 1: # need to rebuild one archive # from multiple ones os.makedirs(extraction_dir, 0o755, exist_ok=True) dir_path = tempfile.mkdtemp(prefix='swh.deposit-', dir=extraction_dir) # root folder to build an aggregated tarball aggregated_tarball_rootdir = os.path.join(dir_path, 'aggregate') os.makedirs(aggregated_tarball_rootdir, 0o755, exist_ok=True) # uncompress in a temporary location all archives for archive_path in archive_paths: tarball.uncompress(archive_path, aggregated_tarball_rootdir) # Aggregate into one big tarball the multiple smaller ones temp_tarpath = tarball.compress( aggregated_tarball_rootdir + '.zip', nature='zip', dirpath_or_files=aggregated_tarball_rootdir) # can already clean up temporary directory shutil.rmtree(aggregated_tarball_rootdir) try: yield temp_tarpath finally: shutil.rmtree(dir_path) else: # only 1 archive, no need to do fancy actions (and no cleanup step) yield archive_paths[0] class SWHDepositReadArchives(SWHGetDepositAPI, SWHPrivateAPIView): """Dedicated class to read a deposit's raw archives content. Only GET is supported. """ ADDITIONAL_CONFIG = { - 'extraction_dir': ('str', '/tmp/swh-deposit/archive/') + 'extraction_dir': ('str', '/tmp/swh-deposit/archive/'), } def __init__(self): super().__init__() self.extraction_dir = self.config['extraction_dir'] if not os.path.exists(self.extraction_dir): os.makedirs(self.extraction_dir) def retrieve_archives(self, deposit_id): """Given a deposit identifier, returns its associated archives' path. Yields: path to deposited archives """ deposit = Deposit.objects.get(pk=deposit_id) deposit_requests = DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['archive']).order_by('id') for deposit_request in deposit_requests: yield deposit_request.archive.path def process_get(self, req, collection_name, deposit_id): """Build a unique tarball from the multiple received and stream that content to the client. Args: req (Request): collection_name (str): Collection owning the deposit deposit_id (id): Deposit concerned by the reading Returns: Tuple status, stream of content, content-type """ archive_paths = list(self.retrieve_archives(deposit_id)) with aggregate_tarballs(self.extraction_dir, archive_paths) as path: return FileResponse(open(path, 'rb'), status=status.HTTP_200_OK, content_type='application/octet-stream') class SWHDepositReadMetadata(SWHGetDepositAPI, SWHPrivateAPIView): """Class in charge of aggregating metadata on a deposit. """ + ADDITIONAL_CONFIG = { + 'provider': ('dict', { + 'provider_name': '', + 'provider_type': 'deposit_client', + 'provider_url': '', + 'metadata': { + } + }), + 'tool': ('dict', { + 'tool_name': 'swh-deposit', + 'tool_version': '0.0.1', + 'tool_configuration': { + 'sword_version': '2' + } + }) + } + + def __init__(self): + super().__init__() + self.provider = self.config['provider'] + self.tool = self.config['tool'] + def _aggregate_metadata(self, deposit, metadata_requests): """Retrieve and aggregates metadata information. """ metadata = {} for req in metadata_requests: metadata.update(req.metadata) return metadata def aggregate(self, deposit, requests): """Aggregate multiple data on deposit into one unified data dictionary. Args: deposit (Deposit): Deposit concerned by the data aggregation. requests ([DepositRequest]): List of associated requests which need aggregation. Returns: Dictionary of data representing the deposit to inject in swh. """ data = {} metadata_requests = [] # Retrieve tarballs/metadata information metadata = self._aggregate_metadata(deposit, metadata_requests) # Read information metadata data['origin'] = { 'type': 'deposit', 'url': deposit.client.url + deposit.external_id, } # revision fullname = deposit.client.get_full_name() author_committer = { 'name': deposit.client.last_name, 'fullname': fullname, 'email': deposit.client.email, } + # metadata provider + self.provider['provider_name'] = deposit.client.last_name + self.provider['provider_url'] = deposit.client.url + revision_type = 'tar' revision_msg = '%s: Deposit %s in collection %s' % ( fullname, deposit.id, deposit.collection.name) complete_date = identifiers.normalize_timestamp(deposit.complete_date) data['revision'] = { 'synthetic': True, 'date': complete_date, 'committer_date': complete_date, 'author': author_committer, 'committer': author_committer, 'type': revision_type, 'message': revision_msg, 'metadata': metadata, } parent_revision = previous_revision_id(deposit.swh_id) if parent_revision: data['revision'] = { 'parents': [hashutil.hash_to_bytes(parent_revision)] } data['occurrence'] = { 'branch': 'master' } - provider = { - 'provider_name': deposit.client.last_name, - 'provider_type': 'deposit_client', - 'provider_url': deposit.client.url, - 'metadata': {} - } - - tool = { - 'tool_name': 'swh-deposit', - 'tool_version': '0.0.1', - 'tool_configuration': { - 'sword_version': '2' - } - } - data['origin_metadata'] = { - 'provider': provider, - 'tool': tool, + 'provider': self.provider, + 'tool': self.tool, 'metadata': metadata } - print(data) return data def process_get(self, req, collection_name, deposit_id): deposit = Deposit.objects.get(pk=deposit_id) requests = DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['metadata']) data = self.aggregate(deposit, requests) d = {} if data: d = json.dumps(data) return status.HTTP_200_OK, d, 'application/json' diff --git a/swh/deposit/injection/loader.py b/swh/deposit/injection/loader.py index 8f570140..77619b4c 100644 --- a/swh/deposit/injection/loader.py +++ b/swh/deposit/injection/loader.py @@ -1,200 +1,196 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import os import requests import tempfile from swh.model import hashutil from swh.loader.tar import loader from swh.loader.core.loader import SWHLoader class DepositClient: """Deposit client to read archive, metadata or update deposit's status. """ def read_archive_to(self, archive_update_url, archive_path, log=None): """Retrieve the archive from the deposit to a local directory. Args: archive_update_url (str): The full deposit archive(s)'s raw content to retrieve locally archive_path (str): the local archive's path where to store the raw content Returns: The archive path to the local archive to load. Or None if any problem arose. """ r = requests.get(archive_update_url, stream=True) if r.ok: with open(archive_path, 'wb') as f: for chunk in r.iter_content(): f.write(chunk) return archive_path msg = 'Problem when retrieving deposit archive at %s' % ( archive_update_url, ) if log: log.error(msg) raise ValueError(msg) def read_metadata(self, metadata_url, log=None): """Retrieve the metadata information on a given deposit. Args: metadata_url (str): The full deposit metadata url to retrieve locally Returns: The dictionary of metadata for that deposit or None if any problem arose. """ r = requests.get(metadata_url) if r.ok: data = r.json() return data msg = 'Problem when retrieving metadata at %s' % metadata_url if log: log.error(msg) raise ValueError(msg) def update_status(self, update_status_url, status, revision_id=None): """Update the deposit's status. Args: update_status_url (str): the full deposit's archive status (str): The status to update the deposit with revision_id (str/None): the revision's identifier to update to """ payload = {'status': status} if revision_id: payload['revision_id'] = revision_id requests.put(update_status_url, json=payload) class DepositLoader(loader.TarLoader): """Deposit loader implementation. This is a subclass of the :class:TarLoader as the main goal of this class is to first retrieve the deposit's tarball contents as one and its associated metadata. Then provide said tarball to be loaded by the TarLoader. This will: - retrieves the deposit's archive locally - provide the archive to be loaded by the tar loader - clean up the temporary location used to retrieve the archive locally - update the deposit's status accordingly """ def __init__(self, client=None): super().__init__() if client: self.client = client else: self.client = DepositClient() def load(self, *, archive_url, deposit_meta_url, deposit_update_url): SWHLoader.load( self, archive_url=archive_url, deposit_meta_url=deposit_meta_url, deposit_update_url=deposit_update_url) def prepare(self, *, archive_url, deposit_meta_url, deposit_update_url): """Prepare the injection by first retrieving the deposit's raw archive content. """ self.deposit_update_url = deposit_update_url temporary_directory = tempfile.TemporaryDirectory() self.temporary_directory = temporary_directory archive_path = os.path.join(temporary_directory.name, 'archive.zip') archive = self.client.get_archive( archive_url, archive_path, log=self.log) metadata = self.client.get_metadata( deposit_meta_url, log=self.log) origin = metadata['origin'] visit_date = datetime.datetime.now(tz=datetime.timezone.utc) revision = metadata['revision'] occurrence = metadata['occurrence'] - self.client.update_deposit_status(deposit_update_url, 'injecting') super().prepare(tar_path=archive, origin=origin, visit_date=visit_date, revision=revision, occurrences=[occurrence]) def store_metadata(self): """Storing the origin_metadata during the load processus. Fetching tool and metadata_provider from storage and adding the metadata associated to the current origin. """ + origin_id = self.origin_id + visit_date = self.visit_date + provider = self.origin_metadata['provider'] + tool = self.origin_metadata['tool'] + metadata = self.origin_metadata['metadata'] try: - tool_id = self.storage.indexer_configuration_get( - self.origin_metadata['tool']) - provider_id = self.storage.metadata_provider_get_by( - self.origin_metadata['provider']) - - self.storage.origin_metadata_add(self.origin_id, - self.visit_date, - provider_id, - tool_id, - self.origin_metadata['metadata']) + self.send_origin_metadata(self, origin_id, visit_date, provider, + tool, metadata) except: self.log.exception('Problem when storing origin_metadata') def post_load(self, success=True): """Updating the deposit's status according to its loading status. If not successful, we update its status to failure. Otherwise, we update its status to 'success' and pass along its associated revision. """ try: if not success: self.client.update_deposit_status(self.deposit_update_url, status='failure') return # first retrieve the new revision [rev_id] = self.objects['revision'].keys() if rev_id: rev_id_hex = hashutil.hash_to_hex(rev_id) # then update the deposit's status to success with its # revision-id self.client.update_deposit_status(self.deposit_update_url, status='success', revision_id=rev_id_hex) except: self.log.exception( 'Problem when trying to update the deposit\'s status') def cleanup(self): """Clean up temporary directory where we retrieved the tarball. """ super().cleanup() self.temporary_directory.cleanup() diff --git a/swh/deposit/tests/__init__.py b/swh/deposit/tests/__init__.py index 5c41f6b3..8fd7474f 100644 --- a/swh/deposit/tests/__init__.py +++ b/swh/deposit/tests/__init__.py @@ -1,57 +1,71 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.deposit.config import setup_django_for from swh.deposit.config import SWHDefaultConfig # noqa from swh.loader.core.loader import SWHLoader TEST_CONFIG = { 'max_upload_size': 500, 'extraction_dir': '/tmp/swh-deposit/test/extraction-dir', + 'provider': { + 'provider_name': '', + 'provider_type': 'deposit_client', + 'provider_url': '', + 'metadata': { + } + }, + 'tool': { + 'tool_name': 'swh-deposit', + 'tool_version': '0.0.1', + 'tool_configuration': { + 'sword_version': '2' + } + } } def parse_deposit_config_file(base_filename=None, config_filename=None, additional_configs=None, global_config=True): return TEST_CONFIG TEST_LOADER_CONFIG = { 'extraction_dir': '/tmp/swh-loader-tar/test/', 'storage': { 'cls': 'remote', 'args': { 'url': 'http://localhost:unexisting-port/', } }, 'send_contents': False, 'send_directories': False, 'send_revisions': False, 'send_releases': False, 'send_occurrences': False, 'content_packet_size': 10, 'content_packet_size_bytes': 100 * 1024 * 1024, 'directory_packet_size': 10, 'revision_packet_size': 10, 'release_packet_size': 10, 'occurrence_packet_size': 10, } def parse_loader_config_file(base_filename=None, config_filename=None, additional_configs=None, global_config=True): return TEST_LOADER_CONFIG # monkey patch classes method permits to override, for tests purposes, # the default configuration without side-effect, i.e do not load the # configuration from disk SWHDefaultConfig.parse_config_file = parse_deposit_config_file SWHLoader.parse_config_file = parse_loader_config_file setup_django_for('testing')