diff --git a/swh/loader/core/tests/__init__.py b/swh/loader/core/tests/__init__.py index 782618c..07d008a 100644 --- a/swh/loader/core/tests/__init__.py +++ b/swh/loader/core/tests/__init__.py @@ -1,227 +1,227 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import shutil import subprocess import tempfile from unittest import TestCase from swh.model import hashutil from swh.model.hashutil import hash_to_bytes class BaseLoaderStorageTest: def _assertCountEqual(self, type, expected_length, msg=None): """Check typed 'type' state to have the same expected length. """ self.storage.refresh_stat_counters() self.assertEqual(self.storage.stat_counters()[type], expected_length, msg=msg) def assertCountContents(self, len_expected_contents, msg=None): self._assertCountEqual('content', len_expected_contents, msg=msg) def assertCountDirectories(self, len_expected_directories, msg=None): self._assertCountEqual('directory', len_expected_directories, msg=msg) def assertCountReleases(self, len_expected_releases, msg=None): self._assertCountEqual('release', len_expected_releases, msg=msg) def assertCountRevisions(self, len_expected_revisions, msg=None): self._assertCountEqual('revision', len_expected_revisions, msg=msg) def assertCountSnapshots(self, len_expected_snapshot, msg=None): self._assertCountEqual('snapshot', len_expected_snapshot, msg=msg) def assertContentsContain(self, expected_contents): """Check the provided content are a subset of the stored ones. Args: expected_contents ([sha1]): List of content ids""" self._assertCountEqual('content', len(expected_contents)) missing = list(self.storage.content_missing( {'sha1': hash_to_bytes(content_hash)} for content_hash in expected_contents)) self.assertEqual(missing, []) def assertDirectoriesContain(self, expected_directories): """Check the provided directories are a subset of the stored ones. Args: expected_directories ([sha1]): List of directory ids.""" self._assertCountEqual('directory', len(expected_directories)) missing = list(self.storage.directory_missing( hash_to_bytes(dir_) for dir_ in expected_directories)) self.assertEqual(missing, []) def assertReleasesContain(self, expected_releases): """Check the provided releases are a subset of the stored ones. Args: releases (list): list of swh releases' identifiers. """ self._assertCountEqual('release', len(expected_releases)) missing = list(self.storage.release_missing( hash_to_bytes(rel) for rel in expected_releases)) self.assertEqual(missing, []) def assertRevisionsContain(self, expected_revisions): """Check the provided revisions are a subset of the stored ones. Expects self.loader to be instantiated and ready to be inspected (meaning the loading took place). Args: expected_revisions (dict): Dict with key revision id, value the targeted directory id. """ self._assertCountEqual('revision', len(expected_revisions)) revs = list(self.storage.revision_get( hashutil.hash_to_bytes(rev_id) for rev_id in expected_revisions)) self.assertNotIn(None, revs) self.assertEqual( {rev['id']: rev['directory'] for rev in revs}, {hash_to_bytes(rev_id): hash_to_bytes(rev_dir) for (rev_id, rev_dir) in expected_revisions.items()}) def assertSnapshotEqual(self, expected_snapshot, expected_branches=[]): """Check for snapshot match. Provide the hashes as hexadecimal, the conversion is done within the method. Args: expected_snapshot (str/dict): Either the snapshot identifier or the full snapshot expected_branches (dict): expected branches or nothing is the full snapshot is provided """ if isinstance(expected_snapshot, dict) and not expected_branches: expected_snapshot_id = expected_snapshot['id'] expected_branches = expected_snapshot['branches'] else: expected_snapshot_id = expected_snapshot self._assertCountEqual('snapshot', 1) snap = self.storage.snapshot_get(hash_to_bytes(expected_snapshot_id)) self.assertIsNotNone(snap) def decode_target(target): if not target: return target target_type = target['target_type'] if target_type == 'alias': decoded_target = target['target'].decode('utf-8') else: decoded_target = hashutil.hash_to_hex(target['target']) return { 'target': decoded_target, 'target_type': target_type } branches = { branch.decode('utf-8'): decode_target(target) for branch, target in snap['branches'].items() } self.assertEqual(expected_branches, branches) def assertOriginMetadataContains(self, origin_type, origin_url, expected_origin_metadata): """Check the storage contains this metadata for the given origin. Args: origin_type (str): type of origin ('deposit', 'git', 'svn', ...) origin_url (str): URL of the origin expected_origin_metadata (dict): Extrinsic metadata of the origin """ origin = self.storage.origin_get( dict(type=origin_type, url=origin_url)) - results = self.storage.origin_metadata_get_by(origin) + results = self.storage.origin_metadata_get_by(origin['id']) self.assertEqual(len(results), 1, results) result = results[0] self.assertEqual(result['metadata'], expected_origin_metadata) @pytest.mark.fs class BaseLoaderTest(TestCase, BaseLoaderStorageTest): """Mixin base loader test class. This allows to uncompress archives (mercurial, svn, git, ... repositories) into a temporary folder so that the loader under test can work with this. When setUp() is done, the following variables are defined: - self.repo_url: can be used as an origin_url for example - self.destination_path: can be used as a path to ingest the repository. Args: archive_name (str): Name of the archive holding the repository (folder, repository, dump, etc...) start_path (str): (mandatory) Path from where starting to look for resources filename (Optional[str]): Name of the filename/folder once the archive is uncompressed. When the filename is not provided, the archive name is used as a derivative. This is used both for the self.repo_url and self.destination_path computation (this one only when provided) resources_path (str): Folder name to look for archive prefix_tmp_folder_name (str): Prefix name to name the temporary folder uncompress_archive (bool): Uncompress the archive passed as parameters (default to True). It so happens we could avoid doing anything to the tarball. """ def setUp(self, archive_name, *, start_path, filename=None, resources_path='resources', prefix_tmp_folder_name='', uncompress_archive=True): super().setUp() repo_path = os.path.join(start_path, resources_path, archive_name) if not uncompress_archive: # In that case, simply sets the archive's path self.destination_path = repo_path self.tmp_root_path = None self.repo_url = 'file://' + repo_path return tmp_root_path = tempfile.mkdtemp( prefix=prefix_tmp_folder_name, suffix='-tests') # uncompress folder/repositories/dump for the loader to ingest subprocess.check_output(['tar', 'xf', repo_path, '-C', tmp_root_path]) # build the origin url (or some derivative form) _fname = filename if filename else os.path.basename(archive_name) self.repo_url = 'file://' + tmp_root_path + '/' + _fname # where is the data to ingest? if filename: # archive holds one folder with name self.destination_path = os.path.join(tmp_root_path, filename) else: self.destination_path = tmp_root_path self.tmp_root_path = tmp_root_path def tearDown(self): """Clean up temporary working directory """ if self.tmp_root_path and os.path.exists(self.tmp_root_path): shutil.rmtree(self.tmp_root_path) diff --git a/swh/loader/core/tests/test_loader.py b/swh/loader/core/tests/test_loader.py index 4d38641..121e8c3 100644 --- a/swh/loader/core/tests/test_loader.py +++ b/swh/loader/core/tests/test_loader.py @@ -1,258 +1,291 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from swh.model.hashutil import hash_to_bytes from swh.loader.core.loader import BufferedLoader, UnbufferedLoader from . import BaseLoaderTest class DummyLoader: def cleanup(self): pass def prepare(self): pass def fetch_data(self): pass def store_data(self): pass def prepare_origin_visit(self): origin = self.storage.origin_get( self._test_prepare_origin_visit_data['origin']) self.origin = origin self.origin_id = origin['id'] self.origin_url = origin['url'] self.visit_date = datetime.datetime.utcnow() self.storage.origin_visit_add(origin['id'], self.visit_date) def parse_config_file(self, *args, **kwargs): return { 'storage': { 'cls': 'memory', 'args': { } }, 'send_contents': True, 'send_directories': True, 'send_revisions': True, 'send_releases': True, 'send_snapshot': True, 'content_packet_size': 2, 'content_packet_size_bytes': 8, 'directory_packet_size': 2, 'revision_packet_size': 2, 'release_packet_size': 2, 'content_size_limit': 10000, } class DummyUnbufferedLoader(DummyLoader, UnbufferedLoader): pass class DummyBufferedLoader(DummyLoader, BufferedLoader): pass class DummyBaseLoaderTest(BaseLoaderTest): def setUp(self): self.loader = self.loader_class(logging_class='dummyloader') # do not call voluntarily super().setUp() self.storage = self.loader.storage contents = [ { 'id': '34973274ccef6ab4dfaaf86599792fa9c3fe4689', 'sha1': '34973274ccef6ab4dfaaf86599792fa9c3fe4689', 'sha1_git': b'bar1', 'sha256': b'baz1', 'blake2s256': b'qux1', 'status': 'visible', 'data': b'data1', 'length': 5, }, { 'id': '61c2b3a30496d329e21af70dd2d7e097046d07b7', 'sha1': '61c2b3a30496d329e21af70dd2d7e097046d07b7', 'sha1_git': b'bar2', 'sha256': b'baz2', 'blake2s256': b'qux2', 'status': 'visible', 'data': b'data2', 'length': 5, }, ] self.expected_contents = [content['id'] for content in contents] self.in_contents = contents.copy() for content in self.in_contents: content['sha1'] = hash_to_bytes(content['sha1']) self.in_directories = [ {'id': hash_to_bytes(id_)} for id_ in [ '44e45d56f88993aae6a0198013efa80716fd8921', '54e45d56f88993aae6a0198013efa80716fd8920', '43e45d56f88993aae6a0198013efa80716fd8920', ] ] self.in_revisions = [ { 'id': b'rev1', 'date': None, }, { 'id': b'rev2', 'date': None, }, ] self.in_releases = [ { 'id': b'rel1', 'date': None, }, { 'id': b'rel2', 'date': None, }, ] - self.in_origins = [ - { - 'type': 'git', - 'url': 'http://example.com/', - }, - ] + self.in_origin = { + 'type': 'git', + 'url': 'http://example.com/', + } self.in_snapshot = { 'id': b'snap1', 'branches': {}, } + self.in_provider = { + 'provider_name': 'Test Provider', + 'provider_type': 'test_provider', + 'provider_url': 'http://example.org/metadata_provider', + 'metadata': {'working': True}, + } + self.in_tool = { + 'name': 'Test Tool', + 'version': 'v1.2.3', + 'configuration': {'in_the_Matrix': 'maybe'}, + } - self.storage.origin_add(self.in_origins) + self.storage.origin_add([self.in_origin]) # used by prepare_origin_visit() when it gets called self.loader._test_prepare_origin_visit_data = { - 'origin': self.in_origins[0], + 'origin': self.in_origin, } def tearDown(self): # do not call voluntarily super().tearDown() pass class CoreUnbufferedLoaderTest(DummyBaseLoaderTest): loader_class = DummyUnbufferedLoader def test_unbuffered_loader(self): self.loader.load() # initialize the loader self.loader.send_contents(self.in_contents[0:1]) self.loader.send_directories(self.in_directories[0:1]) self.loader.send_revisions(self.in_revisions[0:1]) self.loader.send_releases(self.in_releases[0:1]) self.assertCountContents(1) self.assertCountDirectories(1) self.assertCountRevisions(1) self.assertCountReleases(1) self.loader.send_contents(self.in_contents[1:]) self.loader.send_directories(self.in_directories[1:]) self.loader.send_revisions(self.in_revisions[1:]) self.loader.send_releases(self.in_releases[1:]) self.assertCountContents(len(self.in_contents)) self.assertCountDirectories(len(self.in_directories)) self.assertCountRevisions(len(self.in_revisions)) self.assertCountReleases(len(self.in_releases)) class CoreBufferedLoaderTest(DummyBaseLoaderTest): loader_class = DummyBufferedLoader def test_buffered_loader(self): self.loader.load() # initialize the loader self.loader.maybe_load_contents(self.in_contents[0:1]) self.loader.maybe_load_directories(self.in_directories[0:1]) self.loader.maybe_load_revisions(self.in_revisions[0:1]) self.loader.maybe_load_releases(self.in_releases[0:1]) self.assertCountContents(0) self.assertCountDirectories(0) self.assertCountRevisions(0) self.assertCountReleases(0) self.loader.maybe_load_contents(self.in_contents[1:]) self.loader.maybe_load_directories(self.in_directories[1:]) self.loader.maybe_load_revisions(self.in_revisions[1:]) self.loader.maybe_load_releases(self.in_releases[1:]) self.assertCountContents(len(self.in_contents)) self.assertCountDirectories(len(self.in_directories)) self.assertCountRevisions(len(self.in_revisions)) self.assertCountReleases(len(self.in_releases)) def test_directory_cascade(self): """Checks that sending a directory triggers sending contents""" self.loader.load() # initialize the loader self.loader.maybe_load_contents(self.in_contents[0:1]) self.loader.maybe_load_directories(self.in_directories) self.assertCountContents(1) self.assertCountDirectories(len(self.in_directories)) def test_revision_cascade(self): """Checks that sending a revision triggers sending contents and directories.""" self.loader.load() # initialize the loader self.loader.maybe_load_contents(self.in_contents[0:1]) self.loader.maybe_load_directories(self.in_directories[0:1]) self.loader.maybe_load_revisions(self.in_revisions) self.assertCountContents(1) self.assertCountDirectories(1) self.assertCountRevisions(len(self.in_revisions)) def test_release_cascade(self): """Checks that sending a release triggers sending revisions, contents, and directories.""" self.loader.load() # initialize the loader self.loader.maybe_load_contents(self.in_contents[0:1]) self.loader.maybe_load_directories(self.in_directories[0:1]) self.loader.maybe_load_revisions(self.in_revisions[0:1]) self.loader.maybe_load_releases(self.in_releases) self.assertCountContents(1) self.assertCountDirectories(1) self.assertCountRevisions(1) self.assertCountReleases(len(self.in_releases)) def test_snapshot_cascade(self): """Checks that sending a snapshot triggers sending releases, revisions, contents, and directories.""" self.loader.load() # initialize the loader self.loader.maybe_load_contents(self.in_contents[0:1]) self.loader.maybe_load_directories(self.in_directories[0:1]) self.loader.maybe_load_revisions(self.in_revisions[0:1]) self.loader.maybe_load_releases(self.in_releases[0:1]) self.loader.maybe_load_snapshot(self.in_snapshot) self.assertCountContents(1) self.assertCountDirectories(1) self.assertCountRevisions(1) self.assertCountReleases(1) self.assertCountSnapshots(1) + + def test_origin_metadata(self): + self.loader.load() + + provider_id = self.loader.send_provider(self.in_provider) + tool_id = self.loader.send_tool(self.in_tool) + + self.loader.send_origin_metadata( + self.loader.origin_id, self.loader.visit_date, provider_id, + tool_id, {'test_metadata': 'foobar'}) + + self.assertOriginMetadataContains( + self.in_origin['type'], self.in_origin['url'], + {'test_metadata': 'foobar'}) + + with self.assertRaises(AssertionError): + self.assertOriginMetadataContains( + self.in_origin['type'], self.in_origin['url'], + {'test_metadata': 'foobarbaz'}) + + with self.assertRaises(Exception): + self.assertOriginMetadataContains( + self.in_origin['type'], self.in_origin['url'] + 'blah', + {'test_metadata': 'foobar'})