Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_storage.py
Show All 23 Lines | |||||
from typing import ClassVar, Optional | from typing import ClassVar, Optional | ||||
from swh.model import from_disk, identifiers | from swh.model import from_disk, identifiers | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
from swh.model.hypothesis_strategies import objects | from swh.model.hypothesis_strategies import objects | ||||
from swh.storage import HashCollision | from swh.storage import HashCollision | ||||
from swh.storage.converters import origin_url_to_sha1 as sha1 | from swh.storage.converters import origin_url_to_sha1 as sha1 | ||||
from swh.storage.exc import StorageArgumentException | |||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||
from .storage_data import data | from .storage_data import data | ||||
@contextmanager | @contextmanager | ||||
def db_transaction(storage): | def db_transaction(storage): | ||||
with storage.db() as db: | with storage.db() as db: | ||||
▲ Show 20 Lines • Show All 130 Lines • ▼ Show 20 Lines | def test_content_add_from_generator(self, swh_storage): | ||||
} | } | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()['content'] == 1 | assert swh_storage.stat_counters()['content'] == 1 | ||||
def test_content_add_validation(self, swh_storage): | def test_content_add_validation(self, swh_storage): | ||||
cont = data.cont | cont = data.cont | ||||
with pytest.raises(ValueError, match='status'): | with pytest.raises(StorageArgumentException, match='status'): | ||||
swh_storage.content_add([{**cont, 'status': 'absent'}]) | swh_storage.content_add([{**cont, 'status': 'absent'}]) | ||||
with pytest.raises(ValueError, match='status'): | with pytest.raises(StorageArgumentException, match='status'): | ||||
swh_storage.content_add([{**cont, 'status': 'foobar'}]) | swh_storage.content_add([{**cont, 'status': 'foobar'}]) | ||||
with pytest.raises(ValueError, match="(?i)length"): | with pytest.raises(StorageArgumentException, match="(?i)length"): | ||||
swh_storage.content_add([{**cont, 'length': -2}]) | swh_storage.content_add([{**cont, 'length': -2}]) | ||||
with pytest.raises( | with pytest.raises(StorageArgumentException, match="reason"): | ||||
(ValueError, TypeError), | |||||
match="reason"): | |||||
swh_storage.content_add([{**cont, 'reason': 'foobar'}]) | swh_storage.content_add([{**cont, 'reason': 'foobar'}]) | ||||
def test_skipped_content_add_validation(self, swh_storage): | def test_skipped_content_add_validation(self, swh_storage): | ||||
cont = data.cont.copy() | cont = data.cont.copy() | ||||
del cont['data'] | del cont['data'] | ||||
with pytest.raises(ValueError, match='status'): | with pytest.raises(StorageArgumentException, match='status'): | ||||
swh_storage.skipped_content_add([{**cont, 'status': 'visible'}]) | swh_storage.skipped_content_add([{**cont, 'status': 'visible'}]) | ||||
with pytest.raises((ValueError, psycopg2.IntegrityError), | with pytest.raises(StorageArgumentException, match='reason') as cm: | ||||
match='reason') as cm: | |||||
swh_storage.skipped_content_add([{**cont, 'status': 'absent'}]) | swh_storage.skipped_content_add([{**cont, 'status': 'absent'}]) | ||||
if type(cm.value) == psycopg2.IntegrityError: | if type(cm.value) == psycopg2.IntegrityError: | ||||
assert cm.exception.pgcode == \ | assert cm.exception.pgcode == \ | ||||
psycopg2.errorcodes.NOT_NULL_VIOLATION | psycopg2.errorcodes.NOT_NULL_VIOLATION | ||||
def test_content_get_missing(self, swh_storage): | def test_content_get_missing(self, swh_storage): | ||||
cont = data.cont | cont = data.cont | ||||
▲ Show 20 Lines • Show All 266 Lines • ▼ Show 20 Lines | def test_content_get_partition_empty(self, swh_storage, swh_contents): | ||||
# Limit is higher than the max number of results | # Limit is higher than the max number of results | ||||
assert actual_result['next_page_token'] is None | assert actual_result['next_page_token'] is None | ||||
assert set(seen_sha1s) == expected_contents | assert set(seen_sha1s) == expected_contents | ||||
def test_content_get_partition_limit_none(self, swh_storage): | def test_content_get_partition_limit_none(self, swh_storage): | ||||
"""content_get_partition call with wrong limit input should fail""" | """content_get_partition call with wrong limit input should fail""" | ||||
with pytest.raises(ValueError) as e: | with pytest.raises(StorageArgumentException) as e: | ||||
swh_storage.content_get_partition(1, 16, limit=None) | swh_storage.content_get_partition(1, 16, limit=None) | ||||
assert e.value.args == ('Development error: limit should not be None',) | assert e.value.args == ('limit should not be None',) | ||||
def test_generate_content_get_partition_pagination( | def test_generate_content_get_partition_pagination( | ||||
self, swh_storage, swh_contents): | self, swh_storage, swh_contents): | ||||
"""content_get_partition returns contents within range provided""" | """content_get_partition returns contents within range provided""" | ||||
expected_contents = [c for c in swh_contents | expected_contents = [c for c in swh_contents | ||||
if c['status'] != 'absent'] | if c['status'] != 'absent'] | ||||
# retrieve contents | # retrieve contents | ||||
▲ Show 20 Lines • Show All 82 Lines • ▼ Show 20 Lines | def test_directory_add_from_generator(self, swh_storage): | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()['directory'] == 1 | assert swh_storage.stat_counters()['directory'] == 1 | ||||
def test_directory_add_validation(self, swh_storage): | def test_directory_add_validation(self, swh_storage): | ||||
dir_ = copy.deepcopy(data.dir) | dir_ = copy.deepcopy(data.dir) | ||||
dir_['entries'][0]['type'] = 'foobar' | dir_['entries'][0]['type'] = 'foobar' | ||||
with pytest.raises(ValueError, match='type.*foobar'): | with pytest.raises(StorageArgumentException, match='type.*foobar'): | ||||
swh_storage.directory_add([dir_]) | swh_storage.directory_add([dir_]) | ||||
dir_ = copy.deepcopy(data.dir) | dir_ = copy.deepcopy(data.dir) | ||||
del dir_['entries'][0]['target'] | del dir_['entries'][0]['target'] | ||||
with pytest.raises((TypeError, psycopg2.IntegrityError), | with pytest.raises(StorageArgumentException, match='target') as cm: | ||||
match='target') as cm: | |||||
swh_storage.directory_add([dir_]) | swh_storage.directory_add([dir_]) | ||||
if type(cm.value) == psycopg2.IntegrityError: | if type(cm.value) == psycopg2.IntegrityError: | ||||
assert cm.value.pgcode == psycopg2.errorcodes.NOT_NULL_VIOLATION | assert cm.value.pgcode == psycopg2.errorcodes.NOT_NULL_VIOLATION | ||||
def test_directory_add_twice(self, swh_storage): | def test_directory_add_twice(self, swh_storage): | ||||
actual_result = swh_storage.directory_add([data.dir]) | actual_result = swh_storage.directory_add([data.dir]) | ||||
assert actual_result == {'directory:add': 1} | assert actual_result == {'directory:add': 1} | ||||
▲ Show 20 Lines • Show All 183 Lines • ▼ Show 20 Lines | def test_revision_add_from_generator(self, swh_storage): | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()['revision'] == 1 | assert swh_storage.stat_counters()['revision'] == 1 | ||||
def test_revision_add_validation(self, swh_storage): | def test_revision_add_validation(self, swh_storage): | ||||
rev = copy.deepcopy(data.revision) | rev = copy.deepcopy(data.revision) | ||||
rev['date']['offset'] = 2**16 | rev['date']['offset'] = 2**16 | ||||
with pytest.raises((ValueError, psycopg2.DataError), | with pytest.raises(StorageArgumentException, match='offset') as cm: | ||||
match='offset') as cm: | |||||
swh_storage.revision_add([rev]) | swh_storage.revision_add([rev]) | ||||
if type(cm.value) == psycopg2.DataError: | if type(cm.value) == psycopg2.DataError: | ||||
assert cm.value.pgcode \ | assert cm.value.pgcode \ | ||||
== psycopg2.errorcodes.NUMERIC_VALUE_OUT_OF_RANGE | == psycopg2.errorcodes.NUMERIC_VALUE_OUT_OF_RANGE | ||||
rev = copy.deepcopy(data.revision) | rev = copy.deepcopy(data.revision) | ||||
rev['committer_date']['offset'] = 2**16 | rev['committer_date']['offset'] = 2**16 | ||||
with pytest.raises((ValueError, psycopg2.DataError), | with pytest.raises(StorageArgumentException, match='offset') as cm: | ||||
match='offset') as cm: | |||||
swh_storage.revision_add([rev]) | swh_storage.revision_add([rev]) | ||||
if type(cm.value) == psycopg2.DataError: | if type(cm.value) == psycopg2.DataError: | ||||
assert cm.value.pgcode \ | assert cm.value.pgcode \ | ||||
== psycopg2.errorcodes.NUMERIC_VALUE_OUT_OF_RANGE | == psycopg2.errorcodes.NUMERIC_VALUE_OUT_OF_RANGE | ||||
rev = copy.deepcopy(data.revision) | rev = copy.deepcopy(data.revision) | ||||
rev['type'] = 'foobar' | rev['type'] = 'foobar' | ||||
with pytest.raises((ValueError, psycopg2.DataError), | with pytest.raises(StorageArgumentException, match='(?i)type') as cm: | ||||
match='(?i)type') as cm: | |||||
swh_storage.revision_add([rev]) | swh_storage.revision_add([rev]) | ||||
if type(cm.value) == psycopg2.DataError: | if type(cm.value) == psycopg2.DataError: | ||||
assert cm.value.pgcode == \ | assert cm.value.pgcode == \ | ||||
psycopg2.errorcodes.INVALID_TEXT_REPRESENTATION | psycopg2.errorcodes.INVALID_TEXT_REPRESENTATION | ||||
def test_revision_add_twice(self, swh_storage): | def test_revision_add_twice(self, swh_storage): | ||||
actual_result = swh_storage.revision_add([data.revision]) | actual_result = swh_storage.revision_add([data.revision]) | ||||
▲ Show 20 Lines • Show All 182 Lines • ▼ Show 20 Lines | def test_release_add_no_author_date(self, swh_storage): | ||||
assert list(swh_storage.journal_writer.objects) \ | assert list(swh_storage.journal_writer.objects) \ | ||||
== [('release', release)] | == [('release', release)] | ||||
def test_release_add_validation(self, swh_storage): | def test_release_add_validation(self, swh_storage): | ||||
rel = copy.deepcopy(data.release) | rel = copy.deepcopy(data.release) | ||||
rel['date']['offset'] = 2**16 | rel['date']['offset'] = 2**16 | ||||
with pytest.raises((ValueError, psycopg2.DataError), | with pytest.raises(StorageArgumentException, match='offset') as cm: | ||||
match='offset') as cm: | |||||
swh_storage.release_add([rel]) | swh_storage.release_add([rel]) | ||||
if type(cm.value) == psycopg2.DataError: | if type(cm.value) == psycopg2.DataError: | ||||
assert cm.value.pgcode \ | assert cm.value.pgcode \ | ||||
== psycopg2.errorcodes.NUMERIC_VALUE_OUT_OF_RANGE | == psycopg2.errorcodes.NUMERIC_VALUE_OUT_OF_RANGE | ||||
rel = copy.deepcopy(data.release) | rel = copy.deepcopy(data.release) | ||||
rel['author'] = None | rel['author'] = None | ||||
with pytest.raises((ValueError, psycopg2.IntegrityError), | with pytest.raises(StorageArgumentException, match='date') as cm: | ||||
match='date') as cm: | |||||
swh_storage.release_add([rel]) | swh_storage.release_add([rel]) | ||||
if type(cm.value) == psycopg2.IntegrityError: | if type(cm.value) == psycopg2.IntegrityError: | ||||
assert cm.value.pgcode == psycopg2.errorcodes.CHECK_VIOLATION | assert cm.value.pgcode == psycopg2.errorcodes.CHECK_VIOLATION | ||||
def test_release_add_twice(self, swh_storage): | def test_release_add_twice(self, swh_storage): | ||||
actual_result = swh_storage.release_add([data.release]) | actual_result = swh_storage.release_add([data.release]) | ||||
assert actual_result == {'release:add': 1} | assert actual_result == {'release:add': 1} | ||||
▲ Show 20 Lines • Show All 130 Lines • ▼ Show 20 Lines | def test_origin_add_twice(self, swh_storage): | ||||
add2 = swh_storage.origin_add([data.origin, data.origin2]) | add2 = swh_storage.origin_add([data.origin, data.origin2]) | ||||
assert list(swh_storage.journal_writer.objects) \ | assert list(swh_storage.journal_writer.objects) \ | ||||
== [('origin', data.origin), | == [('origin', data.origin), | ||||
('origin', data.origin2)] | ('origin', data.origin2)] | ||||
assert add1 == add2 | assert add1 == add2 | ||||
def test_origin_add_validation(self, swh_storage): | def test_origin_add_validation(self, swh_storage): | ||||
with pytest.raises((TypeError, KeyError), match='url'): | with pytest.raises(StorageArgumentException, match='url'): | ||||
swh_storage.origin_add([{'type': 'git'}]) | swh_storage.origin_add([{'type': 'git'}]) | ||||
def test_origin_get_legacy(self, swh_storage): | def test_origin_get_legacy(self, swh_storage): | ||||
assert swh_storage.origin_get(data.origin) is None | assert swh_storage.origin_get(data.origin) is None | ||||
swh_storage.origin_add_one(data.origin) | swh_storage.origin_add_one(data.origin) | ||||
actual_origin0 = swh_storage.origin_get( | actual_origin0 = swh_storage.origin_get( | ||||
{'url': data.origin['url']}) | {'url': data.origin['url']}) | ||||
▲ Show 20 Lines • Show All 304 Lines • ▼ Show 20 Lines | def test_origin_visit_add_default_type(self, swh_storage): | ||||
assert ('origin', data.origin2) in objects | assert ('origin', data.origin2) in objects | ||||
for visit in expected_visits: | for visit in expected_visits: | ||||
assert ('origin_visit', visit) in objects | assert ('origin_visit', visit) in objects | ||||
def test_origin_visit_add_validation(self, swh_storage): | def test_origin_visit_add_validation(self, swh_storage): | ||||
origin_url = swh_storage.origin_add_one(data.origin2) | origin_url = swh_storage.origin_add_one(data.origin2) | ||||
with pytest.raises((TypeError, psycopg2.ProgrammingError)) as cm: | with pytest.raises(StorageArgumentException) as cm: | ||||
swh_storage.origin_visit_add(origin_url, date=[b'foo']) | swh_storage.origin_visit_add(origin_url, date=[b'foo'], type='git') | ||||
if type(cm.value) == psycopg2.ProgrammingError: | if type(cm.value) == psycopg2.ProgrammingError: | ||||
assert cm.value.pgcode \ | assert cm.value.pgcode \ | ||||
== psycopg2.errorcodes.UNDEFINED_FUNCTION | == psycopg2.errorcodes.UNDEFINED_FUNCTION | ||||
def test_origin_visit_update(self, swh_storage): | def test_origin_visit_update(self, swh_storage): | ||||
# given | # given | ||||
swh_storage.origin_add_one(data.origin) | swh_storage.origin_add_one(data.origin) | ||||
▲ Show 20 Lines • Show All 164 Lines • ▼ Show 20 Lines | def test_origin_visit_update_validation(self, swh_storage): | ||||
origin_url = data.origin['url'] | origin_url = data.origin['url'] | ||||
swh_storage.origin_add_one(data.origin) | swh_storage.origin_add_one(data.origin) | ||||
visit = swh_storage.origin_visit_add( | visit = swh_storage.origin_visit_add( | ||||
origin_url, | origin_url, | ||||
date=data.date_visit2, | date=data.date_visit2, | ||||
type=data.type_visit2, | type=data.type_visit2, | ||||
) | ) | ||||
with pytest.raises((ValueError, psycopg2.DataError), | with pytest.raises(StorageArgumentException, match='status') as cm: | ||||
match='status') as cm: | |||||
swh_storage.origin_visit_update( | swh_storage.origin_visit_update( | ||||
origin_url, visit['visit'], status='foobar') | origin_url, visit['visit'], status='foobar') | ||||
if type(cm.value) == psycopg2.DataError: | if type(cm.value) == psycopg2.DataError: | ||||
assert cm.value.pgcode == \ | assert cm.value.pgcode == \ | ||||
psycopg2.errorcodes.INVALID_TEXT_REPRESENTATION | psycopg2.errorcodes.INVALID_TEXT_REPRESENTATION | ||||
def test_origin_visit_find_by_date(self, swh_storage): | def test_origin_visit_find_by_date(self, swh_storage): | ||||
▲ Show 20 Lines • Show All 508 Lines • ▼ Show 20 Lines | def test_snapshot_add_twice(self, swh_storage): | ||||
assert list(swh_storage.journal_writer.objects) \ | assert list(swh_storage.journal_writer.objects) \ | ||||
== [('snapshot', data.empty_snapshot), | == [('snapshot', data.empty_snapshot), | ||||
('snapshot', data.snapshot)] | ('snapshot', data.snapshot)] | ||||
def test_snapshot_add_validation(self, swh_storage): | def test_snapshot_add_validation(self, swh_storage): | ||||
snap = copy.deepcopy(data.snapshot) | snap = copy.deepcopy(data.snapshot) | ||||
snap['branches'][b'foo'] = {'target_type': 'revision'} | snap['branches'][b'foo'] = {'target_type': 'revision'} | ||||
with pytest.raises(KeyError, match='target'): | with pytest.raises(StorageArgumentException, match='target'): | ||||
swh_storage.snapshot_add([snap]) | swh_storage.snapshot_add([snap]) | ||||
snap = copy.deepcopy(data.snapshot) | snap = copy.deepcopy(data.snapshot) | ||||
snap['branches'][b'foo'] = {'target': b'\x42'*20} | snap['branches'][b'foo'] = {'target': b'\x42'*20} | ||||
with pytest.raises(KeyError, match='target_type'): | with pytest.raises(StorageArgumentException, match='target_type'): | ||||
swh_storage.snapshot_add([snap]) | swh_storage.snapshot_add([snap]) | ||||
def test_snapshot_add_count_branches(self, swh_storage): | def test_snapshot_add_count_branches(self, swh_storage): | ||||
actual_result = swh_storage.snapshot_add([data.complete_snapshot]) | actual_result = swh_storage.snapshot_add([data.complete_snapshot]) | ||||
assert actual_result == {'snapshot:add': 1} | assert actual_result == {'snapshot:add': 1} | ||||
snp_id = data.complete_snapshot['id'] | snp_id = data.complete_snapshot['id'] | ||||
snp_size = swh_storage.snapshot_count_branches(snp_id) | snp_size = swh_storage.snapshot_count_branches(snp_id) | ||||
▲ Show 20 Lines • Show All 209 Lines • ▼ Show 20 Lines | def test_snapshot_add_nonexistent_visit(self, swh_storage): | ||||
origin_url = data.origin['url'] | origin_url = data.origin['url'] | ||||
swh_storage.origin_add_one(data.origin) | swh_storage.origin_add_one(data.origin) | ||||
visit_id = 54164461156 | visit_id = 54164461156 | ||||
swh_storage.journal_writer.objects[:] = [] | swh_storage.journal_writer.objects[:] = [] | ||||
swh_storage.snapshot_add([data.snapshot]) | swh_storage.snapshot_add([data.snapshot]) | ||||
with pytest.raises(ValueError): | with pytest.raises(StorageArgumentException): | ||||
swh_storage.origin_visit_update( | swh_storage.origin_visit_update( | ||||
origin_url, visit_id, snapshot=data.snapshot['id']) | origin_url, visit_id, snapshot=data.snapshot['id']) | ||||
assert list(swh_storage.journal_writer.objects) == [ | assert list(swh_storage.journal_writer.objects) == [ | ||||
('snapshot', data.snapshot)] | ('snapshot', data.snapshot)] | ||||
def test_snapshot_add_twice__by_origin_visit(self, swh_storage): | def test_snapshot_add_twice__by_origin_visit(self, swh_storage): | ||||
origin_url = data.origin['url'] | origin_url = data.origin['url'] | ||||
▲ Show 20 Lines • Show All 162 Lines • ▼ Show 20 Lines | def test_snapshot_get_latest__missing_snapshot(self, swh_storage): | ||||
# Two visits, both with no snapshot: latest snapshot is None | # Two visits, both with no snapshot: latest snapshot is None | ||||
assert swh_storage.snapshot_get_latest(origin_url) is None | assert swh_storage.snapshot_get_latest(origin_url) is None | ||||
# Add unknown snapshot to visit1, check that the inconsistency is | # Add unknown snapshot to visit1, check that the inconsistency is | ||||
# detected | # detected | ||||
swh_storage.origin_visit_update( | swh_storage.origin_visit_update( | ||||
origin_url, | origin_url, | ||||
visit1_id, snapshot=data.complete_snapshot['id']) | visit1_id, snapshot=data.complete_snapshot['id']) | ||||
with pytest.raises(ValueError): | with pytest.raises(Exception): | ||||
swh_storage.snapshot_get_latest( | # XXX: should the exception be more specific than this? | ||||
origin_url) | swh_storage.snapshot_get_latest(origin_url) | ||||
# Status filter: both visits are status=ongoing, so no snapshot | # Status filter: both visits are status=ongoing, so no snapshot | ||||
# returned | # returned | ||||
assert swh_storage.snapshot_get_latest( | assert swh_storage.snapshot_get_latest( | ||||
origin_url, | origin_url, | ||||
allowed_statuses=['full']) is None | allowed_statuses=['full']) is None | ||||
# Mark the first visit as completed and check status filter again | # Mark the first visit as completed and check status filter again | ||||
swh_storage.origin_visit_update( | swh_storage.origin_visit_update( | ||||
origin_url, | origin_url, | ||||
visit1_id, status='full') | visit1_id, status='full') | ||||
with pytest.raises(ValueError): | with pytest.raises(Exception): | ||||
# XXX: should the exception be more specific than this? | |||||
swh_storage.snapshot_get_latest( | swh_storage.snapshot_get_latest( | ||||
origin_url, | origin_url, | ||||
allowed_statuses=['full']), | allowed_statuses=['full']), | ||||
# Actually add the snapshot and check status filter again | # Actually add the snapshot and check status filter again | ||||
swh_storage.snapshot_add([data.complete_snapshot]) | swh_storage.snapshot_add([data.complete_snapshot]) | ||||
assert {**data.complete_snapshot, 'next_branch': None} \ | assert {**data.complete_snapshot, 'next_branch': None} \ | ||||
== swh_storage.snapshot_get_latest(origin_url) | == swh_storage.snapshot_get_latest(origin_url) | ||||
# Add unknown snapshot to visit2 and check that the inconsistency | # Add unknown snapshot to visit2 and check that the inconsistency | ||||
# is detected | # is detected | ||||
swh_storage.origin_visit_update( | swh_storage.origin_visit_update( | ||||
origin_url, | origin_url, | ||||
visit2_id, snapshot=data.snapshot['id']) | visit2_id, snapshot=data.snapshot['id']) | ||||
with pytest.raises(ValueError): | with pytest.raises(Exception): | ||||
# XXX: should the exception be more specific than this? | |||||
swh_storage.snapshot_get_latest( | swh_storage.snapshot_get_latest( | ||||
origin_url) | origin_url) | ||||
# Actually add that snapshot and check that the new one is returned | # Actually add that snapshot and check that the new one is returned | ||||
swh_storage.snapshot_add([data.snapshot]) | swh_storage.snapshot_add([data.snapshot]) | ||||
assert{**data.snapshot, 'next_branch': None} \ | assert{**data.snapshot, 'next_branch': None} \ | ||||
== swh_storage.snapshot_get_latest(origin_url) | == swh_storage.snapshot_get_latest(origin_url) | ||||
▲ Show 20 Lines • Show All 290 Lines • ▼ Show 20 Lines | def test_content_find_with_duplicate_blake2s256(self, swh_storage): | ||||
expected_result = [ | expected_result = [ | ||||
duplicate_cont | duplicate_cont | ||||
] | ] | ||||
assert expected_result == actual_result | assert expected_result == actual_result | ||||
def test_content_find_bad_input(self, swh_storage): | def test_content_find_bad_input(self, swh_storage): | ||||
# 1. with bad input | # 1. with bad input | ||||
with pytest.raises(ValueError): | with pytest.raises(StorageArgumentException): | ||||
swh_storage.content_find({}) # empty is bad | swh_storage.content_find({}) # empty is bad | ||||
# 2. with bad input | # 2. with bad input | ||||
with pytest.raises(ValueError): | with pytest.raises(StorageArgumentException): | ||||
swh_storage.content_find( | swh_storage.content_find( | ||||
{'unknown-sha1': 'something'}) # not the right key | {'unknown-sha1': 'something'}) # not the right key | ||||
def test_object_find_by_sha1_git(self, swh_storage): | def test_object_find_by_sha1_git(self, swh_storage): | ||||
sha1_gits = [b'00000000000000000000'] | sha1_gits = [b'00000000000000000000'] | ||||
expected = { | expected = { | ||||
b'00000000000000000000': [], | b'00000000000000000000': [], | ||||
} | } | ||||
▲ Show 20 Lines • Show All 440 Lines • ▼ Show 20 Lines | def test_generate_content_get_range_empty(self, swh_storage, swh_contents): | ||||
start = b'0' * 40 | start = b'0' * 40 | ||||
end = b'f' * 40 | end = b'f' * 40 | ||||
actual_result = swh_storage.content_get_range(end, start) | actual_result = swh_storage.content_get_range(end, start) | ||||
assert actual_result['next'] is None | assert actual_result['next'] is None | ||||
assert len(actual_result['contents']) == 0 | assert len(actual_result['contents']) == 0 | ||||
def test_generate_content_get_range_limit_none(self, swh_storage): | def test_generate_content_get_range_limit_none(self, swh_storage): | ||||
"""content_get_range call with wrong limit input should fail""" | """content_get_range call with wrong limit input should fail""" | ||||
with pytest.raises(ValueError) as e: | with pytest.raises(StorageArgumentException) as e: | ||||
swh_storage.content_get_range(start=None, end=None, limit=None) | swh_storage.content_get_range(start=None, end=None, limit=None) | ||||
assert e.value.args == ('Development error: limit should not be None',) | assert e.value.args == ('limit should not be None',) | ||||
def test_generate_content_get_range_no_limit( | def test_generate_content_get_range_no_limit( | ||||
self, swh_storage, swh_contents): | self, swh_storage, swh_contents): | ||||
"""content_get_range returns contents within range provided""" | """content_get_range returns contents within range provided""" | ||||
# input the list of sha1s we want from storage | # input the list of sha1s we want from storage | ||||
get_sha1s = sorted([c['sha1'] for c in swh_contents | get_sha1s = sorted([c['sha1'] for c in swh_contents | ||||
if c['status'] != 'absent']) | if c['status'] != 'absent']) | ||||
start = get_sha1s[0] | start = get_sha1s[0] | ||||
▲ Show 20 Lines • Show All 379 Lines • Show Last 20 Lines |