Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9697751
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
17 KB
Subscribers
None
View Options
diff --git a/swh/loader/core/tests/test_loader.py b/swh/loader/core/tests/test_loader.py
index 9e8a66d..a27e9be 100644
--- a/swh/loader/core/tests/test_loader.py
+++ b/swh/loader/core/tests/test_loader.py
@@ -1,325 +1,325 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from swh.model.hashutil import hash_to_bytes
from . import BaseLoaderTest, LoaderNoStorage
class DummyBaseLoaderTest(BaseLoaderTest):
def setUp(self):
# do not call voluntarily super().setUp()
self.in_contents = [1, 2, 3]
self.in_directories = [4, 5, 6]
self.in_revisions = [7, 8, 9]
self.in_releases = [10, 11, 12]
self.in_snapshot = 13
def tearDown(self):
# do not call voluntarily super().tearDown()
pass
class LoadTest1(DummyBaseLoaderTest):
def setUp(self):
super().setUp()
self.loader = LoaderNoStorage()
def test_stateful_loader(self):
"""Stateful loader accumulates in place the sent data
Note: Those behaviors should be somehow merged but that's
another story.
"""
self.loader.maybe_load_directories(self.in_directories)
self.loader.maybe_load_revisions(self.in_revisions)
self.loader.maybe_load_releases(self.in_releases)
- self.assertEquals(len(self.state('content')), 0)
- self.assertEquals(
+ self.assertEqual(len(self.state('content')), 0)
+ self.assertEqual(
len(self.state('directory')), len(self.in_directories))
- self.assertEquals(
+ self.assertEqual(
len(self.state('revision')), len(self.in_revisions))
- self.assertEquals(
+ self.assertEqual(
len(self.state('release')), len(self.in_releases))
- self.assertEquals(len(self.state('snapshot')), 0)
+ self.assertEqual(len(self.state('snapshot')), 0)
def test_stateless_loader(self):
"""Stateless loader accumulates in place the sent data as well
Note: Those behaviors should be somehow merged but that's
another story.
"""
self.loader.send_batch_contents(self.in_contents)
self.loader.send_snapshot(self.in_snapshot)
- self.assertEquals(len(self.state('content')), len(self.in_contents))
- self.assertEquals(len(self.state('directory')), 0)
- self.assertEquals(len(self.state('revision')), 0)
- self.assertEquals(len(self.state('release')), 0)
- self.assertEquals(len(self.state('snapshot')), 1)
+ self.assertEqual(len(self.state('content')), len(self.in_contents))
+ self.assertEqual(len(self.state('directory')), 0)
+ self.assertEqual(len(self.state('revision')), 0)
+ self.assertEqual(len(self.state('release')), 0)
+ self.assertEqual(len(self.state('snapshot')), 1)
class LoadTestContent(DummyBaseLoaderTest):
def setUp(self):
super().setUp()
self.loader = LoaderNoStorage()
self.content_id0 = '34973274ccef6ab4dfaaf86599792fa9c3fe4689'
self.content_id1 = '61c2b3a30496d329e21af70dd2d7e097046d07b7'
# trimmed data to the bare necessities
self.in_contents = [{
'sha1': hash_to_bytes(self.content_id0),
}, {
'sha1': hash_to_bytes(self.content_id1),
}]
self.expected_contents = [self.content_id0, self.content_id1]
def test_maybe_load_contents(self):
"""Loading contents should be ok
"""
self.loader.maybe_load_contents(self.in_contents)
self.assertCountContents(len(self.expected_contents))
self.assertContentsOk(self.expected_contents)
def test_send_batch_contents(self):
"""Sending contents should be ok 2
"""
self.loader.send_batch_contents(self.in_contents)
self.assertCountContents(len(self.expected_contents))
self.assertContentsOk(self.expected_contents)
def test_failing(self):
"""Comparing wrong snapshot should fail.
"""
self.loader.send_batch_contents(self.in_contents)
with self.assertRaises(AssertionError):
self.assertContentsOk([])
class LoadTestDirectory(DummyBaseLoaderTest):
def setUp(self):
super().setUp()
self.loader = LoaderNoStorage()
self.directory_id0 = '44e45d56f88993aae6a0198013efa80716fd8921'
self.directory_id1 = '54e45d56f88993aae6a0198013efa80716fd8920'
self.directory_id2 = '43e45d56f88993aae6a0198013efa80716fd8920'
# trimmed data to the bare necessities
self.in_directories = [{
'id': hash_to_bytes(self.directory_id0),
}, {
'id': hash_to_bytes(self.directory_id1),
}, {
'id': hash_to_bytes(self.directory_id2),
}]
self.expected_directories = [
self.directory_id0, self.directory_id1, self.directory_id2]
def test_maybe_load_directories(self):
"""Loading directories should be ok
"""
self.loader.maybe_load_directories(self.in_directories)
self.assertCountDirectories(len(self.expected_directories))
self.assertDirectoriesOk(self.expected_directories)
def test_send_batch_directories(self):
"""Sending directories should be ok 2
"""
self.loader.send_batch_directories(self.in_directories)
self.assertCountDirectories(len(self.expected_directories))
self.assertDirectoriesOk(self.expected_directories)
def test_failing(self):
"""Comparing wrong snapshot should fail.
"""
self.loader.send_batch_revisions(self.in_revisions)
with self.assertRaises(AssertionError):
self.assertRevisionsOk([])
class LoadTestRelease(DummyBaseLoaderTest):
def setUp(self):
super().setUp()
self.loader = LoaderNoStorage()
self.release_id0 = '44e45d56f88993aae6a0198013efa80716fd8921'
self.release_id1 = '54e45d56f88993aae6a0198013efa80716fd8920'
self.release_id2 = '43e45d56f88993aae6a0198013efa80716fd8920'
# trimmed data to the bare necessities
self.in_releases = [{
'id': hash_to_bytes(self.release_id0),
}, {
'id': hash_to_bytes(self.release_id1),
}, {
'id': hash_to_bytes(self.release_id2),
}]
self.expected_releases = [
self.release_id0, self.release_id1, self.release_id2]
def test_maybe_load_releases(self):
"""Loading releases should be ok
"""
self.loader.maybe_load_releases(self.in_releases)
self.assertCountReleases(len(self.expected_releases))
self.assertReleasesOk(self.expected_releases)
def test_send_batch_releases(self):
"""Sending releases should be ok 2
"""
self.loader.send_batch_releases(self.in_releases)
self.assertCountReleases(len(self.expected_releases))
self.assertReleasesOk(self.expected_releases)
def test_failing(self):
"""Comparing wrong snapshot should fail.
"""
self.loader.send_batch_releases(self.in_releases)
with self.assertRaises(AssertionError):
self.assertReleasesOk([])
class LoadTestRevision(DummyBaseLoaderTest):
def setUp(self):
super().setUp()
self.loader = LoaderNoStorage()
rev_id0 = '44e45d56f88993aae6a0198013efa80716fd8921'
dir_id0 = '34973274ccef6ab4dfaaf86599792fa9c3fe4689'
rev_id1 = '54e45d56f88993aae6a0198013efa80716fd8920'
dir_id1 = '61c2b3a30496d329e21af70dd2d7e097046d07b7'
rev_id2 = '43e45d56f88993aae6a0198013efa80716fd8920'
dir_id2 = '33e45d56f88993aae6a0198013efa80716fd8921'
# data trimmed to bare necessities
self.in_revisions = [{
'id': hash_to_bytes(rev_id0),
'directory': hash_to_bytes(dir_id0),
}, {
'id': hash_to_bytes(rev_id1),
'directory': hash_to_bytes(dir_id1),
}, {
'id': hash_to_bytes(rev_id2),
'directory': hash_to_bytes(dir_id2),
}]
self.expected_revisions = {
rev_id0: dir_id0,
rev_id1: dir_id1,
rev_id2: dir_id2,
}
def test_maybe_load_revisions(self):
"""Loading revisions should be ok
"""
self.loader.maybe_load_revisions(self.in_revisions)
self.assertCountRevisions(len(self.expected_revisions))
self.assertRevisionsOk(self.expected_revisions)
def test_send_batch_revisions(self):
"""Sending revisions should be ok 2
"""
self.loader.send_batch_revisions(self.in_revisions)
self.assertCountRevisions(len(self.expected_revisions))
self.assertRevisionsOk(self.expected_revisions)
def test_failing(self):
"""Comparing wrong snapshot should fail.
"""
self.loader.send_batch_revisions(self.in_revisions)
with self.assertRaises(AssertionError):
self.assertRevisionsOk([])
class LoadTestSnapshot(DummyBaseLoaderTest):
def setUp(self):
super().setUp()
self.loader = LoaderNoStorage()
snapshot_id = '44e45d56f88993aae6a0198013efa80716fd8921'
revision_id = '54e45d56f88993aae6a0198013efa80716fd8920'
release_id = '43e45d56f88993aae6a0198013efa80716fd8920'
# trimmed data to the bare necessities
self.expected_snapshot = {
'id': snapshot_id,
'branches': {
'default': {
'target_type': 'revision',
'target': revision_id,
},
'master': {
'target_type': 'release',
'target': release_id,
},
'HEAD': {
'target_type': 'alias',
'target': 'master',
}
}
}
self.in_snapshot = {
'id': hash_to_bytes(snapshot_id),
'branches': {
b'default': {
'target_type': 'revision',
'target': hash_to_bytes(revision_id),
},
b'master': {
'target_type': 'release',
'target': hash_to_bytes(release_id),
},
b'HEAD': {
'target_type': 'alias',
'target': b'master',
}
}
}
def test_maybe_load_snapshots(self):
"""Loading snapshot should be ok
"""
self.loader.maybe_load_snapshot(self.in_snapshot)
self.assertCountSnapshots(1)
self.assertSnapshotOk(self.expected_snapshot)
self.assertSnapshotOk(
self.expected_snapshot['id'],
expected_branches=self.expected_snapshot['branches'])
def test_send_batch_snapshots(self):
"""Sending snapshot should be ok 2
"""
self.loader.send_snapshot(self.in_snapshot)
self.assertCountSnapshots(1)
self.assertSnapshotOk(self.expected_snapshot)
self.assertSnapshotOk(
self.expected_snapshot['id'],
expected_branches=self.expected_snapshot['branches'])
def test_failing(self):
"""Comparing wrong snapshot should fail.
"""
self.loader.send_snapshot(self.in_snapshot)
with self.assertRaises(AssertionError):
self.assertSnapshotOk(
'wrong', expected_branches=self.expected_snapshot['branches'])
diff --git a/swh/loader/core/tests/test_queue.py b/swh/loader/core/tests/test_queue.py
index b68f03f..444d36d 100644
--- a/swh/loader/core/tests/test_queue.py
+++ b/swh/loader/core/tests/test_queue.py
@@ -1,136 +1,136 @@
# Copyright (C) 2015-2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from swh.loader.core.queue import (QueuePerNbElements,
QueuePerNbUniqueElements,
QueuePerSizeAndNbUniqueElements)
class TestQueuePerNbElements(unittest.TestCase):
def test_simple_queue_behavior(self):
max_nb_elements = 10
queue = QueuePerNbElements(max_nb_elements=max_nb_elements)
elements = [1, 3, 4, 9, 20, 30, 40]
actual_threshold = queue.add(elements)
self.assertFalse(actual_threshold, len(elements) > max_nb_elements)
# pop returns the content and reset the queue
actual_elements = queue.pop()
- self.assertEquals(actual_elements, elements)
- self.assertEquals(queue.pop(), [])
+ self.assertEqual(actual_elements, elements)
+ self.assertEqual(queue.pop(), [])
# duplicates can be integrated
new_elements = [1, 1, 3, 4, 9, 20, 30, 40, 12, 14, 2]
actual_threshold = queue.add(new_elements)
self.assertTrue(actual_threshold)
- self.assertEquals(queue.pop(), new_elements)
+ self.assertEqual(queue.pop(), new_elements)
# reset is destructive too
queue.add(new_elements)
queue.reset()
- self.assertEquals(queue.pop(), [])
+ self.assertEqual(queue.pop(), [])
def to_some_objects(elements, key):
for elt in elements:
yield {key: elt}
class TestQueuePerNbUniqueElements(unittest.TestCase):
def test_queue_with_unique_key_behavior(self):
max_nb_elements = 5
queue = QueuePerNbUniqueElements(max_nb_elements=max_nb_elements,
key='id')
# no duplicates
elements = list(to_some_objects([1, 1, 3, 4, 9], key='id'))
actual_threshold = queue.add(elements)
self.assertFalse(actual_threshold, len(elements) > max_nb_elements)
# pop returns the content and reset the queue
actual_elements = queue.pop()
- self.assertEquals(actual_elements,
- [{'id': 1}, {'id': 3}, {'id': 4}, {'id': 9}])
- self.assertEquals(queue.pop(), [])
+ self.assertEqual(actual_elements,
+ [{'id': 1}, {'id': 3}, {'id': 4}, {'id': 9}])
+ self.assertEqual(queue.pop(), [])
new_elements = list(to_some_objects(
[1, 3, 4, 9, 20],
key='id'))
actual_threshold = queue.add(new_elements)
self.assertTrue(actual_threshold)
# reset is destructive too
queue.add(new_elements)
queue.reset()
- self.assertEquals(queue.pop(), [])
+ self.assertEqual(queue.pop(), [])
def to_some_complex_objects(elements, key):
for elt, size in elements:
yield {key: elt, 'length': size}
class TestQueuePerSizeAndNbUniqueElements(unittest.TestCase):
def test_queue_with_unique_key_and_size_behavior(self):
max_nb_elements = 5
max_size = 100
queue = QueuePerSizeAndNbUniqueElements(
max_nb_elements=max_nb_elements,
max_size=max_size,
key='k')
# size total exceeded, nb elements not reached, still the
# threshold is deemed reached
elements = list(to_some_complex_objects([(1, 10),
(2, 20),
(3, 30),
(4, 100)], key='k'))
actual_threshold = queue.add(elements)
self.assertTrue(actual_threshold)
# pop returns the content and reset the queue
actual_elements = queue.pop()
- self.assertEquals(actual_elements,
- [{'k': 1, 'length': 10},
- {'k': 2, 'length': 20},
- {'k': 3, 'length': 30},
- {'k': 4, 'length': 100}])
- self.assertEquals(queue.pop(), [])
+ self.assertEqual(actual_elements,
+ [{'k': 1, 'length': 10},
+ {'k': 2, 'length': 20},
+ {'k': 3, 'length': 30},
+ {'k': 4, 'length': 100}])
+ self.assertEqual(queue.pop(), [])
# size threshold not reached, nb elements reached, the
# threshold is considered reached
new_elements = list(to_some_complex_objects(
[(1, 10), (3, 5), (4, 2), (9, 1), (20, 0)],
key='k'))
actual_threshold = queue.add(new_elements)
queue.reset()
self.assertTrue(actual_threshold)
# nb elements threshold not reached, nor the top number of
# elements, the threshold is not reached
new_elements = list(to_some_complex_objects(
[(1, 10)],
key='k'))
actual_threshold = queue.add(new_elements)
self.assertFalse(actual_threshold)
# reset is destructive too
queue.add(new_elements)
queue.reset()
- self.assertEquals(queue.pop(), [])
+ self.assertEqual(queue.pop(), [])
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Tue, Aug 19, 12:48 AM (3 w, 15 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3274607
Attached To
rDLDBASE Generic VCS/Package Loader
Event Timeline
Log In to Comment