Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9339681
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
19 KB
Subscribers
None
View Options
diff --git a/swh/loader/core/tests/__init__.py b/swh/loader/core/tests/__init__.py
index 782618c..07d008a 100644
--- a/swh/loader/core/tests/__init__.py
+++ b/swh/loader/core/tests/__init__.py
@@ -1,227 +1,227 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import pytest
import shutil
import subprocess
import tempfile
from unittest import TestCase
from swh.model import hashutil
from swh.model.hashutil import hash_to_bytes
class BaseLoaderStorageTest:
def _assertCountEqual(self, type, expected_length, msg=None):
"""Check typed 'type' state to have the same expected length.
"""
self.storage.refresh_stat_counters()
self.assertEqual(self.storage.stat_counters()[type],
expected_length, msg=msg)
def assertCountContents(self, len_expected_contents, msg=None):
self._assertCountEqual('content', len_expected_contents, msg=msg)
def assertCountDirectories(self, len_expected_directories, msg=None):
self._assertCountEqual('directory', len_expected_directories,
msg=msg)
def assertCountReleases(self, len_expected_releases, msg=None):
self._assertCountEqual('release', len_expected_releases, msg=msg)
def assertCountRevisions(self, len_expected_revisions, msg=None):
self._assertCountEqual('revision', len_expected_revisions, msg=msg)
def assertCountSnapshots(self, len_expected_snapshot, msg=None):
self._assertCountEqual('snapshot', len_expected_snapshot, msg=msg)
def assertContentsContain(self, expected_contents):
"""Check the provided content are a subset of the stored ones.
Args:
expected_contents ([sha1]): List of content ids"""
self._assertCountEqual('content', len(expected_contents))
missing = list(self.storage.content_missing(
{'sha1': hash_to_bytes(content_hash)}
for content_hash in expected_contents))
self.assertEqual(missing, [])
def assertDirectoriesContain(self, expected_directories):
"""Check the provided directories are a subset of the stored ones.
Args:
expected_directories ([sha1]): List of directory ids."""
self._assertCountEqual('directory', len(expected_directories))
missing = list(self.storage.directory_missing(
hash_to_bytes(dir_) for dir_ in expected_directories))
self.assertEqual(missing, [])
def assertReleasesContain(self, expected_releases):
"""Check the provided releases are a subset of the stored ones.
Args:
releases (list): list of swh releases' identifiers.
"""
self._assertCountEqual('release', len(expected_releases))
missing = list(self.storage.release_missing(
hash_to_bytes(rel) for rel in expected_releases))
self.assertEqual(missing, [])
def assertRevisionsContain(self, expected_revisions):
"""Check the provided revisions are a subset of the stored ones.
Expects self.loader to be instantiated and ready to be
inspected (meaning the loading took place).
Args:
expected_revisions (dict): Dict with key revision id,
value the targeted directory id.
"""
self._assertCountEqual('revision', len(expected_revisions))
revs = list(self.storage.revision_get(
hashutil.hash_to_bytes(rev_id) for rev_id in expected_revisions))
self.assertNotIn(None, revs)
self.assertEqual(
{rev['id']: rev['directory'] for rev in revs},
{hash_to_bytes(rev_id): hash_to_bytes(rev_dir)
for (rev_id, rev_dir) in expected_revisions.items()})
def assertSnapshotEqual(self, expected_snapshot, expected_branches=[]):
"""Check for snapshot match.
Provide the hashes as hexadecimal, the conversion is done
within the method.
Args:
expected_snapshot (str/dict): Either the snapshot
identifier or the full
snapshot
expected_branches (dict): expected branches or nothing is
the full snapshot is provided
"""
if isinstance(expected_snapshot, dict) and not expected_branches:
expected_snapshot_id = expected_snapshot['id']
expected_branches = expected_snapshot['branches']
else:
expected_snapshot_id = expected_snapshot
self._assertCountEqual('snapshot', 1)
snap = self.storage.snapshot_get(hash_to_bytes(expected_snapshot_id))
self.assertIsNotNone(snap)
def decode_target(target):
if not target:
return target
target_type = target['target_type']
if target_type == 'alias':
decoded_target = target['target'].decode('utf-8')
else:
decoded_target = hashutil.hash_to_hex(target['target'])
return {
'target': decoded_target,
'target_type': target_type
}
branches = {
branch.decode('utf-8'): decode_target(target)
for branch, target in snap['branches'].items()
}
self.assertEqual(expected_branches, branches)
def assertOriginMetadataContains(self, origin_type, origin_url,
expected_origin_metadata):
"""Check the storage contains this metadata for the given origin.
Args:
origin_type (str): type of origin ('deposit', 'git', 'svn', ...)
origin_url (str): URL of the origin
expected_origin_metadata (dict):
Extrinsic metadata of the origin
<https://forge.softwareheritage.org/T1344>
"""
origin = self.storage.origin_get(
dict(type=origin_type, url=origin_url))
- results = self.storage.origin_metadata_get_by(origin)
+ results = self.storage.origin_metadata_get_by(origin['id'])
self.assertEqual(len(results), 1, results)
result = results[0]
self.assertEqual(result['metadata'], expected_origin_metadata)
@pytest.mark.fs
class BaseLoaderTest(TestCase, BaseLoaderStorageTest):
"""Mixin base loader test class.
This allows to uncompress archives (mercurial, svn, git,
... repositories) into a temporary folder so that the loader under
test can work with this.
When setUp() is done, the following variables are defined:
- self.repo_url: can be used as an origin_url for example
- self.destination_path: can be used as a path to ingest the
<techno> repository.
Args:
archive_name (str): Name of the archive holding the repository
(folder, repository, dump, etc...)
start_path (str): (mandatory) Path from where starting to look
for resources
filename (Optional[str]): Name of the filename/folder once the
archive is uncompressed. When the filename is not
provided, the archive name is used as a derivative. This
is used both for the self.repo_url and
self.destination_path computation (this one only when
provided)
resources_path (str): Folder name to look for archive
prefix_tmp_folder_name (str): Prefix name to name the temporary folder
uncompress_archive (bool): Uncompress the archive passed as
parameters (default to True). It so
happens we could avoid doing
anything to the tarball.
"""
def setUp(self, archive_name, *, start_path, filename=None,
resources_path='resources', prefix_tmp_folder_name='',
uncompress_archive=True):
super().setUp()
repo_path = os.path.join(start_path, resources_path, archive_name)
if not uncompress_archive:
# In that case, simply sets the archive's path
self.destination_path = repo_path
self.tmp_root_path = None
self.repo_url = 'file://' + repo_path
return
tmp_root_path = tempfile.mkdtemp(
prefix=prefix_tmp_folder_name, suffix='-tests')
# uncompress folder/repositories/dump for the loader to ingest
subprocess.check_output(['tar', 'xf', repo_path, '-C', tmp_root_path])
# build the origin url (or some derivative form)
_fname = filename if filename else os.path.basename(archive_name)
self.repo_url = 'file://' + tmp_root_path + '/' + _fname
# where is the data to ingest?
if filename:
# archive holds one folder with name <filename>
self.destination_path = os.path.join(tmp_root_path, filename)
else:
self.destination_path = tmp_root_path
self.tmp_root_path = tmp_root_path
def tearDown(self):
"""Clean up temporary working directory
"""
if self.tmp_root_path and os.path.exists(self.tmp_root_path):
shutil.rmtree(self.tmp_root_path)
diff --git a/swh/loader/core/tests/test_loader.py b/swh/loader/core/tests/test_loader.py
index 4d38641..121e8c3 100644
--- a/swh/loader/core/tests/test_loader.py
+++ b/swh/loader/core/tests/test_loader.py
@@ -1,258 +1,291 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from swh.model.hashutil import hash_to_bytes
from swh.loader.core.loader import BufferedLoader, UnbufferedLoader
from . import BaseLoaderTest
class DummyLoader:
def cleanup(self):
pass
def prepare(self):
pass
def fetch_data(self):
pass
def store_data(self):
pass
def prepare_origin_visit(self):
origin = self.storage.origin_get(
self._test_prepare_origin_visit_data['origin'])
self.origin = origin
self.origin_id = origin['id']
self.origin_url = origin['url']
self.visit_date = datetime.datetime.utcnow()
self.storage.origin_visit_add(origin['id'], self.visit_date)
def parse_config_file(self, *args, **kwargs):
return {
'storage': {
'cls': 'memory',
'args': {
}
},
'send_contents': True,
'send_directories': True,
'send_revisions': True,
'send_releases': True,
'send_snapshot': True,
'content_packet_size': 2,
'content_packet_size_bytes': 8,
'directory_packet_size': 2,
'revision_packet_size': 2,
'release_packet_size': 2,
'content_size_limit': 10000,
}
class DummyUnbufferedLoader(DummyLoader, UnbufferedLoader):
pass
class DummyBufferedLoader(DummyLoader, BufferedLoader):
pass
class DummyBaseLoaderTest(BaseLoaderTest):
def setUp(self):
self.loader = self.loader_class(logging_class='dummyloader')
# do not call voluntarily super().setUp()
self.storage = self.loader.storage
contents = [
{
'id': '34973274ccef6ab4dfaaf86599792fa9c3fe4689',
'sha1': '34973274ccef6ab4dfaaf86599792fa9c3fe4689',
'sha1_git': b'bar1',
'sha256': b'baz1',
'blake2s256': b'qux1',
'status': 'visible',
'data': b'data1',
'length': 5,
},
{
'id': '61c2b3a30496d329e21af70dd2d7e097046d07b7',
'sha1': '61c2b3a30496d329e21af70dd2d7e097046d07b7',
'sha1_git': b'bar2',
'sha256': b'baz2',
'blake2s256': b'qux2',
'status': 'visible',
'data': b'data2',
'length': 5,
},
]
self.expected_contents = [content['id'] for content in contents]
self.in_contents = contents.copy()
for content in self.in_contents:
content['sha1'] = hash_to_bytes(content['sha1'])
self.in_directories = [
{'id': hash_to_bytes(id_)}
for id_ in [
'44e45d56f88993aae6a0198013efa80716fd8921',
'54e45d56f88993aae6a0198013efa80716fd8920',
'43e45d56f88993aae6a0198013efa80716fd8920',
]
]
self.in_revisions = [
{
'id': b'rev1',
'date': None,
},
{
'id': b'rev2',
'date': None,
},
]
self.in_releases = [
{
'id': b'rel1',
'date': None,
},
{
'id': b'rel2',
'date': None,
},
]
- self.in_origins = [
- {
- 'type': 'git',
- 'url': 'http://example.com/',
- },
- ]
+ self.in_origin = {
+ 'type': 'git',
+ 'url': 'http://example.com/',
+ }
self.in_snapshot = {
'id': b'snap1',
'branches': {},
}
+ self.in_provider = {
+ 'provider_name': 'Test Provider',
+ 'provider_type': 'test_provider',
+ 'provider_url': 'http://example.org/metadata_provider',
+ 'metadata': {'working': True},
+ }
+ self.in_tool = {
+ 'name': 'Test Tool',
+ 'version': 'v1.2.3',
+ 'configuration': {'in_the_Matrix': 'maybe'},
+ }
- self.storage.origin_add(self.in_origins)
+ self.storage.origin_add([self.in_origin])
# used by prepare_origin_visit() when it gets called
self.loader._test_prepare_origin_visit_data = {
- 'origin': self.in_origins[0],
+ 'origin': self.in_origin,
}
def tearDown(self):
# do not call voluntarily super().tearDown()
pass
class CoreUnbufferedLoaderTest(DummyBaseLoaderTest):
loader_class = DummyUnbufferedLoader
def test_unbuffered_loader(self):
self.loader.load() # initialize the loader
self.loader.send_contents(self.in_contents[0:1])
self.loader.send_directories(self.in_directories[0:1])
self.loader.send_revisions(self.in_revisions[0:1])
self.loader.send_releases(self.in_releases[0:1])
self.assertCountContents(1)
self.assertCountDirectories(1)
self.assertCountRevisions(1)
self.assertCountReleases(1)
self.loader.send_contents(self.in_contents[1:])
self.loader.send_directories(self.in_directories[1:])
self.loader.send_revisions(self.in_revisions[1:])
self.loader.send_releases(self.in_releases[1:])
self.assertCountContents(len(self.in_contents))
self.assertCountDirectories(len(self.in_directories))
self.assertCountRevisions(len(self.in_revisions))
self.assertCountReleases(len(self.in_releases))
class CoreBufferedLoaderTest(DummyBaseLoaderTest):
loader_class = DummyBufferedLoader
def test_buffered_loader(self):
self.loader.load() # initialize the loader
self.loader.maybe_load_contents(self.in_contents[0:1])
self.loader.maybe_load_directories(self.in_directories[0:1])
self.loader.maybe_load_revisions(self.in_revisions[0:1])
self.loader.maybe_load_releases(self.in_releases[0:1])
self.assertCountContents(0)
self.assertCountDirectories(0)
self.assertCountRevisions(0)
self.assertCountReleases(0)
self.loader.maybe_load_contents(self.in_contents[1:])
self.loader.maybe_load_directories(self.in_directories[1:])
self.loader.maybe_load_revisions(self.in_revisions[1:])
self.loader.maybe_load_releases(self.in_releases[1:])
self.assertCountContents(len(self.in_contents))
self.assertCountDirectories(len(self.in_directories))
self.assertCountRevisions(len(self.in_revisions))
self.assertCountReleases(len(self.in_releases))
def test_directory_cascade(self):
"""Checks that sending a directory triggers sending contents"""
self.loader.load() # initialize the loader
self.loader.maybe_load_contents(self.in_contents[0:1])
self.loader.maybe_load_directories(self.in_directories)
self.assertCountContents(1)
self.assertCountDirectories(len(self.in_directories))
def test_revision_cascade(self):
"""Checks that sending a revision triggers sending contents and
directories."""
self.loader.load() # initialize the loader
self.loader.maybe_load_contents(self.in_contents[0:1])
self.loader.maybe_load_directories(self.in_directories[0:1])
self.loader.maybe_load_revisions(self.in_revisions)
self.assertCountContents(1)
self.assertCountDirectories(1)
self.assertCountRevisions(len(self.in_revisions))
def test_release_cascade(self):
"""Checks that sending a release triggers sending revisions,
contents, and directories."""
self.loader.load() # initialize the loader
self.loader.maybe_load_contents(self.in_contents[0:1])
self.loader.maybe_load_directories(self.in_directories[0:1])
self.loader.maybe_load_revisions(self.in_revisions[0:1])
self.loader.maybe_load_releases(self.in_releases)
self.assertCountContents(1)
self.assertCountDirectories(1)
self.assertCountRevisions(1)
self.assertCountReleases(len(self.in_releases))
def test_snapshot_cascade(self):
"""Checks that sending a snapshot triggers sending releases,
revisions, contents, and directories."""
self.loader.load() # initialize the loader
self.loader.maybe_load_contents(self.in_contents[0:1])
self.loader.maybe_load_directories(self.in_directories[0:1])
self.loader.maybe_load_revisions(self.in_revisions[0:1])
self.loader.maybe_load_releases(self.in_releases[0:1])
self.loader.maybe_load_snapshot(self.in_snapshot)
self.assertCountContents(1)
self.assertCountDirectories(1)
self.assertCountRevisions(1)
self.assertCountReleases(1)
self.assertCountSnapshots(1)
+
+ def test_origin_metadata(self):
+ self.loader.load()
+
+ provider_id = self.loader.send_provider(self.in_provider)
+ tool_id = self.loader.send_tool(self.in_tool)
+
+ self.loader.send_origin_metadata(
+ self.loader.origin_id, self.loader.visit_date, provider_id,
+ tool_id, {'test_metadata': 'foobar'})
+
+ self.assertOriginMetadataContains(
+ self.in_origin['type'], self.in_origin['url'],
+ {'test_metadata': 'foobar'})
+
+ with self.assertRaises(AssertionError):
+ self.assertOriginMetadataContains(
+ self.in_origin['type'], self.in_origin['url'],
+ {'test_metadata': 'foobarbaz'})
+
+ with self.assertRaises(Exception):
+ self.assertOriginMetadataContains(
+ self.in_origin['type'], self.in_origin['url'] + 'blah',
+ {'test_metadata': 'foobar'})
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Jul 4 2025, 9:51 AM (5 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3276644
Attached To
rDLDBASE Generic VCS/Package Loader
Event Timeline
Log In to Comment