Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9123095
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
9 KB
Subscribers
None
View Options
diff --git a/swh/loader/core/tests/__init__.py b/swh/loader/core/tests/__init__.py
index 0b358d5..9fade8c 100644
--- a/swh/loader/core/tests/__init__.py
+++ b/swh/loader/core/tests/__init__.py
@@ -1,245 +1,262 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import shutil
import subprocess
import tempfile
from unittest import TestCase
from swh.model import hashutil
class BaseLoaderTest(TestCase):
"""Mixin base loader test class.
This allows to uncompress archives (mercurial, svn,
... repositories) into a temporary folder so that the loader can
work with this.
When setUp() is done, the following variable are defined:
- self.repo_url: can be used as an origin_url
- self.destination_path: can be used as a path to a <techno> repository.
+ Args:
+
+ archive_name (str): Name of the archive holding the repository
+ (dump, etc...)
+ filename (str/None): Name of the filename once uncompressed.
+ resources_path (str): Folder name to look for archive
+ prefix_tmp_folder_name (str): Prefix name to name the temporary folder
+ start_path (str): Path from where starting to look for resources
+
"""
- def setUp(self, archive_name, filename, resources_path='resources',
+ def setUp(self, archive_name, filename=None, resources_path='resources',
prefix_tmp_folder_name='', start_path=None):
if not start_path:
raise ValueError('Misuse: You must provide a start_path')
self.tmp_root_path = tempfile.mkdtemp(
prefix=prefix_tmp_folder_name, suffix='-tests')
repo_path = os.path.join(start_path, resources_path, archive_name)
# uncompress the sample folder
subprocess.check_output(
['tar', 'xvf', repo_path, '-C', self.tmp_root_path],
)
- self.repo_url = 'file://' + self.tmp_root_path + '/' + filename
+ if filename:
+ self.repo_url = 'file://' + self.tmp_root_path + '/' + filename
+ else:
+ _filename = os.path.basename(archive_name)
+ self.repo_url = 'file://' + self.tmp_root_path + '/' + _filename
+
# archive holds one folder with name <filename>
- self.destination_path = os.path.join(self.tmp_root_path, filename)
+ if filename:
+ self.destination_path = os.path.join(self.tmp_root_path, filename)
+ else:
+ self.destination_path = self.tmp_root_path
def tearDown(self):
"""Clean up temporary working directory
"""
shutil.rmtree(self.tmp_root_path)
def assertContentsOk(self, expected_contents):
contents = self.loader.all_contents
self.assertEquals(len(contents), len(expected_contents))
for content in contents:
content_id = hashutil.hash_to_hex(content['sha1'])
self.assertIn(content_id, expected_contents)
def assertDirectoriesOk(self, expected_directories):
directories = self.loader.all_directories
self.assertEquals(len(directories), len(expected_directories))
for _dir in directories:
_dir_id = hashutil.hash_to_hex(_dir['id'])
self.assertIn(_dir_id, expected_directories)
def assertReleasesOk(self, expected_releases):
"""Check the loader's releases match the expected releases.
Args:
releases ([dict]): List of dictionaries representing swh releases.
"""
releases = self.loader.all_releases
self.assertEqual(len(releases), len(expected_releases))
for i, rel in enumerate(self.loader.all_releases):
rel_id = hashutil.hash_to_hex(rel['id'])
self.assertEquals(expected_releases[i], rel_id)
def assertRevisionsOk(self, expected_revisions): # noqa: N802
"""Check the loader's revisions match the expected revisions.
Expects self.loader to be instantiated and ready to be
inspected (meaning the loading took place).
Args:
expected_revisions (dict): Dict with key revision id,
value the targeted directory id.
"""
revisions = self.loader.all_revisions
self.assertEqual(len(revisions), len(expected_revisions))
# The last revision being the one used later to start back from
for rev in revisions:
rev_id = hashutil.hash_to_hex(rev['id'])
directory_id = hashutil.hash_to_hex(rev['directory'])
self.assertEquals(expected_revisions[rev_id], directory_id)
def assertSnapshotOk(self, expected_snapshot, expected_branches=[]):
"""Check for snapshot match.
Provide the hashes as hexadecimal, the conversion is done
within the method.
Args:
expected_snapshot (str/dict): Either the snapshot
identifier or the full
snapshot
expected_branches (dict): expected branches or nothing is
the full snapshot is provided
"""
if isinstance(expected_snapshot, dict) and not expected_branches:
expected_snapshot_id = expected_snapshot['id']
expected_branches = expected_snapshot['branches']
else:
expected_snapshot_id = expected_snapshot
snapshots = self.loader.all_snapshots
self.assertEqual(len(snapshots), 1)
snap = snapshots[0]
snap_id = hashutil.hash_to_hex(snap['id'])
self.assertEqual(snap_id, expected_snapshot_id)
def decode_target(target):
if not target:
return target
target_type = target['target_type']
if target_type == 'alias':
decoded_target = target['target'].decode('utf-8')
else:
decoded_target = hashutil.hash_to_hex(target['target'])
return {
'target': decoded_target,
'target_type': target_type
}
branches = {
branch.decode('utf-8'): decode_target(target)
for branch, target in snap['branches'].items()
}
self.assertEqual(expected_branches, branches)
class LoaderNoStorage:
"""Mixin class to inhibit the persistence and keep in memory the data
sent for storage (for testing purposes).
This overrides the core loader's behavior to store in a dict the
swh objects.
cf. HgLoaderNoStorage, SvnLoaderNoStorage, etc...
"""
- def __init__(self):
+ def __init__(self, *args, **kwargs):
super().__init__()
self.all_contents = []
self.all_directories = []
self.all_revisions = []
self.all_releases = []
self.all_snapshots = []
# typed data
- self.objects = {
+ self.__objects = {
'content': self.all_contents,
'directory': self.all_directories,
'revision': self.all_revisions,
'release': self.all_releases,
'snapshot': self.all_snapshots,
}
def _add(self, type, l):
"""Add without duplicates and keeping the insertion order.
Args:
type (str): Type of objects concerned by the action
l ([object]): List of 'type' object
"""
- col = self.objects[type]
+ col = self.__objects[type]
for o in l:
if o in col:
continue
col.extend([o])
def maybe_load_contents(self, all_contents):
self._add('content', all_contents)
def maybe_load_directories(self, all_directories):
self._add('directory', all_directories)
def maybe_load_revisions(self, all_revisions):
self._add('revision', all_revisions)
def maybe_load_releases(self, all_releases):
self._add('release', all_releases)
def maybe_load_snapshot(self, snapshot):
self._add('snapshot', [snapshot])
def send_batch_contents(self, all_contents):
self._add('content', all_contents)
def send_batch_directories(self, all_directories):
self._add('directory', all_directories)
def send_batch_revisions(self, all_revisions):
self._add('revision', all_revisions)
def send_batch_releases(self, all_releases):
self._add('release', all_releases)
def send_snapshot(self, snapshot):
self._add('snapshot', [snapshot])
def _store_origin_visit(self):
pass
def open_fetch_history(self):
pass
def close_fetch_history_success(self, fetch_history_id):
pass
def close_fetch_history_failure(self, fetch_history_id):
pass
def update_origin_visit(self, origin_id, visit, status):
pass
# Override to do nothing at the end
def close_failure(self):
pass
def close_success(self):
pass
def pre_cleanup(self):
pass
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Jun 21, 5:04 PM (1 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3291044
Attached To
rDLDBASE Generic VCS/Package Loader
Event Timeline
Log In to Comment