diff --git a/swh/loader/tar/loader.py b/swh/loader/tar/loader.py index 763d37b..095f6bb 100644 --- a/swh/loader/tar/loader.py +++ b/swh/loader/tar/loader.py @@ -1,350 +1,346 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import tempfile import requests import shutil from urllib.parse import urlparse from tempfile import mkdtemp from swh.core import tarball from swh.loader.core.loader import BufferedLoader from swh.loader.dir.loader import revision_from, snapshot_from from swh.model.hashutil import MultiHash, HASH_BLOCK_SIZE from swh.model.from_disk import Directory from .build import compute_revision try: from _version import __version__ except ImportError: __version__ = 'devel' TEMPORARY_DIR_PREFIX_PATTERN = 'swh.loader.tar.' DEBUG_MODE = '** DEBUG MODE **' class LocalResponse: """Local Response class with iter_content api """ def __init__(self, path): self.path = path def iter_content(self, chunk_size=None): with open(self.path, 'rb') as f: for chunk in f: yield chunk class ArchiveFetcher: """Http/Local client in charge of downloading archives from a remote/local server. Args: temp_directory (str): Path to the temporary disk location used for downloading the release artifacts """ def __init__(self, temp_directory=None): self.temp_directory = temp_directory self.session = requests.session() self.params = { 'headers': { 'User-Agent': 'Software Heritage Tar Loader (%s)' % ( __version__ ) } } def download(self, url): """Download the remote tarball url locally. Args: url (str): Url (file or http*) Raises: ValueError in case of failing to query Returns: Tuple of local (filepath, hashes of filepath) """ url_parsed = urlparse(url) if url_parsed.scheme == 'file': path = url_parsed.path response = LocalResponse(path) length = os.path.getsize(path) else: response = self.session.get(url, **self.params, stream=True) if response.status_code != 200: raise ValueError("Fail to query '%s'. Reason: %s" % ( url, response.status_code)) length = int(response.headers['content-length']) filepath = os.path.join(self.temp_directory, os.path.basename(url)) h = MultiHash(length=length) with open(filepath, 'wb') as f: for chunk in response.iter_content(chunk_size=HASH_BLOCK_SIZE): h.update(chunk) f.write(chunk) actual_length = os.path.getsize(filepath) if length != actual_length: raise ValueError('Error when checking size: %s != %s' % ( length, actual_length)) hashes = { 'length': length, **h.hexdigest() } return filepath, hashes class BaseTarLoader(BufferedLoader): """Base Tarball Loader class. This factorizes multiple loader implementations: - :class:`RemoteTarLoader`: New implementation able to deal with remote archives. - :class:`TarLoader`: Old implementation which dealt with only local archive. It also was only passing along objects to persist (revision, etc...) """ CONFIG_BASE_FILENAME = 'loader/tar' ADDITIONAL_CONFIG = { 'working_dir': ('string', '/tmp'), 'debug': ('bool', False), # NOT FOR PRODUCTION } def __init__(self, logging_class='swh.loader.tar.TarLoader', config=None): super().__init__(logging_class=logging_class, config=config) self.local_cache = None self.dir_path = None working_dir = self.config['working_dir'] os.makedirs(working_dir, exist_ok=True) self.temp_directory = mkdtemp( suffix='-%s' % os.getpid(), prefix=TEMPORARY_DIR_PREFIX_PATTERN, dir=working_dir) self.client = ArchiveFetcher(temp_directory=self.temp_directory) os.makedirs(working_dir, 0o755, exist_ok=True) self.dir_path = tempfile.mkdtemp(prefix='swh.loader.tar-', dir=self.temp_directory) self.debug = self.config['debug'] def cleanup(self): """Clean up temporary disk folders used. """ if self.debug: self.log.warn('%s Will not clean up temp dir %s' % ( DEBUG_MODE, self.temp_directory )) return if os.path.exists(self.temp_directory): self.log.debug('Clean up %s' % self.temp_directory) shutil.rmtree(self.temp_directory) def prepare_origin_visit(self, *, origin, visit_date=None, **kwargs): """Prepare the origin visit information. Args: origin (dict): Dict with keys {url, type} visit_date (str): Date representing the date of the visit. None by default will make it the current time during the loading process. """ self.origin = origin if 'type' not in self.origin: # let the type flow if present self.origin['type'] = 'tar' self.visit_date = visit_date def get_tarball_url_to_retrieve(self): """Compute the tarball url to allow retrieval """ raise NotImplementedError() def fetch_data(self): """Retrieve, uncompress archive and fetch objects from the tarball. The actual ingestion takes place in the :meth:`store_data` implementation below. """ url = self.get_tarball_url_to_retrieve() filepath, hashes = self.client.download(url) nature = tarball.uncompress(filepath, self.dir_path) dir_path = self.dir_path.encode('utf-8') directory = Directory.from_disk(path=dir_path, save_path=True) objects = directory.collect() if 'content' not in objects: objects['content'] = {} if 'directory' not in objects: objects['directory'] = {} # compute the full revision (with ids) revision = self.build_revision(filepath, nature, hashes) revision = revision_from(directory.hash, revision) objects['revision'] = { revision['id']: revision, } snapshot = self.build_snapshot(revision) objects['snapshot'] = { snapshot['id']: snapshot } self.objects = objects def store_data(self): """Store the objects in the swh archive. """ objects = self.objects self.maybe_load_contents(objects['content'].values()) self.maybe_load_directories(objects['directory'].values()) self.maybe_load_revisions(objects['revision'].values()) snapshot = list(objects['snapshot'].values())[0] self.maybe_load_snapshot(snapshot) class RemoteTarLoader(BaseTarLoader): """This is able to load from remote/local archive into the swh archive. This will: - create an origin (if it does not exist) and a visit - fetch the tarball in a temporary location - uncompress it locally in a temporary location - process the content of the tarball to persist on swh storage - clean up the temporary location """ def prepare(self, *, last_modified, **kwargs): """last_modified is the time of last modification of the tarball. E.g https://ftp.gnu.org/gnu/8sync/: [ ] 8sync-0.1.0.tar.gz 2016-04-22 16:35 217K [ ] 8sync-0.1.0.tar.gz.sig 2016-04-22 16:35 543 [ ] ... Args: origin (dict): Dict with keys {url, type} last_modified (str): The date of last modification of the archive to ingest. visit_date (str): Date representing the date of the visit. None by default will make it the current time during the loading process. """ self.last_modified = last_modified def get_tarball_url_to_retrieve(self): return self.origin['url'] def build_revision(self, filepath, nature, hashes): """Build the revision with identifier We use the `last_modified` date provided by the caller to build the revision. """ return { **compute_revision(filepath, self.last_modified), 'metadata': { 'original_artifact': [{ 'name': os.path.basename(filepath), 'archive_type': nature, **hashes, }], } } def build_snapshot(self, revision): """Build the snapshot targeting the revision. """ branch_name = os.path.basename(self.dir_path) return snapshot_from(revision['id'], branch_name) class LegacyLocalTarLoader(BaseTarLoader): """This loads local tarball into the swh archive. It's using the revision and branch provided by the caller as scaffolding to create the full revision and snapshot (with identifiers). This is what's: - been used to ingest our 2015 rsync copy of gnu.org - still used by the loader deposit This will: - create an origin (if it does not exist) and a visit - uncompress a tarball in a local and temporary location - process the content of the tarball to persist on swh storage - associate it to a passed revision and snapshot - clean up the temporary location """ def prepare(self, *, tar_path, revision, branch_name, **kwargs): """Prepare the data prior to ingest it in SWH archive. Args: tar_path (str): Path to the archive to ingest revision (dict): The synthetic revision to associate the archive to (no identifiers within) branch_name (str): The branch name to use for the snapshot. """ self.tar_path = tar_path self.revision = revision self.branch_name = branch_name def get_tarball_url_to_retrieve(self): return 'file://%s' % self.tar_path def build_revision(self, filepath, nature, hashes): """Build the revision with identifier We use the revision provided by the caller as a scaffolding revision. """ return { **self.revision, 'metadata': { 'original_artifact': [{ 'name': os.path.basename(filepath), 'archive_type': nature, **hashes, }], } } def build_snapshot(self, revision): """Build the snapshot targeting the revision. We use the branch_name provided by the caller as a scaffolding as well. """ return snapshot_from(revision['id'], self.branch_name) - - -# Aliasing for retro-compatibility -TarLoader = LegacyLocalTarLoader diff --git a/swh/loader/tar/tests/test_loader.py b/swh/loader/tar/tests/test_loader.py index d75ef52..30a2e70 100644 --- a/swh/loader/tar/tests/test_loader.py +++ b/swh/loader/tar/tests/test_loader.py @@ -1,239 +1,239 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import requests_mock from swh.model import hashutil from swh.loader.core.tests import BaseLoaderTest from swh.loader.tar.build import SWH_PERSON -from swh.loader.tar.loader import RemoteTarLoader, TarLoader +from swh.loader.tar.loader import RemoteTarLoader, LegacyLocalTarLoader TEST_CONFIG = { 'working_dir': '/tmp/tests/loader-tar/', # where to extract the tarball 'debug': False, 'storage': { # we instantiate it but we don't use it in test context 'cls': 'memory', 'args': { } }, 'send_contents': True, 'send_directories': True, 'send_revisions': True, 'send_releases': True, 'send_snapshot': True, 'content_packet_size': 100, 'content_packet_block_size_bytes': 104857600, 'content_packet_size_bytes': 1073741824, 'directory_packet_size': 250, 'revision_packet_size': 100, 'release_packet_size': 100, 'content_size_limit': 1000000000 } class RemoteTarLoaderForTest(RemoteTarLoader): def parse_config_file(self, *args, **kwargs): return TEST_CONFIG @pytest.mark.fs class PrepareDataForTestLoader(BaseLoaderTest): """Prepare the archive to load (test fixture). """ def setUp(self): super().setUp('sample-folder.tgz', start_path=os.path.dirname(__file__), uncompress_archive=False) self.tarpath = self.destination_path def assert_data_ok(self): # then self.assertCountContents(8, "3 files + 5 links") self.assertCountDirectories(6, "4 subdirs + 1 empty + 1 main dir") self.assertCountRevisions(1, "synthetic revision") rev_id = hashutil.hash_to_bytes( '67a7d7dda748f9a86b56a13d9218d16f5cc9ab3d') actual_revision = next(self.storage.revision_get([rev_id])) self.assertTrue(actual_revision['synthetic']) self.assertEqual(actual_revision['parents'], []) self.assertEqual(actual_revision['type'], 'tar') self.assertEqual(actual_revision['message'], b'swh-loader-tar: synthetic revision message') self.assertEqual(actual_revision['directory'], b'\xa7A\xfcM\x96\x8c{\x8e<\x94\xff\x86\xe7\x04\x80\xc5\xc7\xe5r\xa9') # noqa self.assertEqual( actual_revision['metadata']['original_artifact'][0], { 'sha1_git': 'cc848944a0d3e71d287027347e25467e61b07428', 'archive_type': 'tar', 'blake2s256': '5d70923443ad36377cd58e993aff0e3c1b9ef14f796c69569105d3a99c64f075', # noqa 'name': 'sample-folder.tgz', 'sha1': '3ca0d0a5c6833113bd532dc5c99d9648d618f65a', 'length': 555, 'sha256': '307ebda0071ca5975f618e192c8417161e19b6c8bf581a26061b76dc8e85321d' # noqa }) self.assertCountReleases(0) self.assertCountSnapshots(1) class TestRemoteTarLoader(PrepareDataForTestLoader): """Test the remote loader scenario (local/remote) """ def setUp(self): super().setUp() self.loader = RemoteTarLoaderForTest() self.storage = self.loader.storage def test_load_local(self): """Load a local tarball should result in persisted swh data """ # given origin = { 'url': self.repo_url, 'type': 'tar' } visit_date = 'Tue, 3 May 2016 17:16:32 +0200' last_modified = '2018-12-05T12:35:23+00:00' # when self.loader.load( origin=origin, visit_date=visit_date, last_modified=last_modified) # then self.assert_data_ok() @requests_mock.Mocker() def test_load_remote(self, mock_requests): """Load a remote tarball should result in persisted swh data """ # setup the mock to stream the content of the tarball local_url = self.repo_url.replace('file:///', '/') url = 'https://nowhere.org/%s' % local_url with open(local_url, 'rb') as f: data = f.read() mock_requests.get(url, content=data, headers={ 'content-length': str(len(data)) }) # given origin = { 'url': url, 'type': 'tar' } visit_date = 'Tue, 3 May 2016 17:16:32 +0200' last_modified = '2018-12-05T12:35:23+00:00' # when self.loader.load( origin=origin, visit_date=visit_date, last_modified=last_modified) self.assert_data_ok() @requests_mock.Mocker() def test_load_remote_download_failure(self, mock_requests): """Load a remote tarball with download failure should result in no data """ # setup the mock to stream the content of the tarball local_url = self.repo_url.replace('file:///', '/') url = 'https://nowhere.org/%s' % local_url with open(local_url, 'rb') as f: data = f.read() wrong_length = len(data) - 10 mock_requests.get(url, content=data, headers={ 'content-length': str(wrong_length) }) # given origin = { 'url': url, 'type': 'tar' } visit_date = 'Tue, 3 May 2016 17:16:32 +0200' last_modified = '2018-12-05T12:35:23+00:00' # when r = self.loader.load( origin=origin, visit_date=visit_date, last_modified=last_modified) self.assertEqual(r, {'status': 'failed'}) self.assertCountContents(0) self.assertCountDirectories(0) self.assertCountRevisions(0) self.assertCountSnapshots(0) -class TarLoaderForTest(TarLoader): +class TarLoaderForTest(LegacyLocalTarLoader): def parse_config_file(self, *args, **kwargs): return TEST_CONFIG class TestTarLoader(PrepareDataForTestLoader): """Test the legacy tar loader """ def setUp(self): super().setUp() self.loader = TarLoaderForTest() self.storage = self.loader.storage def test_load(self): """Load a local tarball should result in persisted swh data """ # given origin = { 'url': self.repo_url, 'type': 'tar' } visit_date = 'Tue, 3 May 2016 17:16:32 +0200' import datetime commit_time = int(datetime.datetime( 2018, 12, 5, 13, 35, 23, 0, tzinfo=datetime.timezone(datetime.timedelta(hours=1)) ).timestamp()) revision_message = 'swh-loader-tar: synthetic revision message' revision_type = 'tar' revision = { 'date': { 'timestamp': commit_time, 'offset': 0, }, 'committer_date': { 'timestamp': commit_time, 'offset': 0, }, 'author': SWH_PERSON, 'committer': SWH_PERSON, 'type': revision_type, 'message': revision_message, 'synthetic': True, } branch_name = os.path.basename(self.tarpath) # when self.loader.load(tar_path=self.tarpath, origin=origin, visit_date=visit_date, revision=revision, branch_name=branch_name) # then self.assert_data_ok()