Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_filter.py
Show All 12 Lines | def test_filtering_proxy_storage_content(sample_data): | ||||
content = next(storage.content_get([sample_content['sha1']])) | content = next(storage.content_get([sample_content['sha1']])) | ||||
assert not content | assert not content | ||||
s = storage.content_add([sample_content]) | s = storage.content_add([sample_content]) | ||||
assert s == { | assert s == { | ||||
'content:add': 1, | 'content:add': 1, | ||||
'content:add:bytes': sample_content['length'], | 'content:add:bytes': sample_content['length'], | ||||
'skipped_content:add': 0 | |||||
} | } | ||||
content = next(storage.content_get([sample_content['sha1']])) | content = next(storage.content_get([sample_content['sha1']])) | ||||
assert content is not None | assert content is not None | ||||
s = storage.content_add([sample_content]) | s = storage.content_add([sample_content]) | ||||
assert s == { | assert s == { | ||||
'content:add': 0, | 'content:add': 0, | ||||
'content:add:bytes': 0, | 'content:add:bytes': 0, | ||||
'skipped_content:add': 0 | } | ||||
def test_filtering_proxy_storage_skipped_content(sample_data): | |||||
sample_content = sample_data['skipped_content'][0] | |||||
storage = FilteringProxyStorage(storage={'cls': 'memory'}) | |||||
content = next(storage.skipped_content_missing([sample_content])) | |||||
assert content['sha1'] == sample_content['sha1'] | |||||
s = storage.skipped_content_add([sample_content]) | |||||
assert s == { | |||||
'skipped_content:add': 1, | |||||
} | |||||
content = list(storage.skipped_content_missing([sample_content])) | |||||
assert content == [] | |||||
s = storage.skipped_content_add([sample_content]) | |||||
assert s == { | |||||
'skipped_content:add': 0, | |||||
} | } | ||||
def test_filtering_proxy_storage_revision(sample_data): | def test_filtering_proxy_storage_revision(sample_data): | ||||
sample_revision = sample_data['revision'][0] | sample_revision = sample_data['revision'][0] | ||||
storage = FilteringProxyStorage(storage={'cls': 'memory'}) | storage = FilteringProxyStorage(storage={'cls': 'memory'}) | ||||
revision = next(storage.revision_get([sample_revision['id']])) | revision = next(storage.revision_get([sample_revision['id']])) | ||||
Show All 35 Lines |