Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/loader/core/tests/__init__.py b/swh/loader/core/tests/__init__.py
index 0b358d5..9fade8c 100644
--- a/swh/loader/core/tests/__init__.py
+++ b/swh/loader/core/tests/__init__.py
@@ -1,245 +1,262 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import shutil
import subprocess
import tempfile
from unittest import TestCase
from swh.model import hashutil
class BaseLoaderTest(TestCase):
"""Mixin base loader test class.
This allows to uncompress archives (mercurial, svn,
... repositories) into a temporary folder so that the loader can
work with this.
When setUp() is done, the following variable are defined:
- self.repo_url: can be used as an origin_url
- self.destination_path: can be used as a path to a <techno> repository.
+ Args:
+
+ archive_name (str): Name of the archive holding the repository
+ (dump, etc...)
+ filename (str/None): Name of the filename once uncompressed.
+ resources_path (str): Folder name to look for archive
+ prefix_tmp_folder_name (str): Prefix name to name the temporary folder
+ start_path (str): Path from where starting to look for resources
+
"""
- def setUp(self, archive_name, filename, resources_path='resources',
+ def setUp(self, archive_name, filename=None, resources_path='resources',
prefix_tmp_folder_name='', start_path=None):
if not start_path:
raise ValueError('Misuse: You must provide a start_path')
self.tmp_root_path = tempfile.mkdtemp(
prefix=prefix_tmp_folder_name, suffix='-tests')
repo_path = os.path.join(start_path, resources_path, archive_name)
# uncompress the sample folder
subprocess.check_output(
['tar', 'xvf', repo_path, '-C', self.tmp_root_path],
)
- self.repo_url = 'file://' + self.tmp_root_path + '/' + filename
+ if filename:
+ self.repo_url = 'file://' + self.tmp_root_path + '/' + filename
+ else:
+ _filename = os.path.basename(archive_name)
+ self.repo_url = 'file://' + self.tmp_root_path + '/' + _filename
+
# archive holds one folder with name <filename>
- self.destination_path = os.path.join(self.tmp_root_path, filename)
+ if filename:
+ self.destination_path = os.path.join(self.tmp_root_path, filename)
+ else:
+ self.destination_path = self.tmp_root_path
def tearDown(self):
"""Clean up temporary working directory
"""
shutil.rmtree(self.tmp_root_path)
def assertContentsOk(self, expected_contents):
contents = self.loader.all_contents
self.assertEquals(len(contents), len(expected_contents))
for content in contents:
content_id = hashutil.hash_to_hex(content['sha1'])
self.assertIn(content_id, expected_contents)
def assertDirectoriesOk(self, expected_directories):
directories = self.loader.all_directories
self.assertEquals(len(directories), len(expected_directories))
for _dir in directories:
_dir_id = hashutil.hash_to_hex(_dir['id'])
self.assertIn(_dir_id, expected_directories)
def assertReleasesOk(self, expected_releases):
"""Check the loader's releases match the expected releases.
Args:
releases ([dict]): List of dictionaries representing swh releases.
"""
releases = self.loader.all_releases
self.assertEqual(len(releases), len(expected_releases))
for i, rel in enumerate(self.loader.all_releases):
rel_id = hashutil.hash_to_hex(rel['id'])
self.assertEquals(expected_releases[i], rel_id)
def assertRevisionsOk(self, expected_revisions): # noqa: N802
"""Check the loader's revisions match the expected revisions.
Expects self.loader to be instantiated and ready to be
inspected (meaning the loading took place).
Args:
expected_revisions (dict): Dict with key revision id,
value the targeted directory id.
"""
revisions = self.loader.all_revisions
self.assertEqual(len(revisions), len(expected_revisions))
# The last revision being the one used later to start back from
for rev in revisions:
rev_id = hashutil.hash_to_hex(rev['id'])
directory_id = hashutil.hash_to_hex(rev['directory'])
self.assertEquals(expected_revisions[rev_id], directory_id)
def assertSnapshotOk(self, expected_snapshot, expected_branches=[]):
"""Check for snapshot match.
Provide the hashes as hexadecimal, the conversion is done
within the method.
Args:
expected_snapshot (str/dict): Either the snapshot
identifier or the full
snapshot
expected_branches (dict): expected branches or nothing is
the full snapshot is provided
"""
if isinstance(expected_snapshot, dict) and not expected_branches:
expected_snapshot_id = expected_snapshot['id']
expected_branches = expected_snapshot['branches']
else:
expected_snapshot_id = expected_snapshot
snapshots = self.loader.all_snapshots
self.assertEqual(len(snapshots), 1)
snap = snapshots[0]
snap_id = hashutil.hash_to_hex(snap['id'])
self.assertEqual(snap_id, expected_snapshot_id)
def decode_target(target):
if not target:
return target
target_type = target['target_type']
if target_type == 'alias':
decoded_target = target['target'].decode('utf-8')
else:
decoded_target = hashutil.hash_to_hex(target['target'])
return {
'target': decoded_target,
'target_type': target_type
}
branches = {
branch.decode('utf-8'): decode_target(target)
for branch, target in snap['branches'].items()
}
self.assertEqual(expected_branches, branches)
class LoaderNoStorage:
"""Mixin class to inhibit the persistence and keep in memory the data
sent for storage (for testing purposes).
This overrides the core loader's behavior to store in a dict the
swh objects.
cf. HgLoaderNoStorage, SvnLoaderNoStorage, etc...
"""
- def __init__(self):
+ def __init__(self, *args, **kwargs):
super().__init__()
self.all_contents = []
self.all_directories = []
self.all_revisions = []
self.all_releases = []
self.all_snapshots = []
# typed data
- self.objects = {
+ self.__objects = {
'content': self.all_contents,
'directory': self.all_directories,
'revision': self.all_revisions,
'release': self.all_releases,
'snapshot': self.all_snapshots,
}
def _add(self, type, l):
"""Add without duplicates and keeping the insertion order.
Args:
type (str): Type of objects concerned by the action
l ([object]): List of 'type' object
"""
- col = self.objects[type]
+ col = self.__objects[type]
for o in l:
if o in col:
continue
col.extend([o])
def maybe_load_contents(self, all_contents):
self._add('content', all_contents)
def maybe_load_directories(self, all_directories):
self._add('directory', all_directories)
def maybe_load_revisions(self, all_revisions):
self._add('revision', all_revisions)
def maybe_load_releases(self, all_releases):
self._add('release', all_releases)
def maybe_load_snapshot(self, snapshot):
self._add('snapshot', [snapshot])
def send_batch_contents(self, all_contents):
self._add('content', all_contents)
def send_batch_directories(self, all_directories):
self._add('directory', all_directories)
def send_batch_revisions(self, all_revisions):
self._add('revision', all_revisions)
def send_batch_releases(self, all_releases):
self._add('release', all_releases)
def send_snapshot(self, snapshot):
self._add('snapshot', [snapshot])
def _store_origin_visit(self):
pass
def open_fetch_history(self):
pass
def close_fetch_history_success(self, fetch_history_id):
pass
def close_fetch_history_failure(self, fetch_history_id):
pass
def update_origin_visit(self, origin_id, visit, status):
pass
# Override to do nothing at the end
def close_failure(self):
pass
def close_success(self):
pass
def pre_cleanup(self):
pass

File Metadata

Mime Type
text/x-diff
Expires
Sat, Jun 21, 5:04 PM (1 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3291044

Event Timeline