diff --git a/swh/loader/tar/loader.py b/swh/loader/tar/loader.py index d0da44d..52db7b6 100644 --- a/swh/loader/tar/loader.py +++ b/swh/loader/tar/loader.py @@ -1,237 +1,328 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import tempfile import requests import shutil from tempfile import mkdtemp from swh.core import tarball from swh.loader.core.loader import BufferedLoader from swh.loader.dir.loader import revision_from, snapshot_from from swh.model.hashutil import MultiHash from swh.model.from_disk import Directory from .build import compute_revision try: from _version import __version__ except ImportError: __version__ = 'devel' TEMPORARY_DIR_PREFIX_PATTERN = 'swh.loader.tar.' DEBUG_MODE = '** DEBUG MODE **' class LocalResponse: """Local Response class with iter_content api """ def __init__(self, path): self.path = path def iter_content(self, chunk_size=None): with open(self.path, 'rb') as f: for chunk in f: yield chunk class ArchiveFetcher: """Http/Local client in charge of downloading archives from a remote/local server. Args: temp_directory (str): Path to the temporary disk location used for downloading the release artifacts """ def __init__(self, temp_directory=None): self.temp_directory = temp_directory self.session = requests.session() self.params = { 'headers': { 'User-Agent': 'Software Heritage Tar Loader (%s)' % ( __version__ ) } } def download(self, url): """Download the remote tarball url locally. Args: url (str): Url (file or http*) Raises: ValueError in case of failing to query Returns: Tuple of local (filepath, hashes of filepath) """ if url.startswith('file://'): # FIXME: How to improve this path = url.strip('file:').replace('///', '/') response = LocalResponse(path) length = os.path.getsize(path) else: response = self.session.get(url, **self.params, stream=True) if response.status_code != 200: raise ValueError("Fail to query '%s'. Reason: %s" % ( url, response.status_code)) length = int(response.headers['content-length']) filepath = os.path.join(self.temp_directory, os.path.basename(url)) h = MultiHash(length=length) with open(filepath, 'wb') as f: for chunk in response.iter_content(chunk_size=None): h.update(chunk) f.write(chunk) actual_length = os.path.getsize(filepath) if length != actual_length: raise ValueError('Error when checking size: %s != %s' % ( length, actual_length)) hashes = { 'length': length, **h.hexdigest() } return filepath, hashes -class TarLoader(BufferedLoader): - """Tarball loader implementation. +class BaseTarLoader(BufferedLoader): + """Base Tarball Loader class. - This is a subclass of the :class:DirLoader as the main goal of - this class is to first uncompress a tarball, then provide the - uncompressed directory/tree to be loaded by the DirLoader. + This factorizes multiple loader implementations: - This will: + - :class:`RemoteTarLoader`: New implementation able to deal with + remote archives. + + - :class:`TarLoader`: Old implementation which dealt with only + local archive. It also was only passing along objects to + persist (revision, etc...) - - creates an origin (if it does not exist) - - creates a fetch_history entry - - creates an origin_visit - - uncompress locally the tarball in a temporary location - - process the content of the tarballs to persist on swh storage - - clean up the temporary location - - write an entry in fetch_history to mark the loading tarball end (success - or failure) """ CONFIG_BASE_FILENAME = 'loader/tar' ADDITIONAL_CONFIG = { 'working_dir': ('string', '/tmp'), 'debug': ('bool', False), # NOT FOR PRODUCTION } def __init__(self, logging_class='swh.loader.tar.TarLoader', config=None): super().__init__(logging_class=logging_class, config=config) self.local_cache = None self.dir_path = None working_dir = self.config['working_dir'] os.makedirs(working_dir, exist_ok=True) self.temp_directory = mkdtemp( suffix='-%s' % os.getpid(), prefix=TEMPORARY_DIR_PREFIX_PATTERN, dir=working_dir) self.client = ArchiveFetcher(temp_directory=self.temp_directory) os.makedirs(working_dir, 0o755, exist_ok=True) self.dir_path = tempfile.mkdtemp(prefix='swh.loader.tar-', dir=self.temp_directory) self.debug = self.config['debug'] def cleanup(self): """Clean up temporary disk folders used. """ if self.debug: self.log.warn('%s Will not clean up temp dir %s' % ( DEBUG_MODE, self.temp_directory )) return if os.path.exists(self.temp_directory): self.log.debug('Clean up %s' % self.temp_directory) shutil.rmtree(self.temp_directory) def prepare_origin_visit(self, *, origin, visit_date=None, **kwargs): self.origin = origin if 'type' not in self.origin: # let the type flow if present self.origin['type'] = 'tar' self.visit_date = visit_date - def prepare(self, *, origin, last_modified, visit_date=None): - """last_modified is the time of last modification of the tarball. - - E.g https://ftp.gnu.org/gnu/8sync/: - [ ] 8sync-0.1.0.tar.gz 2016-04-22 16:35 217K - [ ] 8sync-0.1.0.tar.gz.sig 2016-04-22 16:35 543 - - Args: - origin (dict): Dict with keys {url, type} - last_modified (str): The date of last modification of the - archive to ingest. - visit_date (str): Date representing the date of the - visit. None by default will make it the current time - during the loading process. + def compute_tarball_url_to_retrieve(self): + """Compute the tarball url to allow retrieval """ - self.last_modified = last_modified + pass def fetch_data(self): - """Retrieve and uncompress the archive. + """Retrieve, uncompress archive and fetch objects from the archive. """ - # fetch the remote tarball locally - url = self.origin['url'] + url = self.compute_tarball_url_to_retrieve() filepath, hashes = self.client.download(url) - - # add checksums in revision - self.log.info('Uncompress %s to %s' % (filepath, self.dir_path)) nature = tarball.uncompress(filepath, self.dir_path) dir_path = self.dir_path.encode('utf-8') directory = Directory.from_disk(path=dir_path, save_path=True) objects = directory.collect() if 'content' not in objects: objects['content'] = {} if 'directory' not in objects: objects['directory'] = {} # compute the full revision (with ids) - revision = { - **compute_revision(filepath, self.last_modified), - 'metadata': { - 'original_artifact': [{ - 'name': os.path.basename(filepath), - 'archive_type': nature, - **hashes, - }], - } - } + revision = self.build_revision(filepath, nature, hashes) revision = revision_from(directory.hash, revision) objects['revision'] = { revision['id']: revision, } - branch_name = os.path.basename(self.dir_path) - snapshot = snapshot_from(revision['id'], branch_name) + snapshot = self.build_snapshot(revision) objects['snapshot'] = { snapshot['id']: snapshot } self.objects = objects def store_data(self): objects = self.objects self.maybe_load_contents(objects['content'].values()) self.maybe_load_directories(objects['directory'].values()) self.maybe_load_revisions(objects['revision'].values()) snapshot = list(objects['snapshot'].values())[0] self.maybe_load_snapshot(snapshot) + + +class RemoteTarLoader(BaseTarLoader): + """Tarball loader implementation. + + This will: + + - create an origin (if it does not exist) and a visit + - fetch the tarball in a temporary location + - uncompress it locally in a temporary location + - process the content of the tarballs to persist on swh storage + - clean up the temporary location + + """ + def prepare(self, *, origin, last_modified, visit_date=None): + """last_modified is the time of last modification of the tarball. + + E.g https://ftp.gnu.org/gnu/8sync/: + [ ] 8sync-0.1.0.tar.gz 2016-04-22 16:35 217K + [ ] 8sync-0.1.0.tar.gz.sig 2016-04-22 16:35 543 + + Args: + origin (dict): Dict with keys {url, type} + last_modified (str): The date of last modification of the + archive to ingest. + visit_date (str): Date representing the date of the + visit. None by default will make it the current time + during the loading process. + + """ + self.last_modified = last_modified + + def compute_tarball_url_to_retrieve(self): + return self.origin['url'] + + def build_revision(self, filepath, nature, hashes): + """Build the revision with identifier + + We use the `last_modified` date provided by the caller to + build the revision. + + """ + return { + **compute_revision(filepath, self.last_modified), + 'metadata': { + 'original_artifact': [{ + 'name': os.path.basename(filepath), + 'archive_type': nature, + **hashes, + }], + } + } + + def build_snapshot(self, revision): + """Build the snapshot targetting the revision. + + """ + branch_name = os.path.basename(self.dir_path) + return snapshot_from(revision['id'], branch_name) + + +class TarLoader(BaseTarLoader): + """Old Tarball loader implementation. + + This will: + + - create an origin (if it does not exist) and a visit + - uncompress a tarball in a local and temporary location + - process the content of the tarballs to persist on swh storage + - associate it to a passed revision and snapshot + - clean up the temporary location + + """ + def prepare(self, *, tar_path, origin, revision, branch_name, + visit_date=None): + """Prepare the data prior to ingest it in SWH archive. + + Args: + tar_path (str): Path to the archive to ingest + origin (dict): Dict with keys {url, type} + revision (dict): The synthetic revision to associate the + archive to (no identifiers within) + branch_name (str): The branch name to use for the + snapshot. + visit_date (str): Date representing the date of the + visit. None by default will make it the current time + during the loading process. + + """ + self.tar_path = tar_path + self.revision = revision + self.branch_name = branch_name + + def compute_tarball_url_to_retrieve(self): + return 'file://%s' % self.tar_path + + def build_revision(self, filepath, nature, hashes): + """Build the revision with identifier + + We use the revision provided by the caller as a scaffolding + revision. + + """ + return { + **self.revision, + 'metadata': { + 'original_artifact': [{ + 'name': os.path.basename(filepath), + 'archive_type': nature, + **hashes, + }], + } + } + + def build_snapshot(self, revision): + """Build the snapshot targetting the revision. + + We use the branch_name provided by the caller as a scaffolding + as well. + + """ + return snapshot_from(revision['id'], self.branch_name) diff --git a/swh/loader/tar/tasks.py b/swh/loader/tar/tasks.py index 4aeec02..cba07fb 100644 --- a/swh/loader/tar/tasks.py +++ b/swh/loader/tar/tasks.py @@ -1,26 +1,26 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.task import Task -from swh.loader.tar.loader import TarLoader +from swh.loader.tar.loader import RemoteTarLoader class LoadTarRepository(Task): """Import a remote or local archive to Software Heritage """ task_queue = 'swh_loader_tar' def run_task(self, *, origin, visit_date, last_modified): """Import a tarball into swh. Args: see :func:`TarLoader.prepare`. """ - loader = TarLoader() + loader = RemoteTarLoader() loader.log = self.log return loader.load( origin=origin, visit_date=visit_date, last_modified=last_modified) diff --git a/swh/loader/tar/tests/test_loader.py b/swh/loader/tar/tests/test_loader.py index 87b4a77..ba8a9e6 100644 --- a/swh/loader/tar/tests/test_loader.py +++ b/swh/loader/tar/tests/test_loader.py @@ -1,109 +1,204 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest from swh.model import hashutil from swh.loader.core.tests import BaseLoaderTest -from swh.loader.tar.loader import TarLoader +from swh.loader.tar.build import SWH_PERSON +from swh.loader.tar.loader import RemoteTarLoader, TarLoader TEST_CONFIG = { 'working_dir': '/tmp/tests/loader-tar/', # where to extract the tarball 'debug': False, 'storage': { # we instantiate it but we don't use it in test context 'cls': 'memory', 'args': { } }, 'send_contents': True, 'send_directories': True, 'send_revisions': True, 'send_releases': True, 'send_snapshot': True, 'content_packet_size': 100, 'content_packet_block_size_bytes': 104857600, 'content_packet_size_bytes': 1073741824, 'directory_packet_size': 250, 'revision_packet_size': 100, 'release_packet_size': 100, 'content_size_limit': 1000000000 } -class TarLoaderTest(TarLoader): +class RemoteTarLoaderForTest(RemoteTarLoader): def parse_config_file(self, *args, **kwargs): return TEST_CONFIG -class TestTarLoader(BaseLoaderTest): - """Prepare the archive to load +class PrepareDataForTestLoader(BaseLoaderTest): + """Prepare the archive to load (test fixture). """ def setUp(self): super().setUp('sample-folder.tgz', start_path=os.path.dirname(__file__), uncompress_archive=False) self.tarpath = self.destination_path -class TestTarLoader1(TestTarLoader): +class TestTarLoader1(PrepareDataForTestLoader): + """Test the remote loader + + """ def setUp(self): super().setUp() - self.loader = TarLoaderTest() + self.loader = RemoteTarLoaderForTest() self.storage = self.loader.storage @pytest.mark.fs def test_load(self): """Process a new tarball should be ok """ # given origin = { 'url': self.repo_url, 'type': 'tar' } visit_date = 'Tue, 3 May 2016 17:16:32 +0200' last_modified = '2018-12-05T12:35:23+00:00' # when self.loader.load( origin=origin, visit_date=visit_date, last_modified=last_modified) # then self.assertCountContents(8, "3 files + 5 links") self.assertCountDirectories(6, "4 subdirs + 1 empty + 1 main dir") self.assertCountRevisions(1, "synthetic revision") rev_id = hashutil.hash_to_bytes( '67a7d7dda748f9a86b56a13d9218d16f5cc9ab3d') actual_revision = next(self.storage.revision_get([rev_id])) self.assertTrue(actual_revision['synthetic']) self.assertEqual(actual_revision['parents'], []) self.assertEqual(actual_revision['type'], 'tar') self.assertEqual(actual_revision['message'], b'swh-loader-tar: synthetic revision message') self.assertEqual(actual_revision['directory'], b'\xa7A\xfcM\x96\x8c{\x8e<\x94\xff\x86\xe7\x04\x80\xc5\xc7\xe5r\xa9') # noqa self.assertEqual( actual_revision['metadata']['original_artifact'][0], { 'sha1_git': 'cc848944a0d3e71d287027347e25467e61b07428', 'archive_type': 'tar', 'blake2s256': '5d70923443ad36377cd58e993aff0e3c1b9ef14f796c69569105d3a99c64f075', # noqa 'name': 'sample-folder.tgz', 'sha1': '3ca0d0a5c6833113bd532dc5c99d9648d618f65a', 'length': 555, 'sha256': '307ebda0071ca5975f618e192c8417161e19b6c8bf581a26061b76dc8e85321d' # noqa }) self.assertCountReleases(0) self.assertCountSnapshots(1) + + +class TarLoaderForTest(TarLoader): + def parse_config_file(self, *args, **kwargs): + return TEST_CONFIG + + +class TestTarLoader2(PrepareDataForTestLoader): + """Test the legacy tar loader + + """ + + def setUp(self): + super().setUp() + self.loader = TarLoaderForTest() + self.storage = self.loader.storage + + @pytest.mark.fs + def test_load(self): + """Process a new tarball should be ok + + """ + # given + origin = { + 'url': self.repo_url, + 'type': 'tar' + } + + visit_date = 'Tue, 3 May 2016 17:16:32 +0200' + + import datetime + commit_time = int(datetime.datetime( + 2018, 12, 5, 13, 35, 23, 0, + tzinfo=datetime.timezone(datetime.timedelta(hours=1)) + ).timestamp()) + + revision_message = 'swh-loader-tar: synthetic revision message' + revision_type = 'tar' + revision = { + 'date': { + 'timestamp': commit_time, + 'offset': 0, + }, + 'committer_date': { + 'timestamp': commit_time, + 'offset': 0, + }, + 'author': SWH_PERSON, + 'committer': SWH_PERSON, + 'type': revision_type, + 'message': revision_message, + 'synthetic': True, + } + + branch_name = os.path.basename(self.tarpath) + + # when + self.loader.load(tar_path=self.tarpath, origin=origin, + visit_date=visit_date, revision=revision, + branch_name=branch_name) + + # then + self.assertCountContents(8, "3 files + 5 links") + self.assertCountDirectories(6, "4 subdirs + 1 empty + 1 main dir") + self.assertCountRevisions(1, "synthetic revision") + + rev_id = hashutil.hash_to_bytes( + '67a7d7dda748f9a86b56a13d9218d16f5cc9ab3d') + actual_revision = next(self.storage.revision_get([rev_id])) + self.assertTrue(actual_revision['synthetic']) + self.assertEqual(actual_revision['parents'], []) + self.assertEqual(actual_revision['type'], 'tar') + self.assertEqual(actual_revision['message'], + b'swh-loader-tar: synthetic revision message') + self.assertEqual(actual_revision['directory'], + b'\xa7A\xfcM\x96\x8c{\x8e<\x94\xff\x86\xe7\x04\x80\xc5\xc7\xe5r\xa9') # noqa + + self.assertEqual( + actual_revision['metadata']['original_artifact'][0], + { + 'sha1_git': 'cc848944a0d3e71d287027347e25467e61b07428', + 'archive_type': 'tar', + 'blake2s256': '5d70923443ad36377cd58e993aff0e3c1b9ef14f796c69569105d3a99c64f075', # noqa + 'name': 'sample-folder.tgz', + 'sha1': '3ca0d0a5c6833113bd532dc5c99d9648d618f65a', + 'length': 555, + 'sha256': '307ebda0071ca5975f618e192c8417161e19b6c8bf581a26061b76dc8e85321d' # noqa + }) + + self.assertCountReleases(0) + self.assertCountSnapshots(1) diff --git a/swh/loader/tar/tests/test_tasks.py b/swh/loader/tar/tests/test_tasks.py index aaaa157..2fa9a30 100644 --- a/swh/loader/tar/tests/test_tasks.py +++ b/swh/loader/tar/tests/test_tasks.py @@ -1,31 +1,31 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from unittest.mock import patch from swh.loader.tar.tasks import LoadTarRepository class TestTasks(unittest.TestCase): def test_check_task_name(self): task = LoadTarRepository() self.assertEqual(task.task_queue, 'swh_loader_tar') - @patch('swh.loader.tar.loader.TarLoader.load') + @patch('swh.loader.tar.loader.RemoteTarLoader.load') def test_task(self, mock_loader): mock_loader.return_value = {'status': 'eventful'} task = LoadTarRepository() # given actual_result = task.run_task( origin='origin', visit_date='visit_date', last_modified='last_modified') self.assertEqual(actual_result, {'status': 'eventful'}) mock_loader.assert_called_once_with( origin='origin', visit_date='visit_date', last_modified='last_modified')