Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_storage.py
Show First 20 Lines • Show All 3,181 Lines • ▼ Show 20 Lines | def test_content_find_bad_input(self, swh_storage): | ||||
# 1. with bad input | # 1. with bad input | ||||
with pytest.raises(StorageArgumentException): | with pytest.raises(StorageArgumentException): | ||||
swh_storage.content_find({}) # empty is bad | swh_storage.content_find({}) # empty is bad | ||||
# 2. with bad input | # 2. with bad input | ||||
with pytest.raises(StorageArgumentException): | with pytest.raises(StorageArgumentException): | ||||
swh_storage.content_find({"unknown-sha1": "something"}) # not the right key | swh_storage.content_find({"unknown-sha1": "something"}) # not the right key | ||||
def test_object_find_by_sha1_git(self, swh_storage): | |||||
sha1_gits = [b"00000000000000000000"] | |||||
expected = { | |||||
b"00000000000000000000": [], | |||||
} | |||||
swh_storage.content_add([data.cont]) | |||||
sha1_gits.append(data.cont["sha1_git"]) | |||||
expected[data.cont["sha1_git"]] = [ | |||||
{"sha1_git": data.cont["sha1_git"], "type": "content",} | |||||
] | |||||
swh_storage.directory_add([data.dir]) | |||||
sha1_gits.append(data.dir["id"]) | |||||
expected[data.dir["id"]] = [{"sha1_git": data.dir["id"], "type": "directory",}] | |||||
swh_storage.revision_add([data.revision]) | |||||
sha1_gits.append(data.revision["id"]) | |||||
expected[data.revision["id"]] = [ | |||||
{"sha1_git": data.revision["id"], "type": "revision",} | |||||
] | |||||
swh_storage.release_add([data.release]) | |||||
sha1_gits.append(data.release["id"]) | |||||
expected[data.release["id"]] = [ | |||||
{"sha1_git": data.release["id"], "type": "release",} | |||||
] | |||||
ret = swh_storage.object_find_by_sha1_git(sha1_gits) | |||||
assert expected == ret | |||||
def test_metadata_fetcher_add_get(self, swh_storage): | |||||
actual_fetcher = swh_storage.metadata_fetcher_get( | |||||
data.metadata_fetcher["name"], data.metadata_fetcher["version"] | |||||
) | |||||
assert actual_fetcher is None # does not exist | |||||
swh_storage.metadata_fetcher_add(**data.metadata_fetcher) | |||||
res = swh_storage.metadata_fetcher_get( | |||||
data.metadata_fetcher["name"], data.metadata_fetcher["version"] | |||||
) | |||||
assert res is not data.metadata_fetcher | |||||
assert res == data.metadata_fetcher | |||||
def test_metadata_authority_add_get(self, swh_storage): | |||||
actual_authority = swh_storage.metadata_authority_get( | |||||
data.metadata_authority["type"], data.metadata_authority["url"] | |||||
) | |||||
assert actual_authority is None # does not exist | |||||
swh_storage.metadata_authority_add(**data.metadata_authority) | |||||
res = swh_storage.metadata_authority_get( | |||||
data.metadata_authority["type"], data.metadata_authority["url"] | |||||
) | |||||
assert res is not data.metadata_authority | |||||
assert res == data.metadata_authority | |||||
def test_content_metadata_add(self, swh_storage): | def test_content_metadata_add(self, swh_storage): | ||||
content = data.cont | content = data.cont | ||||
fetcher = data.metadata_fetcher | fetcher = data.metadata_fetcher | ||||
authority = data.metadata_authority | authority = data.metadata_authority | ||||
content_swhid = f"swh:1:cnt:{content['sha1_git']}" | content_swhid = f"swh:1:cnt:{content['sha1_git']}" | ||||
swh_storage.metadata_fetcher_add(**fetcher) | swh_storage.metadata_fetcher_add(**fetcher) | ||||
swh_storage.metadata_authority_add(**authority) | swh_storage.metadata_authority_add(**authority) | ||||
▲ Show 20 Lines • Show All 173 Lines • ▼ Show 20 Lines | def test_content_metadata_get_paginate_same_date(self, swh_storage): | ||||
assert [data.content_metadata] == result["results"] | assert [data.content_metadata] == result["results"] | ||||
result = swh_storage.content_metadata_get( | result = swh_storage.content_metadata_get( | ||||
content_swhid, authority, limit=1, page_token=result["next_page_token"] | content_swhid, authority, limit=1, page_token=result["next_page_token"] | ||||
) | ) | ||||
assert result["next_page_token"] is None | assert result["next_page_token"] is None | ||||
assert [content_metadata2] == result["results"] | assert [content_metadata2] == result["results"] | ||||
def test_object_find_by_sha1_git(self, swh_storage): | |||||
sha1_gits = [b"00000000000000000000"] | |||||
expected = { | |||||
b"00000000000000000000": [], | |||||
} | |||||
swh_storage.content_add([data.cont]) | |||||
sha1_gits.append(data.cont["sha1_git"]) | |||||
expected[data.cont["sha1_git"]] = [ | |||||
{"sha1_git": data.cont["sha1_git"], "type": "content",} | |||||
] | |||||
swh_storage.directory_add([data.dir]) | |||||
sha1_gits.append(data.dir["id"]) | |||||
expected[data.dir["id"]] = [{"sha1_git": data.dir["id"], "type": "directory",}] | |||||
swh_storage.revision_add([data.revision]) | |||||
sha1_gits.append(data.revision["id"]) | |||||
expected[data.revision["id"]] = [ | |||||
{"sha1_git": data.revision["id"], "type": "revision",} | |||||
] | |||||
swh_storage.release_add([data.release]) | |||||
sha1_gits.append(data.release["id"]) | |||||
expected[data.release["id"]] = [ | |||||
{"sha1_git": data.release["id"], "type": "release",} | |||||
] | |||||
ret = swh_storage.object_find_by_sha1_git(sha1_gits) | |||||
assert expected == ret | |||||
def test_metadata_fetcher_add_get(self, swh_storage): | |||||
actual_fetcher = swh_storage.metadata_fetcher_get( | |||||
data.metadata_fetcher["name"], data.metadata_fetcher["version"] | |||||
) | |||||
assert actual_fetcher is None # does not exist | |||||
swh_storage.metadata_fetcher_add(**data.metadata_fetcher) | |||||
res = swh_storage.metadata_fetcher_get( | |||||
data.metadata_fetcher["name"], data.metadata_fetcher["version"] | |||||
) | |||||
assert res is not data.metadata_fetcher | |||||
assert res == data.metadata_fetcher | |||||
def test_metadata_authority_add_get(self, swh_storage): | |||||
actual_authority = swh_storage.metadata_authority_get( | |||||
data.metadata_authority["type"], data.metadata_authority["url"] | |||||
) | |||||
assert actual_authority is None # does not exist | |||||
swh_storage.metadata_authority_add(**data.metadata_authority) | |||||
res = swh_storage.metadata_authority_get( | |||||
data.metadata_authority["type"], data.metadata_authority["url"] | |||||
) | |||||
assert res is not data.metadata_authority | |||||
assert res == data.metadata_authority | |||||
def test_origin_metadata_add(self, swh_storage): | def test_origin_metadata_add(self, swh_storage): | ||||
origin = data.origin | origin = data.origin | ||||
fetcher = data.metadata_fetcher | fetcher = data.metadata_fetcher | ||||
authority = data.metadata_authority | authority = data.metadata_authority | ||||
assert swh_storage.origin_add([origin]) == {"origin:add": 1} | assert swh_storage.origin_add([origin]) == {"origin:add": 1} | ||||
swh_storage.metadata_fetcher_add(**fetcher) | swh_storage.metadata_fetcher_add(**fetcher) | ||||
swh_storage.metadata_authority_add(**authority) | swh_storage.metadata_authority_add(**authority) | ||||
▲ Show 20 Lines • Show All 717 Lines • Show Last 20 Lines |