diff --git a/swh/loader/dir/loader.py b/swh/loader/dir/loader.py index b2858d1..dcc90cd 100644 --- a/swh/loader/dir/loader.py +++ b/swh/loader/dir/loader.py @@ -1,249 +1,248 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import os import uuid from swh.loader.core import loader from swh.model.identifiers import (release_identifier, revision_identifier, - identifier_to_bytes) + snapshot_identifier, identifier_to_bytes) from swh.model.from_disk import Directory from . import converters class DirLoader(loader.SWHLoader): """A bulk loader for a directory.""" CONFIG_BASE_FILENAME = 'loader/dir' def __init__(self, logging_class='swh.loader.dir.DirLoader', config=None): super().__init__(logging_class=logging_class, config=config) def list_repo_objs(self, *, dir_path, revision, release): """List all objects from dir_path. Args: dir_path: the directory to list revision: revision dictionary representation release: release dictionary representation Returns: dict: a mapping from object types ('content', 'directory', 'revision', 'release') with a dictionary mapping each object's id to the object """ def _revision_from(tree_hash, revision): full_rev = dict(revision) full_rev['directory'] = tree_hash full_rev = converters.commit_to_revision(full_rev) full_rev['id'] = identifier_to_bytes(revision_identifier(full_rev)) return full_rev def _release_from(revision_hash, release): full_rel = dict(release) full_rel['target'] = revision_hash full_rel['target_type'] = 'revision' full_rel = converters.annotated_tag_to_release(full_rel) full_rel['id'] = identifier_to_bytes(release_identifier(full_rel)) return full_rel log_id = str(uuid.uuid4()) sdir_path = dir_path.decode('utf-8') log_data = { 'swh_type': 'dir_list_objs_end', 'swh_repo': sdir_path, 'swh_id': log_id, } self.log.debug("Started listing {swh_repo}".format(**log_data), extra=log_data) directory = Directory.from_disk(path=dir_path, save_path=True) objects = directory.collect() tree_hash = directory.hash full_rev = _revision_from(tree_hash, revision) objects['revision'] = {full_rev['id']: full_rev} objects['release'] = {} if release and 'name' in release: full_rel = _release_from(full_rev['id'], release) objects['release'][full_rel['id']] = release log_data.update({ 'swh_num_%s' % key: len(values) for key, values in objects.items() }) self.log.debug(("Done listing the objects in {swh_repo}: " "{swh_num_content} contents, " "{swh_num_directory} directories, " "{swh_num_revision} revisions, " "{swh_num_release} releases").format(**log_data), extra=log_data) return objects def load(self, *, dir_path, origin, visit_date, revision, release, - occurrences): + branch_name=None): """Load the content of the directory to the archive. Args: dir_path: root of the directory to import origin (dict): an origin dictionary as returned by :func:`swh.storage.storage.Storage.origin_get_one` visit_date (str): the date the origin was visited (as an isoformatted string) revision (dict): a revision as passed to :func:`swh.storage.storage.Storage.revision_add`, excluding the `id` and `directory` keys (computed from the directory) release (dict): a release as passed to :func:`swh.storage.storage.Storage.release_add`, excluding the `id`, `target` and `target_type` keys (computed from the revision)' - occurrences (list of dicts): the occurrences to create in the - generated origin visit. Each dict contains a 'branch' key with - the branch name as value. + branch_name (str): the optional branch_name to use for snapshot + """ # Yes, this is entirely redundant, but it allows us to document the # arguments and the entry point. return super().load(dir_path=dir_path, origin=origin, visit_date=visit_date, revision=revision, - release=release, occurrences=occurrences) + release=release, branch_name=branch_name) def prepare(self, *, dir_path, origin, visit_date, revision, release, - occurrences): - """Prepare the loader for loading of the directory. + branch_name=None): + """Prepare the loader for directory loading. Args: identical to :func:`load`. + """ self.dir_path = dir_path self.origin = origin self.visit_date = visit_date self.revision = revision self.release = release - self.stub_occurrences = occurrences + + branch = branch_name if branch_name else os.path.basename(dir_path) + self.branch_name = branch if not os.path.exists(self.dir_path): warn_msg = 'Skipping inexistant directory %s' % self.dir_path self.log.error(warn_msg, extra={ 'swh_type': 'dir_repo_list_refs', 'swh_repo': self.dir_path, 'swh_num_refs': 0, }) raise ValueError(warn_msg) if isinstance(self.dir_path, str): self.dir_path = os.fsencode(self.dir_path) def get_origin(self): return self.origin # set in prepare method def cleanup(self): """Nothing to clean up. """ pass def fetch_data(self): - def _occurrence_from(origin_id, visit, revision_hash, occurrence): - occ = dict(occurrence) - occ.update({ - 'target': revision_hash, - 'target_type': 'revision', - 'origin': origin_id, - 'visit': visit, - }) - if isinstance(occ['branch'], str): - occ['branch'] = occ['branch'].encode('utf-8') - return occ - - def _occurrences_from(origin_id, visit, revision_hash, occurrences): - occs = {} - for i, occurrence in enumerate(occurrences): - occs[i] = _occurrence_from(origin_id, - visit, - revision_hash, - occurrence) - - return occs + def _snapshot_from(origin_id, visit, revision_hash, branch): + if isinstance(branch, str): + branch = branch.encode('utf-8') + + snapshot = { + 'id': None, + 'branches': { + branch: { + 'target': revision_hash, + 'target_type': 'revision', + 'origin': origin_id, + 'visit': visit, + } + } + } + snap_id = identifier_to_bytes(snapshot_identifier(snapshot)) + snapshot['id'] = snap_id + return snapshot # to load the repository, walk all objects, compute their hashes self.objects = self.list_repo_objs( dir_path=self.dir_path, revision=self.revision, release=self.release) [rev_id] = self.objects['revision'].keys() + snapshot = _snapshot_from( + self.origin_id, self.visit, rev_id, self.branch_name) + # Update objects with release and occurrences - self.objects['occurrence'] = _occurrences_from( - self.origin_id, self.visit, rev_id, self.stub_occurrences) + self.objects['snapshot'] = snapshot def store_data(self): objects = self.objects self.maybe_load_contents(objects['content'].values()) self.maybe_load_directories(objects['directory'].values()) self.maybe_load_revisions(objects['revision'].values()) self.maybe_load_releases(objects['release'].values()) - self.maybe_load_occurrences(objects['occurrence'].values()) + self.maybe_load_snapshot(objects['snapshot']) @click.command() @click.option('--dir-path', required=1, help='Directory path to load') @click.option('--origin-url', required=1, help='Origin url for that directory') @click.option('--visit-date', default=None, help='Visit date time override') def main(dir_path, origin_url, visit_date): """Debugging purpose.""" d = DirLoader() origin = { 'url': origin_url, 'type': 'dir' } import datetime commit_time = int(datetime.datetime.now( tz=datetime.timezone.utc).timestamp() ) swh_person = { 'name': 'Software Heritage', 'fullname': 'Software Heritage', 'email': 'robot@softwareheritage.org' } revision_message = 'swh-loader-dir: synthetic revision message' revision_type = 'tar' revision = { 'date': { 'timestamp': commit_time, 'offset': 0, }, 'committer_date': { 'timestamp': commit_time, 'offset': 0, }, 'author': swh_person, 'committer': swh_person, 'type': revision_type, 'message': revision_message, 'metadata': {}, 'synthetic': True, } release = None - occurrence = { - 'branch': os.path.basename(dir_path), - } - d.load(dir_path=dir_path, origin=origin, visit_date=visit_date, - revision=revision, release=release, occurrences=[occurrence]) + d.load(dir_path=dir_path, origin=origin, + visit_date=visit_date, revision=revision, + release=release) if __name__ == '__main__': main() diff --git a/swh/loader/dir/tasks.py b/swh/loader/dir/tasks.py index f10214e..30dad8c 100644 --- a/swh/loader/dir/tasks.py +++ b/swh/loader/dir/tasks.py @@ -1,26 +1,26 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.loader.dir.loader import DirLoader from swh.scheduler.task import Task class LoadDirRepository(Task): """Import a directory to Software Heritage """ task_queue = 'swh_loader_dir' def run_task(self, *, dir_path, origin, visit_date, revision, release, - occurrences): + branch_name=None): """Import a directory dir_path with origin at visit_date time. Providing the revision, release, and occurrences. """ loader = DirLoader() loader.log = self.log return loader.load(dir_path=dir_path, origin=origin, visit_date=visit_date, revision=revision, - release=release, occurrences=occurrences) + release=release, branch_name=branch_name) diff --git a/swh/loader/dir/tests/test_loader.py b/swh/loader/dir/tests/test_loader.py index e136448..1b7c7dc 100644 --- a/swh/loader/dir/tests/test_loader.py +++ b/swh/loader/dir/tests/test_loader.py @@ -1,318 +1,307 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import shutil import subprocess import tempfile import unittest from nose.tools import istest from nose.plugins.attrib import attr from swh.loader.dir.loader import DirLoader @attr('fs') class InitTestLoader(unittest.TestCase): @classmethod def setUpClass(cls): super().setUpClass() cls.tmp_root_path = tempfile.mkdtemp().encode('utf-8') start_path = os.path.dirname(__file__).encode('utf-8') sample_folder_archive = os.path.join(start_path, b'../../../../..', b'swh-storage-testdata', b'dir-folders', b'sample-folder.tgz') cls.root_path = os.path.join(cls.tmp_root_path) # uncompress the sample folder subprocess.check_output( ['tar', 'xvf', sample_folder_archive, '-C', cls.tmp_root_path], ) @classmethod def tearDownClass(cls): super().tearDownClass() shutil.rmtree(cls.tmp_root_path) class DirLoaderListRepoObject(InitTestLoader): def setUp(self): super().setUp() self.info = { 'storage': { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/', } }, 'content_size_limit': 104857600, 'log_db': 'dbname=softwareheritage-log', 'directory_packet_size': 25000, 'content_packet_size': 10000, 'send_contents': True, 'send_directories': True, 'content_packet_size_bytes': 1073741824, - 'occurrence_packet_size': 100000, 'send_revisions': True, 'revision_packet_size': 100000, 'content_packet_block_size_bytes': 104857600, - 'send_occurrences': True, + 'send_snaphot': True, 'release_packet_size': 100000, 'send_releases': True } self.origin = { 'url': 'file:///dev/null', 'type': 'dir', } - self.occurrence = { - 'branch': 'master', - 'authority_id': 1, - 'validity': '2015-01-01 00:00:00+00', - } - self.revision = { 'author': { 'name': 'swh author', 'email': 'swh@inria.fr', 'fullname': 'swh' }, 'date': { 'timestamp': 1444054085, 'offset': 0 }, 'committer': { 'name': 'swh committer', 'email': 'swh@inria.fr', 'fullname': 'swh' }, 'committer_date': { 'timestamp': 1444054085, 'offset': 0, }, 'type': 'tar', 'message': 'synthetic revision', 'metadata': {'foo': 'bar'}, } self.release = { 'name': 'v0.0.1', 'date': { 'timestamp': 1444054085, 'offset': 0, }, 'author': { 'name': 'swh author', 'fullname': 'swh', 'email': 'swh@inria.fr', }, 'message': 'synthetic release', } self.dirloader = DirLoader(config=self.info) @istest def load_without_storage(self): # when objects = self.dirloader.list_repo_objs( dir_path=self.root_path, revision=self.revision, release=self.release) # then self.assertEquals(len(objects), 4, "4 objects types, blob, tree, revision, release") self.assertEquals(len(objects['content']), 8, "8 contents: 3 files + 5 links") self.assertEquals(len(objects['directory']), 6, "6 directories: 5 subdirs + 1 empty") self.assertEquals(len(objects['revision']), 1, "synthetic revision") self.assertEquals(len(objects['release']), 1, "synthetic release") class LoaderNoStorageForTest: """Mixin class to inhibit the persistence and keep in memory the data sent for storage. cf. SWHDirLoaderNoStorage """ def __init__(self): super().__init__() # Init the state self.all_contents = [] self.all_directories = [] self.all_revisions = [] self.all_releases = [] - self.all_occurrences = [] + self.all_snapshots = [] def send_origin(self, origin): self.origin = origin def send_origin_visit(self, origin_id, ts): self.origin_visit = { 'origin': origin_id, 'ts': ts, 'visit': 1, } return self.origin_visit def update_origin_visit(self, origin_id, visit, status): self.status = status self.origin_visit = visit def maybe_load_contents(self, all_contents): self.all_contents.extend(all_contents) def maybe_load_directories(self, all_directories): self.all_directories.extend(all_directories) def maybe_load_revisions(self, all_revisions): self.all_revisions.extend(all_revisions) def maybe_load_releases(self, releases): self.all_releases.extend(releases) - def maybe_load_occurrences(self, all_occurrences): - self.all_occurrences.extend(all_occurrences) + def maybe_load_snapshot(self, snapshot): + self.all_snapshots.append(snapshot) def open_fetch_history(self): return 1 def close_fetch_history_success(self, fetch_history_id): pass def close_fetch_history_failure(self, fetch_history_id): pass TEST_CONFIG = { 'extraction_dir': '/tmp/tests/loader-tar/', # where to extract the tarball 'storage': { # we instantiate it but we don't use it in test context 'cls': 'remote', 'args': { 'url': 'http://127.0.0.1:9999', # somewhere that does not exist } }, 'send_contents': False, 'send_directories': False, 'send_revisions': False, 'send_releases': False, - 'send_occurrences': False, + 'send_snapshot': False, 'content_packet_size': 100, 'content_packet_block_size_bytes': 104857600, 'content_packet_size_bytes': 1073741824, 'directory_packet_size': 250, 'revision_packet_size': 100, 'release_packet_size': 100, - 'occurrence_packet_size': 100, } def parse_config_file(base_filename=None, config_filename=None, additional_configs=None, global_config=True): return TEST_CONFIG # Inhibit side-effect loading configuration from disk DirLoader.parse_config_file = parse_config_file class SWHDirLoaderNoStorage(LoaderNoStorageForTest, DirLoader): """A DirLoader with no persistence. Context: Load a tarball with a persistent-less tarball loader """ pass class SWHDirLoaderITTest(InitTestLoader): def setUp(self): super().setUp() self.loader = SWHDirLoaderNoStorage() @istest def load(self): """Process a new tarball should be ok """ # given origin = { 'url': 'file:///tmp/sample-folder', 'type': 'dir' } visit_date = 'Tue, 3 May 2016 17:16:32 +0200' import datetime commit_time = int(datetime.datetime.now( tz=datetime.timezone.utc).timestamp() ) swh_person = { 'name': 'Software Heritage', 'fullname': 'Software Heritage', 'email': 'robot@softwareheritage.org' } revision_message = 'swh-loader-dir: synthetic revision message' revision_type = 'tar' revision = { 'date': { 'timestamp': commit_time, 'offset': 0, }, 'committer_date': { 'timestamp': commit_time, 'offset': 0, }, 'author': swh_person, 'committer': swh_person, 'type': revision_type, 'message': revision_message, 'metadata': {}, 'synthetic': True, } - occurrence = { - 'branch': os.path.basename(self.root_path), - } + branch = os.path.basename(self.root_path) # when self.loader.load( dir_path=self.root_path, origin=origin, visit_date=visit_date, - revision=revision, release=None, occurrences=[occurrence], - ) + revision=revision, release=None, branch_name=branch) # then self.assertEquals(len(self.loader.all_contents), 8) self.assertEquals(len(self.loader.all_directories), 6) self.assertEquals(len(self.loader.all_revisions), 1) actual_revision = self.loader.all_revisions[0] self.assertEquals(actual_revision['synthetic'], True) self.assertEquals(actual_revision['parents'], []) self.assertEquals(actual_revision['type'], 'tar') self.assertEquals(actual_revision['message'], b'swh-loader-dir: synthetic revision message') self.assertEquals(len(self.loader.all_releases), 0) - self.assertEquals(len(self.loader.all_occurrences), 1) + self.assertEquals(len(self.loader.all_snapshots), 1)