Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_storage.py
Show All 30 Lines | def setUp(self): | ||||
db = self.test_db[self.TEST_DB_NAME] | db = self.test_db[self.TEST_DB_NAME] | ||||
self.conn = db.conn | self.conn = db.conn | ||||
self.cursor = db.cursor | self.cursor = db.cursor | ||||
self.maxDiff = None | self.maxDiff = None | ||||
def tearDown(self): | def tearDown(self): | ||||
self.reset_storage_tables() | self.reset_storage() | ||||
super().tearDown() | super().tearDown() | ||||
class TestStorageData: | class TestStorageData: | ||||
def setUp(self): | def setUp(self): | ||||
super().setUp() | super().setUp() | ||||
self.cont = { | self.cont = { | ||||
▲ Show 20 Lines • Show All 1,503 Lines • ▼ Show 20 Lines | def test_origin_search(self): | ||||
found_origins = list(self.storage.origin_search('/', offset=1, limit=1)) # noqa | found_origins = list(self.storage.origin_search('/', offset=1, limit=1)) # noqa | ||||
self.assertEqual(len(found_origins), 1) | self.assertEqual(len(found_origins), 1) | ||||
self.assertEqual(found_origins[0], origin2_data) | self.assertEqual(found_origins[0], origin2_data) | ||||
found_origins = list(self.storage.origin_search('.*/.*', offset=1, limit=1, regexp=True)) # noqa | found_origins = list(self.storage.origin_search('.*/.*', offset=1, limit=1, regexp=True)) # noqa | ||||
self.assertEqual(len(found_origins), 1) | self.assertEqual(len(found_origins), 1) | ||||
self.assertEqual(found_origins[0], origin2_data) | self.assertEqual(found_origins[0], origin2_data) | ||||
def test_origin_visit_add(self): | @given(strategies.booleans()) | ||||
# given | def test_origin_visit_add(self, use_url): | ||||
self.assertIsNone(self.storage.origin_get([self.origin2])[0]) | self.reset_storage() | ||||
origin_id = self.storage.origin_add_one(self.origin2) | |||||
self.assertIsNotNone(origin_id) | |||||
# when | |||||
origin_visit1 = self.storage.origin_visit_add( | |||||
origin_id, | |||||
type='git', | |||||
date=self.date_visit2) | |||||
actual_origin_visits = list(self.storage.origin_visit_get(origin_id)) | |||||
self.assertEqual(actual_origin_visits, | |||||
[{ | |||||
'origin': origin_id, | |||||
'date': self.date_visit2, | |||||
'visit': origin_visit1['visit'], | |||||
'type': 'git', | |||||
'status': 'ongoing', | |||||
'metadata': None, | |||||
'snapshot': None, | |||||
}]) | |||||
expected_origin = self.origin2.copy() | |||||
data = { | |||||
'origin': expected_origin, | |||||
'date': self.date_visit2, | |||||
'visit': origin_visit1['visit'], | |||||
'type': 'git', | |||||
'status': 'ongoing', | |||||
'metadata': None, | |||||
'snapshot': None, | |||||
} | |||||
self.assertEqual(list(self.journal_writer.objects), | |||||
[('origin', expected_origin), | |||||
('origin_visit', data)]) | |||||
def test_origin_visit_add_from_url(self): | |||||
# given | # given | ||||
self.assertIsNone(self.storage.origin_get([self.origin2])[0]) | self.assertIsNone(self.storage.origin_get([self.origin2])[0]) | ||||
origin_id = self.storage.origin_add_one(self.origin2) | origin_id = self.storage.origin_add_one(self.origin2) | ||||
origin_url = self.origin2['url'] | |||||
self.assertIsNotNone(origin_id) | self.assertIsNotNone(origin_id) | ||||
origin_id_or_url = self.origin2['url'] if use_url else origin_id | |||||
# when | # when | ||||
origin_visit1 = self.storage.origin_visit_add( | origin_visit1 = self.storage.origin_visit_add( | ||||
origin_url, | origin_id_or_url, | ||||
anlambert: Just a nitpick: I would use an `origin` variable here instead of duplicating the `if else`… | |||||
type='git', | type='git', | ||||
date=self.date_visit2) | date=self.date_visit2) | ||||
actual_origin_visits = list(self.storage.origin_visit_get(origin_id)) | actual_origin_visits = list(self.storage.origin_visit_get( | ||||
origin_id_or_url)) | |||||
self.assertEqual(actual_origin_visits, | self.assertEqual(actual_origin_visits, | ||||
[{ | [{ | ||||
'origin': origin_id, | 'origin': origin_id, | ||||
'date': self.date_visit2, | 'date': self.date_visit2, | ||||
'visit': origin_visit1['visit'], | 'visit': origin_visit1['visit'], | ||||
'type': 'git', | 'type': 'git', | ||||
'status': 'ongoing', | 'status': 'ongoing', | ||||
'metadata': None, | 'metadata': None, | ||||
▲ Show 20 Lines • Show All 1,165 Lines • ▼ Show 20 Lines | def test_snapshot_get_nonexistent(self): | ||||
by_id = self.storage.snapshot_get(bogus_snapshot_id) | by_id = self.storage.snapshot_get(bogus_snapshot_id) | ||||
self.assertIsNone(by_id) | self.assertIsNone(by_id) | ||||
by_ov = self.storage.snapshot_get_by_origin_visit(bogus_origin_id, | by_ov = self.storage.snapshot_get_by_origin_visit(bogus_origin_id, | ||||
bogus_visit_id) | bogus_visit_id) | ||||
self.assertIsNone(by_ov) | self.assertIsNone(by_ov) | ||||
def test_snapshot_get_latest(self): | @given(strategies.booleans()) | ||||
def test_snapshot_get_latest(self, use_url): | |||||
self.reset_storage() | |||||
origin_id = self.storage.origin_add_one(self.origin) | origin_id = self.storage.origin_add_one(self.origin) | ||||
origin_id_or_url = self.origin['url'] if use_url else origin_id | |||||
origin_visit1 = self.storage.origin_visit_add(origin_id, | origin_visit1 = self.storage.origin_visit_add(origin_id, | ||||
self.date_visit1) | self.date_visit1) | ||||
visit1_id = origin_visit1['visit'] | visit1_id = origin_visit1['visit'] | ||||
origin_visit2 = self.storage.origin_visit_add(origin_id, | origin_visit2 = self.storage.origin_visit_add(origin_id, | ||||
self.date_visit2) | self.date_visit2) | ||||
visit2_id = origin_visit2['visit'] | visit2_id = origin_visit2['visit'] | ||||
# Add a visit with the same date as the previous one | # Add a visit with the same date as the previous one | ||||
origin_visit3 = self.storage.origin_visit_add(origin_id, | origin_visit3 = self.storage.origin_visit_add(origin_id, | ||||
self.date_visit2) | self.date_visit2) | ||||
visit3_id = origin_visit3['visit'] | visit3_id = origin_visit3['visit'] | ||||
# Two visits, both with no snapshot: latest snapshot is None | # Two visits, both with no snapshot: latest snapshot is None | ||||
self.assertIsNone(self.storage.snapshot_get_latest(origin_id)) | self.assertIsNone(self.storage.snapshot_get_latest( | ||||
Not Done Inline ActionsSame here. anlambert: Same here. | |||||
origin_id_or_url)) | |||||
# Add snapshot to visit1, latest snapshot = visit 1 snapshot | # Add snapshot to visit1, latest snapshot = visit 1 snapshot | ||||
self.storage.snapshot_add([self.complete_snapshot]) | self.storage.snapshot_add([self.complete_snapshot]) | ||||
self.storage.origin_visit_update( | self.storage.origin_visit_update( | ||||
origin_id, visit1_id, snapshot=self.complete_snapshot['id']) | origin_id, visit1_id, snapshot=self.complete_snapshot['id']) | ||||
self.assertEqual(self.complete_snapshot, | self.assertEqual(self.complete_snapshot, | ||||
self.storage.snapshot_get_latest(origin_id)) | self.storage.snapshot_get_latest( | ||||
origin_id_or_url)) | |||||
# Status filter: all three visits are status=ongoing, so no snapshot | # Status filter: all three visits are status=ongoing, so no snapshot | ||||
# returned | # returned | ||||
self.assertIsNone( | self.assertIsNone( | ||||
self.storage.snapshot_get_latest(origin_id, | self.storage.snapshot_get_latest( | ||||
origin_id_or_url, | |||||
allowed_statuses=['full']) | allowed_statuses=['full']) | ||||
) | ) | ||||
# Mark the first visit as completed and check status filter again | # Mark the first visit as completed and check status filter again | ||||
self.storage.origin_visit_update(origin_id, visit1_id, status='full') | self.storage.origin_visit_update(origin_id, visit1_id, status='full') | ||||
self.assertEqual( | self.assertEqual( | ||||
self.complete_snapshot, | self.complete_snapshot, | ||||
self.storage.snapshot_get_latest(origin_id, | self.storage.snapshot_get_latest( | ||||
origin_id_or_url, | |||||
allowed_statuses=['full']), | allowed_statuses=['full']), | ||||
) | ) | ||||
# Add snapshot to visit2 and check that the new snapshot is returned | # Add snapshot to visit2 and check that the new snapshot is returned | ||||
self.storage.snapshot_add([self.empty_snapshot]) | self.storage.snapshot_add([self.empty_snapshot]) | ||||
self.storage.origin_visit_update( | self.storage.origin_visit_update( | ||||
origin_id, visit2_id, snapshot=self.empty_snapshot['id']) | origin_id, visit2_id, snapshot=self.empty_snapshot['id']) | ||||
self.assertEqual(self.empty_snapshot, | self.assertEqual(self.empty_snapshot, | ||||
self.storage.snapshot_get_latest(origin_id)) | self.storage.snapshot_get_latest(origin_id)) | ||||
# Check that the status filter is still working | # Check that the status filter is still working | ||||
self.assertEqual( | self.assertEqual( | ||||
self.complete_snapshot, | self.complete_snapshot, | ||||
self.storage.snapshot_get_latest(origin_id, | self.storage.snapshot_get_latest( | ||||
origin_id_or_url, | |||||
allowed_statuses=['full']), | allowed_statuses=['full']), | ||||
) | ) | ||||
# Add snapshot to visit3 (same date as visit2) and check that | # Add snapshot to visit3 (same date as visit2) and check that | ||||
# the new snapshot is returned | # the new snapshot is returned | ||||
self.storage.snapshot_add([self.complete_snapshot]) | self.storage.snapshot_add([self.complete_snapshot]) | ||||
self.storage.origin_visit_update( | self.storage.origin_visit_update( | ||||
origin_id, visit3_id, snapshot=self.complete_snapshot['id']) | origin_id, visit3_id, snapshot=self.complete_snapshot['id']) | ||||
self.assertEqual(self.complete_snapshot, | self.assertEqual(self.complete_snapshot, | ||||
self.storage.snapshot_get_latest(origin_id)) | self.storage.snapshot_get_latest( | ||||
origin_id_or_url)) | |||||
def test_snapshot_get_latest_from_url(self): | |||||
self.storage.origin_add_one(self.origin) | |||||
origin_url = self.origin['url'] | |||||
origin_visit1 = self.storage.origin_visit_add(origin_url, | |||||
self.date_visit1) | |||||
visit1_id = origin_visit1['visit'] | |||||
origin_visit2 = self.storage.origin_visit_add(origin_url, | |||||
self.date_visit2) | |||||
visit2_id = origin_visit2['visit'] | |||||
# Add a visit with the same date as the previous one | |||||
origin_visit3 = self.storage.origin_visit_add(origin_url, | |||||
self.date_visit2) | |||||
visit3_id = origin_visit3['visit'] | |||||
# Two visits, both with no snapshot: latest snapshot is None | |||||
self.assertIsNone(self.storage.snapshot_get_latest(origin_url)) | |||||
# Add snapshot to visit1, latest snapshot = visit 1 snapshot | |||||
self.storage.snapshot_add([self.complete_snapshot]) | |||||
self.storage.origin_visit_update( | |||||
origin_url, visit1_id, snapshot=self.complete_snapshot['id']) | |||||
self.assertEqual(self.complete_snapshot, | |||||
self.storage.snapshot_get_latest(origin_url)) | |||||
# Status filter: both visits are status=ongoing, so no snapshot | |||||
# returned | |||||
self.assertIsNone( | |||||
self.storage.snapshot_get_latest(origin_url, | |||||
allowed_statuses=['full']) | |||||
) | |||||
# Mark the first visit as completed and check status filter again | |||||
self.storage.origin_visit_update(origin_url, visit1_id, status='full') | |||||
self.assertEqual( | |||||
self.complete_snapshot, | |||||
self.storage.snapshot_get_latest(origin_url, | |||||
allowed_statuses=['full']), | |||||
) | |||||
# Add snapshot to visit2 and check that the new snapshot is returned | |||||
self.storage.snapshot_add([self.empty_snapshot]) | |||||
self.storage.origin_visit_update( | |||||
origin_url, visit2_id, snapshot=self.empty_snapshot['id']) | |||||
self.assertEqual(self.empty_snapshot, | |||||
self.storage.snapshot_get_latest(origin_url)) | |||||
# Check that the status filter is still working | |||||
self.assertEqual( | |||||
self.complete_snapshot, | |||||
self.storage.snapshot_get_latest(origin_url, | |||||
allowed_statuses=['full']), | |||||
) | |||||
# Add snapshot to visit3 (same date as visit2) and check that | |||||
# the new snapshot is returned | |||||
self.storage.snapshot_add([self.complete_snapshot]) | |||||
self.storage.origin_visit_update( | |||||
origin_url, visit3_id, snapshot=self.complete_snapshot['id']) | |||||
self.assertEqual(self.complete_snapshot, | |||||
self.storage.snapshot_get_latest(origin_url)) | |||||
def test_snapshot_get_latest__missing_snapshot(self): | def test_snapshot_get_latest__missing_snapshot(self): | ||||
origin_id = self.storage.origin_add_one(self.origin) | origin_id = self.storage.origin_add_one(self.origin) | ||||
origin_visit1 = self.storage.origin_visit_add(origin_id, | origin_visit1 = self.storage.origin_visit_add(origin_id, | ||||
self.date_visit1) | self.date_visit1) | ||||
visit1_id = origin_visit1['visit'] | visit1_id = origin_visit1['visit'] | ||||
origin_visit2 = self.storage.origin_visit_add(origin_id, | origin_visit2 = self.storage.origin_visit_add(origin_id, | ||||
self.date_visit2) | self.date_visit2) | ||||
▲ Show 20 Lines • Show All 767 Lines • ▼ Show 20 Lines | def assert_contents_ok(self, expected_contents, actual_contents, | ||||
""" | """ | ||||
for k in keys_to_check: | for k in keys_to_check: | ||||
expected_list = sorted([c[k] for c in expected_contents]) | expected_list = sorted([c[k] for c in expected_contents]) | ||||
actual_list = sorted([c[k] for c in actual_contents]) | actual_list = sorted([c[k] for c in actual_contents]) | ||||
self.assertEqual(actual_list, expected_list) | self.assertEqual(actual_list, expected_list) | ||||
@given(gen_contents(min_size=1, max_size=4)) | @given(gen_contents(min_size=1, max_size=4)) | ||||
def test_generate_content_get(self, contents): | def test_generate_content_get(self, contents): | ||||
self.reset_storage_tables() | self.reset_storage() | ||||
# add contents to storage | # add contents to storage | ||||
self.storage.content_add(contents) | self.storage.content_add(contents) | ||||
# input the list of sha1s we want from storage | # input the list of sha1s we want from storage | ||||
get_sha1s = [c['sha1'] for c in contents] | get_sha1s = [c['sha1'] for c in contents] | ||||
# retrieve contents | # retrieve contents | ||||
actual_contents = list(self.storage.content_get(get_sha1s)) | actual_contents = list(self.storage.content_get(get_sha1s)) | ||||
self.assert_contents_ok(contents, actual_contents) | self.assert_contents_ok(contents, actual_contents) | ||||
@given(gen_contents(min_size=1, max_size=4)) | @given(gen_contents(min_size=1, max_size=4)) | ||||
def test_generate_content_get_metadata(self, contents): | def test_generate_content_get_metadata(self, contents): | ||||
self.reset_storage_tables() | self.reset_storage() | ||||
# add contents to storage | # add contents to storage | ||||
self.storage.content_add(contents) | self.storage.content_add(contents) | ||||
# input the list of sha1s we want from storage | # input the list of sha1s we want from storage | ||||
get_sha1s = [c['sha1'] for c in contents] | get_sha1s = [c['sha1'] for c in contents] | ||||
# retrieve contents | # retrieve contents | ||||
actual_contents = list(self.storage.content_get_metadata(get_sha1s)) | actual_contents = list(self.storage.content_get_metadata(get_sha1s)) | ||||
Show All 38 Lines | def test_generate_content_get_range_limit_none(self): | ||||
self.storage.content_get_range(start=None, end=None, limit=None) | self.storage.content_get_range(start=None, end=None, limit=None) | ||||
self.assertEqual(e.exception.args, ( | self.assertEqual(e.exception.args, ( | ||||
'Development error: limit should not be None',)) | 'Development error: limit should not be None',)) | ||||
@given(gen_contents(min_size=1, max_size=4)) | @given(gen_contents(min_size=1, max_size=4)) | ||||
def test_generate_content_get_range_no_limit(self, contents): | def test_generate_content_get_range_no_limit(self, contents): | ||||
"""content_get_range returns contents within range provided""" | """content_get_range returns contents within range provided""" | ||||
self.reset_storage_tables() | self.reset_storage() | ||||
# add contents to storage | # add contents to storage | ||||
self.storage.content_add(contents) | self.storage.content_add(contents) | ||||
# input the list of sha1s we want from storage | # input the list of sha1s we want from storage | ||||
get_sha1s = sorted([c['sha1'] for c in contents]) | get_sha1s = sorted([c['sha1'] for c in contents]) | ||||
start = get_sha1s[0] | start = get_sha1s[0] | ||||
end = get_sha1s[-1] | end = get_sha1s[-1] | ||||
# retrieve contents | # retrieve contents | ||||
actual_result = self.storage.content_get_range(start, end) | actual_result = self.storage.content_get_range(start, end) | ||||
actual_contents = actual_result['contents'] | actual_contents = actual_result['contents'] | ||||
actual_next = actual_result['next'] | actual_next = actual_result['next'] | ||||
self.assertEqual(len(contents), len(actual_contents)) | self.assertEqual(len(contents), len(actual_contents)) | ||||
self.assertIsNone(actual_next) | self.assertIsNone(actual_next) | ||||
one_content = contents[0] | one_content = contents[0] | ||||
keys_to_check = set(one_content.keys()) - {'data'} | keys_to_check = set(one_content.keys()) - {'data'} | ||||
self.assert_contents_ok(contents, actual_contents, keys_to_check) | self.assert_contents_ok(contents, actual_contents, keys_to_check) | ||||
@given(gen_contents(min_size=4, max_size=4)) | @given(gen_contents(min_size=4, max_size=4)) | ||||
def test_generate_content_get_range_limit(self, contents): | def test_generate_content_get_range_limit(self, contents): | ||||
"""content_get_range paginates results if limit exceeded""" | """content_get_range paginates results if limit exceeded""" | ||||
self.reset_storage_tables() | self.reset_storage() | ||||
contents_map = {c['sha1']: c for c in contents} | contents_map = {c['sha1']: c for c in contents} | ||||
# add contents to storage | # add contents to storage | ||||
self.storage.content_add(contents) | self.storage.content_add(contents) | ||||
# input the list of sha1s we want from storage | # input the list of sha1s we want from storage | ||||
get_sha1s = sorted([c['sha1'] for c in contents]) | get_sha1s = sorted([c['sha1'] for c in contents]) | ||||
start = get_sha1s[0] | start = get_sha1s[0] | ||||
▲ Show 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | def test_origin_get_invalid_id(self): | ||||
self.assertEqual(origin_info, [None, None]) | self.assertEqual(origin_info, [None, None]) | ||||
origin_visits = list(self.storage.origin_visit_get(1)) | origin_visits = list(self.storage.origin_visit_get(1)) | ||||
self.assertEqual(origin_visits, []) | self.assertEqual(origin_visits, []) | ||||
@given(strategies.sets(origins().map(lambda x: tuple(x.to_dict().items())), | @given(strategies.sets(origins().map(lambda x: tuple(x.to_dict().items())), | ||||
min_size=6, max_size=15)) | min_size=6, max_size=15)) | ||||
def test_origin_get_range(self, new_origins): | def test_origin_get_range(self, new_origins): | ||||
self.reset_storage_tables() | self.reset_storage() | ||||
new_origins = list(map(dict, new_origins)) | new_origins = list(map(dict, new_origins)) | ||||
nb_origins = len(new_origins) | nb_origins = len(new_origins) | ||||
self.storage.origin_add(new_origins) | self.storage.origin_add(new_origins) | ||||
origin_from = random.randint(1, nb_origins-1) | origin_from = random.randint(1, nb_origins-1) | ||||
origin_count = random.randint(1, nb_origins - origin_from) | origin_count = random.randint(1, nb_origins - origin_from) | ||||
▲ Show 20 Lines • Show All 57 Lines • ▼ Show 20 Lines | def test_origin_count(self): | ||||
self.assertEqual( | self.assertEqual( | ||||
self.storage.origin_count('.*user1.*', regexp=True), 2) | self.storage.origin_count('.*user1.*', regexp=True), 2) | ||||
self.assertEqual( | self.assertEqual( | ||||
self.storage.origin_count('.*user1.*', regexp=False), 0) | self.storage.origin_count('.*user1.*', regexp=False), 0) | ||||
@settings(suppress_health_check=[HealthCheck.too_slow]) | @settings(suppress_health_check=[HealthCheck.too_slow]) | ||||
@given(strategies.lists(objects(), max_size=2)) | @given(strategies.lists(objects(), max_size=2)) | ||||
def test_add_arbitrary(self, objects): | def test_add_arbitrary(self, objects): | ||||
self.reset_storage_tables() | self.reset_storage() | ||||
for (obj_type, obj) in objects: | for (obj_type, obj) in objects: | ||||
obj = obj.to_dict() | obj = obj.to_dict() | ||||
if obj_type == 'origin_visit': | if obj_type == 'origin_visit': | ||||
origin_id = self.storage.origin_add_one(obj.pop('origin')) | origin_id = self.storage.origin_add_one(obj.pop('origin')) | ||||
if 'visit' in obj: | if 'visit' in obj: | ||||
del obj['visit'] | del obj['visit'] | ||||
self.storage.origin_visit_add(origin_id, **obj) | self.storage.origin_visit_add(origin_id, **obj) | ||||
else: | else: | ||||
method = getattr(self.storage, obj_type + '_add') | method = getattr(self.storage, obj_type + '_add') | ||||
try: | try: | ||||
method([obj]) | method([obj]) | ||||
except HashCollision: | except HashCollision: | ||||
pass | pass | ||||
@pytest.mark.db | @pytest.mark.db | ||||
class TestLocalStorage(CommonTestStorage, StorageTestDbFixture, | class TestLocalStorage(CommonTestStorage, StorageTestDbFixture, | ||||
unittest.TestCase): | unittest.TestCase): | ||||
"""Test the local storage""" | """Test the local storage""" | ||||
# Can only be tested with local storage as you can't mock | # Can only be tested with local storage as you can't mock | ||||
# datetimes for the remote server | # datetimes for the remote server | ||||
@given(strategies.booleans()) | @given(strategies.booleans()) | ||||
def test_fetch_history(self, use_url): | def test_fetch_history(self, use_url): | ||||
self.reset_storage() | |||||
origin = self.storage.origin_add_one(self.origin) | origin = self.storage.origin_add_one(self.origin) | ||||
if use_url: | if use_url: | ||||
origin_id = self.origin['url'] | origin_id = self.origin['url'] | ||||
else: | else: | ||||
origin_id = origin | origin_id = origin | ||||
with patch('datetime.datetime'): | with patch('datetime.datetime'): | ||||
datetime.datetime.now.return_value = self.fetch_history_date | datetime.datetime.now.return_value = self.fetch_history_date | ||||
fetch_history_id = self.storage.fetch_history_start(origin_id) | fetch_history_id = self.storage.fetch_history_start(origin_id) | ||||
▲ Show 20 Lines • Show All 102 Lines • Show Last 20 Lines |
Just a nitpick: I would use an origin variable here instead of duplicating the if else instruction