diff --git a/.gitignore b/.gitignore index 25bc741..d570526 100644 --- a/.gitignore +++ b/.gitignore @@ -1,10 +1,11 @@ *.pyc *.sw? *~ .coverage .eggs/ __pycache__ *.egg-info/ build/ dist/ version.txt +.tox/ diff --git a/PKG-INFO b/PKG-INFO index 3c8a85d..1a14a8a 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,29 +1,29 @@ Metadata-Version: 2.1 Name: swh.loader.core -Version: 0.0.34 +Version: 0.0.35 Summary: Software Heritage Base Loader Home-page: https://forge.softwareheritage.org/diffusion/DLDBASE Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-loader-core Description: SWH-loader-core =============== The Software Heritage Core Loader is a low-level loading utilities and helpers used by other loaders. The main entry points are classes: - :class:`swh.loader.core.loader.SWHLoader` for stateful loaders - :class:`swh.loader.core.loader.SWHStatelessLoader` for stateless loaders Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/requirements-test.txt b/requirements-test.txt index f3c7e8e..e079f8a 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1 +1 @@ -nose +pytest diff --git a/swh.loader.core.egg-info/PKG-INFO b/swh.loader.core.egg-info/PKG-INFO index 3c8a85d..1a14a8a 100644 --- a/swh.loader.core.egg-info/PKG-INFO +++ b/swh.loader.core.egg-info/PKG-INFO @@ -1,29 +1,29 @@ Metadata-Version: 2.1 Name: swh.loader.core -Version: 0.0.34 +Version: 0.0.35 Summary: Software Heritage Base Loader Home-page: https://forge.softwareheritage.org/diffusion/DLDBASE Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-loader-core Description: SWH-loader-core =============== The Software Heritage Core Loader is a low-level loading utilities and helpers used by other loaders. The main entry points are classes: - :class:`swh.loader.core.loader.SWHLoader` for stateful loaders - :class:`swh.loader.core.loader.SWHStatelessLoader` for stateless loaders Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.loader.core.egg-info/SOURCES.txt b/swh.loader.core.egg-info/SOURCES.txt index e2c01e6..99af5be 100644 --- a/swh.loader.core.egg-info/SOURCES.txt +++ b/swh.loader.core.egg-info/SOURCES.txt @@ -1,39 +1,40 @@ .gitignore AUTHORS LICENSE MANIFEST.in Makefile README.md requirements-swh.txt requirements-test.txt requirements.txt setup.py +tox.ini version.txt debian/changelog debian/compat debian/control debian/copyright debian/rules debian/source/format docs/.gitignore docs/Makefile docs/conf.py docs/index.rst docs/_static/.placeholder docs/_templates/.placeholder swh/__init__.py swh.loader.core.egg-info/PKG-INFO swh.loader.core.egg-info/SOURCES.txt swh.loader.core.egg-info/dependency_links.txt swh.loader.core.egg-info/requires.txt swh.loader.core.egg-info/top_level.txt swh/loader/__init__.py swh/loader/core/__init__.py swh/loader/core/converters.py swh/loader/core/loader.py swh/loader/core/queue.py swh/loader/core/utils.py swh/loader/core/tests/__init__.py swh/loader/core/tests/test_converters.py swh/loader/core/tests/test_loader.py swh/loader/core/tests/test_queue.py \ No newline at end of file diff --git a/swh.loader.core.egg-info/requires.txt b/swh.loader.core.egg-info/requires.txt index 814769b..fdc0825 100644 --- a/swh.loader.core.egg-info/requires.txt +++ b/swh.loader.core.egg-info/requires.txt @@ -1,9 +1,9 @@ psutil retrying swh.core swh.model>=0.0.18 swh.storage>=0.0.97 vcversioner [testing] -nose +pytest diff --git a/swh/loader/core/tests/__init__.py b/swh/loader/core/tests/__init__.py index 6eed382..b29129a 100644 --- a/swh/loader/core/tests/__init__.py +++ b/swh/loader/core/tests/__init__.py @@ -1,285 +1,285 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os +import pytest import shutil import subprocess import tempfile from unittest import TestCase -from nose.plugins.attrib import attr from swh.model import hashutil -@attr('fs') +@pytest.mark.fs class BaseLoaderTest(TestCase): """Mixin base loader test class. This allows to uncompress archives (mercurial, svn, git, ... repositories) into a temporary folder so that the loader under test can work with this. When setUp() is done, the following variables are defined: - self.repo_url: can be used as an origin_url for example - self.destination_path: can be used as a path to ingest the repository. Args: archive_name (str): Name of the archive holding the repository (folder, repository, dump, etc...) start_path (str): (mandatory) Path from where starting to look for resources filename (Optional[str]): Name of the filename/folder once the archive is uncompressed. When the filename is not provided, the archive name is used as a derivative. This is used both for the self.repo_url and self.destination_path computation (this one only when provided) resources_path (str): Folder name to look for archive prefix_tmp_folder_name (str): Prefix name to name the temporary folder uncompress_archive (bool): Uncompress the archive passed as parameters (default to True). It so happens we could avoid doing anything to the tarball. """ def setUp(self, archive_name, *, start_path, filename=None, resources_path='resources', prefix_tmp_folder_name='', uncompress_archive=True): repo_path = os.path.join(start_path, resources_path, archive_name) if not uncompress_archive: # In that case, simply sets the archive's path self.destination_path = repo_path self.tmp_root_path = None return tmp_root_path = tempfile.mkdtemp( prefix=prefix_tmp_folder_name, suffix='-tests') # uncompress folder/repositories/dump for the loader to ingest subprocess.check_output(['tar', 'xf', repo_path, '-C', tmp_root_path]) # build the origin url (or some derivative form) _fname = filename if filename else os.path.basename(archive_name) self.repo_url = 'file://' + tmp_root_path + '/' + _fname # where is the data to ingest? if filename: # archive holds one folder with name self.destination_path = os.path.join(tmp_root_path, filename) else: self.destination_path = tmp_root_path self.tmp_root_path = tmp_root_path def tearDown(self): """Clean up temporary working directory """ if self.tmp_root_path and os.path.exists(self.tmp_root_path): shutil.rmtree(self.tmp_root_path) def state(self, _type): return self.loader.state(_type) def _assertCountOk(self, type, expected_length, msg=None): """Check typed 'type' state to have the same expected length. """ self.assertEqual(len(self.state(type)), expected_length, msg=msg) def assertCountContents(self, len_expected_contents, msg=None): self._assertCountOk('content', len_expected_contents, msg=msg) def assertCountDirectories(self, len_expected_directories, msg=None): self._assertCountOk('directory', len_expected_directories, msg=msg) def assertCountReleases(self, len_expected_releases, msg=None): self._assertCountOk('release', len_expected_releases, msg=msg) def assertCountRevisions(self, len_expected_revisions, msg=None): self._assertCountOk('revision', len_expected_revisions, msg=msg) def assertCountSnapshots(self, len_expected_snapshot, msg=None): self._assertCountOk('snapshot', len_expected_snapshot, msg=msg) def assertContentsOk(self, expected_contents): self._assertCountOk('content', len(expected_contents)) for content in self.state('content'): content_id = hashutil.hash_to_hex(content['sha1']) self.assertIn(content_id, expected_contents) def assertDirectoriesOk(self, expected_directories): self._assertCountOk('directory', len(expected_directories)) for _dir in self.state('directory'): _dir_id = hashutil.hash_to_hex(_dir['id']) self.assertIn(_dir_id, expected_directories) def assertReleasesOk(self, expected_releases): """Check the loader's releases match the expected releases. Args: releases ([dict]): List of dictionaries representing swh releases. """ self._assertCountOk('release', len(expected_releases)) for i, rel in enumerate(self.state('release')): rel_id = hashutil.hash_to_hex(rel['id']) self.assertEqual(expected_releases[i], rel_id) def assertRevisionsOk(self, expected_revisions): """Check the loader's revisions match the expected revisions. Expects self.loader to be instantiated and ready to be inspected (meaning the loading took place). Args: expected_revisions (dict): Dict with key revision id, value the targeted directory id. """ self._assertCountOk('revision', len(expected_revisions)) for rev in self.state('revision'): rev_id = hashutil.hash_to_hex(rev['id']) directory_id = hashutil.hash_to_hex(rev['directory']) self.assertEqual(expected_revisions[rev_id], directory_id) def assertSnapshotOk(self, expected_snapshot, expected_branches=[]): """Check for snapshot match. Provide the hashes as hexadecimal, the conversion is done within the method. Args: expected_snapshot (str/dict): Either the snapshot identifier or the full snapshot expected_branches (dict): expected branches or nothing is the full snapshot is provided """ if isinstance(expected_snapshot, dict) and not expected_branches: expected_snapshot_id = expected_snapshot['id'] expected_branches = expected_snapshot['branches'] else: expected_snapshot_id = expected_snapshot snapshots = self.state('snapshot') self.assertEqual(len(snapshots), 1) snap = snapshots[0] snap_id = hashutil.hash_to_hex(snap['id']) self.assertEqual(snap_id, expected_snapshot_id) def decode_target(target): if not target: return target target_type = target['target_type'] if target_type == 'alias': decoded_target = target['target'].decode('utf-8') else: decoded_target = hashutil.hash_to_hex(target['target']) return { 'target': decoded_target, 'target_type': target_type } branches = { branch.decode('utf-8'): decode_target(target) for branch, target in snap['branches'].items() } self.assertEqual(expected_branches, branches) class LoaderNoStorage: """Mixin class to inhibit the persistence and keep in memory the data sent for storage (for testing purposes). This overrides the core loader's behavior to store in a dict the swh objects. cf. :class:`HgLoaderNoStorage`, :class:`SvnLoaderNoStorage`, etc... """ def __init__(self, *args, **kwargs): - super().__init__() + super().__init__(*args, **kwargs) self._state = { 'content': [], 'directory': [], 'revision': [], 'release': [], 'snapshot': [], } def state(self, type): return self._state[type] def _add(self, type, l): """Add without duplicates and keeping the insertion order. Args: type (str): Type of objects concerned by the action l ([object]): List of 'type' object """ col = self.state(type) for o in l: if o in col: continue col.append(o) def maybe_load_contents(self, all_contents): self._add('content', all_contents) def maybe_load_directories(self, all_directories): self._add('directory', all_directories) def maybe_load_revisions(self, all_revisions): self._add('revision', all_revisions) def maybe_load_releases(self, all_releases): self._add('release', all_releases) def maybe_load_snapshot(self, snapshot): self._add('snapshot', [snapshot]) def send_batch_contents(self, all_contents): self._add('content', all_contents) def send_batch_directories(self, all_directories): self._add('directory', all_directories) def send_batch_revisions(self, all_revisions): self._add('revision', all_revisions) def send_batch_releases(self, all_releases): self._add('release', all_releases) def send_snapshot(self, snapshot): self._add('snapshot', [snapshot]) def _store_origin_visit(self): pass def open_fetch_history(self): pass def close_fetch_history_success(self, fetch_history_id): pass def close_fetch_history_failure(self, fetch_history_id): pass def update_origin_visit(self, origin_id, visit, status): pass def close_failure(self): pass def close_success(self): pass def pre_cleanup(self): pass diff --git a/swh/loader/core/tests/test_converters.py b/swh/loader/core/tests/test_converters.py index b719f32..bdba7ab 100644 --- a/swh/loader/core/tests/test_converters.py +++ b/swh/loader/core/tests/test_converters.py @@ -1,104 +1,99 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import tempfile import unittest from unittest.mock import Mock -from nose.tools import istest - -from swh.model.from_disk import Content from swh.loader.core import converters +from swh.model.from_disk import Content def tmpfile_with_content(fromdir, contentfile): """Create a temporary file with content contentfile in directory fromdir. """ tmpfilepath = tempfile.mktemp( suffix='.swh', prefix='tmp-file-for-test', dir=fromdir) with open(tmpfilepath, 'wb') as f: f.write(contentfile) return tmpfilepath class TestContentForStorage(unittest.TestCase): maxDiff = None def setUp(self): super().setUpClass() self.tmpdir = tempfile.TemporaryDirectory( prefix='test-swh-loader-core.' ) def tearDown(self): self.tmpdir.cleanup() - @istest - def content_for_storage_path(self): + def test_content_for_storage_path(self): # given data = b'temp file for testing content storage conversion' tmpfile = tmpfile_with_content(self.tmpdir.name, data) obj = Content.from_file(path=os.fsdecode(tmpfile), save_path=True).get_data() expected_content = obj.copy() expected_content['data'] = data expected_content['status'] = 'visible' # when content = converters.content_for_storage(obj) # then self.assertEqual(content, expected_content) - @istest - def content_for_storage_data(self): + def test_content_for_storage_data(self): # given data = b'temp file for testing content storage conversion' obj = Content.from_bytes(data=data, mode=0o100644).get_data() expected_content = obj.copy() expected_content['status'] = 'visible' # when content = converters.content_for_storage(obj) # then self.assertEqual(content, expected_content) - @istest - def content_for_storage_too_long(self): + def test_content_for_storage_too_long(self): # given data = b'temp file for testing content storage conversion' obj = Content.from_bytes(data=data, mode=0o100644).get_data() log = Mock() expected_content = obj.copy() expected_content.pop('data') expected_content['status'] = 'absent' expected_content['origin'] = 42 expected_content['reason'] = 'Content too large' # when content = converters.content_for_storage( obj, log, max_content_size=len(data) - 1, origin_id=expected_content['origin'], ) # then self.assertEqual(content, expected_content) self.assertTrue(log.info.called) self.assertIn('Skipping content', log.info.call_args[0][0]) self.assertIn('too large', log.info.call_args[0][0]) diff --git a/swh/loader/core/tests/test_loader.py b/swh/loader/core/tests/test_loader.py index fa6e2d2..a27e9be 100644 --- a/swh/loader/core/tests/test_loader.py +++ b/swh/loader/core/tests/test_loader.py @@ -1,343 +1,325 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from nose.tools import istest - -from . import BaseLoaderTest, LoaderNoStorage from swh.model.hashutil import hash_to_bytes +from . import BaseLoaderTest, LoaderNoStorage + class DummyBaseLoaderTest(BaseLoaderTest): def setUp(self): # do not call voluntarily super().setUp() self.in_contents = [1, 2, 3] self.in_directories = [4, 5, 6] self.in_revisions = [7, 8, 9] self.in_releases = [10, 11, 12] self.in_snapshot = 13 def tearDown(self): # do not call voluntarily super().tearDown() pass class LoadTest1(DummyBaseLoaderTest): def setUp(self): super().setUp() self.loader = LoaderNoStorage() - @istest - def stateful_loader(self): + def test_stateful_loader(self): """Stateful loader accumulates in place the sent data Note: Those behaviors should be somehow merged but that's another story. """ self.loader.maybe_load_directories(self.in_directories) self.loader.maybe_load_revisions(self.in_revisions) self.loader.maybe_load_releases(self.in_releases) - self.assertEquals(len(self.state('content')), 0) - self.assertEquals( + self.assertEqual(len(self.state('content')), 0) + self.assertEqual( len(self.state('directory')), len(self.in_directories)) - self.assertEquals( + self.assertEqual( len(self.state('revision')), len(self.in_revisions)) - self.assertEquals( + self.assertEqual( len(self.state('release')), len(self.in_releases)) - self.assertEquals(len(self.state('snapshot')), 0) + self.assertEqual(len(self.state('snapshot')), 0) - @istest - def stateless_loader(self): + def test_stateless_loader(self): """Stateless loader accumulates in place the sent data as well Note: Those behaviors should be somehow merged but that's another story. """ self.loader.send_batch_contents(self.in_contents) self.loader.send_snapshot(self.in_snapshot) - self.assertEquals(len(self.state('content')), len(self.in_contents)) - self.assertEquals(len(self.state('directory')), 0) - self.assertEquals(len(self.state('revision')), 0) - self.assertEquals(len(self.state('release')), 0) - self.assertEquals(len(self.state('snapshot')), 1) + self.assertEqual(len(self.state('content')), len(self.in_contents)) + self.assertEqual(len(self.state('directory')), 0) + self.assertEqual(len(self.state('revision')), 0) + self.assertEqual(len(self.state('release')), 0) + self.assertEqual(len(self.state('snapshot')), 1) class LoadTestContent(DummyBaseLoaderTest): def setUp(self): super().setUp() self.loader = LoaderNoStorage() self.content_id0 = '34973274ccef6ab4dfaaf86599792fa9c3fe4689' self.content_id1 = '61c2b3a30496d329e21af70dd2d7e097046d07b7' # trimmed data to the bare necessities self.in_contents = [{ 'sha1': hash_to_bytes(self.content_id0), }, { 'sha1': hash_to_bytes(self.content_id1), }] self.expected_contents = [self.content_id0, self.content_id1] - @istest - def maybe_load_contents(self): + def test_maybe_load_contents(self): """Loading contents should be ok """ self.loader.maybe_load_contents(self.in_contents) self.assertCountContents(len(self.expected_contents)) self.assertContentsOk(self.expected_contents) - @istest - def send_batch_contents(self): + def test_send_batch_contents(self): """Sending contents should be ok 2 """ self.loader.send_batch_contents(self.in_contents) self.assertCountContents(len(self.expected_contents)) self.assertContentsOk(self.expected_contents) - @istest - def failing(self): + def test_failing(self): """Comparing wrong snapshot should fail. """ self.loader.send_batch_contents(self.in_contents) with self.assertRaises(AssertionError): self.assertContentsOk([]) class LoadTestDirectory(DummyBaseLoaderTest): def setUp(self): super().setUp() self.loader = LoaderNoStorage() self.directory_id0 = '44e45d56f88993aae6a0198013efa80716fd8921' self.directory_id1 = '54e45d56f88993aae6a0198013efa80716fd8920' self.directory_id2 = '43e45d56f88993aae6a0198013efa80716fd8920' # trimmed data to the bare necessities self.in_directories = [{ 'id': hash_to_bytes(self.directory_id0), }, { 'id': hash_to_bytes(self.directory_id1), }, { 'id': hash_to_bytes(self.directory_id2), }] self.expected_directories = [ self.directory_id0, self.directory_id1, self.directory_id2] - @istest - def maybe_load_directories(self): + def test_maybe_load_directories(self): """Loading directories should be ok """ self.loader.maybe_load_directories(self.in_directories) self.assertCountDirectories(len(self.expected_directories)) self.assertDirectoriesOk(self.expected_directories) - @istest - def send_batch_directories(self): + def test_send_batch_directories(self): """Sending directories should be ok 2 """ self.loader.send_batch_directories(self.in_directories) self.assertCountDirectories(len(self.expected_directories)) self.assertDirectoriesOk(self.expected_directories) - @istest - def failing(self): + def test_failing(self): """Comparing wrong snapshot should fail. """ self.loader.send_batch_revisions(self.in_revisions) with self.assertRaises(AssertionError): self.assertRevisionsOk([]) class LoadTestRelease(DummyBaseLoaderTest): def setUp(self): super().setUp() self.loader = LoaderNoStorage() self.release_id0 = '44e45d56f88993aae6a0198013efa80716fd8921' self.release_id1 = '54e45d56f88993aae6a0198013efa80716fd8920' self.release_id2 = '43e45d56f88993aae6a0198013efa80716fd8920' # trimmed data to the bare necessities self.in_releases = [{ 'id': hash_to_bytes(self.release_id0), }, { 'id': hash_to_bytes(self.release_id1), }, { 'id': hash_to_bytes(self.release_id2), }] self.expected_releases = [ self.release_id0, self.release_id1, self.release_id2] - @istest - def maybe_load_releases(self): + def test_maybe_load_releases(self): """Loading releases should be ok """ self.loader.maybe_load_releases(self.in_releases) self.assertCountReleases(len(self.expected_releases)) self.assertReleasesOk(self.expected_releases) - @istest - def send_batch_releases(self): + def test_send_batch_releases(self): """Sending releases should be ok 2 """ self.loader.send_batch_releases(self.in_releases) self.assertCountReleases(len(self.expected_releases)) self.assertReleasesOk(self.expected_releases) - @istest - def failing(self): + def test_failing(self): """Comparing wrong snapshot should fail. """ self.loader.send_batch_releases(self.in_releases) with self.assertRaises(AssertionError): self.assertReleasesOk([]) class LoadTestRevision(DummyBaseLoaderTest): def setUp(self): super().setUp() self.loader = LoaderNoStorage() rev_id0 = '44e45d56f88993aae6a0198013efa80716fd8921' dir_id0 = '34973274ccef6ab4dfaaf86599792fa9c3fe4689' rev_id1 = '54e45d56f88993aae6a0198013efa80716fd8920' dir_id1 = '61c2b3a30496d329e21af70dd2d7e097046d07b7' rev_id2 = '43e45d56f88993aae6a0198013efa80716fd8920' dir_id2 = '33e45d56f88993aae6a0198013efa80716fd8921' # data trimmed to bare necessities self.in_revisions = [{ 'id': hash_to_bytes(rev_id0), 'directory': hash_to_bytes(dir_id0), }, { 'id': hash_to_bytes(rev_id1), 'directory': hash_to_bytes(dir_id1), }, { 'id': hash_to_bytes(rev_id2), 'directory': hash_to_bytes(dir_id2), }] self.expected_revisions = { rev_id0: dir_id0, rev_id1: dir_id1, rev_id2: dir_id2, } - @istest - def maybe_load_revisions(self): + def test_maybe_load_revisions(self): """Loading revisions should be ok """ self.loader.maybe_load_revisions(self.in_revisions) self.assertCountRevisions(len(self.expected_revisions)) self.assertRevisionsOk(self.expected_revisions) - @istest - def send_batch_revisions(self): + def test_send_batch_revisions(self): """Sending revisions should be ok 2 """ self.loader.send_batch_revisions(self.in_revisions) self.assertCountRevisions(len(self.expected_revisions)) self.assertRevisionsOk(self.expected_revisions) - @istest - def failing(self): + def test_failing(self): """Comparing wrong snapshot should fail. """ self.loader.send_batch_revisions(self.in_revisions) with self.assertRaises(AssertionError): self.assertRevisionsOk([]) class LoadTestSnapshot(DummyBaseLoaderTest): def setUp(self): super().setUp() self.loader = LoaderNoStorage() snapshot_id = '44e45d56f88993aae6a0198013efa80716fd8921' revision_id = '54e45d56f88993aae6a0198013efa80716fd8920' release_id = '43e45d56f88993aae6a0198013efa80716fd8920' # trimmed data to the bare necessities self.expected_snapshot = { 'id': snapshot_id, 'branches': { 'default': { 'target_type': 'revision', 'target': revision_id, }, 'master': { 'target_type': 'release', 'target': release_id, }, 'HEAD': { 'target_type': 'alias', 'target': 'master', } } } self.in_snapshot = { 'id': hash_to_bytes(snapshot_id), 'branches': { b'default': { 'target_type': 'revision', 'target': hash_to_bytes(revision_id), }, b'master': { 'target_type': 'release', 'target': hash_to_bytes(release_id), }, b'HEAD': { 'target_type': 'alias', 'target': b'master', } } } - @istest - def maybe_load_snapshots(self): + def test_maybe_load_snapshots(self): """Loading snapshot should be ok """ self.loader.maybe_load_snapshot(self.in_snapshot) self.assertCountSnapshots(1) self.assertSnapshotOk(self.expected_snapshot) self.assertSnapshotOk( self.expected_snapshot['id'], expected_branches=self.expected_snapshot['branches']) - @istest - def send_batch_snapshots(self): + def test_send_batch_snapshots(self): """Sending snapshot should be ok 2 """ self.loader.send_snapshot(self.in_snapshot) self.assertCountSnapshots(1) self.assertSnapshotOk(self.expected_snapshot) self.assertSnapshotOk( self.expected_snapshot['id'], expected_branches=self.expected_snapshot['branches']) - @istest - def failing(self): + def test_failing(self): """Comparing wrong snapshot should fail. """ self.loader.send_snapshot(self.in_snapshot) with self.assertRaises(AssertionError): self.assertSnapshotOk( 'wrong', expected_branches=self.expected_snapshot['branches']) diff --git a/swh/loader/core/tests/test_queue.py b/swh/loader/core/tests/test_queue.py index c036868..444d36d 100644 --- a/swh/loader/core/tests/test_queue.py +++ b/swh/loader/core/tests/test_queue.py @@ -1,141 +1,136 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest -from nose.tools import istest - -from swh.loader.core.queue import QueuePerNbElements -from swh.loader.core.queue import QueuePerNbUniqueElements -from swh.loader.core.queue import QueuePerSizeAndNbUniqueElements +from swh.loader.core.queue import (QueuePerNbElements, + QueuePerNbUniqueElements, + QueuePerSizeAndNbUniqueElements) class TestQueuePerNbElements(unittest.TestCase): - @istest - def simple_queue_behavior(self): + def test_simple_queue_behavior(self): max_nb_elements = 10 queue = QueuePerNbElements(max_nb_elements=max_nb_elements) elements = [1, 3, 4, 9, 20, 30, 40] actual_threshold = queue.add(elements) self.assertFalse(actual_threshold, len(elements) > max_nb_elements) # pop returns the content and reset the queue actual_elements = queue.pop() - self.assertEquals(actual_elements, elements) - self.assertEquals(queue.pop(), []) + self.assertEqual(actual_elements, elements) + self.assertEqual(queue.pop(), []) # duplicates can be integrated new_elements = [1, 1, 3, 4, 9, 20, 30, 40, 12, 14, 2] actual_threshold = queue.add(new_elements) self.assertTrue(actual_threshold) - self.assertEquals(queue.pop(), new_elements) + self.assertEqual(queue.pop(), new_elements) # reset is destructive too queue.add(new_elements) queue.reset() - self.assertEquals(queue.pop(), []) + self.assertEqual(queue.pop(), []) def to_some_objects(elements, key): for elt in elements: yield {key: elt} class TestQueuePerNbUniqueElements(unittest.TestCase): - @istest - def queue_with_unique_key_behavior(self): + def test_queue_with_unique_key_behavior(self): max_nb_elements = 5 queue = QueuePerNbUniqueElements(max_nb_elements=max_nb_elements, key='id') # no duplicates elements = list(to_some_objects([1, 1, 3, 4, 9], key='id')) actual_threshold = queue.add(elements) self.assertFalse(actual_threshold, len(elements) > max_nb_elements) # pop returns the content and reset the queue actual_elements = queue.pop() - self.assertEquals(actual_elements, - [{'id': 1}, {'id': 3}, {'id': 4}, {'id': 9}]) - self.assertEquals(queue.pop(), []) + self.assertEqual(actual_elements, + [{'id': 1}, {'id': 3}, {'id': 4}, {'id': 9}]) + self.assertEqual(queue.pop(), []) new_elements = list(to_some_objects( [1, 3, 4, 9, 20], key='id')) actual_threshold = queue.add(new_elements) self.assertTrue(actual_threshold) # reset is destructive too queue.add(new_elements) queue.reset() - self.assertEquals(queue.pop(), []) + self.assertEqual(queue.pop(), []) def to_some_complex_objects(elements, key): for elt, size in elements: yield {key: elt, 'length': size} class TestQueuePerSizeAndNbUniqueElements(unittest.TestCase): - @istest - def queue_with_unique_key_and_size_behavior(self): + def test_queue_with_unique_key_and_size_behavior(self): max_nb_elements = 5 max_size = 100 queue = QueuePerSizeAndNbUniqueElements( max_nb_elements=max_nb_elements, max_size=max_size, key='k') # size total exceeded, nb elements not reached, still the # threshold is deemed reached elements = list(to_some_complex_objects([(1, 10), (2, 20), (3, 30), (4, 100)], key='k')) actual_threshold = queue.add(elements) self.assertTrue(actual_threshold) # pop returns the content and reset the queue actual_elements = queue.pop() - self.assertEquals(actual_elements, - [{'k': 1, 'length': 10}, - {'k': 2, 'length': 20}, - {'k': 3, 'length': 30}, - {'k': 4, 'length': 100}]) - self.assertEquals(queue.pop(), []) + self.assertEqual(actual_elements, + [{'k': 1, 'length': 10}, + {'k': 2, 'length': 20}, + {'k': 3, 'length': 30}, + {'k': 4, 'length': 100}]) + self.assertEqual(queue.pop(), []) # size threshold not reached, nb elements reached, the # threshold is considered reached new_elements = list(to_some_complex_objects( [(1, 10), (3, 5), (4, 2), (9, 1), (20, 0)], key='k')) actual_threshold = queue.add(new_elements) queue.reset() self.assertTrue(actual_threshold) # nb elements threshold not reached, nor the top number of # elements, the threshold is not reached new_elements = list(to_some_complex_objects( [(1, 10)], key='k')) actual_threshold = queue.add(new_elements) self.assertFalse(actual_threshold) # reset is destructive too queue.add(new_elements) queue.reset() - self.assertEquals(queue.pop(), []) + self.assertEqual(queue.pop(), []) diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..4093c4e --- /dev/null +++ b/tox.ini @@ -0,0 +1,16 @@ +[tox] +envlist=flake8,py3 + +[testenv:py3] +deps = + .[testing] + pytest-cov +commands = + pytest --cov=swh --cov-branch {posargs} + +[testenv:flake8] +skip_install = true +deps = + flake8 +commands = + {envpython} -m flake8 diff --git a/version.txt b/version.txt index b1aa332..d32c1c0 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.34-0-gc3eb09c \ No newline at end of file +v0.0.35-0-g913eac9 \ No newline at end of file