Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_storage.py
# Copyright (C) 2015-2019 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import copy | import copy | ||||
from contextlib import contextmanager | from contextlib import contextmanager | ||||
import datetime | import datetime | ||||
import itertools | import itertools | ||||
▲ Show 20 Lines • Show All 110 Lines • ▼ Show 20 Lines | def test_content_add(self, swh_storage): | ||||
del expected_cont['data'] | del expected_cont['data'] | ||||
journal_objects = list(swh_storage.journal_writer.objects) | journal_objects = list(swh_storage.journal_writer.objects) | ||||
for (obj_type, obj) in journal_objects: | for (obj_type, obj) in journal_objects: | ||||
assert insertion_start_time <= obj['ctime'] | assert insertion_start_time <= obj['ctime'] | ||||
assert obj['ctime'] <= insertion_end_time | assert obj['ctime'] <= insertion_end_time | ||||
del obj['ctime'] | del obj['ctime'] | ||||
assert journal_objects == [('content', expected_cont)] | assert journal_objects == [('content', expected_cont)] | ||||
swh_storage.refresh_stat_counters() | |||||
assert swh_storage.stat_counters()['content'] == 1 | |||||
def test_content_add_from_generator(self, swh_storage): | |||||
def _cnt_gen(): | |||||
yield data.cont | |||||
actual_result = swh_storage.content_add(_cnt_gen()) | |||||
assert actual_result == { | |||||
'content:add': 1, | |||||
'content:add:bytes': data.cont['length'], | |||||
'skipped_content:add': 0 | |||||
} | |||||
swh_storage.refresh_stat_counters() | |||||
assert swh_storage.stat_counters()['content'] == 1 | |||||
def test_content_add_validation(self, swh_storage): | def test_content_add_validation(self, swh_storage): | ||||
cont = data.cont | cont = data.cont | ||||
with pytest.raises(ValueError, match='status'): | with pytest.raises(ValueError, match='status'): | ||||
swh_storage.content_add([{**cont, 'status': 'foobar'}]) | swh_storage.content_add([{**cont, 'status': 'foobar'}]) | ||||
with pytest.raises(ValueError, match="(?i)length"): | with pytest.raises(ValueError, match="(?i)length"): | ||||
swh_storage.content_add([{**cont, 'length': -2}]) | swh_storage.content_add([{**cont, 'length': -2}]) | ||||
▲ Show 20 Lines • Show All 355 Lines • ▼ Show 20 Lines | def test_directory_add(self, swh_storage): | ||||
expected_data = list(transform_entries(data.dir)) | expected_data = list(transform_entries(data.dir)) | ||||
assert sorted(expected_data, key=cmpdir) \ | assert sorted(expected_data, key=cmpdir) \ | ||||
== sorted(actual_data, key=cmpdir) | == sorted(actual_data, key=cmpdir) | ||||
after_missing = list(swh_storage.directory_missing([data.dir['id']])) | after_missing = list(swh_storage.directory_missing([data.dir['id']])) | ||||
assert after_missing == [] | assert after_missing == [] | ||||
swh_storage.refresh_stat_counters() | |||||
assert swh_storage.stat_counters()['directory'] == 1 | |||||
def test_directory_add_from_generator(self, swh_storage): | |||||
def _dir_gen(): | |||||
yield data.dir | |||||
actual_result = swh_storage.directory_add(directories=_dir_gen()) | |||||
assert actual_result == {'directory:add': 1} | |||||
assert list(swh_storage.journal_writer.objects) == \ | |||||
[('directory', data.dir)] | |||||
swh_storage.refresh_stat_counters() | |||||
assert swh_storage.stat_counters()['directory'] == 1 | |||||
def test_directory_add_validation(self, swh_storage): | def test_directory_add_validation(self, swh_storage): | ||||
dir_ = copy.deepcopy(data.dir) | dir_ = copy.deepcopy(data.dir) | ||||
dir_['entries'][0]['type'] = 'foobar' | dir_['entries'][0]['type'] = 'foobar' | ||||
with pytest.raises(ValueError, match='type.*foobar'): | with pytest.raises(ValueError, match='type.*foobar'): | ||||
swh_storage.directory_add([dir_]) | swh_storage.directory_add([dir_]) | ||||
dir_ = copy.deepcopy(data.dir) | dir_ = copy.deepcopy(data.dir) | ||||
▲ Show 20 Lines • Show All 178 Lines • ▼ Show 20 Lines | def test_revision_add(self, swh_storage): | ||||
assert list(swh_storage.journal_writer.objects) \ | assert list(swh_storage.journal_writer.objects) \ | ||||
== [('revision', data.revision)] | == [('revision', data.revision)] | ||||
# already there so nothing added | # already there so nothing added | ||||
actual_result = swh_storage.revision_add([data.revision]) | actual_result = swh_storage.revision_add([data.revision]) | ||||
assert actual_result == {'revision:add': 0} | assert actual_result == {'revision:add': 0} | ||||
swh_storage.refresh_stat_counters() | |||||
assert swh_storage.stat_counters()['revision'] == 1 | |||||
def test_revision_add_from_generator(self, swh_storage): | |||||
def _rev_gen(): | |||||
yield data.revision | |||||
actual_result = swh_storage.revision_add(_rev_gen()) | |||||
assert actual_result == {'revision:add': 1} | |||||
swh_storage.refresh_stat_counters() | |||||
assert swh_storage.stat_counters()['revision'] == 1 | |||||
def test_revision_add_validation(self, swh_storage): | def test_revision_add_validation(self, swh_storage): | ||||
rev = copy.deepcopy(data.revision) | rev = copy.deepcopy(data.revision) | ||||
rev['date']['offset'] = 2**16 | rev['date']['offset'] = 2**16 | ||||
with pytest.raises((ValueError, psycopg2.DataError), | with pytest.raises((ValueError, psycopg2.DataError), | ||||
match='offset') as cm: | match='offset') as cm: | ||||
swh_storage.revision_add([rev]) | swh_storage.revision_add([rev]) | ||||
▲ Show 20 Lines • Show All 173 Lines • ▼ Show 20 Lines | def test_release_add(self, swh_storage): | ||||
assert list(swh_storage.journal_writer.objects) == [ | assert list(swh_storage.journal_writer.objects) == [ | ||||
('release', data.release), | ('release', data.release), | ||||
('release', data.release2)] | ('release', data.release2)] | ||||
# already present so nothing added | # already present so nothing added | ||||
actual_result = swh_storage.release_add([data.release, data.release2]) | actual_result = swh_storage.release_add([data.release, data.release2]) | ||||
assert actual_result == {'release:add': 0} | assert actual_result == {'release:add': 0} | ||||
swh_storage.refresh_stat_counters() | |||||
assert swh_storage.stat_counters()['release'] == 2 | |||||
def test_release_add_from_generator(self, swh_storage): | |||||
def _rel_gen(): | |||||
yield data.release | |||||
yield data.release2 | |||||
actual_result = swh_storage.release_add(_rel_gen()) | |||||
assert actual_result == {'release:add': 2} | |||||
assert list(swh_storage.journal_writer.objects) == [ | |||||
('release', data.release), | |||||
('release', data.release2)] | |||||
swh_storage.refresh_stat_counters() | |||||
assert swh_storage.stat_counters()['release'] == 2 | |||||
def test_release_add_no_author_date(self, swh_storage): | def test_release_add_no_author_date(self, swh_storage): | ||||
release = data.release | release = data.release | ||||
release['author'] = None | release['author'] = None | ||||
release['date'] = None | release['date'] = None | ||||
actual_result = swh_storage.release_add([release]) | actual_result = swh_storage.release_add([release]) | ||||
assert actual_result == {'release:add': 1} | assert actual_result == {'release:add': 1} | ||||
▲ Show 20 Lines • Show All 117 Lines • ▼ Show 20 Lines | def test_origin_add(self, swh_storage): | ||||
if 'id' in actual_origin: | if 'id' in actual_origin: | ||||
del actual_origin['id'] | del actual_origin['id'] | ||||
del actual_origin2['id'] | del actual_origin2['id'] | ||||
assert list(swh_storage.journal_writer.objects) \ | assert list(swh_storage.journal_writer.objects) \ | ||||
== [('origin', actual_origin), | == [('origin', actual_origin), | ||||
('origin', actual_origin2)] | ('origin', actual_origin2)] | ||||
swh_storage.refresh_stat_counters() | |||||
assert swh_storage.stat_counters()['origin'] == 2 | |||||
def test_origin_add_from_generator(self, swh_storage): | |||||
def _ori_gen(): | |||||
yield data.origin | |||||
yield data.origin2 | |||||
origin1, origin2 = swh_storage.origin_add(_ori_gen()) | |||||
actual_origin = swh_storage.origin_get([{ | |||||
'url': data.origin['url'], | |||||
}])[0] | |||||
assert actual_origin['url'] == origin1['url'] | |||||
actual_origin2 = swh_storage.origin_get([{ | |||||
'url': data.origin2['url'], | |||||
}])[0] | |||||
assert actual_origin2['url'] == origin2['url'] | |||||
if 'id' in actual_origin: | |||||
del actual_origin['id'] | |||||
del actual_origin2['id'] | |||||
assert list(swh_storage.journal_writer.objects) \ | |||||
== [('origin', actual_origin), | |||||
('origin', actual_origin2)] | |||||
swh_storage.refresh_stat_counters() | |||||
assert swh_storage.stat_counters()['origin'] == 2 | |||||
def test_origin_add_twice(self, swh_storage): | def test_origin_add_twice(self, swh_storage): | ||||
add1 = swh_storage.origin_add([data.origin, data.origin2]) | add1 = swh_storage.origin_add([data.origin, data.origin2]) | ||||
assert list(swh_storage.journal_writer.objects) \ | assert list(swh_storage.journal_writer.objects) \ | ||||
== [('origin', data.origin), | == [('origin', data.origin), | ||||
('origin', data.origin2)] | ('origin', data.origin2)] | ||||
add2 = swh_storage.origin_add([data.origin, data.origin2]) | add2 = swh_storage.origin_add([data.origin, data.origin2]) | ||||
assert list(swh_storage.journal_writer.objects) \ | assert list(swh_storage.journal_writer.objects) \ | ||||
▲ Show 20 Lines • Show All 960 Lines • ▼ Show 20 Lines | def test_snapshot_add_many(self, swh_storage): | ||||
assert actual_result == {'snapshot:add': 2} | assert actual_result == {'snapshot:add': 2} | ||||
assert {**data.complete_snapshot, 'next_branch': None} \ | assert {**data.complete_snapshot, 'next_branch': None} \ | ||||
== swh_storage.snapshot_get(data.complete_snapshot['id']) | == swh_storage.snapshot_get(data.complete_snapshot['id']) | ||||
assert {**data.snapshot, 'next_branch': None} \ | assert {**data.snapshot, 'next_branch': None} \ | ||||
== swh_storage.snapshot_get(data.snapshot['id']) | == swh_storage.snapshot_get(data.snapshot['id']) | ||||
swh_storage.refresh_stat_counters() | |||||
assert swh_storage.stat_counters()['snapshot'] == 2 | |||||
def test_snapshot_add_many_from_generator(self, swh_storage): | |||||
def _snp_gen(): | |||||
yield data.snapshot | |||||
yield data.complete_snapshot | |||||
actual_result = swh_storage.snapshot_add(_snp_gen()) | |||||
assert actual_result == {'snapshot:add': 2} | |||||
swh_storage.refresh_stat_counters() | |||||
assert swh_storage.stat_counters()['snapshot'] == 2 | |||||
def test_snapshot_add_many_incremental(self, swh_storage): | def test_snapshot_add_many_incremental(self, swh_storage): | ||||
actual_result = swh_storage.snapshot_add([data.complete_snapshot]) | actual_result = swh_storage.snapshot_add([data.complete_snapshot]) | ||||
assert actual_result == {'snapshot:add': 1} | assert actual_result == {'snapshot:add': 1} | ||||
actual_result2 = swh_storage.snapshot_add( | actual_result2 = swh_storage.snapshot_add( | ||||
[data.snapshot, data.complete_snapshot]) | [data.snapshot, data.complete_snapshot]) | ||||
assert actual_result2 == {'snapshot:add': 1} | assert actual_result2 == {'snapshot:add': 1} | ||||
▲ Show 20 Lines • Show All 1,644 Lines • Show Last 20 Lines |