diff --git a/swh/deposit/injection/loader.py b/swh/deposit/injection/loader.py index 6043c7fc..9ee652b6 100644 --- a/swh/deposit/injection/loader.py +++ b/swh/deposit/injection/loader.py @@ -1,168 +1,177 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import os import requests import tempfile from swh.model import hashutil from swh.loader.tar import loader from swh.loader.core.loader import SWHLoader -def retrieve_archive_to(archive_update_url, archive_path, log=None): - """Retrieve the archive from the deposit to a local directory. - - Args: - - archive_update_url (str): The full deposit archive(s)'s raw content - to retrieve locally - - archive_path (str): the local archive's path where to store - the raw content - - Returns: - The archive path to the local archive to load. - Or None if any problem arose. +class DepositClient: + """Deposit client to read archive, metadata or update deposit's status. """ - r = requests.get(archive_update_url, stream=True) - if r.ok: - with open(archive_path, 'wb') as f: - for chunk in r.iter_content(): - f.write(chunk) + def read_archive_to(self, archive_update_url, archive_path, log=None): + """Retrieve the archive from the deposit to a local directory. - return archive_path + Args: + archive_update_url (str): The full deposit archive(s)'s raw content + to retrieve locally - msg = 'Problem when retrieving deposit archive at %s' % ( - archive_update_url, ) - if log: - log.error(msg) + archive_path (str): the local archive's path where to store + the raw content - raise ValueError(msg) + Returns: + The archive path to the local archive to load. + Or None if any problem arose. + """ + r = requests.get(archive_update_url, stream=True) + if r.ok: + with open(archive_path, 'wb') as f: + for chunk in r.iter_content(): + f.write(chunk) -def retrieve_metadata(metadata_url, log=None): - """Retrieve the metadata information on a given deposit. + return archive_path - Args: + msg = 'Problem when retrieving deposit archive at %s' % ( + archive_update_url, ) + if log: + log.error(msg) - metadata_url (str): The full deposit metadata url to retrieve - locally + raise ValueError(msg) - Returns: - The dictionary of metadata for that deposit or None if any - problem arose. + def read_metadata(self, metadata_url, log=None): + """Retrieve the metadata information on a given deposit. - """ - r = requests.get(metadata_url) - if r.ok: - data = r.json() + Args: + metadata_url (str): The full deposit metadata url to retrieve + locally - return data + Returns: + The dictionary of metadata for that deposit or None if any + problem arose. - msg = 'Problem when retrieving metadata at %s' % metadata_url - if log: - log.error(msg) + """ + r = requests.get(metadata_url) + if r.ok: + data = r.json() - raise ValueError(msg) + return data + msg = 'Problem when retrieving metadata at %s' % metadata_url + if log: + log.error(msg) -def update_deposit_status(update_status_url, status, revision_id=None): - """Update the deposit's status. + raise ValueError(msg) - Args: - update_status_url (str): the full deposit's archive - status (str): The status to update the deposit with - revision_id (str/None): the revision's identifier to update to + def update_status(self, update_status_url, status, + revision_id=None): + """Update the deposit's status. - """ - payload = {'status': status} - if revision_id: - payload['revision_id'] = revision_id - requests.put(update_status_url, json=payload) + Args: + update_status_url (str): the full deposit's archive + status (str): The status to update the deposit with + revision_id (str/None): the revision's identifier to update to + + """ + payload = {'status': status} + if revision_id: + payload['revision_id'] = revision_id + requests.put(update_status_url, json=payload) class DepositLoader(loader.TarLoader): """Deposit loader implementation. This is a subclass of the :class:TarLoader as the main goal of this class is to first retrieve the deposit's tarball contents as one and its associated metadata. Then provide said tarball to be loaded by the TarLoader. This will: - retrieves the deposit's archive locally - provide the archive to be loaded by the tar loader - clean up the temporary location used to retrieve the archive locally - update the deposit's status accordingly """ + def __init__(self, client=None): + super().__init__() + if client: + self.client = client + else: + self.client = DepositClient() + def load(self, *, archive_url, deposit_meta_url, deposit_update_url): SWHLoader.load( self, archive_url=archive_url, deposit_meta_url=deposit_meta_url, deposit_update_url=deposit_update_url) def prepare(self, *, archive_url, deposit_meta_url, deposit_update_url): """Prepare the injection by first retrieving the deposit's raw archive content. """ self.deposit_update_url = deposit_update_url temporary_directory = tempfile.TemporaryDirectory() self.temporary_directory = temporary_directory archive_path = os.path.join(temporary_directory.name, 'archive.zip') - archive = retrieve_archive_to(archive_url, archive_path, log=self.log) + archive = self.client.get_archive( + archive_url, archive_path, log=self.log) - metadata = retrieve_metadata(deposit_meta_url, log=self.log) + metadata = self.client.get_metadata( + deposit_meta_url, log=self.log) origin = metadata['origin'] visit_date = datetime.datetime.now(tz=datetime.timezone.utc) revision = metadata['revision'] occurrence = metadata['occurrence'] - update_deposit_status(deposit_update_url, 'injecting') + self.client.update_deposit_status(deposit_update_url, 'injecting') super().prepare(tar_path=archive, origin=origin, visit_date=visit_date, revision=revision, occurrences=[occurrence]) def post_load(self, success=True): """Updating the deposit's status according to its loading status. If not successful, we update its status to failure. Otherwise, we update its status to 'success' and pass along its associated revision. """ try: if not success: - update_deposit_status(self.deposit_update_url, - status='failure') + self.client.update_deposit_status(self.deposit_update_url, + status='failure') return # first retrieve the new revision - occs = list(self.storage.occurrence_get(self.origin_id)) - if occs: - occ = occs[0] - revision_id = hashutil.hash_to_hex(occ['target']) + [rev_id] = self.objects['revision'].keys() + if rev_id: + rev_id_hex = hashutil.hash_to_hex(rev_id) # then update the deposit's status to success with its # revision-id - update_deposit_status(self.deposit_update_url, - status='success', - revision_id=revision_id) + self.client.update_deposit_status(self.deposit_update_url, + status='success', + revision_id=rev_id_hex) except: self.log.exception( 'Problem when trying to update the deposit\'s status') def cleanup(self): """Clean up temporary directory where we retrieved the tarball. """ super().cleanup() self.temporary_directory.cleanup() diff --git a/swh/deposit/tests/__init__.py b/swh/deposit/tests/__init__.py index a71164ff..5c41f6b3 100644 --- a/swh/deposit/tests/__init__.py +++ b/swh/deposit/tests/__init__.py @@ -1,26 +1,57 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.deposit.config import setup_django_for from swh.deposit.config import SWHDefaultConfig # noqa +from swh.loader.core.loader import SWHLoader + TEST_CONFIG = { 'max_upload_size': 500, 'extraction_dir': '/tmp/swh-deposit/test/extraction-dir', } -def parse_config_file(base_filename=None, config_filename=None, - additional_configs=None, global_config=True): +def parse_deposit_config_file(base_filename=None, config_filename=None, + additional_configs=None, global_config=True): return TEST_CONFIG -# monkey patch this class method permits to override, for tests -# purposes, the default configuration without side-effect, i.e do not -# load the configuration from disk -SWHDefaultConfig.parse_config_file = parse_config_file +TEST_LOADER_CONFIG = { + 'extraction_dir': '/tmp/swh-loader-tar/test/', + 'storage': { + 'cls': 'remote', + 'args': { + 'url': 'http://localhost:unexisting-port/', + } + }, + 'send_contents': False, + 'send_directories': False, + 'send_revisions': False, + 'send_releases': False, + 'send_occurrences': False, + + 'content_packet_size': 10, + 'content_packet_size_bytes': 100 * 1024 * 1024, + 'directory_packet_size': 10, + 'revision_packet_size': 10, + 'release_packet_size': 10, + 'occurrence_packet_size': 10, +} + + +def parse_loader_config_file(base_filename=None, config_filename=None, + additional_configs=None, global_config=True): + return TEST_LOADER_CONFIG + + +# monkey patch classes method permits to override, for tests purposes, +# the default configuration without side-effect, i.e do not load the +# configuration from disk +SWHDefaultConfig.parse_config_file = parse_deposit_config_file +SWHLoader.parse_config_file = parse_loader_config_file setup_django_for('testing') diff --git a/swh/deposit/tests/test_loader.py b/swh/deposit/tests/test_loader.py new file mode 100644 index 00000000..99d7ede7 --- /dev/null +++ b/swh/deposit/tests/test_loader.py @@ -0,0 +1,216 @@ +# Copyright (C) 2016-2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import json +import unittest +import shutil + +from nose.tools import istest +from nose.plugins.attrib import attr +from rest_framework.test import APITestCase + +from swh.model import hashutil +from swh.deposit.injection.loader import DepositLoader, DepositClient +from swh.deposit.config import PRIVATE_GET_RAW_CONTENT +from swh.deposit.config import PRIVATE_GET_DEPOSIT_METADATA +from swh.deposit.config import PRIVATE_PUT_DEPOSIT +from django.core.urlresolvers import reverse + + +from . import TEST_LOADER_CONFIG +from .common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine +from .common import FileSystemCreationRoutine + + +class DepositLoaderInhibitsStorage: + """Mixin class to inhibit the persistence and keep in memory the data + sent for storage. + + cf. SWHDepositLoaderNoStorage + + """ + def __init__(self): + super().__init__() + # typed data + self.state = { + 'origin': [], + 'origin_visit': [], + 'content': [], + 'directory': [], + 'revision': [], + 'release': [], + 'occurrence': [], + } + + def _add(self, type, l): + """Add without duplicates and keeping the insertion order. + + Args: + type (str): Type of objects concerned by the action + l ([object]): List of 'type' object + + """ + col = self.state[type] + for o in l: + if o in col: + continue + col.extend([o]) + + def send_origin(self, origin): + origin.update({'id': 1}) + self._add('origin', [origin]) + return origin['id'] + + def send_origin_visit(self, origin_id, visit_date): + origin_visit = { + 'origin': origin_id, + 'visit_date': visit_date, + 'visit': 1, + } + self._add('origin_visit', [origin_visit]) + return origin_visit + + def maybe_load_contents(self, contents): + self._add('content', contents) + + def maybe_load_directories(self, directories): + self._add('directory', directories) + + def maybe_load_revisions(self, revisions): + self._add('revision', revisions) + + def maybe_load_releases(self, releases): + self._add('release', releases) + + def maybe_load_occurrences(self, occurrences): + self._add('occurrence', occurrences) + + def open_fetch_history(self): + pass + + def close_fetch_history_failure(self, fetch_history_id): + pass + + def close_fetch_history_success(self, fetch_history_id): + pass + + def update_origin_visit(self, origin_id, visit, status): + self.status = status + + # Override to do nothing at the end + def close_failure(self): + pass + + def close_success(self): + pass + + +class TestLoaderUtils(unittest.TestCase): + def assertRevisionsOk(self, expected_revisions): + """Check the loader's revisions match the expected revisions. + + Expects self.loader to be instantiated and ready to be + inspected (meaning the loading took place). + + Args: + expected_revisions (dict): Dict with key revision id, + value the targeted directory id. + + """ + # The last revision being the one used later to start back from + for rev in self.loader.state['revision']: + rev_id = hashutil.hash_to_hex(rev['id']) + directory_id = hashutil.hash_to_hex(rev['directory']) + + self.assertEquals(expected_revisions[rev_id], directory_id) + + +class SWHDepositLoaderNoStorage(DepositLoaderInhibitsStorage, DepositLoader): + """Loader to test. + + It inherits from the actual deposit loader to actually test its + correct behavior. It also inherits from + DepositLoaderInhibitsStorageLoader so that no persistence takes place. + + """ + pass + + +@attr('fs') +class DepositLoaderScenarioTest(APITestCase, WithAuthTestCase, + BasicTestCase, CommonCreationRoutine, + FileSystemCreationRoutine, TestLoaderUtils): + + def setUp(self): + super().setUp() + + self.server = 'http://localhost/' + + # 1. create a deposit with archive and metadata + self.deposit_id = self.create_simple_binary_deposit() + + me = self + + class SWHDepositTestClient(DepositClient): + def get_archive(self, archive_update_url, archive_path, + log=None): + r = me.client.get(archive_update_url) + # import os + # os.makedirs(os.path.dirname(archive_path), exist_ok=True) + with open(archive_path, 'wb') as f: + for chunk in r.streaming_content: + f.write(chunk) + + return archive_path + + def get_metadata(self, metadata_url, log=None): + r = me.client.get(metadata_url) + return json.loads(r.content.decode('utf-8')) + + def update_deposit_status(self, update_status_url, status, + revision_id=None): + payload = {'status': status} + if revision_id: + payload['revision_id'] = revision_id + me.client.put(update_status_url, + content_type='application/json', + data=json.dumps(payload)) + + # 2. setup loader with no persistence + self.loader = SWHDepositLoaderNoStorage() + # and a basic client which accesses the data + # setuped in that test + self.loader.client = SWHDepositTestClient() + + def tearDown(self): + super().tearDown() + shutil.rmtree(TEST_LOADER_CONFIG['extraction_dir']) + + @istest + def inject_deposit_ready(self): + """Load a deposit which is ready + + """ + args = [self.collection.name, self.deposit_id] + + archive_url = reverse(PRIVATE_GET_RAW_CONTENT, args=args) + deposit_meta_url = reverse(PRIVATE_GET_DEPOSIT_METADATA, args=args) + deposit_update_url = reverse(PRIVATE_PUT_DEPOSIT, args=args) + + # when + self.loader.load(archive_url=archive_url, + deposit_meta_url=deposit_meta_url, + deposit_update_url=deposit_update_url) + + # then + self.assertEquals(len(self.loader.state['content']), 1) + self.assertEquals(len(self.loader.state['directory']), 1) + self.assertEquals(len(self.loader.state['revision']), 1) + self.assertEquals(len(self.loader.state['release']), 0) + self.assertEquals(len(self.loader.state['occurrence']), 1) + + # FIXME enrich state introspection + # expected_revisions = {} + # self.assertRevisionsOk(expected_revisions)