diff --git a/swh/loader/core/tests/test_loader.py b/swh/loader/core/tests/test_loader.py index 9e8a66d..a27e9be 100644 --- a/swh/loader/core/tests/test_loader.py +++ b/swh/loader/core/tests/test_loader.py @@ -1,325 +1,325 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.model.hashutil import hash_to_bytes from . import BaseLoaderTest, LoaderNoStorage class DummyBaseLoaderTest(BaseLoaderTest): def setUp(self): # do not call voluntarily super().setUp() self.in_contents = [1, 2, 3] self.in_directories = [4, 5, 6] self.in_revisions = [7, 8, 9] self.in_releases = [10, 11, 12] self.in_snapshot = 13 def tearDown(self): # do not call voluntarily super().tearDown() pass class LoadTest1(DummyBaseLoaderTest): def setUp(self): super().setUp() self.loader = LoaderNoStorage() def test_stateful_loader(self): """Stateful loader accumulates in place the sent data Note: Those behaviors should be somehow merged but that's another story. """ self.loader.maybe_load_directories(self.in_directories) self.loader.maybe_load_revisions(self.in_revisions) self.loader.maybe_load_releases(self.in_releases) - self.assertEquals(len(self.state('content')), 0) - self.assertEquals( + self.assertEqual(len(self.state('content')), 0) + self.assertEqual( len(self.state('directory')), len(self.in_directories)) - self.assertEquals( + self.assertEqual( len(self.state('revision')), len(self.in_revisions)) - self.assertEquals( + self.assertEqual( len(self.state('release')), len(self.in_releases)) - self.assertEquals(len(self.state('snapshot')), 0) + self.assertEqual(len(self.state('snapshot')), 0) def test_stateless_loader(self): """Stateless loader accumulates in place the sent data as well Note: Those behaviors should be somehow merged but that's another story. """ self.loader.send_batch_contents(self.in_contents) self.loader.send_snapshot(self.in_snapshot) - self.assertEquals(len(self.state('content')), len(self.in_contents)) - self.assertEquals(len(self.state('directory')), 0) - self.assertEquals(len(self.state('revision')), 0) - self.assertEquals(len(self.state('release')), 0) - self.assertEquals(len(self.state('snapshot')), 1) + self.assertEqual(len(self.state('content')), len(self.in_contents)) + self.assertEqual(len(self.state('directory')), 0) + self.assertEqual(len(self.state('revision')), 0) + self.assertEqual(len(self.state('release')), 0) + self.assertEqual(len(self.state('snapshot')), 1) class LoadTestContent(DummyBaseLoaderTest): def setUp(self): super().setUp() self.loader = LoaderNoStorage() self.content_id0 = '34973274ccef6ab4dfaaf86599792fa9c3fe4689' self.content_id1 = '61c2b3a30496d329e21af70dd2d7e097046d07b7' # trimmed data to the bare necessities self.in_contents = [{ 'sha1': hash_to_bytes(self.content_id0), }, { 'sha1': hash_to_bytes(self.content_id1), }] self.expected_contents = [self.content_id0, self.content_id1] def test_maybe_load_contents(self): """Loading contents should be ok """ self.loader.maybe_load_contents(self.in_contents) self.assertCountContents(len(self.expected_contents)) self.assertContentsOk(self.expected_contents) def test_send_batch_contents(self): """Sending contents should be ok 2 """ self.loader.send_batch_contents(self.in_contents) self.assertCountContents(len(self.expected_contents)) self.assertContentsOk(self.expected_contents) def test_failing(self): """Comparing wrong snapshot should fail. """ self.loader.send_batch_contents(self.in_contents) with self.assertRaises(AssertionError): self.assertContentsOk([]) class LoadTestDirectory(DummyBaseLoaderTest): def setUp(self): super().setUp() self.loader = LoaderNoStorage() self.directory_id0 = '44e45d56f88993aae6a0198013efa80716fd8921' self.directory_id1 = '54e45d56f88993aae6a0198013efa80716fd8920' self.directory_id2 = '43e45d56f88993aae6a0198013efa80716fd8920' # trimmed data to the bare necessities self.in_directories = [{ 'id': hash_to_bytes(self.directory_id0), }, { 'id': hash_to_bytes(self.directory_id1), }, { 'id': hash_to_bytes(self.directory_id2), }] self.expected_directories = [ self.directory_id0, self.directory_id1, self.directory_id2] def test_maybe_load_directories(self): """Loading directories should be ok """ self.loader.maybe_load_directories(self.in_directories) self.assertCountDirectories(len(self.expected_directories)) self.assertDirectoriesOk(self.expected_directories) def test_send_batch_directories(self): """Sending directories should be ok 2 """ self.loader.send_batch_directories(self.in_directories) self.assertCountDirectories(len(self.expected_directories)) self.assertDirectoriesOk(self.expected_directories) def test_failing(self): """Comparing wrong snapshot should fail. """ self.loader.send_batch_revisions(self.in_revisions) with self.assertRaises(AssertionError): self.assertRevisionsOk([]) class LoadTestRelease(DummyBaseLoaderTest): def setUp(self): super().setUp() self.loader = LoaderNoStorage() self.release_id0 = '44e45d56f88993aae6a0198013efa80716fd8921' self.release_id1 = '54e45d56f88993aae6a0198013efa80716fd8920' self.release_id2 = '43e45d56f88993aae6a0198013efa80716fd8920' # trimmed data to the bare necessities self.in_releases = [{ 'id': hash_to_bytes(self.release_id0), }, { 'id': hash_to_bytes(self.release_id1), }, { 'id': hash_to_bytes(self.release_id2), }] self.expected_releases = [ self.release_id0, self.release_id1, self.release_id2] def test_maybe_load_releases(self): """Loading releases should be ok """ self.loader.maybe_load_releases(self.in_releases) self.assertCountReleases(len(self.expected_releases)) self.assertReleasesOk(self.expected_releases) def test_send_batch_releases(self): """Sending releases should be ok 2 """ self.loader.send_batch_releases(self.in_releases) self.assertCountReleases(len(self.expected_releases)) self.assertReleasesOk(self.expected_releases) def test_failing(self): """Comparing wrong snapshot should fail. """ self.loader.send_batch_releases(self.in_releases) with self.assertRaises(AssertionError): self.assertReleasesOk([]) class LoadTestRevision(DummyBaseLoaderTest): def setUp(self): super().setUp() self.loader = LoaderNoStorage() rev_id0 = '44e45d56f88993aae6a0198013efa80716fd8921' dir_id0 = '34973274ccef6ab4dfaaf86599792fa9c3fe4689' rev_id1 = '54e45d56f88993aae6a0198013efa80716fd8920' dir_id1 = '61c2b3a30496d329e21af70dd2d7e097046d07b7' rev_id2 = '43e45d56f88993aae6a0198013efa80716fd8920' dir_id2 = '33e45d56f88993aae6a0198013efa80716fd8921' # data trimmed to bare necessities self.in_revisions = [{ 'id': hash_to_bytes(rev_id0), 'directory': hash_to_bytes(dir_id0), }, { 'id': hash_to_bytes(rev_id1), 'directory': hash_to_bytes(dir_id1), }, { 'id': hash_to_bytes(rev_id2), 'directory': hash_to_bytes(dir_id2), }] self.expected_revisions = { rev_id0: dir_id0, rev_id1: dir_id1, rev_id2: dir_id2, } def test_maybe_load_revisions(self): """Loading revisions should be ok """ self.loader.maybe_load_revisions(self.in_revisions) self.assertCountRevisions(len(self.expected_revisions)) self.assertRevisionsOk(self.expected_revisions) def test_send_batch_revisions(self): """Sending revisions should be ok 2 """ self.loader.send_batch_revisions(self.in_revisions) self.assertCountRevisions(len(self.expected_revisions)) self.assertRevisionsOk(self.expected_revisions) def test_failing(self): """Comparing wrong snapshot should fail. """ self.loader.send_batch_revisions(self.in_revisions) with self.assertRaises(AssertionError): self.assertRevisionsOk([]) class LoadTestSnapshot(DummyBaseLoaderTest): def setUp(self): super().setUp() self.loader = LoaderNoStorage() snapshot_id = '44e45d56f88993aae6a0198013efa80716fd8921' revision_id = '54e45d56f88993aae6a0198013efa80716fd8920' release_id = '43e45d56f88993aae6a0198013efa80716fd8920' # trimmed data to the bare necessities self.expected_snapshot = { 'id': snapshot_id, 'branches': { 'default': { 'target_type': 'revision', 'target': revision_id, }, 'master': { 'target_type': 'release', 'target': release_id, }, 'HEAD': { 'target_type': 'alias', 'target': 'master', } } } self.in_snapshot = { 'id': hash_to_bytes(snapshot_id), 'branches': { b'default': { 'target_type': 'revision', 'target': hash_to_bytes(revision_id), }, b'master': { 'target_type': 'release', 'target': hash_to_bytes(release_id), }, b'HEAD': { 'target_type': 'alias', 'target': b'master', } } } def test_maybe_load_snapshots(self): """Loading snapshot should be ok """ self.loader.maybe_load_snapshot(self.in_snapshot) self.assertCountSnapshots(1) self.assertSnapshotOk(self.expected_snapshot) self.assertSnapshotOk( self.expected_snapshot['id'], expected_branches=self.expected_snapshot['branches']) def test_send_batch_snapshots(self): """Sending snapshot should be ok 2 """ self.loader.send_snapshot(self.in_snapshot) self.assertCountSnapshots(1) self.assertSnapshotOk(self.expected_snapshot) self.assertSnapshotOk( self.expected_snapshot['id'], expected_branches=self.expected_snapshot['branches']) def test_failing(self): """Comparing wrong snapshot should fail. """ self.loader.send_snapshot(self.in_snapshot) with self.assertRaises(AssertionError): self.assertSnapshotOk( 'wrong', expected_branches=self.expected_snapshot['branches']) diff --git a/swh/loader/core/tests/test_queue.py b/swh/loader/core/tests/test_queue.py index b68f03f..444d36d 100644 --- a/swh/loader/core/tests/test_queue.py +++ b/swh/loader/core/tests/test_queue.py @@ -1,136 +1,136 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from swh.loader.core.queue import (QueuePerNbElements, QueuePerNbUniqueElements, QueuePerSizeAndNbUniqueElements) class TestQueuePerNbElements(unittest.TestCase): def test_simple_queue_behavior(self): max_nb_elements = 10 queue = QueuePerNbElements(max_nb_elements=max_nb_elements) elements = [1, 3, 4, 9, 20, 30, 40] actual_threshold = queue.add(elements) self.assertFalse(actual_threshold, len(elements) > max_nb_elements) # pop returns the content and reset the queue actual_elements = queue.pop() - self.assertEquals(actual_elements, elements) - self.assertEquals(queue.pop(), []) + self.assertEqual(actual_elements, elements) + self.assertEqual(queue.pop(), []) # duplicates can be integrated new_elements = [1, 1, 3, 4, 9, 20, 30, 40, 12, 14, 2] actual_threshold = queue.add(new_elements) self.assertTrue(actual_threshold) - self.assertEquals(queue.pop(), new_elements) + self.assertEqual(queue.pop(), new_elements) # reset is destructive too queue.add(new_elements) queue.reset() - self.assertEquals(queue.pop(), []) + self.assertEqual(queue.pop(), []) def to_some_objects(elements, key): for elt in elements: yield {key: elt} class TestQueuePerNbUniqueElements(unittest.TestCase): def test_queue_with_unique_key_behavior(self): max_nb_elements = 5 queue = QueuePerNbUniqueElements(max_nb_elements=max_nb_elements, key='id') # no duplicates elements = list(to_some_objects([1, 1, 3, 4, 9], key='id')) actual_threshold = queue.add(elements) self.assertFalse(actual_threshold, len(elements) > max_nb_elements) # pop returns the content and reset the queue actual_elements = queue.pop() - self.assertEquals(actual_elements, - [{'id': 1}, {'id': 3}, {'id': 4}, {'id': 9}]) - self.assertEquals(queue.pop(), []) + self.assertEqual(actual_elements, + [{'id': 1}, {'id': 3}, {'id': 4}, {'id': 9}]) + self.assertEqual(queue.pop(), []) new_elements = list(to_some_objects( [1, 3, 4, 9, 20], key='id')) actual_threshold = queue.add(new_elements) self.assertTrue(actual_threshold) # reset is destructive too queue.add(new_elements) queue.reset() - self.assertEquals(queue.pop(), []) + self.assertEqual(queue.pop(), []) def to_some_complex_objects(elements, key): for elt, size in elements: yield {key: elt, 'length': size} class TestQueuePerSizeAndNbUniqueElements(unittest.TestCase): def test_queue_with_unique_key_and_size_behavior(self): max_nb_elements = 5 max_size = 100 queue = QueuePerSizeAndNbUniqueElements( max_nb_elements=max_nb_elements, max_size=max_size, key='k') # size total exceeded, nb elements not reached, still the # threshold is deemed reached elements = list(to_some_complex_objects([(1, 10), (2, 20), (3, 30), (4, 100)], key='k')) actual_threshold = queue.add(elements) self.assertTrue(actual_threshold) # pop returns the content and reset the queue actual_elements = queue.pop() - self.assertEquals(actual_elements, - [{'k': 1, 'length': 10}, - {'k': 2, 'length': 20}, - {'k': 3, 'length': 30}, - {'k': 4, 'length': 100}]) - self.assertEquals(queue.pop(), []) + self.assertEqual(actual_elements, + [{'k': 1, 'length': 10}, + {'k': 2, 'length': 20}, + {'k': 3, 'length': 30}, + {'k': 4, 'length': 100}]) + self.assertEqual(queue.pop(), []) # size threshold not reached, nb elements reached, the # threshold is considered reached new_elements = list(to_some_complex_objects( [(1, 10), (3, 5), (4, 2), (9, 1), (20, 0)], key='k')) actual_threshold = queue.add(new_elements) queue.reset() self.assertTrue(actual_threshold) # nb elements threshold not reached, nor the top number of # elements, the threshold is not reached new_elements = list(to_some_complex_objects( [(1, 10)], key='k')) actual_threshold = queue.add(new_elements) self.assertFalse(actual_threshold) # reset is destructive too queue.add(new_elements) queue.reset() - self.assertEquals(queue.pop(), []) + self.assertEqual(queue.pop(), [])