Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_storage.py
Show First 20 Lines • Show All 180 Lines • ▼ Show 20 Lines | def test_content_get_missing(self, swh_storage): | ||||
assert results == [{'sha1': cont['sha1'], 'data': cont['data']}, None] | assert results == [{'sha1': cont['sha1'], 'data': cont['data']}, None] | ||||
# Check content_get does not discard found countent when it finds | # Check content_get does not discard found countent when it finds | ||||
# a missing content. | # a missing content. | ||||
results = list(swh_storage.content_get( | results = list(swh_storage.content_get( | ||||
[data.cont2['sha1'], data.cont['sha1']])) | [data.cont2['sha1'], data.cont['sha1']])) | ||||
assert results == [None, {'sha1': cont['sha1'], 'data': cont['data']}] | assert results == [None, {'sha1': cont['sha1'], 'data': cont['data']}] | ||||
def test_content_add_same_input(self, swh_storage): | |||||
cont = data.cont | |||||
actual_result = swh_storage.content_add([cont, cont]) | |||||
assert actual_result == { | |||||
'content:add': 1, | |||||
'content:add:bytes': cont['length'], | |||||
'skipped_content:add': 0 | |||||
} | |||||
def test_content_add_different_input(self, swh_storage): | def test_content_add_different_input(self, swh_storage): | ||||
cont = data.cont | cont = data.cont | ||||
cont2 = data.cont2 | cont2 = data.cont2 | ||||
actual_result = swh_storage.content_add([cont, cont2]) | actual_result = swh_storage.content_add([cont, cont2]) | ||||
assert actual_result == { | assert actual_result == { | ||||
'content:add': 2, | 'content:add': 2, | ||||
'content:add:bytes': cont['length'] + cont2['length'], | 'content:add:bytes': cont['length'] + cont2['length'], | ||||
▲ Show 20 Lines • Show All 48 Lines • ▼ Show 20 Lines | def test_content_add_metadata(self, swh_storage): | ||||
expected_cont = cont.copy() | expected_cont = cont.copy() | ||||
del expected_cont['ctime'] | del expected_cont['ctime'] | ||||
assert swh_storage.content_get_metadata([cont['sha1']]) == { | assert swh_storage.content_get_metadata([cont['sha1']]) == { | ||||
cont['sha1']: [expected_cont] | cont['sha1']: [expected_cont] | ||||
} | } | ||||
assert list(swh_storage.journal_writer.objects) == [('content', cont)] | assert list(swh_storage.journal_writer.objects) == [('content', cont)] | ||||
def test_content_add_metadata_same_input(self, swh_storage): | |||||
cont = data.cont | |||||
del cont['data'] | |||||
cont['ctime'] = datetime.datetime.now() | |||||
actual_result = swh_storage.content_add_metadata([cont, cont]) | |||||
assert actual_result == { | |||||
'content:add': 1, | |||||
'skipped_content:add': 0 | |||||
} | |||||
def test_content_add_metadata_different_input(self, swh_storage): | def test_content_add_metadata_different_input(self, swh_storage): | ||||
cont = data.cont | cont = data.cont | ||||
del cont['data'] | del cont['data'] | ||||
cont['ctime'] = datetime.datetime.now() | cont['ctime'] = datetime.datetime.now() | ||||
cont2 = data.cont2 | cont2 = data.cont2 | ||||
del cont2['data'] | del cont2['data'] | ||||
cont2['ctime'] = datetime.datetime.now() | cont2['ctime'] = datetime.datetime.now() | ||||
▲ Show 20 Lines • Show All 3,471 Lines • Show Last 20 Lines |