diff --git a/debian/control b/debian/control --- a/debian/control +++ b/debian/control @@ -5,7 +5,9 @@ Build-Depends: debhelper (>= 9), dh-python (>= 2), python3-all, - python3-nose, + python3-arrow, + python3-requests, + python3-pytest, python3-setuptools, python3-swh.core (>= 0.0.46~), python3-swh.loader.core (>= 0.0.35~), diff --git a/debian/rules b/debian/rules --- a/debian/rules +++ b/debian/rules @@ -1,7 +1,7 @@ #!/usr/bin/make -f export PYBUILD_NAME=swh.loader.tar -export export PYBUILD_TEST_ARGS=--with-doctest -sv -a !db,!fs +export PYBUILD_TEST_ARGS=-m 'not db and not fs' %: dh $@ --with python3 --buildsystem=pybuild diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,8 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html +arrow vcversioner -retrying +requests click python-dateutil diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -48,7 +48,7 @@ install_requires=parse_requirements() + parse_requirements('swh'), setup_requires=['vcversioner'], extras_require={'testing': parse_requirements('test')}, - vcversioner={}, + vcversioner={'version_module_paths': ['swh/loader/tar/_version.py']}, include_package_data=True, classifiers=[ "Programming Language :: Python :: 3", diff --git a/swh/loader/tar/.gitignore b/swh/loader/tar/.gitignore new file mode 100644 --- /dev/null +++ b/swh/loader/tar/.gitignore @@ -0,0 +1 @@ +_version.py \ No newline at end of file diff --git a/swh/loader/tar/build.py b/swh/loader/tar/build.py --- a/swh/loader/tar/build.py +++ b/swh/loader/tar/build.py @@ -3,9 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import os - -from swh.core import utils +import arrow # Static setup @@ -16,61 +14,36 @@ 'fullname': 'Software Heritage', 'email': 'robot@softwareheritage.org' } -REVISION_MESSAGE = 'synthetic revision message' +REVISION_MESSAGE = 'swh-loader-tar: synthetic revision message' REVISION_TYPE = 'tar' -def compute_origin(url_scheme, url_type, root_dirpath, tarpath): - """Compute the origin. - - Args: - - url_scheme: scheme to build the origin's url - - url_type: origin's type - - root_dirpath: the top level root directory path - - tarpath: file's absolute path - - Returns: - Dictionary origin with keys: - - url: origin's url - - type: origin's type - - """ - relative_path = utils.commonname(root_dirpath, tarpath) - return { - 'url': ''.join([url_scheme, - os.path.dirname(relative_path)]), - 'type': url_type, - } - - -def _time_from_path(tarpath): +def _time_from_last_modified(last_modified): """Compute the modification time from the tarpath. Args: - tarpath (str|bytes): Full path to the archive to extract the - date from. + last_modified (str): Last modification time Returns: - dict representing a timestamp with keys seconds and microseconds keys. + dict representing a timestamp with keys {seconds, microseconds} """ - mtime = os.lstat(tarpath).st_mtime - if isinstance(mtime, float): - normalized_time = list(map(int, str(mtime).split('.'))) - else: # assuming int - normalized_time = [mtime, 0] - + last_modified = arrow.get(last_modified) + mtime = last_modified.float_timestamp + normalized_time = list(map(int, str(mtime).split('.'))) return { 'seconds': normalized_time[0], 'microseconds': normalized_time[1] } -def compute_revision(tarpath): +def compute_revision(tarpath, last_modified): """Compute a revision. Args: - tarpath: absolute path to the tarball + tarpath (str): absolute path to the tarball + last_modified (str): Time of last modification read from the + source remote (most probably by the lister) Returns: Revision as dict: @@ -84,7 +57,8 @@ - message: cf. REVISION_MESSAGE """ - ts = _time_from_path(tarpath) + ts = _time_from_last_modified(last_modified) + return { 'date': { 'timestamp': ts, @@ -98,4 +72,5 @@ 'committer': SWH_PERSON, 'type': REVISION_TYPE, 'message': REVISION_MESSAGE, + 'synthetic': True, } diff --git a/swh/loader/tar/file.py b/swh/loader/tar/file.py deleted file mode 100644 --- a/swh/loader/tar/file.py +++ /dev/null @@ -1,90 +0,0 @@ -# Copyright (C) 2015-2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import itertools -import os - -from swh.core import tarball -from swh.loader.tar import utils - - -def archives_from_dir(path): - """Given a path to a directory, walk such directory and yield tuple of - tarpath, fname. - - Args: - path: top level directory - - Returns: - Generator of tuple tarpath, filename with tarpath a tarball. - - """ - for dirpath, dirnames, filenames in os.walk(path): - for fname in filenames: - tarpath = os.path.join(dirpath, fname) - if not os.path.exists(tarpath): - continue - - if tarball.is_tarball(tarpath): - yield tarpath, fname - - -def archives_from_file(mirror_file): - """Given a path to a file containing one tarball per line, yield a tuple of - tarpath, fname. - - Args: - mirror_file: path to the file containing list of tarpath. - - Returns: - Generator of tuple tarpath, filename with tarpath a tarball. - - """ - with open(mirror_file, 'r') as f: - for tarpath in f.readlines(): - tarpath = tarpath.strip() - if not os.path.exists(tarpath): - print('WARN: %s does not exist. Skipped.' % tarpath) - continue - - if tarball.is_tarball(tarpath): - yield tarpath, os.path.basename(tarpath) - - -def archives_from(path): - """From path, list tuple of tarpath, fname. - - Args: - path: top directory to list archives from or custom file format. - - Returns: - Generator of tuple tarpath, filename with tarpath a tarball. - - """ - if os.path.isfile(path): - yield from archives_from_file(path) - elif os.path.isdir(path): - yield from archives_from_dir(path) - else: - raise ValueError( - 'Input incorrect, %s must be a file or a directory.' % path) - - -def random_archives_from(path, block, limit=None): - """Randomize by size block the archives. - - Returns: - Generator of randomized tuple tarpath, filename with tarpath a tarball. - - """ - random_archives = utils.random_blocks(archives_from(path), - block, - fillvalue=(None, None)) - - if limit: - random_archives = itertools.islice(random_archives, limit) - - for tarpath, fname in ((t, f) for t, f in random_archives if t and f): - yield tarpath, fname diff --git a/swh/loader/tar/loader.py b/swh/loader/tar/loader.py --- a/swh/loader/tar/loader.py +++ b/swh/loader/tar/loader.py @@ -6,15 +6,108 @@ import os import tempfile +import requests import shutil +from tempfile import mkdtemp + from swh.core import tarball from swh.loader.core.loader import BufferedLoader -from swh.loader.dir import loader +from swh.loader.dir.loader import revision_from, snapshot_from from swh.model.hashutil import MultiHash +from swh.model.from_disk import Directory + +from .build import compute_revision + +try: + from _version import __version__ +except ImportError: + __version__ = 'devel' + + +TEMPORARY_DIR_PREFIX_PATTERN = 'swh.loader.tar.' +DEBUG_MODE = '** DEBUG MODE **' + + +class LocalResponse: + """Local Response class with iter_content api + + """ + def __init__(self, path): + self.path = path + + def iter_content(self, chunk_size=None): + with open(self.path, 'rb') as f: + for chunk in f: + yield chunk + +class ArchiveFetcher: + """Http/Local client in charge of downloading archives from a + remote/local server. -class TarLoader(loader.DirLoader): + Args: + temp_directory (str): Path to the temporary disk location used + for downloading the release artifacts + + """ + def __init__(self, temp_directory=None): + self.temp_directory = temp_directory + self.session = requests.session() + self.params = { + 'headers': { + 'User-Agent': 'Software Heritage Tar Loader (%s)' % ( + __version__ + ) + } + } + + def download(self, url): + """Download the remote tarball url locally. + + Args: + url (str): Url (file or http*) + + Raises: + ValueError in case of failing to query + + Returns: + Tuple of local (filepath, hashes of filepath) + + """ + if url.startswith('file://'): + # FIXME: How to improve this + path = url.strip('file:').replace('///', '/') + response = LocalResponse(path) + length = os.path.getsize(path) + else: + response = self.session.get(url, **self.params, stream=True) + if response.status_code != 200: + raise ValueError("Fail to query '%s'. Reason: %s" % ( + url, response.status_code)) + length = int(response.headers['content-length']) + + filepath = os.path.join(self.temp_directory, os.path.basename(url)) + + h = MultiHash(length=length) + with open(filepath, 'wb') as f: + for chunk in response.iter_content(chunk_size=None): + h.update(chunk) + f.write(chunk) + + actual_length = os.path.getsize(filepath) + if length != actual_length: + raise ValueError('Error when checking size: %s != %s' % ( + length, actual_length)) + + hashes = { + 'length': length, + **h.hexdigest() + } + return filepath, hashes + + +class TarLoader(BufferedLoader): """Tarball loader implementation. This is a subclass of the :class:DirLoader as the main goal of @@ -35,33 +128,38 @@ CONFIG_BASE_FILENAME = 'loader/tar' ADDITIONAL_CONFIG = { - 'extraction_dir': ('string', '/tmp') + 'working_dir': ('string', '/tmp'), + 'debug': ('bool', False), # NOT FOR PRODUCTION } def __init__(self, logging_class='swh.loader.tar.TarLoader', config=None): super().__init__(logging_class=logging_class, config=config) + self.local_cache = None self.dir_path = None + working_dir = self.config['working_dir'] + os.makedirs(working_dir, exist_ok=True) + self.temp_directory = mkdtemp( + suffix='-%s' % os.getpid(), + prefix=TEMPORARY_DIR_PREFIX_PATTERN, + dir=working_dir) + self.client = ArchiveFetcher(temp_directory=self.temp_directory) + os.makedirs(working_dir, 0o755, exist_ok=True) + self.dir_path = tempfile.mkdtemp(prefix='swh.loader.tar-', + dir=self.temp_directory) + self.debug = self.config['debug'] - def load(self, *, tar_path, origin, visit_date, revision, - branch_name=None): - """Load a tarball in `tarpath` in the Software Heritage Archive. - - Args: - tar_path: tarball to import - origin (dict): an origin dictionary as returned by - :func:`swh.storage.storage.Storage.origin_get_one` - visit_date (str): the date the origin was visited (as an - isoformatted string) - revision (dict): a revision as passed to - :func:`swh.storage.storage.Storage.revision_add`, excluding the - `id` and `directory` keys (computed from the directory) - branch_name (str): the optional branch_name to use for snapshot + def cleanup(self): + """Clean up temporary disk folders used. """ - # Shortcut super() as we use different arguments than the DirLoader. - return BufferedLoader.load(self, tar_path=tar_path, origin=origin, - visit_date=visit_date, revision=revision, - branch_name=branch_name) + if self.debug: + self.log.warn('%s Will not clean up temp dir %s' % ( + DEBUG_MODE, self.temp_directory + )) + return + if os.path.exists(self.temp_directory): + self.log.debug('Clean up %s' % self.temp_directory) + shutil.rmtree(self.temp_directory) def prepare_origin_visit(self, *, origin, visit_date=None, **kwargs): self.origin = origin @@ -69,85 +167,63 @@ self.origin['type'] = 'tar' self.visit_date = visit_date - def prepare(self, *, tar_path, origin, revision, visit_date=None, - branch_name=None): - """1. Uncompress the tarball in a temporary directory. - 2. Compute some metadata to update the revision. + def prepare(self, *args, **kwargs): + """last_modified is the time of last modification of the tarball. + + E.g https://ftp.gnu.org/gnu/8sync/: + [ ] 8sync-0.1.0.tar.gz 2016-04-22 16:35 217K + [ ] 8sync-0.1.0.tar.gz.sig 2016-04-22 16:35 543 """ - # Prepare the extraction path - extraction_dir = self.config['extraction_dir'] - os.makedirs(extraction_dir, 0o755, exist_ok=True) - self.dir_path = tempfile.mkdtemp(prefix='swh.loader.tar-', - dir=extraction_dir) + self.last_modified = kwargs.get('last_modified') - # add checksums in revision + def fetch_data(self): + """Retrieve and uncompress the archive. - self.log.info('Uncompress %s to %s' % (tar_path, self.dir_path)) - nature = tarball.uncompress(tar_path, self.dir_path) + """ + # fetch the remote tarball locally + url = self.origin['url'] + filepath, hashes = self.client.download(url) - if 'metadata' not in revision: - artifact = MultiHash.from_path(tar_path).hexdigest() - artifact['name'] = os.path.basename(tar_path) - artifact['archive_type'] = nature - artifact['length'] = os.path.getsize(tar_path) - revision['metadata'] = { - 'original_artifact': [artifact], + # add checksums in revision + self.log.info('Uncompress %s to %s' % (filepath, self.dir_path)) + nature = tarball.uncompress(filepath, self.dir_path) + + dir_path = self.dir_path.encode('utf-8') + directory = Directory.from_disk(path=dir_path, save_path=True) + objects = directory.collect() + if 'content' not in objects: + objects['content'] = {} + if 'directory' not in objects: + objects['directory'] = {} + + # compute the full revision (with ids) + revision = { + **compute_revision(filepath, self.last_modified), + 'metadata': { + 'original_artifact': [{ + 'name': os.path.basename(filepath), + 'archive_type': nature, + **hashes, + }], } - - branch = branch_name if branch_name else os.path.basename(tar_path) - - super().prepare(dir_path=self.dir_path, - origin=origin, - visit_date=visit_date, - revision=revision, - release=None, - branch_name=branch) - - def cleanup(self): - """Clean up temporary directory where we uncompress the tarball. - - """ - if self.dir_path and os.path.exists(self.dir_path): - shutil.rmtree(self.dir_path) - - -if __name__ == '__main__': - import click - import logging - logging.basicConfig( - level=logging.DEBUG, - format='%(asctime)s %(process)d %(message)s' - ) - - @click.command() - @click.option('--archive-path', required=1, help='Archive path to load') - @click.option('--origin-url', required=1, help='Origin url to associate') - @click.option('--visit-date', default=None, - help='Visit date time override') - def main(archive_path, origin_url, visit_date): - """Loading archive tryout.""" - import datetime - origin = {'url': origin_url, 'type': 'tar'} - commit_time = int(datetime.datetime.now( - tz=datetime.timezone.utc).timestamp()) - swh_person = { - 'name': 'Software Heritage', - 'fullname': 'Software Heritage', - 'email': 'robot@softwareheritage.org' } - revision = { - 'date': {'timestamp': commit_time, 'offset': 0}, - 'committer_date': {'timestamp': commit_time, 'offset': 0}, - 'author': swh_person, - 'committer': swh_person, - 'type': 'tar', - 'message': 'swh-loader-tar: synthetic revision message', - 'metadata': {}, - 'synthetic': True, + revision = revision_from(directory.hash, revision) + objects['revision'] = { + revision['id']: revision, } - TarLoader().load(tar_path=archive_path, origin=origin, - visit_date=visit_date, revision=revision, - branch_name='master') - main() + branch_name = os.path.basename(self.dir_path) + snapshot = snapshot_from(revision['id'], branch_name) + objects['snapshot'] = { + snapshot['id']: snapshot + } + self.objects = objects + + def store_data(self): + objects = self.objects + self.maybe_load_contents(objects['content'].values()) + self.maybe_load_directories(objects['directory'].values()) + self.maybe_load_revisions(objects['revision'].values()) + snapshot = list(objects['snapshot'].values())[0] + self.maybe_load_snapshot(snapshot) diff --git a/swh/loader/tar/producer.py b/swh/loader/tar/producer.py deleted file mode 100755 --- a/swh/loader/tar/producer.py +++ /dev/null @@ -1,102 +0,0 @@ -# Copyright (C) 2015-2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import click -import dateutil.parser - -from swh.scheduler.utils import get_task - -from swh.core import config -from swh.loader.tar import build, file - - -TASK_QUEUE = 'swh.loader.tar.tasks.LoadTarRepository' - - -def produce_archive_messages_from( - conf, root_dir, visit_date, mirror_file=None, dry_run=False): - """From root_dir, produce archive tarball messages to celery. - - Will print error message when some computation arise on archive - and continue. - - Args: - conf: dictionary holding static metadata - root_dir: top directory to list archives from. - visit_date: override origin's visit date of information - mirror_file: a filtering file of tarballs to load - dry_run: will compute but not send messages - - Returns: - Number of messages generated - - """ - - limit = conf.get('limit') - block = int(conf['block_messages']) - count = 0 - - path_source_tarballs = mirror_file if mirror_file else root_dir - - visit_date = dateutil.parser.parse(visit_date) - if not dry_run: - task = get_task(TASK_QUEUE) - - for tarpath, _ in file.random_archives_from( - path_source_tarballs, block, limit): - try: - origin = build.compute_origin( - conf['url_scheme'], conf['type'], root_dir, tarpath) - revision = build.compute_revision(tarpath) - - if not dry_run: - task.delay(tar_path=tarpath, origin=origin, - visit_date=visit_date, - revision=revision) - - count += 1 - except ValueError: - print('Problem with the following archive: %s' % tarpath) - - return count - - -@click.command() -@click.option('--config-file', required=1, - help='Configuration file path') -@click.option('--dry-run/--no-dry-run', default=False, - help='Dry run (print repo only)') -@click.option('--limit', default=None, - help='Number of origins limit to send') -def main(config_file, dry_run, limit): - """Tarball producer of local fs tarballs. - - """ - conf = config.read(config_file) - url_scheme = conf['url_scheme'] - mirror_dir = conf['mirror_root_directory'] - - # remove trailing / in configuration (to ease ulterior computation) - if url_scheme[-1] == '/': - conf['url_scheme'] = url_scheme[0:-1] - - if mirror_dir[-1] == '/': - conf['mirror_root_directory'] = mirror_dir[0:-1] - - if limit: - conf['limit'] = int(limit) - - nb_tarballs = produce_archive_messages_from( - conf=conf, - root_dir=conf['mirror_root_directory'], - visit_date=conf['date'], - mirror_file=conf.get('mirror_subset_archives'), - dry_run=dry_run) - - print('%s tarball(s) sent to worker.' % nb_tarballs) - - -if __name__ == '__main__': - main() diff --git a/swh/loader/tar/tests/test_build.py b/swh/loader/tar/tests/test_build.py --- a/swh/loader/tar/tests/test_build.py +++ b/swh/loader/tar/tests/test_build.py @@ -10,29 +10,12 @@ class TestBuildUtils(unittest.TestCase): - def test_compute_origin(self): - # given - expected_origin = { - 'url': 'rsync://some/url/package-foo', - 'type': 'rsync', - } - - # when - actual_origin = build.compute_origin( - 'rsync://some/url/', - 'rsync', - '/some/root/path/', - '/some/root/path/package-foo/package-foo-1.2.3.tgz') - - # then - self.assertEqual(actual_origin, expected_origin) - - @patch('swh.loader.tar.build._time_from_path') - def test_compute_revision(self, mock_time_from_path): - mock_time_from_path.return_value = 'some-other-time' + @patch('swh.loader.tar.build._time_from_last_modified') + def test_compute_revision(self, mock_time_from_last_modified): + mock_time_from_last_modified.return_value = 'some-other-time' # when - actual_revision = build.compute_revision('/some/path') + actual_revision = build.compute_revision('/some/path', 'last-modified') expected_revision = { 'date': { @@ -47,40 +30,29 @@ 'committer': build.SWH_PERSON, 'type': build.REVISION_TYPE, 'message': build.REVISION_MESSAGE, + 'synthetic': True, } # then self.assertEqual(actual_revision, expected_revision) - mock_time_from_path.assert_called_once_with('/some/path') + mock_time_from_last_modified.assert_called_once_with( + 'last-modified') - @patch('swh.loader.tar.build.os') - def test_time_from_path_with_float(self, mock_os): - class MockStat: - st_mtime = 1445348286.8308342 - mock_os.lstat.return_value = MockStat() - - actual_time = build._time_from_path('some/path') + def test_time_from_last_modified_with_float(self): + actual_time = build._time_from_last_modified( + '2015-10-20T13:38:06.830834+00:00') self.assertEqual(actual_time, { 'seconds': 1445348286, - 'microseconds': 8308342 + 'microseconds': 830834 }) - mock_os.lstat.assert_called_once_with('some/path') - - @patch('swh.loader.tar.build.os') - def test_time_from_path_with_int(self, mock_os): - class MockStat: - st_mtime = 1445348286 - - mock_os.lstat.return_value = MockStat() - - actual_time = build._time_from_path('some/path') + def test_time_from_last_modified_with_int(self): + actual_time = build._time_from_last_modified( + '2015-10-20T13:38:06+00:00') self.assertEqual(actual_time, { 'seconds': 1445348286, 'microseconds': 0 }) - - mock_os.lstat.assert_called_once_with('some/path') diff --git a/swh/loader/tar/tests/test_loader.py b/swh/loader/tar/tests/test_loader.py --- a/swh/loader/tar/tests/test_loader.py +++ b/swh/loader/tar/tests/test_loader.py @@ -14,7 +14,8 @@ TEST_CONFIG = { - 'extraction_dir': '/tmp/tests/loader-tar/', # where to extract the tarball + 'working_dir': '/tmp/tests/loader-tar/', # where to extract the tarball + 'debug': False, 'storage': { # we instantiate it but we don't use it in test context 'cls': 'memory', 'args': { @@ -64,57 +65,30 @@ """ # given origin = { - 'url': 'file:///tmp/sample-folder', - 'type': 'dir' + 'url': self.repo_url, + 'type': 'tar' } visit_date = 'Tue, 3 May 2016 17:16:32 +0200' - import datetime - commit_time = int(datetime.datetime( - 2018, 12, 5, 13, 35, 23, 0, - tzinfo=datetime.timezone(datetime.timedelta(hours=1)) - ).timestamp()) - - swh_person = { - 'name': 'Software Heritage', - 'fullname': 'Software Heritage', - 'email': 'robot@softwareheritage.org' - } - - revision_message = 'swh-loader-tar: synthetic revision message' - revision_type = 'tar' - revision = { - 'date': { - 'timestamp': commit_time, - 'offset': 0, - }, - 'committer_date': { - 'timestamp': commit_time, - 'offset': 0, - }, - 'author': swh_person, - 'committer': swh_person, - 'type': revision_type, - 'message': revision_message, - 'synthetic': True, - } - - branch_name = os.path.basename(self.tarpath) + last_modified = '2018-12-05T12:35:23+00:00' # when - self.loader.load(tar_path=self.tarpath, origin=origin, - visit_date=visit_date, revision=revision, - branch_name=branch_name) + self.loader.load( + origin=origin, visit_date=visit_date, last_modified=last_modified) # then self.assertCountContents(8, "3 files + 5 links") self.assertCountDirectories(6, "4 subdirs + 1 empty + 1 main dir") self.assertCountRevisions(1, "synthetic revision") + # hash changed because the revision is now computed (date is different) + rev_id = b'\xd3\xf5\x91\x07H\xb1i\xe6\xddY\xd3\xff\xc9\xe1\xf1l\'"6L' rev_id = hashutil.hash_to_bytes( '67a7d7dda748f9a86b56a13d9218d16f5cc9ab3d') + actual_revision = next(self.storage.revision_get([rev_id])) + print(actual_revision) self.assertTrue(actual_revision['synthetic']) self.assertEqual(actual_revision['parents'], []) self.assertEqual(actual_revision['type'], 'tar')