Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/loader/core/tests/__init__.py b/swh/loader/core/tests/__init__.py
index 782618c..07d008a 100644
--- a/swh/loader/core/tests/__init__.py
+++ b/swh/loader/core/tests/__init__.py
@@ -1,227 +1,227 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import pytest
import shutil
import subprocess
import tempfile
from unittest import TestCase
from swh.model import hashutil
from swh.model.hashutil import hash_to_bytes
class BaseLoaderStorageTest:
def _assertCountEqual(self, type, expected_length, msg=None):
"""Check typed 'type' state to have the same expected length.
"""
self.storage.refresh_stat_counters()
self.assertEqual(self.storage.stat_counters()[type],
expected_length, msg=msg)
def assertCountContents(self, len_expected_contents, msg=None):
self._assertCountEqual('content', len_expected_contents, msg=msg)
def assertCountDirectories(self, len_expected_directories, msg=None):
self._assertCountEqual('directory', len_expected_directories,
msg=msg)
def assertCountReleases(self, len_expected_releases, msg=None):
self._assertCountEqual('release', len_expected_releases, msg=msg)
def assertCountRevisions(self, len_expected_revisions, msg=None):
self._assertCountEqual('revision', len_expected_revisions, msg=msg)
def assertCountSnapshots(self, len_expected_snapshot, msg=None):
self._assertCountEqual('snapshot', len_expected_snapshot, msg=msg)
def assertContentsContain(self, expected_contents):
"""Check the provided content are a subset of the stored ones.
Args:
expected_contents ([sha1]): List of content ids"""
self._assertCountEqual('content', len(expected_contents))
missing = list(self.storage.content_missing(
{'sha1': hash_to_bytes(content_hash)}
for content_hash in expected_contents))
self.assertEqual(missing, [])
def assertDirectoriesContain(self, expected_directories):
"""Check the provided directories are a subset of the stored ones.
Args:
expected_directories ([sha1]): List of directory ids."""
self._assertCountEqual('directory', len(expected_directories))
missing = list(self.storage.directory_missing(
hash_to_bytes(dir_) for dir_ in expected_directories))
self.assertEqual(missing, [])
def assertReleasesContain(self, expected_releases):
"""Check the provided releases are a subset of the stored ones.
Args:
releases (list): list of swh releases' identifiers.
"""
self._assertCountEqual('release', len(expected_releases))
missing = list(self.storage.release_missing(
hash_to_bytes(rel) for rel in expected_releases))
self.assertEqual(missing, [])
def assertRevisionsContain(self, expected_revisions):
"""Check the provided revisions are a subset of the stored ones.
Expects self.loader to be instantiated and ready to be
inspected (meaning the loading took place).
Args:
expected_revisions (dict): Dict with key revision id,
value the targeted directory id.
"""
self._assertCountEqual('revision', len(expected_revisions))
revs = list(self.storage.revision_get(
hashutil.hash_to_bytes(rev_id) for rev_id in expected_revisions))
self.assertNotIn(None, revs)
self.assertEqual(
{rev['id']: rev['directory'] for rev in revs},
{hash_to_bytes(rev_id): hash_to_bytes(rev_dir)
for (rev_id, rev_dir) in expected_revisions.items()})
def assertSnapshotEqual(self, expected_snapshot, expected_branches=[]):
"""Check for snapshot match.
Provide the hashes as hexadecimal, the conversion is done
within the method.
Args:
expected_snapshot (str/dict): Either the snapshot
identifier or the full
snapshot
expected_branches (dict): expected branches or nothing is
the full snapshot is provided
"""
if isinstance(expected_snapshot, dict) and not expected_branches:
expected_snapshot_id = expected_snapshot['id']
expected_branches = expected_snapshot['branches']
else:
expected_snapshot_id = expected_snapshot
self._assertCountEqual('snapshot', 1)
snap = self.storage.snapshot_get(hash_to_bytes(expected_snapshot_id))
self.assertIsNotNone(snap)
def decode_target(target):
if not target:
return target
target_type = target['target_type']
if target_type == 'alias':
decoded_target = target['target'].decode('utf-8')
else:
decoded_target = hashutil.hash_to_hex(target['target'])
return {
'target': decoded_target,
'target_type': target_type
}
branches = {
branch.decode('utf-8'): decode_target(target)
for branch, target in snap['branches'].items()
}
self.assertEqual(expected_branches, branches)
def assertOriginMetadataContains(self, origin_type, origin_url,
expected_origin_metadata):
"""Check the storage contains this metadata for the given origin.
Args:
origin_type (str): type of origin ('deposit', 'git', 'svn', ...)
origin_url (str): URL of the origin
expected_origin_metadata (dict):
Extrinsic metadata of the origin
<https://forge.softwareheritage.org/T1344>
"""
origin = self.storage.origin_get(
dict(type=origin_type, url=origin_url))
- results = self.storage.origin_metadata_get_by(origin)
+ results = self.storage.origin_metadata_get_by(origin['id'])
self.assertEqual(len(results), 1, results)
result = results[0]
self.assertEqual(result['metadata'], expected_origin_metadata)
@pytest.mark.fs
class BaseLoaderTest(TestCase, BaseLoaderStorageTest):
"""Mixin base loader test class.
This allows to uncompress archives (mercurial, svn, git,
... repositories) into a temporary folder so that the loader under
test can work with this.
When setUp() is done, the following variables are defined:
- self.repo_url: can be used as an origin_url for example
- self.destination_path: can be used as a path to ingest the
<techno> repository.
Args:
archive_name (str): Name of the archive holding the repository
(folder, repository, dump, etc...)
start_path (str): (mandatory) Path from where starting to look
for resources
filename (Optional[str]): Name of the filename/folder once the
archive is uncompressed. When the filename is not
provided, the archive name is used as a derivative. This
is used both for the self.repo_url and
self.destination_path computation (this one only when
provided)
resources_path (str): Folder name to look for archive
prefix_tmp_folder_name (str): Prefix name to name the temporary folder
uncompress_archive (bool): Uncompress the archive passed as
parameters (default to True). It so
happens we could avoid doing
anything to the tarball.
"""
def setUp(self, archive_name, *, start_path, filename=None,
resources_path='resources', prefix_tmp_folder_name='',
uncompress_archive=True):
super().setUp()
repo_path = os.path.join(start_path, resources_path, archive_name)
if not uncompress_archive:
# In that case, simply sets the archive's path
self.destination_path = repo_path
self.tmp_root_path = None
self.repo_url = 'file://' + repo_path
return
tmp_root_path = tempfile.mkdtemp(
prefix=prefix_tmp_folder_name, suffix='-tests')
# uncompress folder/repositories/dump for the loader to ingest
subprocess.check_output(['tar', 'xf', repo_path, '-C', tmp_root_path])
# build the origin url (or some derivative form)
_fname = filename if filename else os.path.basename(archive_name)
self.repo_url = 'file://' + tmp_root_path + '/' + _fname
# where is the data to ingest?
if filename:
# archive holds one folder with name <filename>
self.destination_path = os.path.join(tmp_root_path, filename)
else:
self.destination_path = tmp_root_path
self.tmp_root_path = tmp_root_path
def tearDown(self):
"""Clean up temporary working directory
"""
if self.tmp_root_path and os.path.exists(self.tmp_root_path):
shutil.rmtree(self.tmp_root_path)
diff --git a/swh/loader/core/tests/test_loader.py b/swh/loader/core/tests/test_loader.py
index 4d38641..121e8c3 100644
--- a/swh/loader/core/tests/test_loader.py
+++ b/swh/loader/core/tests/test_loader.py
@@ -1,258 +1,291 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from swh.model.hashutil import hash_to_bytes
from swh.loader.core.loader import BufferedLoader, UnbufferedLoader
from . import BaseLoaderTest
class DummyLoader:
def cleanup(self):
pass
def prepare(self):
pass
def fetch_data(self):
pass
def store_data(self):
pass
def prepare_origin_visit(self):
origin = self.storage.origin_get(
self._test_prepare_origin_visit_data['origin'])
self.origin = origin
self.origin_id = origin['id']
self.origin_url = origin['url']
self.visit_date = datetime.datetime.utcnow()
self.storage.origin_visit_add(origin['id'], self.visit_date)
def parse_config_file(self, *args, **kwargs):
return {
'storage': {
'cls': 'memory',
'args': {
}
},
'send_contents': True,
'send_directories': True,
'send_revisions': True,
'send_releases': True,
'send_snapshot': True,
'content_packet_size': 2,
'content_packet_size_bytes': 8,
'directory_packet_size': 2,
'revision_packet_size': 2,
'release_packet_size': 2,
'content_size_limit': 10000,
}
class DummyUnbufferedLoader(DummyLoader, UnbufferedLoader):
pass
class DummyBufferedLoader(DummyLoader, BufferedLoader):
pass
class DummyBaseLoaderTest(BaseLoaderTest):
def setUp(self):
self.loader = self.loader_class(logging_class='dummyloader')
# do not call voluntarily super().setUp()
self.storage = self.loader.storage
contents = [
{
'id': '34973274ccef6ab4dfaaf86599792fa9c3fe4689',
'sha1': '34973274ccef6ab4dfaaf86599792fa9c3fe4689',
'sha1_git': b'bar1',
'sha256': b'baz1',
'blake2s256': b'qux1',
'status': 'visible',
'data': b'data1',
'length': 5,
},
{
'id': '61c2b3a30496d329e21af70dd2d7e097046d07b7',
'sha1': '61c2b3a30496d329e21af70dd2d7e097046d07b7',
'sha1_git': b'bar2',
'sha256': b'baz2',
'blake2s256': b'qux2',
'status': 'visible',
'data': b'data2',
'length': 5,
},
]
self.expected_contents = [content['id'] for content in contents]
self.in_contents = contents.copy()
for content in self.in_contents:
content['sha1'] = hash_to_bytes(content['sha1'])
self.in_directories = [
{'id': hash_to_bytes(id_)}
for id_ in [
'44e45d56f88993aae6a0198013efa80716fd8921',
'54e45d56f88993aae6a0198013efa80716fd8920',
'43e45d56f88993aae6a0198013efa80716fd8920',
]
]
self.in_revisions = [
{
'id': b'rev1',
'date': None,
},
{
'id': b'rev2',
'date': None,
},
]
self.in_releases = [
{
'id': b'rel1',
'date': None,
},
{
'id': b'rel2',
'date': None,
},
]
- self.in_origins = [
- {
- 'type': 'git',
- 'url': 'http://example.com/',
- },
- ]
+ self.in_origin = {
+ 'type': 'git',
+ 'url': 'http://example.com/',
+ }
self.in_snapshot = {
'id': b'snap1',
'branches': {},
}
+ self.in_provider = {
+ 'provider_name': 'Test Provider',
+ 'provider_type': 'test_provider',
+ 'provider_url': 'http://example.org/metadata_provider',
+ 'metadata': {'working': True},
+ }
+ self.in_tool = {
+ 'name': 'Test Tool',
+ 'version': 'v1.2.3',
+ 'configuration': {'in_the_Matrix': 'maybe'},
+ }
- self.storage.origin_add(self.in_origins)
+ self.storage.origin_add([self.in_origin])
# used by prepare_origin_visit() when it gets called
self.loader._test_prepare_origin_visit_data = {
- 'origin': self.in_origins[0],
+ 'origin': self.in_origin,
}
def tearDown(self):
# do not call voluntarily super().tearDown()
pass
class CoreUnbufferedLoaderTest(DummyBaseLoaderTest):
loader_class = DummyUnbufferedLoader
def test_unbuffered_loader(self):
self.loader.load() # initialize the loader
self.loader.send_contents(self.in_contents[0:1])
self.loader.send_directories(self.in_directories[0:1])
self.loader.send_revisions(self.in_revisions[0:1])
self.loader.send_releases(self.in_releases[0:1])
self.assertCountContents(1)
self.assertCountDirectories(1)
self.assertCountRevisions(1)
self.assertCountReleases(1)
self.loader.send_contents(self.in_contents[1:])
self.loader.send_directories(self.in_directories[1:])
self.loader.send_revisions(self.in_revisions[1:])
self.loader.send_releases(self.in_releases[1:])
self.assertCountContents(len(self.in_contents))
self.assertCountDirectories(len(self.in_directories))
self.assertCountRevisions(len(self.in_revisions))
self.assertCountReleases(len(self.in_releases))
class CoreBufferedLoaderTest(DummyBaseLoaderTest):
loader_class = DummyBufferedLoader
def test_buffered_loader(self):
self.loader.load() # initialize the loader
self.loader.maybe_load_contents(self.in_contents[0:1])
self.loader.maybe_load_directories(self.in_directories[0:1])
self.loader.maybe_load_revisions(self.in_revisions[0:1])
self.loader.maybe_load_releases(self.in_releases[0:1])
self.assertCountContents(0)
self.assertCountDirectories(0)
self.assertCountRevisions(0)
self.assertCountReleases(0)
self.loader.maybe_load_contents(self.in_contents[1:])
self.loader.maybe_load_directories(self.in_directories[1:])
self.loader.maybe_load_revisions(self.in_revisions[1:])
self.loader.maybe_load_releases(self.in_releases[1:])
self.assertCountContents(len(self.in_contents))
self.assertCountDirectories(len(self.in_directories))
self.assertCountRevisions(len(self.in_revisions))
self.assertCountReleases(len(self.in_releases))
def test_directory_cascade(self):
"""Checks that sending a directory triggers sending contents"""
self.loader.load() # initialize the loader
self.loader.maybe_load_contents(self.in_contents[0:1])
self.loader.maybe_load_directories(self.in_directories)
self.assertCountContents(1)
self.assertCountDirectories(len(self.in_directories))
def test_revision_cascade(self):
"""Checks that sending a revision triggers sending contents and
directories."""
self.loader.load() # initialize the loader
self.loader.maybe_load_contents(self.in_contents[0:1])
self.loader.maybe_load_directories(self.in_directories[0:1])
self.loader.maybe_load_revisions(self.in_revisions)
self.assertCountContents(1)
self.assertCountDirectories(1)
self.assertCountRevisions(len(self.in_revisions))
def test_release_cascade(self):
"""Checks that sending a release triggers sending revisions,
contents, and directories."""
self.loader.load() # initialize the loader
self.loader.maybe_load_contents(self.in_contents[0:1])
self.loader.maybe_load_directories(self.in_directories[0:1])
self.loader.maybe_load_revisions(self.in_revisions[0:1])
self.loader.maybe_load_releases(self.in_releases)
self.assertCountContents(1)
self.assertCountDirectories(1)
self.assertCountRevisions(1)
self.assertCountReleases(len(self.in_releases))
def test_snapshot_cascade(self):
"""Checks that sending a snapshot triggers sending releases,
revisions, contents, and directories."""
self.loader.load() # initialize the loader
self.loader.maybe_load_contents(self.in_contents[0:1])
self.loader.maybe_load_directories(self.in_directories[0:1])
self.loader.maybe_load_revisions(self.in_revisions[0:1])
self.loader.maybe_load_releases(self.in_releases[0:1])
self.loader.maybe_load_snapshot(self.in_snapshot)
self.assertCountContents(1)
self.assertCountDirectories(1)
self.assertCountRevisions(1)
self.assertCountReleases(1)
self.assertCountSnapshots(1)
+
+ def test_origin_metadata(self):
+ self.loader.load()
+
+ provider_id = self.loader.send_provider(self.in_provider)
+ tool_id = self.loader.send_tool(self.in_tool)
+
+ self.loader.send_origin_metadata(
+ self.loader.origin_id, self.loader.visit_date, provider_id,
+ tool_id, {'test_metadata': 'foobar'})
+
+ self.assertOriginMetadataContains(
+ self.in_origin['type'], self.in_origin['url'],
+ {'test_metadata': 'foobar'})
+
+ with self.assertRaises(AssertionError):
+ self.assertOriginMetadataContains(
+ self.in_origin['type'], self.in_origin['url'],
+ {'test_metadata': 'foobarbaz'})
+
+ with self.assertRaises(Exception):
+ self.assertOriginMetadataContains(
+ self.in_origin['type'], self.in_origin['url'] + 'blah',
+ {'test_metadata': 'foobar'})

File Metadata

Mime Type
text/x-diff
Expires
Jul 4 2025, 9:51 AM (5 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3276644

Event Timeline