diff --git a/README.md b/README.md --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # SWH Tarball Loader -The Software Heritage Tarball Loader is a tool and a library to -uncompress a local tarball and inject into the SWH dataset its tree -representation. +The Software Heritage Tarball Loader is in charge of ingesting the +directory representation of the tarball into the Software Heritage +archive. ## Configuration @@ -11,7 +11,7 @@ *`{/etc/softwareheritage | ~/.config/swh | ~/.swh}`/loader/tar.yml*: ```YAML -extraction_dir: /home/storage/tmp/ +working_dir: /home/storage/tmp/ storage: cls: remote args: @@ -20,112 +20,40 @@ ## API -Load tarball directly from code or python3's toplevel: +### local + +Load local tarball directly from code or python3's toplevel: ``` Python # Fill in those -repo = 'loader-tar.tgz' +repo = 'convert-tryout.tgz' tarpath = '/home/storage/tar/%s' % repo -origin = {'url': 'ftp://%s' % repo, 'type': 'tar'} +origin = {'url': 'file://%s' % repo, 'type': 'tar'} visit_date = 'Tue, 3 May 2017 17:16:32 +0200' -revision = { - 'author': {'name': 'some', 'fullname': 'one', 'email': 'something'}, - 'committer': {'name': 'some', 'fullname': 'one', 'email': 'something'}, - 'message': '1.0 Released', - 'date': None, - 'committer_date': None, - 'type': 'tar', -} +last_modified = 'Tue, 10 May 2016 16:16:32 +0200' import logging logging.basicConfig(level=logging.DEBUG) from swh.loader.tar.tasks import LoadTarRepository l = LoadTarRepository() -l.run_task(tar_path=tarpath, origin=origin, visit_date=visit_date, - revision=revision, branch_name='master') -``` - -## Celery - -Load tarball using celery. - -Providing you have a properly configured celery up and running, the -celery worker configuration file needs to be updated: - -*`{/etc/softwareheritage | ~/.config/swh | ~/.swh}`/worker.yml*: - -``` YAML -task_modules: - - swh.loader.tar.tasks -task_queues: - - swh_loader_tar -``` - -cf. https://forge.softwareheritage.org/diffusion/DCORE/browse/master/README.md -for more details - - -## Tar Producer - -Its job is to compulse from a file or a folder a list of existing -tarballs. From this list, compute the corresponding messages to send -to the broker. - -### Configuration - -Message producer's configuration file (`tar.yml`): - -``` YAML -# Mirror's root directory holding tarballs to load into swh -mirror_root_directory: /srv/storage/space/mirrors/gnu.org/gnu/ -# Url scheme prefix used to create the origin url -url_scheme: http://ftp.gnu.org/gnu/ -type: ftp - -# File containing a subset list tarballs from mirror_root_directory to load. -# The file's format is one absolute path name to a tarball per line. -# NOTE: -# - This file must contain data consistent with the mirror_root_directory -# - if this option is not provided, the mirror_root_directory is scanned -# completely as usual -# mirror_subset_archives: /home/storage/missing-archives - -# Randomize blocks of messages and send for consumption -block_messages: 250 -``` - -### Run - -Trigger the message computations: - -```Shell -python3 -m swh.loader.tar.producer --config ~/.swh/producer/tar.yml +l.run_task(origin=origin, visit_date=visit_date, + last_modified=last_modified) ``` -This will walk the `mirror_root_directory` folder and send encountered -tarball messages for the swh-loader-tar to uncompress (through -celery). - -If the `mirror_subset_archives` is provided, the tarball messages will -be computed from such file (the `mirror_root_directory` is still used -so please be consistent). - -If problem arises during tarball message computation, a message will -be output with the tarball that present a problem. - -It will displayed the number of tarball messages sent at the end. +### remote -### Dry run +Load remote tarball is the same sample -``` Shell -python3 -m swh.loader.tar.producer --config-file ~/.swh/producer/tar.yml --dry-run -``` - -This will do the same as previously described but only display the -number of potential tarball messages computed. - -### Help +```Python +url = 'https://ftp.gnu.org/gnu/8sync/8sync-0.1.0.tar.gz' +origin = {'url': url, 'type': 'tar'} +visit_date = 'Tue, 3 May 2017 17:16:32 +0200' +last_modified = '2016-04-22 16:35' +import logging +logging.basicConfig(level=logging.DEBUG) -``` Shell -python3 -m swh.loader.tar.producer --help +from swh.loader.tar.tasks import LoadTarRepository +l = LoadTarRepository() +l.run_task(origin=origin, visit_date=visit_date, + last_modified=last_modified) ``` diff --git a/debian/control b/debian/control --- a/debian/control +++ b/debian/control @@ -5,7 +5,9 @@ Build-Depends: debhelper (>= 9), dh-python (>= 2), python3-all, - python3-nose, + python3-arrow, + python3-requests, + python3-pytest, python3-setuptools, python3-swh.core (>= 0.0.46~), python3-swh.loader.core (>= 0.0.35~), diff --git a/debian/rules b/debian/rules --- a/debian/rules +++ b/debian/rules @@ -1,7 +1,7 @@ #!/usr/bin/make -f export PYBUILD_NAME=swh.loader.tar -export export PYBUILD_TEST_ARGS=--with-doctest -sv -a !db,!fs +export PYBUILD_TEST_ARGS=-m 'not db and not fs' %: dh $@ --with python3 --buildsystem=pybuild diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,8 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html +arrow vcversioner -retrying +requests click python-dateutil diff --git a/resources/producer/tar-gnu.yml b/resources/producer/tar-gnu.yml deleted file mode 100644 --- a/resources/producer/tar-gnu.yml +++ /dev/null @@ -1,22 +0,0 @@ -# Mirror's root directory holding tarballs to load into swh -mirror_root_directory: /srv/softwareheritage/space/mirrors/gnu.org/gnu/ - -# Origin setup's possible scheme url -url_scheme: rsync://ftp.gnu.org/gnu/ - -# Origin type used for tarballs -type: ftp - -# File containing a subset list tarballs from mirror_root_directory to load. -# The file's format is one absolute path name to a tarball per line. -# NOTE: -# - This file must contain data consistent with the mirror_root_directory -# - if this option is not provided, the mirror_root_directory is scanned -# completely as usual -# mirror_subset_archives: /home/storage/missing-archives - -# Retrieval date information (rsync, etc...) -date: Fri, 28 Aug 2015 13:13:26 +0200 - -# Randomize blocks of messages and send for consumption -block_messages: 250 diff --git a/resources/producer/tar-old-gnu.yml b/resources/producer/tar-old-gnu.yml deleted file mode 100644 --- a/resources/producer/tar-old-gnu.yml +++ /dev/null @@ -1,22 +0,0 @@ -# Mirror's root directory holding tarballs to load into swh -mirror_root_directory: /home/storage/space/mirrors/gnu.org/old-gnu/ - -# Origin setup's possible scheme url -url_scheme: rsync://ftp.gnu.org/old-gnu/ - -# Origin type used for tarballs -type: ftp - -# File containing a subset list tarballs from mirror_root_directory to load. -# The file's format is one absolute path name to a tarball per line. -# NOTE: -# - This file must contain data consistent with the mirror_root_directory -# - if this option is not provided, the mirror_root_directory is scanned -# completely as usual -# mirror_subset_archives: /home/tony/work/inria/repo/swh-environment/swh-loader-tar/old-gnu-missing - -# Retrieval date information (rsync, etc...) -date: Fri, 28 Aug 2015 13:13:26 +0200 - -# Randomize blocks of messages and send for consumption -block_messages: 100 diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -48,7 +48,7 @@ install_requires=parse_requirements() + parse_requirements('swh'), setup_requires=['vcversioner'], extras_require={'testing': parse_requirements('test')}, - vcversioner={}, + vcversioner={'version_module_paths': ['swh/loader/tar/_version.py']}, include_package_data=True, classifiers=[ "Programming Language :: Python :: 3", diff --git a/swh/loader/tar/.gitignore b/swh/loader/tar/.gitignore new file mode 100644 --- /dev/null +++ b/swh/loader/tar/.gitignore @@ -0,0 +1 @@ +_version.py \ No newline at end of file diff --git a/swh/loader/tar/build.py b/swh/loader/tar/build.py --- a/swh/loader/tar/build.py +++ b/swh/loader/tar/build.py @@ -3,9 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import os - -from swh.core import utils +import arrow # Static setup @@ -16,61 +14,36 @@ 'fullname': 'Software Heritage', 'email': 'robot@softwareheritage.org' } -REVISION_MESSAGE = 'synthetic revision message' +REVISION_MESSAGE = 'swh-loader-tar: synthetic revision message' REVISION_TYPE = 'tar' -def compute_origin(url_scheme, url_type, root_dirpath, tarpath): - """Compute the origin. - - Args: - - url_scheme: scheme to build the origin's url - - url_type: origin's type - - root_dirpath: the top level root directory path - - tarpath: file's absolute path - - Returns: - Dictionary origin with keys: - - url: origin's url - - type: origin's type - - """ - relative_path = utils.commonname(root_dirpath, tarpath) - return { - 'url': ''.join([url_scheme, - os.path.dirname(relative_path)]), - 'type': url_type, - } - - -def _time_from_path(tarpath): +def _time_from_last_modified(last_modified): """Compute the modification time from the tarpath. Args: - tarpath (str|bytes): Full path to the archive to extract the - date from. + last_modified (str): Last modification time Returns: - dict representing a timestamp with keys seconds and microseconds keys. + dict representing a timestamp with keys {seconds, microseconds} """ - mtime = os.lstat(tarpath).st_mtime - if isinstance(mtime, float): - normalized_time = list(map(int, str(mtime).split('.'))) - else: # assuming int - normalized_time = [mtime, 0] - + last_modified = arrow.get(last_modified) + mtime = last_modified.float_timestamp + normalized_time = list(map(int, str(mtime).split('.'))) return { 'seconds': normalized_time[0], 'microseconds': normalized_time[1] } -def compute_revision(tarpath): +def compute_revision(tarpath, last_modified): """Compute a revision. Args: - tarpath: absolute path to the tarball + tarpath (str): absolute path to the tarball + last_modified (str): Time of last modification read from the + source remote (most probably by the lister) Returns: Revision as dict: @@ -84,7 +57,8 @@ - message: cf. REVISION_MESSAGE """ - ts = _time_from_path(tarpath) + ts = _time_from_last_modified(last_modified) + return { 'date': { 'timestamp': ts, @@ -98,4 +72,5 @@ 'committer': SWH_PERSON, 'type': REVISION_TYPE, 'message': REVISION_MESSAGE, + 'synthetic': True, } diff --git a/swh/loader/tar/file.py b/swh/loader/tar/file.py deleted file mode 100644 --- a/swh/loader/tar/file.py +++ /dev/null @@ -1,90 +0,0 @@ -# Copyright (C) 2015-2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import itertools -import os - -from swh.core import tarball -from swh.loader.tar import utils - - -def archives_from_dir(path): - """Given a path to a directory, walk such directory and yield tuple of - tarpath, fname. - - Args: - path: top level directory - - Returns: - Generator of tuple tarpath, filename with tarpath a tarball. - - """ - for dirpath, dirnames, filenames in os.walk(path): - for fname in filenames: - tarpath = os.path.join(dirpath, fname) - if not os.path.exists(tarpath): - continue - - if tarball.is_tarball(tarpath): - yield tarpath, fname - - -def archives_from_file(mirror_file): - """Given a path to a file containing one tarball per line, yield a tuple of - tarpath, fname. - - Args: - mirror_file: path to the file containing list of tarpath. - - Returns: - Generator of tuple tarpath, filename with tarpath a tarball. - - """ - with open(mirror_file, 'r') as f: - for tarpath in f.readlines(): - tarpath = tarpath.strip() - if not os.path.exists(tarpath): - print('WARN: %s does not exist. Skipped.' % tarpath) - continue - - if tarball.is_tarball(tarpath): - yield tarpath, os.path.basename(tarpath) - - -def archives_from(path): - """From path, list tuple of tarpath, fname. - - Args: - path: top directory to list archives from or custom file format. - - Returns: - Generator of tuple tarpath, filename with tarpath a tarball. - - """ - if os.path.isfile(path): - yield from archives_from_file(path) - elif os.path.isdir(path): - yield from archives_from_dir(path) - else: - raise ValueError( - 'Input incorrect, %s must be a file or a directory.' % path) - - -def random_archives_from(path, block, limit=None): - """Randomize by size block the archives. - - Returns: - Generator of randomized tuple tarpath, filename with tarpath a tarball. - - """ - random_archives = utils.random_blocks(archives_from(path), - block, - fillvalue=(None, None)) - - if limit: - random_archives = itertools.islice(random_archives, limit) - - for tarpath, fname in ((t, f) for t, f in random_archives if t and f): - yield tarpath, fname diff --git a/swh/loader/tar/loader.py b/swh/loader/tar/loader.py --- a/swh/loader/tar/loader.py +++ b/swh/loader/tar/loader.py @@ -6,148 +6,345 @@ import os import tempfile +import requests import shutil +from urllib.parse import urlparse + +from tempfile import mkdtemp from swh.core import tarball from swh.loader.core.loader import BufferedLoader -from swh.loader.dir import loader +from swh.loader.dir.loader import revision_from, snapshot_from from swh.model.hashutil import MultiHash +from swh.model.from_disk import Directory +from .build import compute_revision -class TarLoader(loader.DirLoader): - """Tarball loader implementation. +try: + from _version import __version__ +except ImportError: + __version__ = 'devel' - This is a subclass of the :class:DirLoader as the main goal of - this class is to first uncompress a tarball, then provide the - uncompressed directory/tree to be loaded by the DirLoader. - This will: +TEMPORARY_DIR_PREFIX_PATTERN = 'swh.loader.tar.' +DEBUG_MODE = '** DEBUG MODE **' + + +class LocalResponse: + """Local Response class with iter_content api + + """ + def __init__(self, path): + self.path = path + + def iter_content(self, chunk_size=None): + with open(self.path, 'rb') as f: + for chunk in f: + yield chunk + + +class ArchiveFetcher: + """Http/Local client in charge of downloading archives from a + remote/local server. + + Args: + temp_directory (str): Path to the temporary disk location used + for downloading the release artifacts + + """ + def __init__(self, temp_directory=None): + self.temp_directory = temp_directory + self.session = requests.session() + self.params = { + 'headers': { + 'User-Agent': 'Software Heritage Tar Loader (%s)' % ( + __version__ + ) + } + } + + def download(self, url): + """Download the remote tarball url locally. + + Args: + url (str): Url (file or http*) + + Raises: + ValueError in case of failing to query + + Returns: + Tuple of local (filepath, hashes of filepath) + + """ + url_parsed = urlparse(url) + if url_parsed.scheme == 'file': + path = url_parsed.path + response = LocalResponse(path) + length = os.path.getsize(path) + else: + response = self.session.get(url, **self.params, stream=True) + if response.status_code != 200: + raise ValueError("Fail to query '%s'. Reason: %s" % ( + url, response.status_code)) + length = int(response.headers['content-length']) + + filepath = os.path.join(self.temp_directory, os.path.basename(url)) + + h = MultiHash(length=length) + with open(filepath, 'wb') as f: + for chunk in response.iter_content(chunk_size=None): + h.update(chunk) + f.write(chunk) + + actual_length = os.path.getsize(filepath) + if length != actual_length: + raise ValueError('Error when checking size: %s != %s' % ( + length, actual_length)) + + hashes = { + 'length': length, + **h.hexdigest() + } + return filepath, hashes + + +class BaseTarLoader(BufferedLoader): + """Base Tarball Loader class. + + This factorizes multiple loader implementations: + + - :class:`RemoteTarLoader`: New implementation able to deal with + remote archives. + + - :class:`TarLoader`: Old implementation which dealt with only + local archive. It also was only passing along objects to + persist (revision, etc...) - - creates an origin (if it does not exist) - - creates a fetch_history entry - - creates an origin_visit - - uncompress locally the tarball in a temporary location - - process the content of the tarballs to persist on swh storage - - clean up the temporary location - - write an entry in fetch_history to mark the loading tarball end (success - or failure) """ CONFIG_BASE_FILENAME = 'loader/tar' ADDITIONAL_CONFIG = { - 'extraction_dir': ('string', '/tmp') + 'working_dir': ('string', '/tmp'), + 'debug': ('bool', False), # NOT FOR PRODUCTION } def __init__(self, logging_class='swh.loader.tar.TarLoader', config=None): super().__init__(logging_class=logging_class, config=config) + self.local_cache = None self.dir_path = None + working_dir = self.config['working_dir'] + os.makedirs(working_dir, exist_ok=True) + self.temp_directory = mkdtemp( + suffix='-%s' % os.getpid(), + prefix=TEMPORARY_DIR_PREFIX_PATTERN, + dir=working_dir) + self.client = ArchiveFetcher(temp_directory=self.temp_directory) + os.makedirs(working_dir, 0o755, exist_ok=True) + self.dir_path = tempfile.mkdtemp(prefix='swh.loader.tar-', + dir=self.temp_directory) + self.debug = self.config['debug'] - def load(self, *, tar_path, origin, visit_date, revision, - branch_name=None): - """Load a tarball in `tarpath` in the Software Heritage Archive. - - Args: - tar_path: tarball to import - origin (dict): an origin dictionary as returned by - :func:`swh.storage.storage.Storage.origin_get_one` - visit_date (str): the date the origin was visited (as an - isoformatted string) - revision (dict): a revision as passed to - :func:`swh.storage.storage.Storage.revision_add`, excluding the - `id` and `directory` keys (computed from the directory) - branch_name (str): the optional branch_name to use for snapshot + def cleanup(self): + """Clean up temporary disk folders used. """ - # Shortcut super() as we use different arguments than the DirLoader. - return BufferedLoader.load(self, tar_path=tar_path, origin=origin, - visit_date=visit_date, revision=revision, - branch_name=branch_name) + if self.debug: + self.log.warn('%s Will not clean up temp dir %s' % ( + DEBUG_MODE, self.temp_directory + )) + return + if os.path.exists(self.temp_directory): + self.log.debug('Clean up %s' % self.temp_directory) + shutil.rmtree(self.temp_directory) def prepare_origin_visit(self, *, origin, visit_date=None, **kwargs): + """Prepare the origin visit information. + + Args: + origin (dict): Dict with keys {url, type} + visit_date (str): Date representing the date of the + visit. None by default will make it the current time + during the loading process. + + """ self.origin = origin if 'type' not in self.origin: # let the type flow if present self.origin['type'] = 'tar' self.visit_date = visit_date - def prepare(self, *, tar_path, origin, revision, visit_date=None, - branch_name=None): - """1. Uncompress the tarball in a temporary directory. - 2. Compute some metadata to update the revision. + def get_tarball_url_to_retrieve(self): + """Compute the tarball url to allow retrieval """ - # Prepare the extraction path - extraction_dir = self.config['extraction_dir'] - os.makedirs(extraction_dir, 0o755, exist_ok=True) - self.dir_path = tempfile.mkdtemp(prefix='swh.loader.tar-', - dir=extraction_dir) + raise NotImplementedError() - # add checksums in revision + def fetch_data(self): + """Retrieve, uncompress archive and fetch objects from the tarball. + The actual ingestion takes place in the :meth:`store_data` + implementation below. - self.log.info('Uncompress %s to %s' % (tar_path, self.dir_path)) - nature = tarball.uncompress(tar_path, self.dir_path) + """ + url = self.get_tarball_url_to_retrieve() + filepath, hashes = self.client.download(url) + nature = tarball.uncompress(filepath, self.dir_path) - if 'metadata' not in revision: - artifact = MultiHash.from_path(tar_path).hexdigest() - artifact['name'] = os.path.basename(tar_path) - artifact['archive_type'] = nature - artifact['length'] = os.path.getsize(tar_path) - revision['metadata'] = { - 'original_artifact': [artifact], - } + dir_path = self.dir_path.encode('utf-8') + directory = Directory.from_disk(path=dir_path, save_path=True) + objects = directory.collect() + if 'content' not in objects: + objects['content'] = {} + if 'directory' not in objects: + objects['directory'] = {} - branch = branch_name if branch_name else os.path.basename(tar_path) + # compute the full revision (with ids) + revision = self.build_revision(filepath, nature, hashes) + revision = revision_from(directory.hash, revision) + objects['revision'] = { + revision['id']: revision, + } - super().prepare(dir_path=self.dir_path, - origin=origin, - visit_date=visit_date, - revision=revision, - release=None, - branch_name=branch) + snapshot = self.build_snapshot(revision) + objects['snapshot'] = { + snapshot['id']: snapshot + } + self.objects = objects - def cleanup(self): - """Clean up temporary directory where we uncompress the tarball. + def store_data(self): + """Store the objects in the swh archive. """ - if self.dir_path and os.path.exists(self.dir_path): - shutil.rmtree(self.dir_path) - - -if __name__ == '__main__': - import click - import logging - logging.basicConfig( - level=logging.DEBUG, - format='%(asctime)s %(process)d %(message)s' - ) - - @click.command() - @click.option('--archive-path', required=1, help='Archive path to load') - @click.option('--origin-url', required=1, help='Origin url to associate') - @click.option('--visit-date', default=None, - help='Visit date time override') - def main(archive_path, origin_url, visit_date): - """Loading archive tryout.""" - import datetime - origin = {'url': origin_url, 'type': 'tar'} - commit_time = int(datetime.datetime.now( - tz=datetime.timezone.utc).timestamp()) - swh_person = { - 'name': 'Software Heritage', - 'fullname': 'Software Heritage', - 'email': 'robot@softwareheritage.org' + objects = self.objects + self.maybe_load_contents(objects['content'].values()) + self.maybe_load_directories(objects['directory'].values()) + self.maybe_load_revisions(objects['revision'].values()) + snapshot = list(objects['snapshot'].values())[0] + self.maybe_load_snapshot(snapshot) + + +class RemoteTarLoader(BaseTarLoader): + """This is able to load from remote/local archive into the swh + archive. + + This will: + + - create an origin (if it does not exist) and a visit + - fetch the tarball in a temporary location + - uncompress it locally in a temporary location + - process the content of the tarball to persist on swh storage + - clean up the temporary location + + """ + def prepare(self, *, last_modified, **kwargs): + """last_modified is the time of last modification of the tarball. + + E.g https://ftp.gnu.org/gnu/8sync/: + [ ] 8sync-0.1.0.tar.gz 2016-04-22 16:35 217K + [ ] 8sync-0.1.0.tar.gz.sig 2016-04-22 16:35 543 + [ ] ... + + Args: + origin (dict): Dict with keys {url, type} + last_modified (str): The date of last modification of the + archive to ingest. + visit_date (str): Date representing the date of the + visit. None by default will make it the current time + during the loading process. + + """ + self.last_modified = last_modified + + def get_tarball_url_to_retrieve(self): + return self.origin['url'] + + def build_revision(self, filepath, nature, hashes): + """Build the revision with identifier + + We use the `last_modified` date provided by the caller to + build the revision. + + """ + return { + **compute_revision(filepath, self.last_modified), + 'metadata': { + 'original_artifact': [{ + 'name': os.path.basename(filepath), + 'archive_type': nature, + **hashes, + }], + } } - revision = { - 'date': {'timestamp': commit_time, 'offset': 0}, - 'committer_date': {'timestamp': commit_time, 'offset': 0}, - 'author': swh_person, - 'committer': swh_person, - 'type': 'tar', - 'message': 'swh-loader-tar: synthetic revision message', - 'metadata': {}, - 'synthetic': True, + + def build_snapshot(self, revision): + """Build the snapshot targeting the revision. + + """ + branch_name = os.path.basename(self.dir_path) + return snapshot_from(revision['id'], branch_name) + + +class LegacyLocalTarLoader(BaseTarLoader): + """This loads local tarball into the swh archive. It's using the + revision and branch provided by the caller as scaffolding to + create the full revision and snapshot (with identifiers). + + This is what's: + - been used to ingest our 2015 rsync copy of gnu.org + - still used by the loader deposit + + This will: + + - create an origin (if it does not exist) and a visit + - uncompress a tarball in a local and temporary location + - process the content of the tarball to persist on swh storage + - associate it to a passed revision and snapshot + - clean up the temporary location + + """ + def prepare(self, *, tar_path, revision, branch_name, **kwargs): + """Prepare the data prior to ingest it in SWH archive. + + Args: + tar_path (str): Path to the archive to ingest + revision (dict): The synthetic revision to associate the + archive to (no identifiers within) + branch_name (str): The branch name to use for the + snapshot. + + """ + self.tar_path = tar_path + self.revision = revision + self.branch_name = branch_name + + def get_tarball_url_to_retrieve(self): + return 'file://%s' % self.tar_path + + def build_revision(self, filepath, nature, hashes): + """Build the revision with identifier + + We use the revision provided by the caller as a scaffolding + revision. + + """ + return { + **self.revision, + 'metadata': { + 'original_artifact': [{ + 'name': os.path.basename(filepath), + 'archive_type': nature, + **hashes, + }], + } } - TarLoader().load(tar_path=archive_path, origin=origin, - visit_date=visit_date, revision=revision, - branch_name='master') - main() + def build_snapshot(self, revision): + """Build the snapshot targeting the revision. + + We use the branch_name provided by the caller as a scaffolding + as well. + + """ + return snapshot_from(revision['id'], self.branch_name) + + +# Aliasing for retro-compatibility +TarLoader = LegacyLocalTarLoader diff --git a/swh/loader/tar/producer.py b/swh/loader/tar/producer.py deleted file mode 100755 --- a/swh/loader/tar/producer.py +++ /dev/null @@ -1,102 +0,0 @@ -# Copyright (C) 2015-2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import click -import dateutil.parser - -from swh.scheduler.utils import get_task - -from swh.core import config -from swh.loader.tar import build, file - - -TASK_QUEUE = 'swh.loader.tar.tasks.LoadTarRepository' - - -def produce_archive_messages_from( - conf, root_dir, visit_date, mirror_file=None, dry_run=False): - """From root_dir, produce archive tarball messages to celery. - - Will print error message when some computation arise on archive - and continue. - - Args: - conf: dictionary holding static metadata - root_dir: top directory to list archives from. - visit_date: override origin's visit date of information - mirror_file: a filtering file of tarballs to load - dry_run: will compute but not send messages - - Returns: - Number of messages generated - - """ - - limit = conf.get('limit') - block = int(conf['block_messages']) - count = 0 - - path_source_tarballs = mirror_file if mirror_file else root_dir - - visit_date = dateutil.parser.parse(visit_date) - if not dry_run: - task = get_task(TASK_QUEUE) - - for tarpath, _ in file.random_archives_from( - path_source_tarballs, block, limit): - try: - origin = build.compute_origin( - conf['url_scheme'], conf['type'], root_dir, tarpath) - revision = build.compute_revision(tarpath) - - if not dry_run: - task.delay(tar_path=tarpath, origin=origin, - visit_date=visit_date, - revision=revision) - - count += 1 - except ValueError: - print('Problem with the following archive: %s' % tarpath) - - return count - - -@click.command() -@click.option('--config-file', required=1, - help='Configuration file path') -@click.option('--dry-run/--no-dry-run', default=False, - help='Dry run (print repo only)') -@click.option('--limit', default=None, - help='Number of origins limit to send') -def main(config_file, dry_run, limit): - """Tarball producer of local fs tarballs. - - """ - conf = config.read(config_file) - url_scheme = conf['url_scheme'] - mirror_dir = conf['mirror_root_directory'] - - # remove trailing / in configuration (to ease ulterior computation) - if url_scheme[-1] == '/': - conf['url_scheme'] = url_scheme[0:-1] - - if mirror_dir[-1] == '/': - conf['mirror_root_directory'] = mirror_dir[0:-1] - - if limit: - conf['limit'] = int(limit) - - nb_tarballs = produce_archive_messages_from( - conf=conf, - root_dir=conf['mirror_root_directory'], - visit_date=conf['date'], - mirror_file=conf.get('mirror_subset_archives'), - dry_run=dry_run) - - print('%s tarball(s) sent to worker.' % nb_tarballs) - - -if __name__ == '__main__': - main() diff --git a/swh/loader/tar/tasks.py b/swh/loader/tar/tasks.py --- a/swh/loader/tar/tasks.py +++ b/swh/loader/tar/tasks.py @@ -5,23 +5,22 @@ from swh.scheduler.task import Task -from swh.loader.tar.loader import TarLoader +from swh.loader.tar.loader import RemoteTarLoader class LoadTarRepository(Task): - """Import a directory to Software Heritage + """Import a remote or local archive to Software Heritage """ task_queue = 'swh_loader_tar' - def run_task(self, *, tar_path, origin, visit_date, revision, - branch_name=None): + def run_task(self, *, origin, visit_date, last_modified): """Import a tarball into swh. - Args: see :func:`TarLoader.load`. + Args: see :func:`TarLoader.prepare`. + """ - loader = TarLoader() + loader = RemoteTarLoader() loader.log = self.log - return loader.load(tar_path=tar_path, origin=origin, - visit_date=visit_date, revision=revision, - branch_name=branch_name) + return loader.load( + origin=origin, visit_date=visit_date, last_modified=last_modified) diff --git a/swh/loader/tar/tests/test_build.py b/swh/loader/tar/tests/test_build.py --- a/swh/loader/tar/tests/test_build.py +++ b/swh/loader/tar/tests/test_build.py @@ -10,29 +10,12 @@ class TestBuildUtils(unittest.TestCase): - def test_compute_origin(self): - # given - expected_origin = { - 'url': 'rsync://some/url/package-foo', - 'type': 'rsync', - } - - # when - actual_origin = build.compute_origin( - 'rsync://some/url/', - 'rsync', - '/some/root/path/', - '/some/root/path/package-foo/package-foo-1.2.3.tgz') - - # then - self.assertEqual(actual_origin, expected_origin) - - @patch('swh.loader.tar.build._time_from_path') - def test_compute_revision(self, mock_time_from_path): - mock_time_from_path.return_value = 'some-other-time' + @patch('swh.loader.tar.build._time_from_last_modified') + def test_compute_revision(self, mock_time_from_last_modified): + mock_time_from_last_modified.return_value = 'some-other-time' # when - actual_revision = build.compute_revision('/some/path') + actual_revision = build.compute_revision('/some/path', 'last-modified') expected_revision = { 'date': { @@ -47,40 +30,29 @@ 'committer': build.SWH_PERSON, 'type': build.REVISION_TYPE, 'message': build.REVISION_MESSAGE, + 'synthetic': True, } # then self.assertEqual(actual_revision, expected_revision) - mock_time_from_path.assert_called_once_with('/some/path') + mock_time_from_last_modified.assert_called_once_with( + 'last-modified') - @patch('swh.loader.tar.build.os') - def test_time_from_path_with_float(self, mock_os): - class MockStat: - st_mtime = 1445348286.8308342 - mock_os.lstat.return_value = MockStat() - - actual_time = build._time_from_path('some/path') + def test_time_from_last_modified_with_float(self): + actual_time = build._time_from_last_modified( + '2015-10-20T13:38:06.830834+00:00') self.assertEqual(actual_time, { 'seconds': 1445348286, - 'microseconds': 8308342 + 'microseconds': 830834 }) - mock_os.lstat.assert_called_once_with('some/path') - - @patch('swh.loader.tar.build.os') - def test_time_from_path_with_int(self, mock_os): - class MockStat: - st_mtime = 1445348286 - - mock_os.lstat.return_value = MockStat() - - actual_time = build._time_from_path('some/path') + def test_time_from_last_modified_with_int(self): + actual_time = build._time_from_last_modified( + '2015-10-20T13:38:06+00:00') self.assertEqual(actual_time, { 'seconds': 1445348286, 'microseconds': 0 }) - - mock_os.lstat.assert_called_once_with('some/path') diff --git a/swh/loader/tar/tests/test_loader.py b/swh/loader/tar/tests/test_loader.py --- a/swh/loader/tar/tests/test_loader.py +++ b/swh/loader/tar/tests/test_loader.py @@ -10,11 +10,13 @@ from swh.model import hashutil from swh.loader.core.tests import BaseLoaderTest -from swh.loader.tar.loader import TarLoader +from swh.loader.tar.build import SWH_PERSON +from swh.loader.tar.loader import RemoteTarLoader, TarLoader TEST_CONFIG = { - 'extraction_dir': '/tmp/tests/loader-tar/', # where to extract the tarball + 'working_dir': '/tmp/tests/loader-tar/', # where to extract the tarball + 'debug': False, 'storage': { # we instantiate it but we don't use it in test context 'cls': 'memory', 'args': { @@ -35,13 +37,13 @@ } -class TarLoaderTest(TarLoader): +class RemoteTarLoaderForTest(RemoteTarLoader): def parse_config_file(self, *args, **kwargs): return TEST_CONFIG -class TestTarLoader(BaseLoaderTest): - """Prepare the archive to load +class PrepareDataForTestLoader(BaseLoaderTest): + """Prepare the archive to load (test fixture). """ def setUp(self): @@ -51,10 +53,13 @@ self.tarpath = self.destination_path -class TestTarLoader1(TestTarLoader): +class TestTarLoader1(PrepareDataForTestLoader): + """Test the remote loader + + """ def setUp(self): super().setUp() - self.loader = TarLoaderTest() + self.loader = RemoteTarLoaderForTest() self.storage = self.loader.storage @pytest.mark.fs @@ -64,8 +69,74 @@ """ # given origin = { - 'url': 'file:///tmp/sample-folder', - 'type': 'dir' + 'url': self.repo_url, + 'type': 'tar' + } + + visit_date = 'Tue, 3 May 2016 17:16:32 +0200' + + last_modified = '2018-12-05T12:35:23+00:00' + + # when + self.loader.load( + origin=origin, visit_date=visit_date, last_modified=last_modified) + + # then + self.assertCountContents(8, "3 files + 5 links") + self.assertCountDirectories(6, "4 subdirs + 1 empty + 1 main dir") + self.assertCountRevisions(1, "synthetic revision") + + rev_id = hashutil.hash_to_bytes( + '67a7d7dda748f9a86b56a13d9218d16f5cc9ab3d') + actual_revision = next(self.storage.revision_get([rev_id])) + self.assertTrue(actual_revision['synthetic']) + self.assertEqual(actual_revision['parents'], []) + self.assertEqual(actual_revision['type'], 'tar') + self.assertEqual(actual_revision['message'], + b'swh-loader-tar: synthetic revision message') + self.assertEqual(actual_revision['directory'], + b'\xa7A\xfcM\x96\x8c{\x8e<\x94\xff\x86\xe7\x04\x80\xc5\xc7\xe5r\xa9') # noqa + + self.assertEqual( + actual_revision['metadata']['original_artifact'][0], + { + 'sha1_git': 'cc848944a0d3e71d287027347e25467e61b07428', + 'archive_type': 'tar', + 'blake2s256': '5d70923443ad36377cd58e993aff0e3c1b9ef14f796c69569105d3a99c64f075', # noqa + 'name': 'sample-folder.tgz', + 'sha1': '3ca0d0a5c6833113bd532dc5c99d9648d618f65a', + 'length': 555, + 'sha256': '307ebda0071ca5975f618e192c8417161e19b6c8bf581a26061b76dc8e85321d' # noqa + }) + + self.assertCountReleases(0) + self.assertCountSnapshots(1) + + +class TarLoaderForTest(TarLoader): + def parse_config_file(self, *args, **kwargs): + return TEST_CONFIG + + +class TestTarLoader2(PrepareDataForTestLoader): + """Test the legacy tar loader + + """ + + def setUp(self): + super().setUp() + self.loader = TarLoaderForTest() + self.storage = self.loader.storage + + @pytest.mark.fs + def test_load(self): + """Process a new tarball should be ok + + """ + # given + origin = { + 'url': self.repo_url, + 'type': 'tar' } visit_date = 'Tue, 3 May 2016 17:16:32 +0200' @@ -76,12 +147,6 @@ tzinfo=datetime.timezone(datetime.timedelta(hours=1)) ).timestamp()) - swh_person = { - 'name': 'Software Heritage', - 'fullname': 'Software Heritage', - 'email': 'robot@softwareheritage.org' - } - revision_message = 'swh-loader-tar: synthetic revision message' revision_type = 'tar' revision = { @@ -93,8 +158,8 @@ 'timestamp': commit_time, 'offset': 0, }, - 'author': swh_person, - 'committer': swh_person, + 'author': SWH_PERSON, + 'committer': SWH_PERSON, 'type': revision_type, 'message': revision_message, 'synthetic': True, diff --git a/swh/loader/tar/tests/test_tasks.py b/swh/loader/tar/tests/test_tasks.py new file mode 100644 --- /dev/null +++ b/swh/loader/tar/tests/test_tasks.py @@ -0,0 +1,31 @@ +# Copyright (C) 2015-2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import unittest +from unittest.mock import patch + +from swh.loader.tar.tasks import LoadTarRepository + + +class TestTasks(unittest.TestCase): + def test_check_task_name(self): + task = LoadTarRepository() + self.assertEqual(task.task_queue, 'swh_loader_tar') + + @patch('swh.loader.tar.loader.RemoteTarLoader.load') + def test_task(self, mock_loader): + mock_loader.return_value = {'status': 'eventful'} + task = LoadTarRepository() + + # given + actual_result = task.run_task( + origin='origin', visit_date='visit_date', + last_modified='last_modified') + + self.assertEqual(actual_result, {'status': 'eventful'}) + + mock_loader.assert_called_once_with( + origin='origin', visit_date='visit_date', + last_modified='last_modified')