Page Menu
Software Heritage
Configure Global Search
Log In
No One
View File
Edit File
Delete File
View Transforms
Mute Notifications
Award Token
Flag For Later
21 KB
View Options
diff --git a/ b/
--- a/
+++ b/
@@ -10,6 +10,7 @@
- [swh-loader-dir](
- [swh-loader-git](
- [swh-loader-mercurial](
+- [swh-loader-pypi](
- [swh-loader-svn](
- [swh-loader-tar](
diff --git a/swh/loader/core/tests/ b/swh/loader/core/tests/
--- a/swh/loader/core/tests/
+++ b/swh/loader/core/tests/
@@ -0,0 +1,271 @@
+# Copyright (C) 2018 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+import os
+import shutil
+import subprocess
+import tempfile
+from unittest import TestCase
+from nose.plugins.attrib import attr
+from swh.model import hashutil
+class BaseLoaderTest(TestCase):
+ """Mixin base loader test class.
+ This allows to uncompress archives (mercurial, svn, git,
+ ... repositories) into a temporary folder so that the loader under
+ test can work with this.
+ When setUp() is done, the following variables are defined:
+ - self.repo_url: can be used as an origin_url for example
+ - self.destination_path: can be used as a path to ingest the
+ <techno> repository.
+ Args:
+ archive_name (str): Name of the archive holding the repository
+ (folder, repository, dump, etc...)
+ start_path (str): (mandatory) Path from where starting to look
+ for resources
+ filename (Optional[str]): Name of the filename/folder once the
+ archive is uncompressed. When the filename is not
+ provided, the archive name is used as a derivative. This
+ is used both for the self.repo_url and
+ self.destination_path computation (this one only when
+ provided)
+ resources_path (str): Folder name to look for archive
+ prefix_tmp_folder_name (str): Prefix name to name the temporary folder
+ """
+ def setUp(self, archive_name, *, start_path, filename=None,
+ resources_path='resources', prefix_tmp_folder_name=''):
+ tmp_root_path = tempfile.mkdtemp(
+ prefix=prefix_tmp_folder_name, suffix='-tests')
+ repo_path = os.path.join(start_path, resources_path, archive_name)
+ # uncompress folder/repositories/dump for the loader to ingest
+ subprocess.check_output(['tar', 'xf', repo_path, '-C', tmp_root_path])
+ # build the origin url (or some derivative form)
+ _fname = filename if filename else os.path.basename(archive_name)
+ self.repo_url = 'file://' + tmp_root_path + '/' + _fname
+ # where is the data to ingest?
+ if filename:
+ # archive holds one folder with name <filename>
+ self.destination_path = os.path.join(tmp_root_path, filename)
+ else:
+ self.destination_path = tmp_root_path
+ self.tmp_root_path = tmp_root_path
+ def tearDown(self):
+ """Clean up temporary working directory
+ """
+ shutil.rmtree(self.tmp_root_path)
+ def state(self, _type):
+ return self.loader.state(_type)
+ def _assertCountOk(self, type, expected_length):
+ self.assertEquals(len(self.state(type)), expected_length)
+ def assertCountContents(self, len_expected_contents):
+ self._assertCountOk('content', len_expected_contents)
+ def assertCountDirectories(self, len_expected_directories):
+ self._assertCountOk('directory', len_expected_directories)
+ def assertCountReleases(self, len_expected_releases):
+ self._assertCountOk('release', len_expected_releases)
+ def assertCountRevisions(self, len_expected_revisions):
+ self._assertCountOk('revision', len_expected_revisions)
+ def assertCountSnapshots(self, len_expected_snapshot):
+ self._assertCountOk('snapshot', len_expected_snapshot)
+ def assertContentsOk(self, expected_contents):
+ self._assertCountOk('content', len(expected_contents))
+ for content in self.state('content'):
+ content_id = hashutil.hash_to_hex(content['sha1'])
+ self.assertIn(content_id, expected_contents)
+ def assertDirectoriesOk(self, expected_directories):
+ self._assertCountOk('directory', len(expected_directories))
+ for _dir in self.state('directory'):
+ _dir_id = hashutil.hash_to_hex(_dir['id'])
+ self.assertIn(_dir_id, expected_directories)
+ def assertReleasesOk(self, expected_releases):
+ """Check the loader's releases match the expected releases.
+ Args:
+ releases ([dict]): List of dictionaries representing swh releases.
+ """
+ self._assertCountOk('release', len(expected_releases))
+ for i, rel in enumerate(self.state('release')):
+ rel_id = hashutil.hash_to_hex(rel['id'])
+ self.assertEquals(expected_releases[i], rel_id)
+ def assertRevisionsOk(self, expected_revisions):
+ """Check the loader's revisions match the expected revisions.
+ Expects self.loader to be instantiated and ready to be
+ inspected (meaning the loading took place).
+ Args:
+ expected_revisions (dict): Dict with key revision id,
+ value the targeted directory id.
+ """
+ self._assertCountOk('revision', len(expected_revisions))
+ for rev in self.state('revision'):
+ rev_id = hashutil.hash_to_hex(rev['id'])
+ directory_id = hashutil.hash_to_hex(rev['directory'])
+ self.assertEquals(expected_revisions[rev_id], directory_id)
+ def assertSnapshotOk(self, expected_snapshot, expected_branches=[]):
+ """Check for snapshot match.
+ Provide the hashes as hexadecimal, the conversion is done
+ within the method.
+ Args:
+ expected_snapshot (str/dict): Either the snapshot
+ identifier or the full
+ snapshot
+ expected_branches (dict): expected branches or nothing is
+ the full snapshot is provided
+ """
+ if isinstance(expected_snapshot, dict) and not expected_branches:
+ expected_snapshot_id = expected_snapshot['id']
+ expected_branches = expected_snapshot['branches']
+ else:
+ expected_snapshot_id = expected_snapshot
+ snapshots = self.state('snapshot')
+ self.assertEqual(len(snapshots), 1)
+ snap = snapshots[0]
+ snap_id = hashutil.hash_to_hex(snap['id'])
+ self.assertEqual(snap_id, expected_snapshot_id)
+ def decode_target(target):
+ if not target:
+ return target
+ target_type = target['target_type']
+ if target_type == 'alias':
+ decoded_target = target['target'].decode('utf-8')
+ else:
+ decoded_target = hashutil.hash_to_hex(target['target'])
+ return {
+ 'target': decoded_target,
+ 'target_type': target_type
+ }
+ branches = {
+ branch.decode('utf-8'): decode_target(target)
+ for branch, target in snap['branches'].items()
+ }
+ self.assertEqual(expected_branches, branches)
+class LoaderNoStorage:
+ """Mixin class to inhibit the persistence and keep in memory the data
+ sent for storage (for testing purposes).
+ This overrides the core loader's behavior to store in a dict the
+ swh objects.
+ cf. HgLoaderNoStorage, SvnLoaderNoStorage, etc...
+ """
+ def __init__(self, *args, **kwargs):
+ super().__init__()
+ self.__state = {
+ 'content': [],
+ 'directory': [],
+ 'revision': [],
+ 'release': [],
+ 'snapshot': [],
+ }
+ def state(self, type):
+ return self.__state[type]
+ def _add(self, type, l):
+ """Add without duplicates and keeping the insertion order.
+ Args:
+ type (str): Type of objects concerned by the action
+ l ([object]): List of 'type' object
+ """
+ col = self.state(type)
+ for o in l:
+ if o in col:
+ continue
+ col.append(o)
+ def maybe_load_contents(self, all_contents):
+ self._add('content', all_contents)
+ def maybe_load_directories(self, all_directories):
+ self._add('directory', all_directories)
+ def maybe_load_revisions(self, all_revisions):
+ self._add('revision', all_revisions)
+ def maybe_load_releases(self, all_releases):
+ self._add('release', all_releases)
+ def maybe_load_snapshot(self, snapshot):
+ self._add('snapshot', [snapshot])
+ def send_batch_contents(self, all_contents):
+ self._add('content', all_contents)
+ def send_batch_directories(self, all_directories):
+ self._add('directory', all_directories)
+ def send_batch_revisions(self, all_revisions):
+ self._add('revision', all_revisions)
+ def send_batch_releases(self, all_releases):
+ self._add('release', all_releases)
+ def send_snapshot(self, snapshot):
+ self._add('snapshot', [snapshot])
+ def _store_origin_visit(self):
+ pass
+ def open_fetch_history(self):
+ pass
+ def close_fetch_history_success(self, fetch_history_id):
+ pass
+ def close_fetch_history_failure(self, fetch_history_id):
+ pass
+ def update_origin_visit(self, origin_id, visit, status):
+ pass
+ def close_failure(self):
+ pass
+ def close_success(self):
+ pass
+ def pre_cleanup(self):
+ pass
diff --git a/swh/loader/core/tests/ b/swh/loader/core/tests/
new file mode 100644
--- /dev/null
+++ b/swh/loader/core/tests/
@@ -0,0 +1,343 @@
+# Copyright (C) 2018 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+from import istest
+from . import BaseLoaderTest, LoaderNoStorage
+from swh.model.hashutil import hash_to_bytes
+class DummyBaseLoaderTest(BaseLoaderTest):
+ def setUp(self):
+ # do not call voluntarily super().setUp()
+ self.in_contents = [1, 2, 3]
+ self.in_directories = [4, 5, 6]
+ self.in_revisions = [7, 8, 9]
+ self.in_releases = [10, 11, 12]
+ self.in_snapshot = 13
+ def tearDown(self):
+ # do not call voluntarily super().tearDown()
+ pass
+class LoadTest1(DummyBaseLoaderTest):
+ def setUp(self):
+ super().setUp()
+ self.loader = LoaderNoStorage()
+ @istest
+ def stateful_loader(self):
+ """Stateful loader accumulates in place the sent data
+ Note: Those behaviors should be somehow merged but that's
+ another story.
+ """
+ self.loader.maybe_load_directories(self.in_directories)
+ self.loader.maybe_load_revisions(self.in_revisions)
+ self.loader.maybe_load_releases(self.in_releases)
+ self.assertEquals(len(self.state('content')), 0)
+ self.assertEquals(
+ len(self.state('directory')), len(self.in_directories))
+ self.assertEquals(
+ len(self.state('revision')), len(self.in_revisions))
+ self.assertEquals(
+ len(self.state('release')), len(self.in_releases))
+ self.assertEquals(len(self.state('snapshot')), 0)
+ @istest
+ def stateless_loader(self):
+ """Stateless loader accumulates in place the sent data as well
+ Note: Those behaviors should be somehow merged but that's
+ another story.
+ """
+ self.loader.send_batch_contents(self.in_contents)
+ self.loader.send_snapshot(self.in_snapshot)
+ self.assertEquals(len(self.state('content')), len(self.in_contents))
+ self.assertEquals(len(self.state('directory')), 0)
+ self.assertEquals(len(self.state('revision')), 0)
+ self.assertEquals(len(self.state('release')), 0)
+ self.assertEquals(len(self.state('snapshot')), 1)
+class LoadTestContent(DummyBaseLoaderTest):
+ def setUp(self):
+ super().setUp()
+ self.loader = LoaderNoStorage()
+ self.content_id0 = '34973274ccef6ab4dfaaf86599792fa9c3fe4689'
+ self.content_id1 = '61c2b3a30496d329e21af70dd2d7e097046d07b7'
+ # trimmed data to the bare necessities
+ self.in_contents = [{
+ 'sha1': hash_to_bytes(self.content_id0),
+ }, {
+ 'sha1': hash_to_bytes(self.content_id1),
+ }]
+ self.expected_contents = [self.content_id0, self.content_id1]
+ @istest
+ def maybe_load_contents(self):
+ """Loading contents should be ok
+ """
+ self.loader.maybe_load_contents(self.in_contents)
+ self.assertCountContents(len(self.expected_contents))
+ self.assertContentsOk(self.expected_contents)
+ @istest
+ def send_batch_contents(self):
+ """Sending contents should be ok 2
+ """
+ self.loader.send_batch_contents(self.in_contents)
+ self.assertCountContents(len(self.expected_contents))
+ self.assertContentsOk(self.expected_contents)
+ @istest
+ def failing(self):
+ """Comparing wrong snapshot should fail.
+ """
+ self.loader.send_batch_contents(self.in_contents)
+ with self.assertRaises(AssertionError):
+ self.assertContentsOk([])
+class LoadTestDirectory(DummyBaseLoaderTest):
+ def setUp(self):
+ super().setUp()
+ self.loader = LoaderNoStorage()
+ self.directory_id0 = '44e45d56f88993aae6a0198013efa80716fd8921'
+ self.directory_id1 = '54e45d56f88993aae6a0198013efa80716fd8920'
+ self.directory_id2 = '43e45d56f88993aae6a0198013efa80716fd8920'
+ # trimmed data to the bare necessities
+ self.in_directories = [{
+ 'id': hash_to_bytes(self.directory_id0),
+ }, {
+ 'id': hash_to_bytes(self.directory_id1),
+ }, {
+ 'id': hash_to_bytes(self.directory_id2),
+ }]
+ self.expected_directories = [
+ self.directory_id0, self.directory_id1, self.directory_id2]
+ @istest
+ def maybe_load_directories(self):
+ """Loading directories should be ok
+ """
+ self.loader.maybe_load_directories(self.in_directories)
+ self.assertCountDirectories(len(self.expected_directories))
+ self.assertDirectoriesOk(self.expected_directories)
+ @istest
+ def send_batch_directories(self):
+ """Sending directories should be ok 2
+ """
+ self.loader.send_batch_directories(self.in_directories)
+ self.assertCountDirectories(len(self.expected_directories))
+ self.assertDirectoriesOk(self.expected_directories)
+ @istest
+ def failing(self):
+ """Comparing wrong snapshot should fail.
+ """
+ self.loader.send_batch_revisions(self.in_revisions)
+ with self.assertRaises(AssertionError):
+ self.assertRevisionsOk([])
+class LoadTestRelease(DummyBaseLoaderTest):
+ def setUp(self):
+ super().setUp()
+ self.loader = LoaderNoStorage()
+ self.release_id0 = '44e45d56f88993aae6a0198013efa80716fd8921'
+ self.release_id1 = '54e45d56f88993aae6a0198013efa80716fd8920'
+ self.release_id2 = '43e45d56f88993aae6a0198013efa80716fd8920'
+ # trimmed data to the bare necessities
+ self.in_releases = [{
+ 'id': hash_to_bytes(self.release_id0),
+ }, {
+ 'id': hash_to_bytes(self.release_id1),
+ }, {
+ 'id': hash_to_bytes(self.release_id2),
+ }]
+ self.expected_releases = [
+ self.release_id0, self.release_id1, self.release_id2]
+ @istest
+ def maybe_load_releases(self):
+ """Loading releases should be ok
+ """
+ self.loader.maybe_load_releases(self.in_releases)
+ self.assertCountReleases(len(self.expected_releases))
+ self.assertReleasesOk(self.expected_releases)
+ @istest
+ def send_batch_releases(self):
+ """Sending releases should be ok 2
+ """
+ self.loader.send_batch_releases(self.in_releases)
+ self.assertCountReleases(len(self.expected_releases))
+ self.assertReleasesOk(self.expected_releases)
+ @istest
+ def failing(self):
+ """Comparing wrong snapshot should fail.
+ """
+ self.loader.send_batch_releases(self.in_releases)
+ with self.assertRaises(AssertionError):
+ self.assertReleasesOk([])
+class LoadTestRevision(DummyBaseLoaderTest):
+ def setUp(self):
+ super().setUp()
+ self.loader = LoaderNoStorage()
+ rev_id0 = '44e45d56f88993aae6a0198013efa80716fd8921'
+ dir_id0 = '34973274ccef6ab4dfaaf86599792fa9c3fe4689'
+ rev_id1 = '54e45d56f88993aae6a0198013efa80716fd8920'
+ dir_id1 = '61c2b3a30496d329e21af70dd2d7e097046d07b7'
+ rev_id2 = '43e45d56f88993aae6a0198013efa80716fd8920'
+ dir_id2 = '33e45d56f88993aae6a0198013efa80716fd8921'
+ # data trimmed to bare necessities
+ self.in_revisions = [{
+ 'id': hash_to_bytes(rev_id0),
+ 'directory': hash_to_bytes(dir_id0),
+ }, {
+ 'id': hash_to_bytes(rev_id1),
+ 'directory': hash_to_bytes(dir_id1),
+ }, {
+ 'id': hash_to_bytes(rev_id2),
+ 'directory': hash_to_bytes(dir_id2),
+ }]
+ self.expected_revisions = {
+ rev_id0: dir_id0,
+ rev_id1: dir_id1,
+ rev_id2: dir_id2,
+ }
+ @istest
+ def maybe_load_revisions(self):
+ """Loading revisions should be ok
+ """
+ self.loader.maybe_load_revisions(self.in_revisions)
+ self.assertCountRevisions(len(self.expected_revisions))
+ self.assertRevisionsOk(self.expected_revisions)
+ @istest
+ def send_batch_revisions(self):
+ """Sending revisions should be ok 2
+ """
+ self.loader.send_batch_revisions(self.in_revisions)
+ self.assertCountRevisions(len(self.expected_revisions))
+ self.assertRevisionsOk(self.expected_revisions)
+ @istest
+ def failing(self):
+ """Comparing wrong snapshot should fail.
+ """
+ self.loader.send_batch_revisions(self.in_revisions)
+ with self.assertRaises(AssertionError):
+ self.assertRevisionsOk([])
+class LoadTestSnapshot(DummyBaseLoaderTest):
+ def setUp(self):
+ super().setUp()
+ self.loader = LoaderNoStorage()
+ snapshot_id = '44e45d56f88993aae6a0198013efa80716fd8921'
+ revision_id = '54e45d56f88993aae6a0198013efa80716fd8920'
+ release_id = '43e45d56f88993aae6a0198013efa80716fd8920'
+ # trimmed data to the bare necessities
+ self.expected_snapshot = {
+ 'id': snapshot_id,
+ 'branches': {
+ 'default': {
+ 'target_type': 'revision',
+ 'target': revision_id,
+ },
+ 'master': {
+ 'target_type': 'release',
+ 'target': release_id,
+ },
+ 'HEAD': {
+ 'target_type': 'alias',
+ 'target': 'master',
+ }
+ }
+ }
+ self.in_snapshot = {
+ 'id': hash_to_bytes(snapshot_id),
+ 'branches': {
+ b'default': {
+ 'target_type': 'revision',
+ 'target': hash_to_bytes(revision_id),
+ },
+ b'master': {
+ 'target_type': 'release',
+ 'target': hash_to_bytes(release_id),
+ },
+ b'HEAD': {
+ 'target_type': 'alias',
+ 'target': b'master',
+ }
+ }
+ }
+ @istest
+ def maybe_load_snapshots(self):
+ """Loading snapshot should be ok
+ """
+ self.loader.maybe_load_snapshot(self.in_snapshot)
+ self.assertCountSnapshots(1)
+ self.assertSnapshotOk(self.expected_snapshot)
+ self.assertSnapshotOk(
+ self.expected_snapshot['id'],
+ expected_branches=self.expected_snapshot['branches'])
+ @istest
+ def send_batch_snapshots(self):
+ """Sending snapshot should be ok 2
+ """
+ self.loader.send_snapshot(self.in_snapshot)
+ self.assertCountSnapshots(1)
+ self.assertSnapshotOk(self.expected_snapshot)
+ self.assertSnapshotOk(
+ self.expected_snapshot['id'],
+ expected_branches=self.expected_snapshot['branches'])
+ @istest
+ def failing(self):
+ """Comparing wrong snapshot should fail.
+ """
+ self.loader.send_snapshot(self.in_snapshot)
+ with self.assertRaises(AssertionError):
+ self.assertSnapshotOk(
+ 'wrong', expected_branches=self.expected_snapshot['branches'])
File Metadata
Mime Type
Dec 21 2024, 11:20 AM (11 w, 4 d ago)
Storage Engine
Storage Format
Raw Data
Storage Handle
Attached To
D467: loader.core.tests: Add loader core fixtures to ease loaders' tests
Event Timeline
Log In to Comment