Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_storage.py
Show First 20 Lines • Show All 230 Lines • ▼ Show 20 Lines | def test_content_add_metadata(self, swh_storage): | ||||
actual_result = swh_storage.content_add_metadata([cont]) | actual_result = swh_storage.content_add_metadata([cont]) | ||||
assert actual_result == { | assert actual_result == { | ||||
'content:add': 1, | 'content:add': 1, | ||||
'skipped_content:add': 0 | 'skipped_content:add': 0 | ||||
} | } | ||||
expected_cont = cont.copy() | expected_cont = cont.copy() | ||||
del expected_cont['ctime'] | del expected_cont['ctime'] | ||||
assert list(swh_storage.content_get_metadata([cont['sha1']])) == \ | assert swh_storage.content_get_metadata([cont['sha1']]) == { | ||||
[expected_cont] | cont['sha1']: [expected_cont] | ||||
} | |||||
assert list(swh_storage.journal_writer.objects) == [('content', cont)] | assert list(swh_storage.journal_writer.objects) == [('content', cont)] | ||||
def test_content_add_metadata_same_input(self, swh_storage): | def test_content_add_metadata_same_input(self, swh_storage): | ||||
cont = data.cont | cont = data.cont | ||||
del cont['data'] | del cont['data'] | ||||
cont['ctime'] = datetime.datetime.now() | cont['ctime'] = datetime.datetime.now() | ||||
▲ Show 20 Lines • Show All 192 Lines • ▼ Show 20 Lines | def test_generate_content_get_partition_pagination( | ||||
expected_contents, actual_contents, ['sha1']) | expected_contents, actual_contents, ['sha1']) | ||||
def test_content_get_metadata(self, swh_storage): | def test_content_get_metadata(self, swh_storage): | ||||
cont1 = data.cont | cont1 = data.cont | ||||
cont2 = data.cont2 | cont2 = data.cont2 | ||||
swh_storage.content_add([cont1, cont2]) | swh_storage.content_add([cont1, cont2]) | ||||
actual_md = list(swh_storage.content_get_metadata( | actual_md = swh_storage.content_get_metadata( | ||||
[cont1['sha1'], cont2['sha1']])) | [cont1['sha1'], cont2['sha1']]) | ||||
# we only retrieve the metadata | # we only retrieve the metadata | ||||
cont1.pop('data') | cont1.pop('data') | ||||
cont2.pop('data') | cont2.pop('data') | ||||
assert actual_md in ([cont1, cont2], [cont2, cont1]) | assert actual_md[cont1['sha1']] == [cont1] | ||||
assert actual_md[cont2['sha1']] == [cont2] | |||||
assert len(actual_md.keys()) == 2 | |||||
def test_content_get_metadata_missing_sha1(self, swh_storage): | def test_content_get_metadata_missing_sha1(self, swh_storage): | ||||
cont1 = data.cont | cont1 = data.cont | ||||
cont2 = data.cont2 | cont2 = data.cont2 | ||||
missing_cont = data.missing_cont | missing_cont = data.missing_cont | ||||
swh_storage.content_add([cont1, cont2]) | swh_storage.content_add([cont1, cont2]) | ||||
gen = swh_storage.content_get_metadata([missing_cont['sha1']]) | actual_contents = swh_storage.content_get_metadata( | ||||
[missing_cont['sha1']]) | |||||
# All the metadata keys are None | |||||
missing_cont.pop('data') | |||||
for key in missing_cont: | |||||
if key != 'sha1': | |||||
missing_cont[key] = None | |||||
assert list(gen) == [missing_cont] | assert actual_contents == {missing_cont['sha1']: []} | ||||
def test_content_get_random(self, swh_storage): | def test_content_get_random(self, swh_storage): | ||||
swh_storage.content_add([data.cont, data.cont2, data.cont3]) | swh_storage.content_add([data.cont, data.cont2, data.cont3]) | ||||
assert swh_storage.content_get_random() in { | assert swh_storage.content_get_random() in { | ||||
data.cont['sha1_git'], data.cont2['sha1_git'], | data.cont['sha1_git'], data.cont2['sha1_git'], | ||||
data.cont3['sha1_git']} | data.cont3['sha1_git']} | ||||
▲ Show 20 Lines • Show All 2,676 Lines • ▼ Show 20 Lines | class TestStorageGeneratedData: | ||||
def test_generate_content_get_metadata(self, swh_storage, swh_contents): | def test_generate_content_get_metadata(self, swh_storage, swh_contents): | ||||
# input the list of sha1s we want from storage | # input the list of sha1s we want from storage | ||||
expected_contents = [c for c in swh_contents | expected_contents = [c for c in swh_contents | ||||
if c['status'] != 'absent'] | if c['status'] != 'absent'] | ||||
get_sha1s = [c['sha1'] for c in expected_contents] | get_sha1s = [c['sha1'] for c in expected_contents] | ||||
# retrieve contents | # retrieve contents | ||||
actual_contents = list(swh_storage.content_get_metadata(get_sha1s)) | meta_contents = swh_storage.content_get_metadata(get_sha1s) | ||||
assert len(actual_contents) == len(get_sha1s) | assert len(list(meta_contents)) == len(get_sha1s) | ||||
actual_contents = [] | |||||
for contents in meta_contents.values(): | |||||
actual_contents.extend(contents) | |||||
keys_to_check = {'length', 'status', | keys_to_check = {'length', 'status', | ||||
'sha1', 'sha1_git', 'sha256', 'blake2s256'} | 'sha1', 'sha1_git', 'sha256', 'blake2s256'} | ||||
assert_contents_ok(expected_contents, actual_contents, | assert_contents_ok(expected_contents, actual_contents, | ||||
keys_to_check=keys_to_check) | keys_to_check=keys_to_check) | ||||
def test_generate_content_get_range(self, swh_storage, swh_contents): | def test_generate_content_get_range(self, swh_storage, swh_contents): | ||||
"""content_get_range returns complete range""" | """content_get_range returns complete range""" | ||||
present_contents = [c for c in swh_contents | present_contents = [c for c in swh_contents | ||||
if c['status'] != 'absent'] | if c['status'] != 'absent'] | ||||
▲ Show 20 Lines • Show All 460 Lines • Show Last 20 Lines |