diff --git a/swh/loader/tar/build.py b/swh/loader/tar/build.py index 47e5415..02fd167 100755 --- a/swh/loader/tar/build.py +++ b/swh/loader/tar/build.py @@ -1,116 +1,101 @@ -# Copyright (C) 2015-2017 The Software Heritage developers +# Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from swh.core import utils # Static setup EPOCH = 0 UTC_OFFSET = 0 SWH_PERSON = { 'name': 'Software Heritage', 'fullname': 'Software Heritage', 'email': 'robot@softwareheritage.org' } REVISION_MESSAGE = 'synthetic revision message' REVISION_TYPE = 'tar' def compute_origin(url_scheme, url_type, root_dirpath, tarpath): """Compute the origin. Args: - url_scheme: scheme to build the origin's url - url_type: origin's type - root_dirpath: the top level root directory path - tarpath: file's absolute path Returns: Dictionary origin with keys: - url: origin's url - type: origin's type """ relative_path = utils.commonname(root_dirpath, tarpath) return { 'url': ''.join([url_scheme, os.path.dirname(relative_path)]), 'type': url_type, } -def compute_occurrence(tarpath): - """Compute the occurrence using the tarpath's ctime. - - Args: - tarpath: file's path - - Returns: - Occurrence dictionary. - - """ - return { - 'branch': os.path.basename(tarpath), - } - - def _time_from_path(tarpath): """Compute the modification time from the tarpath. Args: tarpath (str|bytes): Full path to the archive to extract the date from. Returns: dict representing a timestamp with keys seconds and microseconds keys. """ mtime = os.lstat(tarpath).st_mtime if isinstance(mtime, float): normalized_time = list(map(int, str(mtime).split('.'))) else: # assuming int normalized_time = [mtime, 0] return { 'seconds': normalized_time[0], 'microseconds': normalized_time[1] } def compute_revision(tarpath): """Compute a revision. Args: tarpath: absolute path to the tarball Returns: Revision as dict: - date (dict): the modification timestamp as returned by _time_from_path function - committer_date: the modification timestamp as returned by _time_from_path function - author: cf. SWH_PERSON - committer: cf. SWH_PERSON - type: cf. REVISION_TYPE - message: cf. REVISION_MESSAGE """ ts = _time_from_path(tarpath) return { 'date': { 'timestamp': ts, 'offset': UTC_OFFSET, }, 'committer_date': { 'timestamp': ts, 'offset': UTC_OFFSET, }, 'author': SWH_PERSON, 'committer': SWH_PERSON, 'type': REVISION_TYPE, 'message': REVISION_MESSAGE, } diff --git a/swh/loader/tar/loader.py b/swh/loader/tar/loader.py index d173d3c..abf186d 100644 --- a/swh/loader/tar/loader.py +++ b/swh/loader/tar/loader.py @@ -1,107 +1,110 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import tempfile import shutil from swh.core import tarball from swh.loader.core.loader import SWHLoader from swh.loader.dir import loader from swh.loader.tar import utils from swh.model import hashutil class TarLoader(loader.DirLoader): """Tarball loader implementation. This is a subclass of the :class:DirLoader as the main goal of this class is to first uncompress a tarball, then provide the uncompressed directory/tree to be loaded by the DirLoader. This will: - creates an origin (if it does not exist) - creates a fetch_history entry - creates an origin_visit - uncompress locally the tarball in a temporary location - process the content of the tarballs to persist on swh storage - clean up the temporary location - write an entry in fetch_history to mark the loading tarball end (success or failure) """ CONFIG_BASE_FILENAME = 'loader/tar' ADDITIONAL_CONFIG = { 'extraction_dir': ('string', '/tmp') } def __init__(self, logging_class='swh.loader.tar.TarLoader', config=None): super().__init__(logging_class=logging_class, config=config) - def load(self, *, tar_path, origin, visit_date, revision, occurrences): + def load(self, *, tar_path, origin, visit_date, revision, + branch_name=None): """Load a tarball in `tarpath` in the Software Heritage Archive. Args: tar_path: tarball to import origin (dict): an origin dictionary as returned by :func:`swh.storage.storage.Storage.origin_get_one` visit_date (str): the date the origin was visited (as an isoformatted string) revision (dict): a revision as passed to :func:`swh.storage.storage.Storage.revision_add`, excluding the `id` and `directory` keys (computed from the directory) - occurrences (list of dicts): the occurrences to create in the - generated origin visit. Each dict contains a 'branch' key with - the branch name as value. + branch_name (str): the optional branch_name to use for snapshot + """ # Shortcut super() as we use different arguments than the DirLoader. return SWHLoader.load(self, tar_path=tar_path, origin=origin, visit_date=visit_date, revision=revision, - occurrences=occurrences) + branch_name=branch_name) - def prepare(self, *, tar_path, origin, visit_date, revision, occurrences): + def prepare(self, *, tar_path, origin, visit_date, revision, + branch_name=None): """1. Uncompress the tarball in a temporary directory. 2. Compute some metadata to update the revision. """ if 'type' not in origin: # let the type flow if present origin['type'] = 'tar' # Prepare the extraction path extraction_dir = self.config['extraction_dir'] os.makedirs(extraction_dir, 0o755, exist_ok=True) dir_path = tempfile.mkdtemp(prefix='swh.loader.tar-', dir=extraction_dir) # add checksums in revision self.log.info('Uncompress %s to %s' % (tar_path, dir_path)) nature = tarball.uncompress(tar_path, dir_path) if 'metadata' not in revision: artifact = utils.convert_to_hex(hashutil.hash_path(tar_path)) artifact['name'] = os.path.basename(tar_path) artifact['archive_type'] = nature artifact['length'] = os.path.getsize(tar_path) revision['metadata'] = { 'original_artifact': [artifact], } + branch = branch_name if branch_name else os.path.basename(tar_path) + super().prepare(dir_path=dir_path, origin=origin, visit_date=visit_date, revision=revision, release=None, - occurrences=occurrences) + branch_name=branch) def cleanup(self): """Clean up temporary directory where we uncompress the tarball. """ dir_path = self.dir_path if dir_path and os.path.exists(dir_path): shutil.rmtree(dir_path) diff --git a/swh/loader/tar/producer.py b/swh/loader/tar/producer.py index 53daabf..21db54f 100755 --- a/swh/loader/tar/producer.py +++ b/swh/loader/tar/producer.py @@ -1,103 +1,102 @@ -# Copyright (C) 2015-2017 The Software Heritage developers +# Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import dateutil.parser from swh.scheduler.utils import get_task from swh.core import config from swh.loader.tar import build, file TASK_QUEUE = 'swh.loader.tar.tasks.LoadTarRepository' def produce_archive_messages_from( conf, root_dir, visit_date, mirror_file=None, dry_run=False): """From root_dir, produce archive tarball messages to celery. Will print error message when some computation arise on archive and continue. Args: conf: dictionary holding static metadata root_dir: top directory to list archives from. visit_date: override origin's visit date of information mirror_file: a filtering file of tarballs to load dry_run: will compute but not send messages Returns: Number of messages generated """ limit = conf.get('limit') block = int(conf['block_messages']) count = 0 path_source_tarballs = mirror_file if mirror_file else root_dir visit_date = dateutil.parser.parse(visit_date) if not dry_run: task = get_task(TASK_QUEUE) for tarpath, _ in file.random_archives_from( path_source_tarballs, block, limit): try: origin = build.compute_origin( conf['url_scheme'], conf['type'], root_dir, tarpath) revision = build.compute_revision(tarpath) - occurrence = build.compute_occurrence(tarpath) if not dry_run: task.delay(tar_path=tarpath, origin=origin, - visit_date=visit_date, revision=revision, - occurrences=[occurrence]) + visit_date=visit_date, + revision=revision) count += 1 except ValueError: print('Problem with the following archive: %s' % tarpath) return count @click.command() @click.option('--config-file', required=1, help='Configuration file path') @click.option('--dry-run/--no-dry-run', default=False, help='Dry run (print repo only)') @click.option('--limit', default=None, help='Number of origins limit to send') def main(config_file, dry_run, limit): """Tarball producer of local fs tarballs. """ conf = config.read(config_file) url_scheme = conf['url_scheme'] mirror_dir = conf['mirror_root_directory'] # remove trailing / in configuration (to ease ulterior computation) if url_scheme[-1] == '/': conf['url_scheme'] = url_scheme[0:-1] if mirror_dir[-1] == '/': conf['mirror_root_directory'] = mirror_dir[0:-1] if limit: conf['limit'] = int(limit) nb_tarballs = produce_archive_messages_from( conf=conf, root_dir=conf['mirror_root_directory'], visit_date=conf['date'], mirror_file=conf.get('mirror_subset_archives'), dry_run=dry_run) print('%s tarball(s) sent to worker.' % nb_tarballs) if __name__ == '__main__': main() diff --git a/swh/loader/tar/tasks.py b/swh/loader/tar/tasks.py index 5429fcb..a4cd70d 100644 --- a/swh/loader/tar/tasks.py +++ b/swh/loader/tar/tasks.py @@ -1,26 +1,27 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.task import Task from swh.loader.tar.loader import TarLoader class LoadTarRepository(Task): """Import a directory to Software Heritage """ task_queue = 'swh_loader_tar' - def run_task(self, *, tar_path, origin, visit_date, revision, occurrences): + def run_task(self, *, tar_path, origin, visit_date, revision, + branch_name=None): """Import a tarball into swh. Args: see :func:`TarLoader.load`. """ loader = TarLoader() loader.log = self.log return loader.load(tar_path=tar_path, origin=origin, visit_date=visit_date, revision=revision, - occurrences=occurrences) + branch_name=branch_name) diff --git a/swh/loader/tar/tests/test_build.py b/swh/loader/tar/tests/test_build.py index 94e2235..592f792 100644 --- a/swh/loader/tar/tests/test_build.py +++ b/swh/loader/tar/tests/test_build.py @@ -1,106 +1,92 @@ -# Copyright (C) 2015-2017 The Software Heritage developers +# Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from nose.tools import istest from unittest.mock import patch from swh.loader.tar import build class TestBuildUtils(unittest.TestCase): @istest def compute_origin(self): # given expected_origin = { 'url': 'rsync://some/url/package-foo', 'type': 'rsync', } # when actual_origin = build.compute_origin( 'rsync://some/url/', 'rsync', '/some/root/path/', '/some/root/path/package-foo/package-foo-1.2.3.tgz') # then self.assertEquals(actual_origin, expected_origin) - @istest - def compute_occurrence(self): - # given - expected_occurrence = { - 'branch': b'package-bar.tgz', - } - - # when - actual_occurrence = build.compute_occurrence( - b'/path/to/package-bar.tgz') - - # then - self.assertEquals(actual_occurrence, expected_occurrence) - @patch('swh.loader.tar.build._time_from_path') @istest def compute_revision(self, mock_time_from_path): mock_time_from_path.return_value = 'some-other-time' # when actual_revision = build.compute_revision('/some/path') expected_revision = { 'date': { 'timestamp': 'some-other-time', 'offset': build.UTC_OFFSET, }, 'committer_date': { 'timestamp': 'some-other-time', 'offset': build.UTC_OFFSET, }, 'author': build.SWH_PERSON, 'committer': build.SWH_PERSON, 'type': build.REVISION_TYPE, 'message': build.REVISION_MESSAGE, } # then self.assertEquals(actual_revision, expected_revision) mock_time_from_path.assert_called_once_with('/some/path') @patch('swh.loader.tar.build.os') @istest def time_from_path_with_float(self, mock_os): class MockStat: st_mtime = 1445348286.8308342 mock_os.lstat.return_value = MockStat() actual_time = build._time_from_path('some/path') self.assertEquals(actual_time, { 'seconds': 1445348286, 'microseconds': 8308342 }) mock_os.lstat.assert_called_once_with('some/path') @patch('swh.loader.tar.build.os') @istest def time_from_path_with_int(self, mock_os): class MockStat: st_mtime = 1445348286 mock_os.lstat.return_value = MockStat() actual_time = build._time_from_path('some/path') self.assertEquals(actual_time, { 'seconds': 1445348286, 'microseconds': 0 }) mock_os.lstat.assert_called_once_with('some/path') diff --git a/swh/loader/tar/tests/test_loader.py b/swh/loader/tar/tests/test_loader.py index 971afc7..d64d7d5 100644 --- a/swh/loader/tar/tests/test_loader.py +++ b/swh/loader/tar/tests/test_loader.py @@ -1,211 +1,208 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from unittest import TestCase from nose.plugins.attrib import attr from nose.tools import istest from swh.loader.tar.loader import TarLoader class LoaderNoStorageForTest: """Mixin class to inhibit the persistence and keep in memory the data sent for storage. cf. SWHTarLoaderNoStorage """ def __init__(self): super().__init__() # Init the state self.all_contents = [] self.all_directories = [] self.all_revisions = [] self.all_releases = [] - self.all_occurrences = [] + self.all_snapshots = [] def send_origin(self, origin): self.origin = origin def send_origin_visit(self, origin_id, ts): self.origin_visit = { 'origin': origin_id, 'ts': ts, 'visit': 1, } return self.origin_visit def update_origin_visit(self, origin_id, visit, status): self.status = status self.origin_visit = visit def maybe_load_contents(self, all_contents): self.all_contents.extend(all_contents) def maybe_load_directories(self, all_directories): self.all_directories.extend(all_directories) def maybe_load_revisions(self, all_revisions): self.all_revisions.extend(all_revisions) def maybe_load_releases(self, releases): self.all_releases.extend(releases) - def maybe_load_occurrences(self, all_occurrences): - self.all_occurrences.extend(all_occurrences) + def maybe_load_snapshot(self, snapshot): + self.all_snapshots.append(snapshot) def open_fetch_history(self): return 1 def close_fetch_history_success(self, fetch_history_id): pass def close_fetch_history_failure(self, fetch_history_id): pass TEST_CONFIG = { 'extraction_dir': '/tmp/tests/loader-tar/', # where to extract the tarball 'storage': { # we instantiate it but we don't use it in test context 'cls': 'remote', 'args': { 'url': 'http://127.0.0.1:9999', # somewhere that does not exist } }, 'send_contents': False, 'send_directories': False, 'send_revisions': False, 'send_releases': False, - 'send_occurrences': False, + 'send_snapshot': False, 'content_packet_size': 100, 'content_packet_block_size_bytes': 104857600, 'content_packet_size_bytes': 1073741824, 'directory_packet_size': 250, 'revision_packet_size': 100, 'release_packet_size': 100, - 'occurrence_packet_size': 100, } def parse_config_file(base_filename=None, config_filename=None, additional_configs=None, global_config=True): return TEST_CONFIG # Inhibit side-effect loading configuration from disk TarLoader.parse_config_file = parse_config_file class SWHTarLoaderNoStorage(LoaderNoStorageForTest, TarLoader): """A TarLoader with no persistence. Context: Load a tarball with a persistent-less tarball loader """ pass PATH_TO_DATA = '../../../../..' class SWHTarLoaderITTest(TestCase): def setUp(self): super().setUp() self.loader = SWHTarLoaderNoStorage() @attr('fs') @istest def load(self): """Process a new tarball should be ok """ # given start_path = os.path.dirname(__file__) tarpath = os.path.join( start_path, PATH_TO_DATA, 'swh-storage-testdata/dir-folders/sample-folder.tgz') origin = { 'url': 'file:///tmp/sample-folder', 'type': 'dir' } visit_date = 'Tue, 3 May 2016 17:16:32 +0200' import datetime commit_time = int(datetime.datetime.now( tz=datetime.timezone.utc).timestamp() ) swh_person = { 'name': 'Software Heritage', 'fullname': 'Software Heritage', 'email': 'robot@softwareheritage.org' } revision_message = 'swh-loader-tar: synthetic revision message' revision_type = 'tar' revision = { 'date': { 'timestamp': commit_time, 'offset': 0, }, 'committer_date': { 'timestamp': commit_time, 'offset': 0, }, 'author': swh_person, 'committer': swh_person, 'type': revision_type, 'message': revision_message, 'synthetic': True, } - occurrence = { - 'branch': os.path.basename(tarpath), - } + branch_name = os.path.basename(tarpath) # when self.loader.load(tar_path=tarpath, origin=origin, visit_date=visit_date, revision=revision, - occurrences=[occurrence]) + branch_name=branch_name) # then self.assertEquals(len(self.loader.all_contents), 8, "8 contents: 3 files + 5 links") self.assertEquals(len(self.loader.all_directories), 6, "6 directories: 4 subdirs + 1 empty + 1 main dir") self.assertEquals(len(self.loader.all_revisions), 1, "synthetic revision") actual_revision = self.loader.all_revisions[0] self.assertTrue(actual_revision['synthetic']) self.assertEquals(actual_revision['parents'], []) self.assertEquals(actual_revision['type'], 'tar') self.assertEquals(actual_revision['message'], b'swh-loader-tar: synthetic revision message') self.assertEquals(actual_revision['directory'], b'\xa7A\xfcM\x96\x8c{\x8e<\x94\xff\x86\xe7\x04\x80\xc5\xc7\xe5r\xa9') # noqa self.assertEquals( actual_revision['metadata']['original_artifact'][0], { 'sha1_git': 'cc848944a0d3e71d287027347e25467e61b07428', 'archive_type': 'tar', 'blake2s256': '5d70923443ad36377cd58e993aff0e3c1b9ef14f796c69569105d3a99c64f075', # noqa 'name': 'sample-folder.tgz', 'sha1': '3ca0d0a5c6833113bd532dc5c99d9648d618f65a', 'length': 555, 'sha256': '307ebda0071ca5975f618e192c8417161e19b6c8bf581a26061b76dc8e85321d' # noqa }) self.assertEquals(len(self.loader.all_releases), 0) - self.assertEquals(len(self.loader.all_occurrences), 1) + self.assertEquals(len(self.loader.all_snapshots), 1)