diff --git a/debian/control b/debian/control index c9868dd..975d137 100644 --- a/debian/control +++ b/debian/control @@ -1,25 +1,25 @@ Source: swh-loader-tar Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python, python3-all, python3-nose, python3-setuptools, python3-swh.core (>= 0.0.14~), python3-swh.model (>= 0.0.15~), python3-swh.scheduler (>= 0.0.14~), python3-swh.storage (>= 0.0.83~), - python3-swh.loader.dir (>= 0.0.26~), + python3-swh.loader.dir (>= 0.0.28~), python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/DLDTAR/ Package: python3-swh.loader.tar Architecture: all Depends: python3-swh.core (>= 0.0.14~), python3-swh.storage (>= 0.0.83~), - python3-swh.loader.dir (>= 0.0.26~), python3-swh.scheduler (>= 0.0.14~), + python3-swh.loader.dir (>= 0.0.28~), python3-swh.scheduler (>= 0.0.14~), ${misc:Depends}, ${python3:Depends} Description: Software Heritage Tarball Loader diff --git a/requirements-swh.txt b/requirements-swh.txt index 0053026..e439e32 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,5 +1,5 @@ swh.core >= 0.0.14 swh.model >= 0.0.15 swh.scheduler >= 0.0.14 swh.storage >= 0.0.83 -swh.loader.dir >= 0.0.26 +swh.loader.dir >= 0.0.86 diff --git a/resources/producer/tar-gnu.yml b/resources/producer/tar-gnu.yml index fc920f4..8ac4b49 100644 --- a/resources/producer/tar-gnu.yml +++ b/resources/producer/tar-gnu.yml @@ -1,22 +1,22 @@ # Mirror's root directory holding tarballs to load into swh -mirror_root_directory: /home/storage/space/mirrors/gnu.org/gnu/ +mirror_root_directory: /srv/softwareheritage/space/mirrors/gnu.org/gnu/ # Origin setup's possible scheme url url_scheme: rsync://ftp.gnu.org/gnu/ # Origin type used for tarballs type: ftp # File containing a subset list tarballs from mirror_root_directory to load. # The file's format is one absolute path name to a tarball per line. # NOTE: # - This file must contain data consistent with the mirror_root_directory # - if this option is not provided, the mirror_root_directory is scanned # completely as usual # mirror_subset_archives: /home/storage/missing-archives # Retrieval date information (rsync, etc...) date: Fri, 28 Aug 2015 13:13:26 +0200 # Randomize blocks of messages and send for consumption block_messages: 250 diff --git a/swh/loader/tar/loader.py b/swh/loader/tar/loader.py index 3bff96c..14bc949 100644 --- a/swh/loader/tar/loader.py +++ b/swh/loader/tar/loader.py @@ -1,105 +1,99 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import tempfile import shutil +from swh.loader.core.loader import SWHLoader from swh.loader.dir import loader from swh.loader.tar import tarball, utils from swh.model import hashutil class TarLoader(loader.DirLoader): """A tarball loader: - creates an origin if it does not exist - creates a fetch_history entry - creates an origin_visit - uncompress locally the tarballs in a temporary location - process the content of the tarballs to persist on swh storage - clean up the temporary location - write an entry in fetch_history to mark the loading tarball end (success or failure) - - Args: - tarpath: path to the tarball to uncompress - origin (dict): Dictionary with the following keys: - - - url: url origin we fetched - - type: type of the origin - - visit_date (str): To override the visit date - revision (dict): Dictionary of information needed, keys are: - - - author_name: revision's author name - - author_email: revision's author email - - author_date: timestamp (e.g. 1444054085) - - author_offset: date offset e.g. -0220, +0100 - - committer_name: revision's committer name - - committer_email: revision's committer email - - committer_date: timestamp - - committer_offset: date offset e.g. -0220, +0100 - - type: type of revision dir, tar - - message: synthetic message for the revision - - occurrences (dict): List of occurrence dictionary, with the following - keys: - - - branch: occurrence's branch name - - authority_id: authority id (e.g. 1 for swh) - - validity: validity date (e.g. 2015-01-01 00:00:00+00) - """ CONFIG_BASE_FILENAME = 'loader/tar' ADDITIONAL_CONFIG = { 'extraction_dir': ('string', '/tmp') } def __init__(self): super().__init__(logging_class='swh.loader.tar.TarLoader') - def prepare(self, *args, **kwargs): + def load(self, *, tar_path, origin, visit_date, revision, occurrences): + """Load a tarball in `tarpath` in the Software Heritage Archive. + + Args: + tar_path: tarball to import + origin (dict): an origin dictionary as returned by + :func:`swh.storage.storage.Storage.origin_get_one` + visit_date (str): the date the origin was visited (as an + isoformatted string) + revision (dict): a revision as passed to + :func:`swh.storage.storage.Storage.revision_add`, excluding the + `id` and `directory` keys (computed from the directory) + occurrences (list of dicts): the occurrences to create in the + generated origin visit. Each dict contains a 'branch' key with + the branch name as value. + """ + # Shortcut super() as we use different arguments than the DirLoader. + SWHLoader.load(self, tar_path=tar_path, origin=origin, + visit_date=visit_date, revision=revision, + occurrences=occurrences) + + def prepare(self, *, tar_path, origin, visit_date, revision, occurrences): """1. Uncompress the tarball in a temporary directory. 2. Compute some metadata to update the revision. """ - tarpath, origin, visit_date, revision, occs = args - if 'type' not in origin: # let the type flow if present origin['type'] = 'tar' # Prepare the extraction path extraction_dir = self.config['extraction_dir'] os.makedirs(extraction_dir, 0o755, exist_ok=True) dir_path = tempfile.mkdtemp(prefix='swh.loader.tar-', dir=extraction_dir) # add checksums in revision - artifact = utils.convert_to_hex(hashutil.hash_path(tarpath)) - artifact['name'] = os.path.basename(tarpath) + artifact = utils.convert_to_hex(hashutil.hash_path(tar_path)) + artifact['name'] = os.path.basename(tar_path) - self.log.info('Uncompress %s to %s' % (tarpath, dir_path)) - nature = tarball.uncompress(tarpath, dir_path) + self.log.info('Uncompress %s to %s' % (tar_path, dir_path)) + nature = tarball.uncompress(tar_path, dir_path) artifact['archive_type'] = nature - artifact['length'] = os.path.getsize(tarpath) + artifact['length'] = os.path.getsize(tar_path) revision['metadata'] = { 'original_artifact': [artifact], } - self.dir_path = dir_path - - super().prepare(dir_path, origin, visit_date, revision, None, occs) + super().prepare(dir_path=dir_path, + origin=origin, + visit_date=visit_date, + revision=revision, + release=None, + occurrences=occurrences) def cleanup(self): """Clean up temporary directory where we uncompress the tarball. """ dir_path = self.dir_path if dir_path and os.path.exists(dir_path): shutil.rmtree(dir_path) diff --git a/swh/loader/tar/producer.py b/swh/loader/tar/producer.py index a13765c..53daabf 100755 --- a/swh/loader/tar/producer.py +++ b/swh/loader/tar/producer.py @@ -1,101 +1,103 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import dateutil.parser from swh.scheduler.utils import get_task from swh.core import config from swh.loader.tar import build, file TASK_QUEUE = 'swh.loader.tar.tasks.LoadTarRepository' def produce_archive_messages_from( conf, root_dir, visit_date, mirror_file=None, dry_run=False): """From root_dir, produce archive tarball messages to celery. Will print error message when some computation arise on archive and continue. Args: conf: dictionary holding static metadata root_dir: top directory to list archives from. visit_date: override origin's visit date of information mirror_file: a filtering file of tarballs to load dry_run: will compute but not send messages Returns: Number of messages generated """ limit = conf.get('limit') block = int(conf['block_messages']) count = 0 path_source_tarballs = mirror_file if mirror_file else root_dir visit_date = dateutil.parser.parse(visit_date) if not dry_run: task = get_task(TASK_QUEUE) for tarpath, _ in file.random_archives_from( path_source_tarballs, block, limit): try: origin = build.compute_origin( conf['url_scheme'], conf['type'], root_dir, tarpath) revision = build.compute_revision(tarpath) occurrence = build.compute_occurrence(tarpath) if not dry_run: - task.delay(tarpath, origin, visit_date, revision, [occurrence]) + task.delay(tar_path=tarpath, origin=origin, + visit_date=visit_date, revision=revision, + occurrences=[occurrence]) count += 1 except ValueError: print('Problem with the following archive: %s' % tarpath) return count @click.command() @click.option('--config-file', required=1, help='Configuration file path') @click.option('--dry-run/--no-dry-run', default=False, help='Dry run (print repo only)') @click.option('--limit', default=None, help='Number of origins limit to send') def main(config_file, dry_run, limit): """Tarball producer of local fs tarballs. """ conf = config.read(config_file) url_scheme = conf['url_scheme'] mirror_dir = conf['mirror_root_directory'] # remove trailing / in configuration (to ease ulterior computation) if url_scheme[-1] == '/': conf['url_scheme'] = url_scheme[0:-1] if mirror_dir[-1] == '/': conf['mirror_root_directory'] = mirror_dir[0:-1] if limit: conf['limit'] = int(limit) nb_tarballs = produce_archive_messages_from( conf=conf, root_dir=conf['mirror_root_directory'], visit_date=conf['date'], mirror_file=conf.get('mirror_subset_archives'), dry_run=dry_run) print('%s tarball(s) sent to worker.' % nb_tarballs) if __name__ == '__main__': main() diff --git a/swh/loader/tar/tasks.py b/swh/loader/tar/tasks.py index 3bfd650..eb45604 100644 --- a/swh/loader/tar/tasks.py +++ b/swh/loader/tar/tasks.py @@ -1,28 +1,25 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.task import Task from swh.loader.tar.loader import TarLoader class LoadTarRepository(Task): """Import a directory to Software Heritage """ task_queue = 'swh_loader_tar' - def run_task(self, tarpath, origin, visit_date, revision, occurrences): + def run_task(self, *, tar_path, origin, visit_date, revision, occurrences): """Import a tarball into swh. - Args: - - tarpath: path to a tarball file - - origin, visit_date, revision, release, occurrences: - cf. swh.loader.dir.loader.prepare docstring - + Args: see :func:`TarLoader.load`. """ loader = TarLoader() loader.log = self.log - loader.load(tarpath, origin, visit_date, revision, occurrences) + loader.load(tar_path=tar_path, origin=origin, visit_date=visit_date, + revision=revision, occurrences=occurrences) diff --git a/swh/loader/tar/tests/test_loader.py b/swh/loader/tar/tests/test_loader.py index 4e7d53c..ddf5bc4 100644 --- a/swh/loader/tar/tests/test_loader.py +++ b/swh/loader/tar/tests/test_loader.py @@ -1,207 +1,209 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from nose.tools import istest from unittest import TestCase from swh.loader.tar.loader import TarLoader class LoaderNoStorageForTest: """Mixin class to inhibit the persistence and keep in memory the data sent for storage. cf. SWHTarLoaderNoStorage """ def __init__(self): super().__init__() # Init the state self.all_contents = [] self.all_directories = [] self.all_revisions = [] self.all_releases = [] self.all_occurrences = [] def send_origin(self, origin): self.origin = origin def send_origin_visit(self, origin_id, ts): self.origin_visit = { 'origin': origin_id, 'ts': ts, 'visit': 1, } return self.origin_visit def update_origin_visit(self, origin_id, visit, status): self.status = status self.origin_visit = visit def maybe_load_contents(self, all_contents): self.all_contents.extend(all_contents) def maybe_load_directories(self, all_directories): self.all_directories.extend(all_directories) def maybe_load_revisions(self, all_revisions): self.all_revisions.extend(all_revisions) def maybe_load_releases(self, releases): self.all_releases.extend(releases) def maybe_load_occurrences(self, all_occurrences): self.all_occurrences.extend(all_occurrences) def open_fetch_history(self): return 1 def close_fetch_history_success(self, fetch_history_id): pass def close_fetch_history_failure(self, fetch_history_id): pass TEST_CONFIG = { 'extraction_dir': '/tmp/tests/loader-tar/', # where to extract the tarball 'storage': { # we instantiate it but we don't use it in test context 'cls': 'remote', 'args': { 'url': 'http://127.0.0.1:9999', # somewhere that does not exist } }, 'send_contents': False, 'send_directories': False, 'send_revisions': False, 'send_releases': False, 'send_occurrences': False, 'content_packet_size': 100, 'content_packet_block_size_bytes': 104857600, 'content_packet_size_bytes': 1073741824, 'directory_packet_size': 250, 'revision_packet_size': 100, 'release_packet_size': 100, 'occurrence_packet_size': 100, } def parse_config_file(base_filename=None, config_filename=None, additional_configs=None, global_config=True): return TEST_CONFIG # Inhibit side-effect loading configuration from disk TarLoader.parse_config_file = parse_config_file class SWHTarLoaderNoStorage(LoaderNoStorageForTest, TarLoader): """A TarLoader with no persistence. Context: Load a tarball with a persistent-less tarball loader """ pass PATH_TO_DATA = '../../../../..' class SWHTarLoaderITTest(TestCase): def setUp(self): super().setUp() self.loader = SWHTarLoaderNoStorage() @istest def load(self): """Process a new tarball should be ok """ # given start_path = os.path.dirname(__file__) tarpath = os.path.join( start_path, PATH_TO_DATA, 'swh-storage-testdata/dir-folders/sample-folder.tgz') origin = { 'url': 'file:///tmp/sample-folder', 'type': 'dir' } visit_date = 'Tue, 3 May 2016 17:16:32 +0200' import datetime commit_time = int(datetime.datetime.now( tz=datetime.timezone.utc).timestamp() ) swh_person = { 'name': 'Software Heritage', 'fullname': 'Software Heritage', 'email': 'robot@softwareheritage.org' } revision_message = 'swh-loader-tar: synthetic revision message' revision_type = 'tar' revision = { 'date': { 'timestamp': commit_time, 'offset': 0, }, 'committer_date': { 'timestamp': commit_time, 'offset': 0, }, 'author': swh_person, 'committer': swh_person, 'type': revision_type, 'message': revision_message, 'metadata': {}, 'synthetic': True, } occurrence = { 'branch': os.path.basename(tarpath), } # when - self.loader.load(tarpath, origin, visit_date, revision, [occurrence]) + self.loader.load(tar_path=tarpath, origin=origin, + visit_date=visit_date, revision=revision, + occurrences=[occurrence]) # then self.assertEquals(len(self.loader.all_contents), 8) self.assertEquals(len(self.loader.all_directories), 6) self.assertEquals(len(self.loader.all_revisions), 1) actual_revision = self.loader.all_revisions[0] self.assertEquals(actual_revision['synthetic'], True) self.assertEquals(actual_revision['parents'], []) self.assertEquals(actual_revision['type'], 'tar') self.assertEquals(actual_revision['message'], b'swh-loader-tar: synthetic revision message') self.assertEquals(actual_revision['directory'], b'\x18U\xe5?K\x98,\xdb&9\x0f\xd3/h\xf5{\xfb,\xc3\xd5') # noqa self.assertEquals( actual_revision['metadata']['original_artifact'][0], { 'sha1_git': 'cc848944a0d3e71d287027347e25467e61b07428', 'archive_type': 'tar', 'blake2s256': '5d70923443ad36377cd58e993aff0e3c1b9ef14f796c69569105d3a99c64f075', # noqa 'name': 'sample-folder.tgz', 'sha1': '3ca0d0a5c6833113bd532dc5c99d9648d618f65a', 'length': 555, 'sha256': '307ebda0071ca5975f618e192c8417161e19b6c8bf581a26061b76dc8e85321d' # noqa }) self.assertEquals(len(self.loader.all_releases), 0) self.assertEquals(len(self.loader.all_occurrences), 1)