Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/loader/core/tests/test_loader.py b/swh/loader/core/tests/test_loader.py
index 9e8a66d..a27e9be 100644
--- a/swh/loader/core/tests/test_loader.py
+++ b/swh/loader/core/tests/test_loader.py
@@ -1,325 +1,325 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from swh.model.hashutil import hash_to_bytes
from . import BaseLoaderTest, LoaderNoStorage
class DummyBaseLoaderTest(BaseLoaderTest):
def setUp(self):
# do not call voluntarily super().setUp()
self.in_contents = [1, 2, 3]
self.in_directories = [4, 5, 6]
self.in_revisions = [7, 8, 9]
self.in_releases = [10, 11, 12]
self.in_snapshot = 13
def tearDown(self):
# do not call voluntarily super().tearDown()
pass
class LoadTest1(DummyBaseLoaderTest):
def setUp(self):
super().setUp()
self.loader = LoaderNoStorage()
def test_stateful_loader(self):
"""Stateful loader accumulates in place the sent data
Note: Those behaviors should be somehow merged but that's
another story.
"""
self.loader.maybe_load_directories(self.in_directories)
self.loader.maybe_load_revisions(self.in_revisions)
self.loader.maybe_load_releases(self.in_releases)
- self.assertEquals(len(self.state('content')), 0)
- self.assertEquals(
+ self.assertEqual(len(self.state('content')), 0)
+ self.assertEqual(
len(self.state('directory')), len(self.in_directories))
- self.assertEquals(
+ self.assertEqual(
len(self.state('revision')), len(self.in_revisions))
- self.assertEquals(
+ self.assertEqual(
len(self.state('release')), len(self.in_releases))
- self.assertEquals(len(self.state('snapshot')), 0)
+ self.assertEqual(len(self.state('snapshot')), 0)
def test_stateless_loader(self):
"""Stateless loader accumulates in place the sent data as well
Note: Those behaviors should be somehow merged but that's
another story.
"""
self.loader.send_batch_contents(self.in_contents)
self.loader.send_snapshot(self.in_snapshot)
- self.assertEquals(len(self.state('content')), len(self.in_contents))
- self.assertEquals(len(self.state('directory')), 0)
- self.assertEquals(len(self.state('revision')), 0)
- self.assertEquals(len(self.state('release')), 0)
- self.assertEquals(len(self.state('snapshot')), 1)
+ self.assertEqual(len(self.state('content')), len(self.in_contents))
+ self.assertEqual(len(self.state('directory')), 0)
+ self.assertEqual(len(self.state('revision')), 0)
+ self.assertEqual(len(self.state('release')), 0)
+ self.assertEqual(len(self.state('snapshot')), 1)
class LoadTestContent(DummyBaseLoaderTest):
def setUp(self):
super().setUp()
self.loader = LoaderNoStorage()
self.content_id0 = '34973274ccef6ab4dfaaf86599792fa9c3fe4689'
self.content_id1 = '61c2b3a30496d329e21af70dd2d7e097046d07b7'
# trimmed data to the bare necessities
self.in_contents = [{
'sha1': hash_to_bytes(self.content_id0),
}, {
'sha1': hash_to_bytes(self.content_id1),
}]
self.expected_contents = [self.content_id0, self.content_id1]
def test_maybe_load_contents(self):
"""Loading contents should be ok
"""
self.loader.maybe_load_contents(self.in_contents)
self.assertCountContents(len(self.expected_contents))
self.assertContentsOk(self.expected_contents)
def test_send_batch_contents(self):
"""Sending contents should be ok 2
"""
self.loader.send_batch_contents(self.in_contents)
self.assertCountContents(len(self.expected_contents))
self.assertContentsOk(self.expected_contents)
def test_failing(self):
"""Comparing wrong snapshot should fail.
"""
self.loader.send_batch_contents(self.in_contents)
with self.assertRaises(AssertionError):
self.assertContentsOk([])
class LoadTestDirectory(DummyBaseLoaderTest):
def setUp(self):
super().setUp()
self.loader = LoaderNoStorage()
self.directory_id0 = '44e45d56f88993aae6a0198013efa80716fd8921'
self.directory_id1 = '54e45d56f88993aae6a0198013efa80716fd8920'
self.directory_id2 = '43e45d56f88993aae6a0198013efa80716fd8920'
# trimmed data to the bare necessities
self.in_directories = [{
'id': hash_to_bytes(self.directory_id0),
}, {
'id': hash_to_bytes(self.directory_id1),
}, {
'id': hash_to_bytes(self.directory_id2),
}]
self.expected_directories = [
self.directory_id0, self.directory_id1, self.directory_id2]
def test_maybe_load_directories(self):
"""Loading directories should be ok
"""
self.loader.maybe_load_directories(self.in_directories)
self.assertCountDirectories(len(self.expected_directories))
self.assertDirectoriesOk(self.expected_directories)
def test_send_batch_directories(self):
"""Sending directories should be ok 2
"""
self.loader.send_batch_directories(self.in_directories)
self.assertCountDirectories(len(self.expected_directories))
self.assertDirectoriesOk(self.expected_directories)
def test_failing(self):
"""Comparing wrong snapshot should fail.
"""
self.loader.send_batch_revisions(self.in_revisions)
with self.assertRaises(AssertionError):
self.assertRevisionsOk([])
class LoadTestRelease(DummyBaseLoaderTest):
def setUp(self):
super().setUp()
self.loader = LoaderNoStorage()
self.release_id0 = '44e45d56f88993aae6a0198013efa80716fd8921'
self.release_id1 = '54e45d56f88993aae6a0198013efa80716fd8920'
self.release_id2 = '43e45d56f88993aae6a0198013efa80716fd8920'
# trimmed data to the bare necessities
self.in_releases = [{
'id': hash_to_bytes(self.release_id0),
}, {
'id': hash_to_bytes(self.release_id1),
}, {
'id': hash_to_bytes(self.release_id2),
}]
self.expected_releases = [
self.release_id0, self.release_id1, self.release_id2]
def test_maybe_load_releases(self):
"""Loading releases should be ok
"""
self.loader.maybe_load_releases(self.in_releases)
self.assertCountReleases(len(self.expected_releases))
self.assertReleasesOk(self.expected_releases)
def test_send_batch_releases(self):
"""Sending releases should be ok 2
"""
self.loader.send_batch_releases(self.in_releases)
self.assertCountReleases(len(self.expected_releases))
self.assertReleasesOk(self.expected_releases)
def test_failing(self):
"""Comparing wrong snapshot should fail.
"""
self.loader.send_batch_releases(self.in_releases)
with self.assertRaises(AssertionError):
self.assertReleasesOk([])
class LoadTestRevision(DummyBaseLoaderTest):
def setUp(self):
super().setUp()
self.loader = LoaderNoStorage()
rev_id0 = '44e45d56f88993aae6a0198013efa80716fd8921'
dir_id0 = '34973274ccef6ab4dfaaf86599792fa9c3fe4689'
rev_id1 = '54e45d56f88993aae6a0198013efa80716fd8920'
dir_id1 = '61c2b3a30496d329e21af70dd2d7e097046d07b7'
rev_id2 = '43e45d56f88993aae6a0198013efa80716fd8920'
dir_id2 = '33e45d56f88993aae6a0198013efa80716fd8921'
# data trimmed to bare necessities
self.in_revisions = [{
'id': hash_to_bytes(rev_id0),
'directory': hash_to_bytes(dir_id0),
}, {
'id': hash_to_bytes(rev_id1),
'directory': hash_to_bytes(dir_id1),
}, {
'id': hash_to_bytes(rev_id2),
'directory': hash_to_bytes(dir_id2),
}]
self.expected_revisions = {
rev_id0: dir_id0,
rev_id1: dir_id1,
rev_id2: dir_id2,
}
def test_maybe_load_revisions(self):
"""Loading revisions should be ok
"""
self.loader.maybe_load_revisions(self.in_revisions)
self.assertCountRevisions(len(self.expected_revisions))
self.assertRevisionsOk(self.expected_revisions)
def test_send_batch_revisions(self):
"""Sending revisions should be ok 2
"""
self.loader.send_batch_revisions(self.in_revisions)
self.assertCountRevisions(len(self.expected_revisions))
self.assertRevisionsOk(self.expected_revisions)
def test_failing(self):
"""Comparing wrong snapshot should fail.
"""
self.loader.send_batch_revisions(self.in_revisions)
with self.assertRaises(AssertionError):
self.assertRevisionsOk([])
class LoadTestSnapshot(DummyBaseLoaderTest):
def setUp(self):
super().setUp()
self.loader = LoaderNoStorage()
snapshot_id = '44e45d56f88993aae6a0198013efa80716fd8921'
revision_id = '54e45d56f88993aae6a0198013efa80716fd8920'
release_id = '43e45d56f88993aae6a0198013efa80716fd8920'
# trimmed data to the bare necessities
self.expected_snapshot = {
'id': snapshot_id,
'branches': {
'default': {
'target_type': 'revision',
'target': revision_id,
},
'master': {
'target_type': 'release',
'target': release_id,
},
'HEAD': {
'target_type': 'alias',
'target': 'master',
}
}
}
self.in_snapshot = {
'id': hash_to_bytes(snapshot_id),
'branches': {
b'default': {
'target_type': 'revision',
'target': hash_to_bytes(revision_id),
},
b'master': {
'target_type': 'release',
'target': hash_to_bytes(release_id),
},
b'HEAD': {
'target_type': 'alias',
'target': b'master',
}
}
}
def test_maybe_load_snapshots(self):
"""Loading snapshot should be ok
"""
self.loader.maybe_load_snapshot(self.in_snapshot)
self.assertCountSnapshots(1)
self.assertSnapshotOk(self.expected_snapshot)
self.assertSnapshotOk(
self.expected_snapshot['id'],
expected_branches=self.expected_snapshot['branches'])
def test_send_batch_snapshots(self):
"""Sending snapshot should be ok 2
"""
self.loader.send_snapshot(self.in_snapshot)
self.assertCountSnapshots(1)
self.assertSnapshotOk(self.expected_snapshot)
self.assertSnapshotOk(
self.expected_snapshot['id'],
expected_branches=self.expected_snapshot['branches'])
def test_failing(self):
"""Comparing wrong snapshot should fail.
"""
self.loader.send_snapshot(self.in_snapshot)
with self.assertRaises(AssertionError):
self.assertSnapshotOk(
'wrong', expected_branches=self.expected_snapshot['branches'])
diff --git a/swh/loader/core/tests/test_queue.py b/swh/loader/core/tests/test_queue.py
index b68f03f..444d36d 100644
--- a/swh/loader/core/tests/test_queue.py
+++ b/swh/loader/core/tests/test_queue.py
@@ -1,136 +1,136 @@
# Copyright (C) 2015-2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from swh.loader.core.queue import (QueuePerNbElements,
QueuePerNbUniqueElements,
QueuePerSizeAndNbUniqueElements)
class TestQueuePerNbElements(unittest.TestCase):
def test_simple_queue_behavior(self):
max_nb_elements = 10
queue = QueuePerNbElements(max_nb_elements=max_nb_elements)
elements = [1, 3, 4, 9, 20, 30, 40]
actual_threshold = queue.add(elements)
self.assertFalse(actual_threshold, len(elements) > max_nb_elements)
# pop returns the content and reset the queue
actual_elements = queue.pop()
- self.assertEquals(actual_elements, elements)
- self.assertEquals(queue.pop(), [])
+ self.assertEqual(actual_elements, elements)
+ self.assertEqual(queue.pop(), [])
# duplicates can be integrated
new_elements = [1, 1, 3, 4, 9, 20, 30, 40, 12, 14, 2]
actual_threshold = queue.add(new_elements)
self.assertTrue(actual_threshold)
- self.assertEquals(queue.pop(), new_elements)
+ self.assertEqual(queue.pop(), new_elements)
# reset is destructive too
queue.add(new_elements)
queue.reset()
- self.assertEquals(queue.pop(), [])
+ self.assertEqual(queue.pop(), [])
def to_some_objects(elements, key):
for elt in elements:
yield {key: elt}
class TestQueuePerNbUniqueElements(unittest.TestCase):
def test_queue_with_unique_key_behavior(self):
max_nb_elements = 5
queue = QueuePerNbUniqueElements(max_nb_elements=max_nb_elements,
key='id')
# no duplicates
elements = list(to_some_objects([1, 1, 3, 4, 9], key='id'))
actual_threshold = queue.add(elements)
self.assertFalse(actual_threshold, len(elements) > max_nb_elements)
# pop returns the content and reset the queue
actual_elements = queue.pop()
- self.assertEquals(actual_elements,
- [{'id': 1}, {'id': 3}, {'id': 4}, {'id': 9}])
- self.assertEquals(queue.pop(), [])
+ self.assertEqual(actual_elements,
+ [{'id': 1}, {'id': 3}, {'id': 4}, {'id': 9}])
+ self.assertEqual(queue.pop(), [])
new_elements = list(to_some_objects(
[1, 3, 4, 9, 20],
key='id'))
actual_threshold = queue.add(new_elements)
self.assertTrue(actual_threshold)
# reset is destructive too
queue.add(new_elements)
queue.reset()
- self.assertEquals(queue.pop(), [])
+ self.assertEqual(queue.pop(), [])
def to_some_complex_objects(elements, key):
for elt, size in elements:
yield {key: elt, 'length': size}
class TestQueuePerSizeAndNbUniqueElements(unittest.TestCase):
def test_queue_with_unique_key_and_size_behavior(self):
max_nb_elements = 5
max_size = 100
queue = QueuePerSizeAndNbUniqueElements(
max_nb_elements=max_nb_elements,
max_size=max_size,
key='k')
# size total exceeded, nb elements not reached, still the
# threshold is deemed reached
elements = list(to_some_complex_objects([(1, 10),
(2, 20),
(3, 30),
(4, 100)], key='k'))
actual_threshold = queue.add(elements)
self.assertTrue(actual_threshold)
# pop returns the content and reset the queue
actual_elements = queue.pop()
- self.assertEquals(actual_elements,
- [{'k': 1, 'length': 10},
- {'k': 2, 'length': 20},
- {'k': 3, 'length': 30},
- {'k': 4, 'length': 100}])
- self.assertEquals(queue.pop(), [])
+ self.assertEqual(actual_elements,
+ [{'k': 1, 'length': 10},
+ {'k': 2, 'length': 20},
+ {'k': 3, 'length': 30},
+ {'k': 4, 'length': 100}])
+ self.assertEqual(queue.pop(), [])
# size threshold not reached, nb elements reached, the
# threshold is considered reached
new_elements = list(to_some_complex_objects(
[(1, 10), (3, 5), (4, 2), (9, 1), (20, 0)],
key='k'))
actual_threshold = queue.add(new_elements)
queue.reset()
self.assertTrue(actual_threshold)
# nb elements threshold not reached, nor the top number of
# elements, the threshold is not reached
new_elements = list(to_some_complex_objects(
[(1, 10)],
key='k'))
actual_threshold = queue.add(new_elements)
self.assertFalse(actual_threshold)
# reset is destructive too
queue.add(new_elements)
queue.reset()
- self.assertEquals(queue.pop(), [])
+ self.assertEqual(queue.pop(), [])

File Metadata

Mime Type
text/x-diff
Expires
Tue, Aug 19, 12:48 AM (3 w, 15 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3274607

Event Timeline