diff --git a/swh/loader/dir/loader.py b/swh/loader/dir/loader.py index dcc90cd..a5f5e35 100644 --- a/swh/loader/dir/loader.py +++ b/swh/loader/dir/loader.py @@ -1,248 +1,269 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import os import uuid from swh.loader.core import loader from swh.model.identifiers import (release_identifier, revision_identifier, snapshot_identifier, identifier_to_bytes) from swh.model.from_disk import Directory from . import converters +def revision_from(directory_hash, revision): + full_rev = dict(revision) + full_rev['directory'] = directory_hash + full_rev = converters.commit_to_revision(full_rev) + full_rev['id'] = identifier_to_bytes(revision_identifier(full_rev)) + return full_rev + + +def release_from(revision_hash, release): + full_rel = dict(release) + full_rel['target'] = revision_hash + full_rel['target_type'] = 'revision' + full_rel = converters.annotated_tag_to_release(full_rel) + full_rel['id'] = identifier_to_bytes(release_identifier(full_rel)) + return full_rel + + +def snapshot_from(origin_id, visit, revision_hash, branch): + """Build a snapshot from an origin, a visit, a revision, and a branch. + + """ + if isinstance(branch, str): + branch = branch.encode('utf-8') + + snapshot = { + 'id': None, + 'branches': { + branch: { + 'target': revision_hash, + 'target_type': 'revision', + 'origin': origin_id, + 'visit': visit, + } + } + } + snap_id = identifier_to_bytes(snapshot_identifier(snapshot)) + snapshot['id'] = snap_id + return snapshot + + class DirLoader(loader.SWHLoader): """A bulk loader for a directory.""" CONFIG_BASE_FILENAME = 'loader/dir' def __init__(self, logging_class='swh.loader.dir.DirLoader', config=None): super().__init__(logging_class=logging_class, config=config) - def list_repo_objs(self, *, dir_path, revision, release): + def list_objs(self, *, + dir_path, origin, visit, revision, release, branch_name): """List all objects from dir_path. Args: - dir_path: the directory to list - revision: revision dictionary representation - release: release dictionary representation + dir_path (str): the directory to list + origin (dict): origin information + visit (dict): visit information + revision (dict): revision dictionary representation + release (dict): release dictionary representation + branch_name (str): branch name Returns: dict: a mapping from object types ('content', 'directory', - 'revision', 'release') with a dictionary mapping each object's id - to the object - """ - def _revision_from(tree_hash, revision): - full_rev = dict(revision) - full_rev['directory'] = tree_hash - full_rev = converters.commit_to_revision(full_rev) - full_rev['id'] = identifier_to_bytes(revision_identifier(full_rev)) - return full_rev - - def _release_from(revision_hash, release): - full_rel = dict(release) - full_rel['target'] = revision_hash - full_rel['target_type'] = 'revision' - full_rel = converters.annotated_tag_to_release(full_rel) - full_rel['id'] = identifier_to_bytes(release_identifier(full_rel)) - return full_rel + 'revision', 'release', 'snapshot') with a dictionary + mapping each object's id to the object + """ log_id = str(uuid.uuid4()) sdir_path = dir_path.decode('utf-8') log_data = { 'swh_type': 'dir_list_objs_end', 'swh_repo': sdir_path, 'swh_id': log_id, } self.log.debug("Started listing {swh_repo}".format(**log_data), extra=log_data) directory = Directory.from_disk(path=dir_path, save_path=True) - objects = directory.collect() - tree_hash = directory.hash - full_rev = _revision_from(tree_hash, revision) + full_rev = revision_from(directory.hash, revision) + rev_id = full_rev['id'] + objects['revision'] = { + rev_id: full_rev + } - objects['revision'] = {full_rev['id']: full_rev} objects['release'] = {} - if release and 'name' in release: - full_rel = _release_from(full_rev['id'], release) - objects['release'][full_rel['id']] = release + full_rel = release_from(rev_id, release) + objects['release'][full_rel['id']] = full_rel + + snapshot = snapshot_from( + origin['id'], visit['id'], rev_id, branch_name) + + objects['snapshot'] = { + snapshot['id']: snapshot + } log_data.update({ 'swh_num_%s' % key: len(values) for key, values in objects.items() }) self.log.debug(("Done listing the objects in {swh_repo}: " "{swh_num_content} contents, " "{swh_num_directory} directories, " "{swh_num_revision} revisions, " - "{swh_num_release} releases").format(**log_data), + "{swh_num_release} releases, " + "{swh_num_snapshot} snapshot").format(**log_data), extra=log_data) return objects def load(self, *, dir_path, origin, visit_date, revision, release, branch_name=None): """Load the content of the directory to the archive. Args: dir_path: root of the directory to import origin (dict): an origin dictionary as returned by :func:`swh.storage.storage.Storage.origin_get_one` visit_date (str): the date the origin was visited (as an isoformatted string) revision (dict): a revision as passed to :func:`swh.storage.storage.Storage.revision_add`, excluding the `id` and `directory` keys (computed from the directory) release (dict): a release as passed to :func:`swh.storage.storage.Storage.release_add`, excluding the `id`, `target` and `target_type` keys (computed from the revision)' branch_name (str): the optional branch_name to use for snapshot """ # Yes, this is entirely redundant, but it allows us to document the # arguments and the entry point. return super().load(dir_path=dir_path, origin=origin, visit_date=visit_date, revision=revision, release=release, branch_name=branch_name) def prepare(self, *, dir_path, origin, visit_date, revision, release, branch_name=None): """Prepare the loader for directory loading. Args: identical to :func:`load`. """ self.dir_path = dir_path self.origin = origin self.visit_date = visit_date self.revision = revision self.release = release branch = branch_name if branch_name else os.path.basename(dir_path) self.branch_name = branch if not os.path.exists(self.dir_path): - warn_msg = 'Skipping inexistant directory %s' % self.dir_path + warn_msg = 'Skipping inexistent directory %s' % self.dir_path self.log.error(warn_msg, extra={ 'swh_type': 'dir_repo_list_refs', 'swh_repo': self.dir_path, 'swh_num_refs': 0, }) raise ValueError(warn_msg) if isinstance(self.dir_path, str): self.dir_path = os.fsencode(self.dir_path) def get_origin(self): return self.origin # set in prepare method def cleanup(self): """Nothing to clean up. """ pass def fetch_data(self): - def _snapshot_from(origin_id, visit, revision_hash, branch): - if isinstance(branch, str): - branch = branch.encode('utf-8') - - snapshot = { - 'id': None, - 'branches': { - branch: { - 'target': revision_hash, - 'target_type': 'revision', - 'origin': origin_id, - 'visit': visit, - } - } - } - snap_id = identifier_to_bytes(snapshot_identifier(snapshot)) - snapshot['id'] = snap_id - return snapshot - - # to load the repository, walk all objects, compute their hashes - self.objects = self.list_repo_objs( - dir_path=self.dir_path, revision=self.revision, - release=self.release) + """Walk the directory, load all objects with their hashes. - [rev_id] = self.objects['revision'].keys() + Sets self.objects reference with results. - snapshot = _snapshot_from( - self.origin_id, self.visit, rev_id, self.branch_name) - - # Update objects with release and occurrences - self.objects['snapshot'] = snapshot + """ + visit = {'id': self.visit} + self.origin['id'] = self.origin_id + self.objects = self.list_objs(dir_path=self.dir_path, + origin=self.origin, + visit=visit, + revision=self.revision, + release=self.release, + branch_name=self.branch_name) def store_data(self): objects = self.objects self.maybe_load_contents(objects['content'].values()) self.maybe_load_directories(objects['directory'].values()) self.maybe_load_revisions(objects['revision'].values()) self.maybe_load_releases(objects['release'].values()) - self.maybe_load_snapshot(objects['snapshot']) + snapshot = list(objects['snapshot'].values())[0] + self.maybe_load_snapshot(snapshot) @click.command() @click.option('--dir-path', required=1, help='Directory path to load') @click.option('--origin-url', required=1, help='Origin url for that directory') @click.option('--visit-date', default=None, help='Visit date time override') def main(dir_path, origin_url, visit_date): """Debugging purpose.""" d = DirLoader() origin = { 'url': origin_url, 'type': 'dir' } import datetime commit_time = int(datetime.datetime.now( tz=datetime.timezone.utc).timestamp() ) swh_person = { 'name': 'Software Heritage', 'fullname': 'Software Heritage', 'email': 'robot@softwareheritage.org' } revision_message = 'swh-loader-dir: synthetic revision message' revision_type = 'tar' revision = { 'date': { 'timestamp': commit_time, 'offset': 0, }, 'committer_date': { 'timestamp': commit_time, 'offset': 0, }, 'author': swh_person, 'committer': swh_person, 'type': revision_type, 'message': revision_message, 'metadata': {}, 'synthetic': True, } release = None d.load(dir_path=dir_path, origin=origin, visit_date=visit_date, revision=revision, release=release) if __name__ == '__main__': main() diff --git a/swh/loader/dir/tests/test_loader.py b/swh/loader/dir/tests/test_loader.py index 1b7c7dc..7dbd1d4 100644 --- a/swh/loader/dir/tests/test_loader.py +++ b/swh/loader/dir/tests/test_loader.py @@ -1,307 +1,317 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import shutil import subprocess import tempfile import unittest from nose.tools import istest from nose.plugins.attrib import attr from swh.loader.dir.loader import DirLoader @attr('fs') class InitTestLoader(unittest.TestCase): @classmethod def setUpClass(cls): super().setUpClass() cls.tmp_root_path = tempfile.mkdtemp().encode('utf-8') start_path = os.path.dirname(__file__).encode('utf-8') sample_folder_archive = os.path.join(start_path, b'../../../../..', b'swh-storage-testdata', b'dir-folders', b'sample-folder.tgz') cls.root_path = os.path.join(cls.tmp_root_path) # uncompress the sample folder subprocess.check_output( ['tar', 'xvf', sample_folder_archive, '-C', cls.tmp_root_path], ) @classmethod def tearDownClass(cls): super().tearDownClass() shutil.rmtree(cls.tmp_root_path) class DirLoaderListRepoObject(InitTestLoader): def setUp(self): super().setUp() self.info = { 'storage': { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/', } }, 'content_size_limit': 104857600, 'log_db': 'dbname=softwareheritage-log', 'directory_packet_size': 25000, 'content_packet_size': 10000, 'send_contents': True, 'send_directories': True, 'content_packet_size_bytes': 1073741824, 'send_revisions': True, 'revision_packet_size': 100000, 'content_packet_block_size_bytes': 104857600, 'send_snaphot': True, 'release_packet_size': 100000, 'send_releases': True } self.origin = { 'url': 'file:///dev/null', 'type': 'dir', + 'id': 10, } + self.visit = {'id': 1} + self.revision = { 'author': { 'name': 'swh author', 'email': 'swh@inria.fr', 'fullname': 'swh' }, 'date': { 'timestamp': 1444054085, 'offset': 0 }, 'committer': { 'name': 'swh committer', 'email': 'swh@inria.fr', 'fullname': 'swh' }, 'committer_date': { 'timestamp': 1444054085, 'offset': 0, }, 'type': 'tar', 'message': 'synthetic revision', 'metadata': {'foo': 'bar'}, } self.release = { 'name': 'v0.0.1', 'date': { 'timestamp': 1444054085, 'offset': 0, }, 'author': { 'name': 'swh author', 'fullname': 'swh', 'email': 'swh@inria.fr', }, 'message': 'synthetic release', } self.dirloader = DirLoader(config=self.info) @istest def load_without_storage(self): + """List directory objects without loading should be ok""" # when - objects = self.dirloader.list_repo_objs( + objects = self.dirloader.list_objs( dir_path=self.root_path, + origin=self.origin, + visit=self.visit, revision=self.revision, - release=self.release) + release=self.release, + branch_name=b'master') # then - self.assertEquals(len(objects), 4, - "4 objects types, blob, tree, revision, release") + self.assertEquals(len(objects), 5, + "5 obj types: con, dir, rev, rel, snap") self.assertEquals(len(objects['content']), 8, "8 contents: 3 files + 5 links") self.assertEquals(len(objects['directory']), 6, "6 directories: 5 subdirs + 1 empty") self.assertEquals(len(objects['revision']), 1, "synthetic revision") self.assertEquals(len(objects['release']), 1, "synthetic release") + self.assertEquals(len(objects['snapshot']), 1, "snapshot") class LoaderNoStorageForTest: """Mixin class to inhibit the persistence and keep in memory the data sent for storage. cf. SWHDirLoaderNoStorage """ def __init__(self): super().__init__() # Init the state self.all_contents = [] self.all_directories = [] self.all_revisions = [] self.all_releases = [] self.all_snapshots = [] def send_origin(self, origin): + origin['id'] = 1 self.origin = origin + return self.origin def send_origin_visit(self, origin_id, ts): - self.origin_visit = { + origin_visit = { 'origin': origin_id, 'ts': ts, 'visit': 1, } - return self.origin_visit + return origin_visit def update_origin_visit(self, origin_id, visit, status): self.status = status self.origin_visit = visit def maybe_load_contents(self, all_contents): self.all_contents.extend(all_contents) def maybe_load_directories(self, all_directories): self.all_directories.extend(all_directories) def maybe_load_revisions(self, all_revisions): self.all_revisions.extend(all_revisions) def maybe_load_releases(self, releases): self.all_releases.extend(releases) def maybe_load_snapshot(self, snapshot): self.all_snapshots.append(snapshot) def open_fetch_history(self): return 1 def close_fetch_history_success(self, fetch_history_id): pass def close_fetch_history_failure(self, fetch_history_id): pass TEST_CONFIG = { 'extraction_dir': '/tmp/tests/loader-tar/', # where to extract the tarball 'storage': { # we instantiate it but we don't use it in test context 'cls': 'remote', 'args': { 'url': 'http://127.0.0.1:9999', # somewhere that does not exist } }, 'send_contents': False, 'send_directories': False, 'send_revisions': False, 'send_releases': False, 'send_snapshot': False, 'content_packet_size': 100, 'content_packet_block_size_bytes': 104857600, 'content_packet_size_bytes': 1073741824, 'directory_packet_size': 250, 'revision_packet_size': 100, 'release_packet_size': 100, } def parse_config_file(base_filename=None, config_filename=None, additional_configs=None, global_config=True): return TEST_CONFIG # Inhibit side-effect loading configuration from disk DirLoader.parse_config_file = parse_config_file class SWHDirLoaderNoStorage(LoaderNoStorageForTest, DirLoader): """A DirLoader with no persistence. Context: Load a tarball with a persistent-less tarball loader """ pass class SWHDirLoaderITTest(InitTestLoader): def setUp(self): super().setUp() self.loader = SWHDirLoaderNoStorage() @istest def load(self): """Process a new tarball should be ok """ # given origin = { 'url': 'file:///tmp/sample-folder', 'type': 'dir' } visit_date = 'Tue, 3 May 2016 17:16:32 +0200' import datetime commit_time = int(datetime.datetime.now( tz=datetime.timezone.utc).timestamp() ) swh_person = { 'name': 'Software Heritage', 'fullname': 'Software Heritage', 'email': 'robot@softwareheritage.org' } revision_message = 'swh-loader-dir: synthetic revision message' revision_type = 'tar' revision = { 'date': { 'timestamp': commit_time, 'offset': 0, }, 'committer_date': { 'timestamp': commit_time, 'offset': 0, }, 'author': swh_person, 'committer': swh_person, 'type': revision_type, 'message': revision_message, 'metadata': {}, 'synthetic': True, } branch = os.path.basename(self.root_path) # when self.loader.load( dir_path=self.root_path, origin=origin, visit_date=visit_date, revision=revision, release=None, branch_name=branch) # then self.assertEquals(len(self.loader.all_contents), 8) self.assertEquals(len(self.loader.all_directories), 6) self.assertEquals(len(self.loader.all_revisions), 1) actual_revision = self.loader.all_revisions[0] self.assertEquals(actual_revision['synthetic'], True) self.assertEquals(actual_revision['parents'], []) self.assertEquals(actual_revision['type'], 'tar') self.assertEquals(actual_revision['message'], b'swh-loader-dir: synthetic revision message') self.assertEquals(len(self.loader.all_releases), 0) self.assertEquals(len(self.loader.all_snapshots), 1)