Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_storage.py
Show All 38 Lines | |||||
) | ) | ||||
from swh.model.hypothesis_strategies import objects | from swh.model.hypothesis_strategies import objects | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.converters import origin_url_to_sha1 as sha1 | from swh.storage.converters import origin_url_to_sha1 as sha1 | ||||
from swh.storage.exc import HashCollision, StorageArgumentException | from swh.storage.exc import HashCollision, StorageArgumentException | ||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||
from swh.storage.utils import content_hex_hashes, now | from swh.storage.utils import content_hex_hashes, now | ||||
from .storage_data import data | |||||
@contextmanager | @contextmanager | ||||
def db_transaction(storage): | def db_transaction(storage): | ||||
with storage.db() as db: | with storage.db() as db: | ||||
with db.transaction() as cur: | with db.transaction() as cur: | ||||
yield db, cur | yield db, cur | ||||
▲ Show 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | def test_round_to_milliseconds(): | ||||
for (ms, expected_ms) in [(0, 0), (1000, 1000), (555555, 555000), (999500, 999000)]: | for (ms, expected_ms) in [(0, 0), (1000, 1000), (555555, 555000), (999500, 999000)]: | ||||
date = date.replace(microsecond=ms) | date = date.replace(microsecond=ms) | ||||
actual_date = round_to_milliseconds(date) | actual_date = round_to_milliseconds(date) | ||||
assert actual_date.microsecond == expected_ms | assert actual_date.microsecond == expected_ms | ||||
class LazyContent(Content): | class LazyContent(Content): | ||||
def with_data(self): | def with_data(self): | ||||
raw_data = data.content.data | return Content.from_dict({**self.to_dict(), "data": b"42\n"}) | ||||
return Content.from_dict({**self.to_dict(), "data": raw_data}) | |||||
class TestStorage: | class TestStorage: | ||||
"""Main class for Storage testing. | """Main class for Storage testing. | ||||
This class is used as-is to test local storage (see TestLocalStorage | This class is used as-is to test local storage (see TestLocalStorage | ||||
below) and remote storage (see TestRemoteStorage in | below) and remote storage (see TestRemoteStorage in | ||||
test_remote_storage.py. | test_remote_storage.py. | ||||
Show All 36 Lines | def test_types(self, swh_storage_backend_config): | ||||
assert missing_methods == [] | assert missing_methods == [] | ||||
def test_check_config(self, swh_storage): | def test_check_config(self, swh_storage): | ||||
assert swh_storage.check_config(check_write=True) | assert swh_storage.check_config(check_write=True) | ||||
assert swh_storage.check_config(check_write=False) | assert swh_storage.check_config(check_write=False) | ||||
def test_content_add(self, swh_storage, sample_data): | def test_content_add(self, swh_storage, sample_data): | ||||
cont = sample_data["content"][0] | cont = sample_data.content | ||||
insertion_start_time = now() | insertion_start_time = now() | ||||
actual_result = swh_storage.content_add([cont]) | actual_result = swh_storage.content_add([cont]) | ||||
insertion_end_time = now() | insertion_end_time = now() | ||||
assert actual_result == { | assert actual_result == { | ||||
"content:add": 1, | "content:add": 1, | ||||
"content:add:bytes": cont.length, | "content:add:bytes": cont.length, | ||||
Show All 15 Lines | def test_content_add(self, swh_storage, sample_data): | ||||
assert insertion_start_time <= obj.ctime | assert insertion_start_time <= obj.ctime | ||||
assert obj.ctime <= insertion_end_time | assert obj.ctime <= insertion_end_time | ||||
assert obj == expected_cont | assert obj == expected_cont | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()["content"] == 1 | assert swh_storage.stat_counters()["content"] == 1 | ||||
def test_content_add_from_generator(self, swh_storage, sample_data): | def test_content_add_from_generator(self, swh_storage, sample_data): | ||||
cont = sample_data["content"][0] | cont = sample_data.content | ||||
def _cnt_gen(): | def _cnt_gen(): | ||||
yield cont | yield cont | ||||
actual_result = swh_storage.content_add(_cnt_gen()) | actual_result = swh_storage.content_add(_cnt_gen()) | ||||
assert actual_result == { | assert actual_result == { | ||||
"content:add": 1, | "content:add": 1, | ||||
"content:add:bytes": cont.length, | "content:add:bytes": cont.length, | ||||
} | } | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()["content"] == 1 | assert swh_storage.stat_counters()["content"] == 1 | ||||
def test_content_add_from_lazy_content(self, swh_storage, sample_data): | def test_content_add_from_lazy_content(self, swh_storage, sample_data): | ||||
cont = sample_data["content"][0] | cont = sample_data.content | ||||
lazy_content = LazyContent.from_dict(cont.to_dict()) | lazy_content = LazyContent.from_dict(cont.to_dict()) | ||||
insertion_start_time = now() | insertion_start_time = now() | ||||
actual_result = swh_storage.content_add([lazy_content]) | actual_result = swh_storage.content_add([lazy_content]) | ||||
insertion_end_time = now() | insertion_end_time = now() | ||||
Show All 19 Lines | def test_content_add_from_lazy_content(self, swh_storage, sample_data): | ||||
assert insertion_start_time <= obj.ctime | assert insertion_start_time <= obj.ctime | ||||
assert obj.ctime <= insertion_end_time | assert obj.ctime <= insertion_end_time | ||||
assert attr.evolve(obj, ctime=None).to_dict() == expected_cont.to_dict() | assert attr.evolve(obj, ctime=None).to_dict() == expected_cont.to_dict() | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()["content"] == 1 | assert swh_storage.stat_counters()["content"] == 1 | ||||
def test_content_get_missing(self, swh_storage, sample_data): | def test_content_get_missing(self, swh_storage, sample_data): | ||||
cont, cont2 = sample_data["content"][:2] | cont, cont2 = sample_data.contents[:2] | ||||
swh_storage.content_add([cont]) | swh_storage.content_add([cont]) | ||||
# Query a single missing content | # Query a single missing content | ||||
results = list(swh_storage.content_get([cont2.sha1])) | results = list(swh_storage.content_get([cont2.sha1])) | ||||
assert results == [None] | assert results == [None] | ||||
# Check content_get does not abort after finding a missing content | # Check content_get does not abort after finding a missing content | ||||
results = list(swh_storage.content_get([cont.sha1, cont2.sha1])) | results = list(swh_storage.content_get([cont.sha1, cont2.sha1])) | ||||
assert results == [{"sha1": cont.sha1, "data": cont.data}, None] | assert results == [{"sha1": cont.sha1, "data": cont.data}, None] | ||||
# Check content_get does not discard found countent when it finds | # Check content_get does not discard found countent when it finds | ||||
# a missing content. | # a missing content. | ||||
results = list(swh_storage.content_get([cont2.sha1, cont.sha1])) | results = list(swh_storage.content_get([cont2.sha1, cont.sha1])) | ||||
assert results == [None, {"sha1": cont.sha1, "data": cont.data}] | assert results == [None, {"sha1": cont.sha1, "data": cont.data}] | ||||
def test_content_add_different_input(self, swh_storage, sample_data): | def test_content_add_different_input(self, swh_storage, sample_data): | ||||
cont, cont2 = sample_data["content"][:2] | cont, cont2 = sample_data.contents[:2] | ||||
actual_result = swh_storage.content_add([cont, cont2]) | actual_result = swh_storage.content_add([cont, cont2]) | ||||
assert actual_result == { | assert actual_result == { | ||||
"content:add": 2, | "content:add": 2, | ||||
"content:add:bytes": cont.length + cont2.length, | "content:add:bytes": cont.length + cont2.length, | ||||
} | } | ||||
def test_content_add_twice(self, swh_storage, sample_data): | def test_content_add_twice(self, swh_storage, sample_data): | ||||
cont, cont2 = sample_data["content"][:2] | cont, cont2 = sample_data.contents[:2] | ||||
actual_result = swh_storage.content_add([cont]) | actual_result = swh_storage.content_add([cont]) | ||||
assert actual_result == { | assert actual_result == { | ||||
"content:add": 1, | "content:add": 1, | ||||
"content:add:bytes": cont.length, | "content:add:bytes": cont.length, | ||||
} | } | ||||
assert len(swh_storage.journal_writer.journal.objects) == 1 | assert len(swh_storage.journal_writer.journal.objects) == 1 | ||||
actual_result = swh_storage.content_add([cont, cont2]) | actual_result = swh_storage.content_add([cont, cont2]) | ||||
assert actual_result == { | assert actual_result == { | ||||
"content:add": 1, | "content:add": 1, | ||||
"content:add:bytes": cont2.length, | "content:add:bytes": cont2.length, | ||||
} | } | ||||
assert 2 <= len(swh_storage.journal_writer.journal.objects) <= 3 | assert 2 <= len(swh_storage.journal_writer.journal.objects) <= 3 | ||||
assert len(swh_storage.content_find(cont.to_dict())) == 1 | assert len(swh_storage.content_find(cont.to_dict())) == 1 | ||||
assert len(swh_storage.content_find(cont2.to_dict())) == 1 | assert len(swh_storage.content_find(cont2.to_dict())) == 1 | ||||
def test_content_add_collision(self, swh_storage, sample_data): | def test_content_add_collision(self, swh_storage, sample_data): | ||||
cont1 = sample_data["content"][0] | cont1 = sample_data.content | ||||
# create (corrupted) content with same sha1{,_git} but != sha256 | # create (corrupted) content with same sha1{,_git} but != sha256 | ||||
sha256_array = bytearray(cont1.sha256) | sha256_array = bytearray(cont1.sha256) | ||||
sha256_array[0] += 1 | sha256_array[0] += 1 | ||||
cont1b = attr.evolve(cont1, sha256=bytes(sha256_array)) | cont1b = attr.evolve(cont1, sha256=bytes(sha256_array)) | ||||
with pytest.raises(HashCollision) as cm: | with pytest.raises(HashCollision) as cm: | ||||
swh_storage.content_add([cont1, cont1b]) | swh_storage.content_add([cont1, cont1b]) | ||||
Show All 10 Lines | def test_content_add_collision(self, swh_storage, sample_data): | ||||
content_hex_hashes(cont1b.hashes()), | content_hex_hashes(cont1b.hashes()), | ||||
] | ] | ||||
assert exc.colliding_content_hashes() == [ | assert exc.colliding_content_hashes() == [ | ||||
cont1.hashes(), | cont1.hashes(), | ||||
cont1b.hashes(), | cont1b.hashes(), | ||||
] | ] | ||||
def test_content_add_duplicate(self, swh_storage, sample_data): | def test_content_add_duplicate(self, swh_storage, sample_data): | ||||
cont = sample_data["content"][0] | cont = sample_data.content | ||||
swh_storage.content_add([cont, cont]) | swh_storage.content_add([cont, cont]) | ||||
assert list(swh_storage.content_get([cont.sha1])) == [ | assert list(swh_storage.content_get([cont.sha1])) == [ | ||||
{"sha1": cont.sha1, "data": cont.data} | {"sha1": cont.sha1, "data": cont.data} | ||||
] | ] | ||||
def test_content_update(self, swh_storage, sample_data): | def test_content_update(self, swh_storage, sample_data): | ||||
cont1 = sample_data["content"][0] | cont1 = sample_data.content | ||||
if hasattr(swh_storage, "journal_writer"): | if hasattr(swh_storage, "journal_writer"): | ||||
swh_storage.journal_writer.journal = None # TODO, not supported | swh_storage.journal_writer.journal = None # TODO, not supported | ||||
swh_storage.content_add([cont1]) | swh_storage.content_add([cont1]) | ||||
# alter the sha1_git for example | # alter the sha1_git for example | ||||
cont1b = attr.evolve( | cont1b = attr.evolve( | ||||
cont1, sha1_git=hash_to_bytes("3a60a5275d0333bf13468e8b3dcab90f4046e654") | cont1, sha1_git=hash_to_bytes("3a60a5275d0333bf13468e8b3dcab90f4046e654") | ||||
) | ) | ||||
swh_storage.content_update([cont1b.to_dict()], keys=["sha1_git"]) | swh_storage.content_update([cont1b.to_dict()], keys=["sha1_git"]) | ||||
results = swh_storage.content_get_metadata([cont1.sha1]) | results = swh_storage.content_get_metadata([cont1.sha1]) | ||||
expected_content = attr.evolve(cont1b, data=None).to_dict() | expected_content = attr.evolve(cont1b, data=None).to_dict() | ||||
del expected_content["ctime"] | del expected_content["ctime"] | ||||
assert tuple(results[cont1.sha1]) == (expected_content,) | assert tuple(results[cont1.sha1]) == (expected_content,) | ||||
def test_content_add_metadata(self, swh_storage, sample_data): | def test_content_add_metadata(self, swh_storage, sample_data): | ||||
cont = attr.evolve(sample_data["content"][0], data=None, ctime=now()) | cont = attr.evolve(sample_data.content, data=None, ctime=now()) | ||||
actual_result = swh_storage.content_add_metadata([cont]) | actual_result = swh_storage.content_add_metadata([cont]) | ||||
assert actual_result == { | assert actual_result == { | ||||
"content:add": 1, | "content:add": 1, | ||||
} | } | ||||
expected_cont = cont.to_dict() | expected_cont = cont.to_dict() | ||||
del expected_cont["ctime"] | del expected_cont["ctime"] | ||||
assert tuple(swh_storage.content_get_metadata([cont.sha1])[cont.sha1]) == ( | assert tuple(swh_storage.content_get_metadata([cont.sha1])[cont.sha1]) == ( | ||||
expected_cont, | expected_cont, | ||||
) | ) | ||||
contents = [ | contents = [ | ||||
obj | obj | ||||
for (obj_type, obj) in swh_storage.journal_writer.journal.objects | for (obj_type, obj) in swh_storage.journal_writer.journal.objects | ||||
if obj_type == "content" | if obj_type == "content" | ||||
] | ] | ||||
assert len(contents) == 1 | assert len(contents) == 1 | ||||
for obj in contents: | for obj in contents: | ||||
obj = attr.evolve(obj, ctime=None) | obj = attr.evolve(obj, ctime=None) | ||||
assert obj == cont | assert obj == cont | ||||
def test_content_add_metadata_different_input(self, swh_storage, sample_data): | def test_content_add_metadata_different_input(self, swh_storage, sample_data): | ||||
contents = sample_data["content"][:2] | contents = sample_data.contents[:2] | ||||
cont = attr.evolve(contents[0], data=None, ctime=now()) | cont = attr.evolve(contents[0], data=None, ctime=now()) | ||||
cont2 = attr.evolve(contents[1], data=None, ctime=now()) | cont2 = attr.evolve(contents[1], data=None, ctime=now()) | ||||
actual_result = swh_storage.content_add_metadata([cont, cont2]) | actual_result = swh_storage.content_add_metadata([cont, cont2]) | ||||
assert actual_result == { | assert actual_result == { | ||||
"content:add": 2, | "content:add": 2, | ||||
} | } | ||||
def test_content_add_metadata_collision(self, swh_storage, sample_data): | def test_content_add_metadata_collision(self, swh_storage, sample_data): | ||||
cont1 = attr.evolve(sample_data["content"][0], data=None, ctime=now()) | cont1 = attr.evolve(sample_data.content, data=None, ctime=now()) | ||||
# create (corrupted) content with same sha1{,_git} but != sha256 | # create (corrupted) content with same sha1{,_git} but != sha256 | ||||
sha1_git_array = bytearray(cont1.sha256) | sha1_git_array = bytearray(cont1.sha256) | ||||
sha1_git_array[0] += 1 | sha1_git_array[0] += 1 | ||||
cont1b = attr.evolve(cont1, sha256=bytes(sha1_git_array)) | cont1b = attr.evolve(cont1, sha256=bytes(sha1_git_array)) | ||||
with pytest.raises(HashCollision) as cm: | with pytest.raises(HashCollision) as cm: | ||||
swh_storage.content_add_metadata([cont1, cont1b]) | swh_storage.content_add_metadata([cont1, cont1b]) | ||||
Show All 10 Lines | def test_content_add_metadata_collision(self, swh_storage, sample_data): | ||||
content_hex_hashes(cont1b.hashes()), | content_hex_hashes(cont1b.hashes()), | ||||
] | ] | ||||
assert exc.colliding_content_hashes() == [ | assert exc.colliding_content_hashes() == [ | ||||
cont1.hashes(), | cont1.hashes(), | ||||
cont1b.hashes(), | cont1b.hashes(), | ||||
] | ] | ||||
def test_skipped_content_add(self, swh_storage, sample_data): | def test_skipped_content_add(self, swh_storage, sample_data): | ||||
contents = sample_data["skipped_content"][:2] | contents = sample_data.skipped_contents[:2] | ||||
cont = contents[0] | cont = contents[0] | ||||
cont2 = attr.evolve(contents[1], blake2s256=None) | cont2 = attr.evolve(contents[1], blake2s256=None) | ||||
contents_dict = [c.to_dict() for c in [cont, cont2]] | contents_dict = [c.to_dict() for c in [cont, cont2]] | ||||
missing = list(swh_storage.skipped_content_missing(contents_dict)) | missing = list(swh_storage.skipped_content_missing(contents_dict)) | ||||
assert missing == [cont.hashes(), cont2.hashes()] | assert missing == [cont.hashes(), cont2.hashes()] | ||||
actual_result = swh_storage.skipped_content_add([cont, cont, cont2]) | actual_result = swh_storage.skipped_content_add([cont, cont, cont2]) | ||||
assert 2 <= actual_result.pop("skipped_content:add") <= 3 | assert 2 <= actual_result.pop("skipped_content:add") <= 3 | ||||
assert actual_result == {} | assert actual_result == {} | ||||
missing = list(swh_storage.skipped_content_missing(contents_dict)) | missing = list(swh_storage.skipped_content_missing(contents_dict)) | ||||
assert missing == [] | assert missing == [] | ||||
def test_skipped_content_add_missing_hashes(self, swh_storage, sample_data): | def test_skipped_content_add_missing_hashes(self, swh_storage, sample_data): | ||||
cont, cont2 = [ | cont, cont2 = [ | ||||
attr.evolve(c, sha1_git=None) for c in sample_data["skipped_content"][:2] | attr.evolve(c, sha1_git=None) for c in sample_data.skipped_contents[:2] | ||||
] | ] | ||||
contents_dict = [c.to_dict() for c in [cont, cont2]] | contents_dict = [c.to_dict() for c in [cont, cont2]] | ||||
missing = list(swh_storage.skipped_content_missing(contents_dict)) | missing = list(swh_storage.skipped_content_missing(contents_dict)) | ||||
assert len(missing) == 2 | assert len(missing) == 2 | ||||
actual_result = swh_storage.skipped_content_add([cont, cont, cont2]) | actual_result = swh_storage.skipped_content_add([cont, cont, cont2]) | ||||
assert 2 <= actual_result.pop("skipped_content:add") <= 3 | assert 2 <= actual_result.pop("skipped_content:add") <= 3 | ||||
assert actual_result == {} | assert actual_result == {} | ||||
missing = list(swh_storage.skipped_content_missing(contents_dict)) | missing = list(swh_storage.skipped_content_missing(contents_dict)) | ||||
assert missing == [] | assert missing == [] | ||||
def test_skipped_content_missing_partial_hash(self, swh_storage, sample_data): | def test_skipped_content_missing_partial_hash(self, swh_storage, sample_data): | ||||
cont = sample_data["skipped_content"][0] | cont = sample_data.skipped_content | ||||
cont2 = attr.evolve(cont, sha1_git=None) | cont2 = attr.evolve(cont, sha1_git=None) | ||||
contents_dict = [c.to_dict() for c in [cont, cont2]] | contents_dict = [c.to_dict() for c in [cont, cont2]] | ||||
missing = list(swh_storage.skipped_content_missing(contents_dict)) | missing = list(swh_storage.skipped_content_missing(contents_dict)) | ||||
assert len(missing) == 2 | assert len(missing) == 2 | ||||
actual_result = swh_storage.skipped_content_add([cont]) | actual_result = swh_storage.skipped_content_add([cont]) | ||||
assert actual_result.pop("skipped_content:add") == 1 | assert actual_result.pop("skipped_content:add") == 1 | ||||
assert actual_result == {} | assert actual_result == {} | ||||
missing = list(swh_storage.skipped_content_missing(contents_dict)) | missing = list(swh_storage.skipped_content_missing(contents_dict)) | ||||
assert missing == [cont2.hashes()] | assert missing == [cont2.hashes()] | ||||
@pytest.mark.property_based | @pytest.mark.property_based | ||||
@settings(deadline=None) # this test is very slow | @settings(deadline=None) # this test is very slow | ||||
@given( | @given( | ||||
strategies.sets( | strategies.sets( | ||||
elements=strategies.sampled_from(["sha256", "sha1_git", "blake2s256"]), | elements=strategies.sampled_from(["sha256", "sha1_git", "blake2s256"]), | ||||
min_size=0, | min_size=0, | ||||
) | ) | ||||
) | ) | ||||
def test_content_missing(self, swh_storage, algos): | def test_content_missing(self, swh_storage, sample_data, algos): | ||||
algos |= {"sha1"} | algos |= {"sha1"} | ||||
content, missing_content = [data.content2, data.skipped_content] | content, missing_content = [sample_data.content2, sample_data.skipped_content] | ||||
swh_storage.content_add([content]) | swh_storage.content_add([content]) | ||||
test_contents = [content.to_dict()] | test_contents = [content.to_dict()] | ||||
missing_per_hash = defaultdict(list) | missing_per_hash = defaultdict(list) | ||||
for i in range(256): | for i in range(256): | ||||
test_content = missing_content.to_dict() | test_content = missing_content.to_dict() | ||||
for hash in algos: | for hash in algos: | ||||
test_content[hash] = bytes([i]) + test_content[hash][1:] | test_content[hash] = bytes([i]) + test_content[hash][1:] | ||||
Show All 11 Lines | class TestStorage: | ||||
@pytest.mark.property_based | @pytest.mark.property_based | ||||
@given( | @given( | ||||
strategies.sets( | strategies.sets( | ||||
elements=strategies.sampled_from(["sha256", "sha1_git", "blake2s256"]), | elements=strategies.sampled_from(["sha256", "sha1_git", "blake2s256"]), | ||||
min_size=0, | min_size=0, | ||||
) | ) | ||||
) | ) | ||||
def test_content_missing_unknown_algo(self, swh_storage, algos): | def test_content_missing_unknown_algo(self, swh_storage, sample_data, algos): | ||||
algos |= {"sha1"} | algos |= {"sha1"} | ||||
content, missing_content = [data.content2, data.skipped_content] | content, missing_content = [sample_data.content2, sample_data.skipped_content] | ||||
swh_storage.content_add([content]) | swh_storage.content_add([content]) | ||||
test_contents = [content.to_dict()] | test_contents = [content.to_dict()] | ||||
missing_per_hash = defaultdict(list) | missing_per_hash = defaultdict(list) | ||||
for i in range(16): | for i in range(16): | ||||
test_content = missing_content.to_dict() | test_content = missing_content.to_dict() | ||||
for hash in algos: | for hash in algos: | ||||
test_content[hash] = bytes([i]) + test_content[hash][1:] | test_content[hash] = bytes([i]) + test_content[hash][1:] | ||||
missing_per_hash[hash].append(test_content[hash]) | missing_per_hash[hash].append(test_content[hash]) | ||||
test_content["nonexisting_algo"] = b"\x00" | test_content["nonexisting_algo"] = b"\x00" | ||||
test_contents.append(test_content) | test_contents.append(test_content) | ||||
assert set(swh_storage.content_missing(test_contents)) == set( | assert set(swh_storage.content_missing(test_contents)) == set( | ||||
missing_per_hash["sha1"] | missing_per_hash["sha1"] | ||||
) | ) | ||||
for hash in algos: | for hash in algos: | ||||
assert set( | assert set( | ||||
swh_storage.content_missing(test_contents, key_hash=hash) | swh_storage.content_missing(test_contents, key_hash=hash) | ||||
) == set(missing_per_hash[hash]) | ) == set(missing_per_hash[hash]) | ||||
def test_content_missing_per_sha1(self, swh_storage, sample_data): | def test_content_missing_per_sha1(self, swh_storage, sample_data): | ||||
# given | # given | ||||
cont = sample_data["content"][0] | cont = sample_data.content | ||||
missing_cont = sample_data["skipped_content"][0] | missing_cont = sample_data.skipped_content | ||||
swh_storage.content_add([cont]) | swh_storage.content_add([cont]) | ||||
# when | # when | ||||
gen = swh_storage.content_missing_per_sha1([cont.sha1, missing_cont.sha1]) | gen = swh_storage.content_missing_per_sha1([cont.sha1, missing_cont.sha1]) | ||||
# then | # then | ||||
assert list(gen) == [missing_cont.sha1] | assert list(gen) == [missing_cont.sha1] | ||||
def test_content_missing_per_sha1_git(self, swh_storage, sample_data): | def test_content_missing_per_sha1_git(self, swh_storage, sample_data): | ||||
cont, cont2 = sample_data["content"][:2] | cont, cont2 = sample_data.contents[:2] | ||||
missing_cont = sample_data["skipped_content"][0] | missing_cont = sample_data.skipped_content | ||||
swh_storage.content_add([cont, cont2]) | swh_storage.content_add([cont, cont2]) | ||||
contents = [cont.sha1_git, cont2.sha1_git, missing_cont.sha1_git] | contents = [cont.sha1_git, cont2.sha1_git, missing_cont.sha1_git] | ||||
missing_contents = swh_storage.content_missing_per_sha1_git(contents) | missing_contents = swh_storage.content_missing_per_sha1_git(contents) | ||||
assert list(missing_contents) == [missing_cont.sha1_git] | assert list(missing_contents) == [missing_cont.sha1_git] | ||||
▲ Show 20 Lines • Show All 68 Lines • ▼ Show 20 Lines | def test_generate_content_get_partition_pagination(self, swh_storage, swh_contents): | ||||
page_token = actual_result["next_page_token"] | page_token = actual_result["next_page_token"] | ||||
if page_token is None: | if page_token is None: | ||||
break | break | ||||
assert_contents_ok(expected_contents, actual_contents, ["sha1"]) | assert_contents_ok(expected_contents, actual_contents, ["sha1"]) | ||||
def test_content_get_metadata(self, swh_storage, sample_data): | def test_content_get_metadata(self, swh_storage, sample_data): | ||||
cont1, cont2 = sample_data["content"][:2] | cont1, cont2 = sample_data.contents[:2] | ||||
swh_storage.content_add([cont1, cont2]) | swh_storage.content_add([cont1, cont2]) | ||||
actual_md = swh_storage.content_get_metadata([cont1.sha1, cont2.sha1]) | actual_md = swh_storage.content_get_metadata([cont1.sha1, cont2.sha1]) | ||||
# we only retrieve the metadata so no data nor ctime within | # we only retrieve the metadata so no data nor ctime within | ||||
expected_cont1, expected_cont2 = [ | expected_cont1, expected_cont2 = [ | ||||
attr.evolve(c, data=None).to_dict() for c in [cont1, cont2] | attr.evolve(c, data=None).to_dict() for c in [cont1, cont2] | ||||
] | ] | ||||
expected_cont1.pop("ctime") | expected_cont1.pop("ctime") | ||||
expected_cont2.pop("ctime") | expected_cont2.pop("ctime") | ||||
assert tuple(actual_md[cont1.sha1]) == (expected_cont1,) | assert tuple(actual_md[cont1.sha1]) == (expected_cont1,) | ||||
assert tuple(actual_md[cont2.sha1]) == (expected_cont2,) | assert tuple(actual_md[cont2.sha1]) == (expected_cont2,) | ||||
assert len(actual_md.keys()) == 2 | assert len(actual_md.keys()) == 2 | ||||
def test_content_get_metadata_missing_sha1(self, swh_storage, sample_data): | def test_content_get_metadata_missing_sha1(self, swh_storage, sample_data): | ||||
cont1, cont2 = sample_data["content"][:2] | cont1, cont2 = sample_data.contents[:2] | ||||
missing_cont = sample_data["skipped_content"][0] | missing_cont = sample_data.skipped_content | ||||
swh_storage.content_add([cont1, cont2]) | swh_storage.content_add([cont1, cont2]) | ||||
actual_contents = swh_storage.content_get_metadata([missing_cont.sha1]) | actual_contents = swh_storage.content_get_metadata([missing_cont.sha1]) | ||||
assert len(actual_contents) == 1 | assert len(actual_contents) == 1 | ||||
assert tuple(actual_contents[missing_cont.sha1]) == () | assert tuple(actual_contents[missing_cont.sha1]) == () | ||||
def test_content_get_random(self, swh_storage, sample_data): | def test_content_get_random(self, swh_storage, sample_data): | ||||
cont, cont2, cont3 = sample_data["content"][:3] | cont, cont2, cont3 = sample_data.contents[:3] | ||||
swh_storage.content_add([cont, cont2, cont3]) | swh_storage.content_add([cont, cont2, cont3]) | ||||
assert swh_storage.content_get_random() in { | assert swh_storage.content_get_random() in { | ||||
cont.sha1_git, | cont.sha1_git, | ||||
cont2.sha1_git, | cont2.sha1_git, | ||||
cont3.sha1_git, | cont3.sha1_git, | ||||
} | } | ||||
def test_directory_add(self, swh_storage, sample_data): | def test_directory_add(self, swh_storage, sample_data): | ||||
directory = sample_data["directory"][1] | directory = sample_data.directories[1] | ||||
init_missing = list(swh_storage.directory_missing([directory.id])) | init_missing = list(swh_storage.directory_missing([directory.id])) | ||||
assert [directory.id] == init_missing | assert [directory.id] == init_missing | ||||
actual_result = swh_storage.directory_add([directory]) | actual_result = swh_storage.directory_add([directory]) | ||||
assert actual_result == {"directory:add": 1} | assert actual_result == {"directory:add": 1} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("directory", directory) | ("directory", directory) | ||||
] | ] | ||||
actual_data = list(swh_storage.directory_ls(directory.id)) | actual_data = list(swh_storage.directory_ls(directory.id)) | ||||
expected_data = list(transform_entries(directory)) | expected_data = list(transform_entries(directory)) | ||||
assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | ||||
after_missing = list(swh_storage.directory_missing([directory.id])) | after_missing = list(swh_storage.directory_missing([directory.id])) | ||||
assert after_missing == [] | assert after_missing == [] | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()["directory"] == 1 | assert swh_storage.stat_counters()["directory"] == 1 | ||||
def test_directory_add_from_generator(self, swh_storage, sample_data): | def test_directory_add_from_generator(self, swh_storage, sample_data): | ||||
directory = sample_data["directory"][1] | directory = sample_data.directories[1] | ||||
def _dir_gen(): | def _dir_gen(): | ||||
yield directory | yield directory | ||||
actual_result = swh_storage.directory_add(directories=_dir_gen()) | actual_result = swh_storage.directory_add(directories=_dir_gen()) | ||||
assert actual_result == {"directory:add": 1} | assert actual_result == {"directory:add": 1} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("directory", directory) | ("directory", directory) | ||||
] | ] | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()["directory"] == 1 | assert swh_storage.stat_counters()["directory"] == 1 | ||||
def test_directory_add_twice(self, swh_storage, sample_data): | def test_directory_add_twice(self, swh_storage, sample_data): | ||||
directory = sample_data["directory"][1] | directory = sample_data.directories[1] | ||||
actual_result = swh_storage.directory_add([directory]) | actual_result = swh_storage.directory_add([directory]) | ||||
assert actual_result == {"directory:add": 1} | assert actual_result == {"directory:add": 1} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("directory", directory) | ("directory", directory) | ||||
] | ] | ||||
actual_result = swh_storage.directory_add([directory]) | actual_result = swh_storage.directory_add([directory]) | ||||
assert actual_result == {"directory:add": 0} | assert actual_result == {"directory:add": 0} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("directory", directory) | ("directory", directory) | ||||
] | ] | ||||
def test_directory_get_recursive(self, swh_storage, sample_data): | def test_directory_get_recursive(self, swh_storage, sample_data): | ||||
dir1, dir2, dir3 = sample_data["directory"][:3] | dir1, dir2, dir3 = sample_data.directories[:3] | ||||
init_missing = list(swh_storage.directory_missing([dir1.id])) | init_missing = list(swh_storage.directory_missing([dir1.id])) | ||||
assert init_missing == [dir1.id] | assert init_missing == [dir1.id] | ||||
actual_result = swh_storage.directory_add([dir1, dir2, dir3]) | actual_result = swh_storage.directory_add([dir1, dir2, dir3]) | ||||
assert actual_result == {"directory:add": 3} | assert actual_result == {"directory:add": 3} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
Show All 18 Lines | def test_directory_get_recursive(self, swh_storage, sample_data): | ||||
expected_data = list( | expected_data = list( | ||||
itertools.chain( | itertools.chain( | ||||
transform_entries(dir3), transform_entries(dir2, prefix=b"subdir/"), | transform_entries(dir3), transform_entries(dir2, prefix=b"subdir/"), | ||||
) | ) | ||||
) | ) | ||||
assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | ||||
def test_directory_get_non_recursive(self, swh_storage, sample_data): | def test_directory_get_non_recursive(self, swh_storage, sample_data): | ||||
dir1, dir2, dir3 = sample_data["directory"][:3] | dir1, dir2, dir3 = sample_data.directories[:3] | ||||
init_missing = list(swh_storage.directory_missing([dir1.id])) | init_missing = list(swh_storage.directory_missing([dir1.id])) | ||||
assert init_missing == [dir1.id] | assert init_missing == [dir1.id] | ||||
actual_result = swh_storage.directory_add([dir1, dir2, dir3]) | actual_result = swh_storage.directory_add([dir1, dir2, dir3]) | ||||
assert actual_result == {"directory:add": 3} | assert actual_result == {"directory:add": 3} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
Show All 14 Lines | def test_directory_get_non_recursive(self, swh_storage, sample_data): | ||||
# List directory containing a known subdirectory, entries should | # List directory containing a known subdirectory, entries should | ||||
# only be those of the parent directory, not of the subdir | # only be those of the parent directory, not of the subdir | ||||
actual_data = list(swh_storage.directory_ls(dir3.id)) | actual_data = list(swh_storage.directory_ls(dir3.id)) | ||||
expected_data = list(transform_entries(dir3)) | expected_data = list(transform_entries(dir3)) | ||||
assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | ||||
def test_directory_entry_get_by_path(self, swh_storage, sample_data): | def test_directory_entry_get_by_path(self, swh_storage, sample_data): | ||||
cont = sample_data["content"][0] | cont = sample_data.content | ||||
dir1, dir2, dir3, dir4, dir5 = sample_data["directory"][:5] | dir1, dir2, dir3, dir4, dir5 = sample_data.directories[:5] | ||||
# given | # given | ||||
init_missing = list(swh_storage.directory_missing([dir3.id])) | init_missing = list(swh_storage.directory_missing([dir3.id])) | ||||
assert init_missing == [dir3.id] | assert init_missing == [dir3.id] | ||||
actual_result = swh_storage.directory_add([dir3, dir4]) | actual_result = swh_storage.directory_add([dir3, dir4]) | ||||
assert actual_result == {"directory:add": 2} | assert actual_result == {"directory:add": 2} | ||||
▲ Show 20 Lines • Show All 55 Lines • ▼ Show 20 Lines | def test_directory_entry_get_by_path(self, swh_storage, sample_data): | ||||
# when (nothing should be found here since `dir` is not persisted.) | # when (nothing should be found here since `dir` is not persisted.) | ||||
for entry in dir2.entries: | for entry in dir2.entries: | ||||
actual_entry = swh_storage.directory_entry_get_by_path( | actual_entry = swh_storage.directory_entry_get_by_path( | ||||
dir2.id, [entry.name] | dir2.id, [entry.name] | ||||
) | ) | ||||
assert actual_entry is None | assert actual_entry is None | ||||
def test_directory_get_random(self, swh_storage, sample_data): | def test_directory_get_random(self, swh_storage, sample_data): | ||||
dir1, dir2, dir3 = sample_data["directory"][:3] | dir1, dir2, dir3 = sample_data.directories[:3] | ||||
swh_storage.directory_add([dir1, dir2, dir3]) | swh_storage.directory_add([dir1, dir2, dir3]) | ||||
assert swh_storage.directory_get_random() in { | assert swh_storage.directory_get_random() in { | ||||
dir1.id, | dir1.id, | ||||
dir2.id, | dir2.id, | ||||
dir3.id, | dir3.id, | ||||
} | } | ||||
def test_revision_add(self, swh_storage, sample_data): | def test_revision_add(self, swh_storage, sample_data): | ||||
revision = sample_data["revision"][0] | revision = sample_data.revision | ||||
init_missing = swh_storage.revision_missing([revision.id]) | init_missing = swh_storage.revision_missing([revision.id]) | ||||
assert list(init_missing) == [revision.id] | assert list(init_missing) == [revision.id] | ||||
actual_result = swh_storage.revision_add([revision]) | actual_result = swh_storage.revision_add([revision]) | ||||
assert actual_result == {"revision:add": 1} | assert actual_result == {"revision:add": 1} | ||||
end_missing = swh_storage.revision_missing([revision.id]) | end_missing = swh_storage.revision_missing([revision.id]) | ||||
assert list(end_missing) == [] | assert list(end_missing) == [] | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("revision", revision) | ("revision", revision) | ||||
] | ] | ||||
# already there so nothing added | # already there so nothing added | ||||
actual_result = swh_storage.revision_add([revision]) | actual_result = swh_storage.revision_add([revision]) | ||||
assert actual_result == {"revision:add": 0} | assert actual_result == {"revision:add": 0} | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()["revision"] == 1 | assert swh_storage.stat_counters()["revision"] == 1 | ||||
def test_revision_add_from_generator(self, swh_storage, sample_data): | def test_revision_add_from_generator(self, swh_storage, sample_data): | ||||
revision = sample_data["revision"][0] | revision = sample_data.revision | ||||
def _rev_gen(): | def _rev_gen(): | ||||
yield revision | yield revision | ||||
actual_result = swh_storage.revision_add(_rev_gen()) | actual_result = swh_storage.revision_add(_rev_gen()) | ||||
assert actual_result == {"revision:add": 1} | assert actual_result == {"revision:add": 1} | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()["revision"] == 1 | assert swh_storage.stat_counters()["revision"] == 1 | ||||
def test_revision_add_twice(self, swh_storage, sample_data): | def test_revision_add_twice(self, swh_storage, sample_data): | ||||
revision, revision2 = sample_data["revision"][:2] | revision, revision2 = sample_data.revisions[:2] | ||||
actual_result = swh_storage.revision_add([revision]) | actual_result = swh_storage.revision_add([revision]) | ||||
assert actual_result == {"revision:add": 1} | assert actual_result == {"revision:add": 1} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("revision", revision) | ("revision", revision) | ||||
] | ] | ||||
actual_result = swh_storage.revision_add([revision, revision2]) | actual_result = swh_storage.revision_add([revision, revision2]) | ||||
assert actual_result == {"revision:add": 1} | assert actual_result == {"revision:add": 1} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("revision", revision), | ("revision", revision), | ||||
("revision", revision2), | ("revision", revision2), | ||||
] | ] | ||||
def test_revision_add_name_clash(self, swh_storage, sample_data): | def test_revision_add_name_clash(self, swh_storage, sample_data): | ||||
revision, revision2 = sample_data["revision"][:2] | revision, revision2 = sample_data.revisions[:2] | ||||
revision1 = attr.evolve( | revision1 = attr.evolve( | ||||
revision, | revision, | ||||
author=Person( | author=Person( | ||||
fullname=b"John Doe <john.doe@example.com>", | fullname=b"John Doe <john.doe@example.com>", | ||||
name=b"John Doe", | name=b"John Doe", | ||||
email=b"john.doe@example.com", | email=b"john.doe@example.com", | ||||
), | ), | ||||
) | ) | ||||
revision2 = attr.evolve( | revision2 = attr.evolve( | ||||
revision2, | revision2, | ||||
author=Person( | author=Person( | ||||
fullname=b"John Doe <john.doe@example.com>", | fullname=b"John Doe <john.doe@example.com>", | ||||
name=b"John Doe ", | name=b"John Doe ", | ||||
email=b"john.doe@example.com ", | email=b"john.doe@example.com ", | ||||
), | ), | ||||
) | ) | ||||
actual_result = swh_storage.revision_add([revision1, revision2]) | actual_result = swh_storage.revision_add([revision1, revision2]) | ||||
assert actual_result == {"revision:add": 2} | assert actual_result == {"revision:add": 2} | ||||
def test_revision_get_order(self, swh_storage, sample_data): | def test_revision_get_order(self, swh_storage, sample_data): | ||||
revision, revision2 = sample_data["revision"][:2] | revision, revision2 = sample_data.revisions[:2] | ||||
add_result = swh_storage.revision_add([revision, revision2]) | add_result = swh_storage.revision_add([revision, revision2]) | ||||
assert add_result == {"revision:add": 2} | assert add_result == {"revision:add": 2} | ||||
# order 1 | # order 1 | ||||
res1 = swh_storage.revision_get([revision.id, revision2.id]) | res1 = swh_storage.revision_get([revision.id, revision2.id]) | ||||
assert [Revision.from_dict(r) for r in res1] == [revision, revision2] | assert [Revision.from_dict(r) for r in res1] == [revision, revision2] | ||||
# order 2 | # order 2 | ||||
res2 = swh_storage.revision_get([revision2.id, revision.id]) | res2 = swh_storage.revision_get([revision2.id, revision.id]) | ||||
assert [Revision.from_dict(r) for r in res2] == [revision2, revision] | assert [Revision.from_dict(r) for r in res2] == [revision2, revision] | ||||
def test_revision_log(self, swh_storage, sample_data): | def test_revision_log(self, swh_storage, sample_data): | ||||
revision1, revision2, revision3, revision4 = sample_data["revision"][:4] | revision1, revision2, revision3, revision4 = sample_data.revisions[:4] | ||||
# rev4 -is-child-of-> rev3 -> rev1, (rev2 -> rev1) | # rev4 -is-child-of-> rev3 -> rev1, (rev2 -> rev1) | ||||
swh_storage.revision_add([revision1, revision2, revision3, revision4]) | swh_storage.revision_add([revision1, revision2, revision3, revision4]) | ||||
# when | # when | ||||
results = list(swh_storage.revision_log([revision4.id])) | results = list(swh_storage.revision_log([revision4.id])) | ||||
# for comparison purposes | # for comparison purposes | ||||
actual_results = [Revision.from_dict(r) for r in results] | actual_results = [Revision.from_dict(r) for r in results] | ||||
assert len(actual_results) == 4 # rev4 -child-> rev3 -> rev1, (rev2 -> rev1) | assert len(actual_results) == 4 # rev4 -child-> rev3 -> rev1, (rev2 -> rev1) | ||||
assert actual_results == [revision4, revision3, revision1, revision2] | assert actual_results == [revision4, revision3, revision1, revision2] | ||||
def test_revision_log_with_limit(self, swh_storage, sample_data): | def test_revision_log_with_limit(self, swh_storage, sample_data): | ||||
revision1, revision2, revision3, revision4 = sample_data["revision"][:4] | revision1, revision2, revision3, revision4 = sample_data.revisions[:4] | ||||
# revision4 -is-child-of-> revision3 | # revision4 -is-child-of-> revision3 | ||||
swh_storage.revision_add([revision3, revision4]) | swh_storage.revision_add([revision3, revision4]) | ||||
results = list(swh_storage.revision_log([revision4.id], 1)) | results = list(swh_storage.revision_log([revision4.id], 1)) | ||||
actual_results = [Revision.from_dict(r) for r in results] | actual_results = [Revision.from_dict(r) for r in results] | ||||
assert len(actual_results) == 1 | assert len(actual_results) == 1 | ||||
assert actual_results[0] == revision4 | assert actual_results[0] == revision4 | ||||
def test_revision_log_unknown_revision(self, swh_storage, sample_data): | def test_revision_log_unknown_revision(self, swh_storage, sample_data): | ||||
revision = sample_data["revision"][0] | revision = sample_data.revision | ||||
rev_log = list(swh_storage.revision_log([revision.id])) | rev_log = list(swh_storage.revision_log([revision.id])) | ||||
assert rev_log == [] | assert rev_log == [] | ||||
def test_revision_shortlog(self, swh_storage, sample_data): | def test_revision_shortlog(self, swh_storage, sample_data): | ||||
revision1, revision2, revision3, revision4 = sample_data["revision"][:4] | revision1, revision2, revision3, revision4 = sample_data.revisions[:4] | ||||
# rev4 -is-child-of-> rev3 -> (rev1, rev2); rev2 -> rev1 | # rev4 -is-child-of-> rev3 -> (rev1, rev2); rev2 -> rev1 | ||||
swh_storage.revision_add([revision1, revision2, revision3, revision4]) | swh_storage.revision_add([revision1, revision2, revision3, revision4]) | ||||
results = list(swh_storage.revision_shortlog([revision4.id])) | results = list(swh_storage.revision_shortlog([revision4.id])) | ||||
actual_results = [[id, tuple(parents)] for (id, parents) in results] | actual_results = [[id, tuple(parents)] for (id, parents) in results] | ||||
assert len(actual_results) == 4 | assert len(actual_results) == 4 | ||||
assert actual_results == [ | assert actual_results == [ | ||||
[revision4.id, revision4.parents], | [revision4.id, revision4.parents], | ||||
[revision3.id, revision3.parents], | [revision3.id, revision3.parents], | ||||
[revision1.id, revision1.parents], | [revision1.id, revision1.parents], | ||||
[revision2.id, revision2.parents], | [revision2.id, revision2.parents], | ||||
] | ] | ||||
def test_revision_shortlog_with_limit(self, swh_storage, sample_data): | def test_revision_shortlog_with_limit(self, swh_storage, sample_data): | ||||
revision1, revision2, revision3, revision4 = sample_data["revision"][:4] | revision1, revision2, revision3, revision4 = sample_data.revisions[:4] | ||||
# revision4 -is-child-of-> revision3 | # revision4 -is-child-of-> revision3 | ||||
swh_storage.revision_add([revision1, revision2, revision3, revision4]) | swh_storage.revision_add([revision1, revision2, revision3, revision4]) | ||||
results = list(swh_storage.revision_shortlog([revision4.id], 1)) | results = list(swh_storage.revision_shortlog([revision4.id], 1)) | ||||
actual_results = [[id, tuple(parents)] for (id, parents) in results] | actual_results = [[id, tuple(parents)] for (id, parents) in results] | ||||
assert len(actual_results) == 1 | assert len(actual_results) == 1 | ||||
assert list(actual_results[0]) == [revision4.id, revision4.parents] | assert list(actual_results[0]) == [revision4.id, revision4.parents] | ||||
def test_revision_get(self, swh_storage, sample_data): | def test_revision_get(self, swh_storage, sample_data): | ||||
revision, revision2 = sample_data["revision"][:2] | revision, revision2 = sample_data.revisions[:2] | ||||
swh_storage.revision_add([revision]) | swh_storage.revision_add([revision]) | ||||
actual_revisions = list(swh_storage.revision_get([revision.id, revision2.id])) | actual_revisions = list(swh_storage.revision_get([revision.id, revision2.id])) | ||||
assert len(actual_revisions) == 2 | assert len(actual_revisions) == 2 | ||||
assert Revision.from_dict(actual_revisions[0]) == revision | assert Revision.from_dict(actual_revisions[0]) == revision | ||||
assert actual_revisions[1] is None | assert actual_revisions[1] is None | ||||
def test_revision_get_no_parents(self, swh_storage, sample_data): | def test_revision_get_no_parents(self, swh_storage, sample_data): | ||||
revision = sample_data["revision"][0] | revision = sample_data.revision | ||||
swh_storage.revision_add([revision]) | swh_storage.revision_add([revision]) | ||||
get = list(swh_storage.revision_get([revision.id])) | get = list(swh_storage.revision_get([revision.id])) | ||||
assert len(get) == 1 | assert len(get) == 1 | ||||
assert revision.parents == () | assert revision.parents == () | ||||
assert tuple(get[0]["parents"]) == () # no parents on this one | assert tuple(get[0]["parents"]) == () # no parents on this one | ||||
def test_revision_get_random(self, swh_storage, sample_data): | def test_revision_get_random(self, swh_storage, sample_data): | ||||
revision1, revision2, revision3 = sample_data["revision"][:3] | revision1, revision2, revision3 = sample_data.revisions[:3] | ||||
swh_storage.revision_add([revision1, revision2, revision3]) | swh_storage.revision_add([revision1, revision2, revision3]) | ||||
assert swh_storage.revision_get_random() in { | assert swh_storage.revision_get_random() in { | ||||
revision1.id, | revision1.id, | ||||
revision2.id, | revision2.id, | ||||
revision3.id, | revision3.id, | ||||
} | } | ||||
def test_release_add(self, swh_storage, sample_data): | def test_release_add(self, swh_storage, sample_data): | ||||
release, release2 = sample_data["release"][:2] | release, release2 = sample_data.releases[:2] | ||||
init_missing = swh_storage.release_missing([release.id, release2.id]) | init_missing = swh_storage.release_missing([release.id, release2.id]) | ||||
assert list(init_missing) == [release.id, release2.id] | assert list(init_missing) == [release.id, release2.id] | ||||
actual_result = swh_storage.release_add([release, release2]) | actual_result = swh_storage.release_add([release, release2]) | ||||
assert actual_result == {"release:add": 2} | assert actual_result == {"release:add": 2} | ||||
end_missing = swh_storage.release_missing([release.id, release2.id]) | end_missing = swh_storage.release_missing([release.id, release2.id]) | ||||
assert list(end_missing) == [] | assert list(end_missing) == [] | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("release", release), | ("release", release), | ||||
("release", release2), | ("release", release2), | ||||
] | ] | ||||
# already present so nothing added | # already present so nothing added | ||||
actual_result = swh_storage.release_add([release, release2]) | actual_result = swh_storage.release_add([release, release2]) | ||||
assert actual_result == {"release:add": 0} | assert actual_result == {"release:add": 0} | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()["release"] == 2 | assert swh_storage.stat_counters()["release"] == 2 | ||||
def test_release_add_from_generator(self, swh_storage, sample_data): | def test_release_add_from_generator(self, swh_storage, sample_data): | ||||
release, release2 = sample_data["release"][:2] | release, release2 = sample_data.releases[:2] | ||||
def _rel_gen(): | def _rel_gen(): | ||||
yield release | yield release | ||||
yield release2 | yield release2 | ||||
actual_result = swh_storage.release_add(_rel_gen()) | actual_result = swh_storage.release_add(_rel_gen()) | ||||
assert actual_result == {"release:add": 2} | assert actual_result == {"release:add": 2} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("release", release), | ("release", release), | ||||
("release", release2), | ("release", release2), | ||||
] | ] | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()["release"] == 2 | assert swh_storage.stat_counters()["release"] == 2 | ||||
def test_release_add_no_author_date(self, swh_storage, sample_data): | def test_release_add_no_author_date(self, swh_storage, sample_data): | ||||
full_release = sample_data["release"][0] | full_release = sample_data.release | ||||
release = attr.evolve(full_release, author=None, date=None) | release = attr.evolve(full_release, author=None, date=None) | ||||
actual_result = swh_storage.release_add([release]) | actual_result = swh_storage.release_add([release]) | ||||
assert actual_result == {"release:add": 1} | assert actual_result == {"release:add": 1} | ||||
end_missing = swh_storage.release_missing([release.id]) | end_missing = swh_storage.release_missing([release.id]) | ||||
assert list(end_missing) == [] | assert list(end_missing) == [] | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("release", release) | ("release", release) | ||||
] | ] | ||||
def test_release_add_twice(self, swh_storage, sample_data): | def test_release_add_twice(self, swh_storage, sample_data): | ||||
release, release2 = sample_data["release"][:2] | release, release2 = sample_data.releases[:2] | ||||
actual_result = swh_storage.release_add([release]) | actual_result = swh_storage.release_add([release]) | ||||
assert actual_result == {"release:add": 1} | assert actual_result == {"release:add": 1} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("release", release) | ("release", release) | ||||
] | ] | ||||
Show All 9 Lines | def test_release_add_name_clash(self, swh_storage, sample_data): | ||||
attr.evolve( | attr.evolve( | ||||
c, | c, | ||||
author=Person( | author=Person( | ||||
fullname=b"John Doe <john.doe@example.com>", | fullname=b"John Doe <john.doe@example.com>", | ||||
name=b"John Doe", | name=b"John Doe", | ||||
email=b"john.doe@example.com", | email=b"john.doe@example.com", | ||||
), | ), | ||||
) | ) | ||||
for c in sample_data["release"][:2] | for c in sample_data.releases[:2] | ||||
] | ] | ||||
actual_result = swh_storage.release_add([release, release2]) | actual_result = swh_storage.release_add([release, release2]) | ||||
assert actual_result == {"release:add": 2} | assert actual_result == {"release:add": 2} | ||||
def test_release_get(self, swh_storage, sample_data): | def test_release_get(self, swh_storage, sample_data): | ||||
release, release2, release3 = sample_data["release"][:3] | release, release2, release3 = sample_data.releases[:3] | ||||
# given | # given | ||||
swh_storage.release_add([release, release2]) | swh_storage.release_add([release, release2]) | ||||
# when | # when | ||||
releases = list(swh_storage.release_get([release.id, release2.id])) | releases = list(swh_storage.release_get([release.id, release2.id])) | ||||
actual_releases = [Release.from_dict(r) for r in releases] | actual_releases = [Release.from_dict(r) for r in releases] | ||||
# then | # then | ||||
assert actual_releases == [release, release2] | assert actual_releases == [release, release2] | ||||
unknown_releases = list(swh_storage.release_get([release3.id])) | unknown_releases = list(swh_storage.release_get([release3.id])) | ||||
assert unknown_releases[0] is None | assert unknown_releases[0] is None | ||||
def test_release_get_order(self, swh_storage, sample_data): | def test_release_get_order(self, swh_storage, sample_data): | ||||
release, release2 = sample_data["release"][:2] | release, release2 = sample_data.releases[:2] | ||||
add_result = swh_storage.release_add([release, release2]) | add_result = swh_storage.release_add([release, release2]) | ||||
assert add_result == {"release:add": 2} | assert add_result == {"release:add": 2} | ||||
# order 1 | # order 1 | ||||
res1 = swh_storage.release_get([release.id, release2.id]) | res1 = swh_storage.release_get([release.id, release2.id]) | ||||
assert list(res1) == [release.to_dict(), release2.to_dict()] | assert list(res1) == [release.to_dict(), release2.to_dict()] | ||||
# order 2 | # order 2 | ||||
res2 = swh_storage.release_get([release2.id, release.id]) | res2 = swh_storage.release_get([release2.id, release.id]) | ||||
assert list(res2) == [release2.to_dict(), release.to_dict()] | assert list(res2) == [release2.to_dict(), release.to_dict()] | ||||
def test_release_get_random(self, swh_storage, sample_data): | def test_release_get_random(self, swh_storage, sample_data): | ||||
release, release2, release3 = sample_data["release"][:3] | release, release2, release3 = sample_data.releases[:3] | ||||
swh_storage.release_add([release, release2, release3]) | swh_storage.release_add([release, release2, release3]) | ||||
assert swh_storage.release_get_random() in { | assert swh_storage.release_get_random() in { | ||||
release.id, | release.id, | ||||
release2.id, | release2.id, | ||||
release3.id, | release3.id, | ||||
} | } | ||||
def test_origin_add(self, swh_storage, sample_data): | def test_origin_add(self, swh_storage, sample_data): | ||||
origin, origin2 = sample_data["origin"][:2] | origin, origin2 = sample_data.origins[:2] | ||||
origin_dict, origin2_dict = [o.to_dict() for o in [origin, origin2]] | origin_dict, origin2_dict = [o.to_dict() for o in [origin, origin2]] | ||||
assert swh_storage.origin_get([origin_dict])[0] is None | assert swh_storage.origin_get([origin_dict])[0] is None | ||||
stats = swh_storage.origin_add([origin, origin2]) | stats = swh_storage.origin_add([origin, origin2]) | ||||
assert stats == {"origin:add": 2} | assert stats == {"origin:add": 2} | ||||
actual_origin = swh_storage.origin_get([origin_dict])[0] | actual_origin = swh_storage.origin_get([origin_dict])[0] | ||||
assert actual_origin["url"] == origin.url | assert actual_origin["url"] == origin.url | ||||
actual_origin2 = swh_storage.origin_get([origin2_dict])[0] | actual_origin2 = swh_storage.origin_get([origin2_dict])[0] | ||||
assert actual_origin2["url"] == origin2.url | assert actual_origin2["url"] == origin2.url | ||||
assert set(swh_storage.journal_writer.journal.objects) == set( | assert set(swh_storage.journal_writer.journal.objects) == set( | ||||
[("origin", origin), ("origin", origin2),] | [("origin", origin), ("origin", origin2),] | ||||
) | ) | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()["origin"] == 2 | assert swh_storage.stat_counters()["origin"] == 2 | ||||
def test_origin_add_from_generator(self, swh_storage, sample_data): | def test_origin_add_from_generator(self, swh_storage, sample_data): | ||||
origin, origin2 = sample_data["origin"][:2] | origin, origin2 = sample_data.origins[:2] | ||||
origin_dict, origin2_dict = [o.to_dict() for o in [origin, origin2]] | origin_dict, origin2_dict = [o.to_dict() for o in [origin, origin2]] | ||||
def _ori_gen(): | def _ori_gen(): | ||||
yield origin | yield origin | ||||
yield origin2 | yield origin2 | ||||
stats = swh_storage.origin_add(_ori_gen()) | stats = swh_storage.origin_add(_ori_gen()) | ||||
assert stats == {"origin:add": 2} | assert stats == {"origin:add": 2} | ||||
actual_origin = swh_storage.origin_get([origin_dict])[0] | actual_origin = swh_storage.origin_get([origin_dict])[0] | ||||
assert actual_origin["url"] == origin.url | assert actual_origin["url"] == origin.url | ||||
actual_origin2 = swh_storage.origin_get([origin2_dict])[0] | actual_origin2 = swh_storage.origin_get([origin2_dict])[0] | ||||
assert actual_origin2["url"] == origin2.url | assert actual_origin2["url"] == origin2.url | ||||
assert set(swh_storage.journal_writer.journal.objects) == set( | assert set(swh_storage.journal_writer.journal.objects) == set( | ||||
[("origin", origin), ("origin", origin2),] | [("origin", origin), ("origin", origin2),] | ||||
) | ) | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()["origin"] == 2 | assert swh_storage.stat_counters()["origin"] == 2 | ||||
def test_origin_add_twice(self, swh_storage, sample_data): | def test_origin_add_twice(self, swh_storage, sample_data): | ||||
origin, origin2 = sample_data["origin"][:2] | origin, origin2 = sample_data.origins[:2] | ||||
origin_dict, origin2_dict = [o.to_dict() for o in [origin, origin2]] | origin_dict, origin2_dict = [o.to_dict() for o in [origin, origin2]] | ||||
add1 = swh_storage.origin_add([origin, origin2]) | add1 = swh_storage.origin_add([origin, origin2]) | ||||
assert set(swh_storage.journal_writer.journal.objects) == set( | assert set(swh_storage.journal_writer.journal.objects) == set( | ||||
[("origin", origin), ("origin", origin2),] | [("origin", origin), ("origin", origin2),] | ||||
) | ) | ||||
assert add1 == {"origin:add": 2} | assert add1 == {"origin:add": 2} | ||||
add2 = swh_storage.origin_add([origin, origin2]) | add2 = swh_storage.origin_add([origin, origin2]) | ||||
assert set(swh_storage.journal_writer.journal.objects) == set( | assert set(swh_storage.journal_writer.journal.objects) == set( | ||||
[("origin", origin), ("origin", origin2),] | [("origin", origin), ("origin", origin2),] | ||||
) | ) | ||||
assert add2 == {"origin:add": 0} | assert add2 == {"origin:add": 0} | ||||
def test_origin_get_legacy(self, swh_storage, sample_data): | def test_origin_get_legacy(self, swh_storage, sample_data): | ||||
origin, origin2 = sample_data["origin"][:2] | origin, origin2 = sample_data.origins[:2] | ||||
origin_dict, origin2_dict = [o.to_dict() for o in [origin, origin2]] | origin_dict, origin2_dict = [o.to_dict() for o in [origin, origin2]] | ||||
assert swh_storage.origin_get(origin_dict) is None | assert swh_storage.origin_get(origin_dict) is None | ||||
swh_storage.origin_add([origin]) | swh_storage.origin_add([origin]) | ||||
actual_origin0 = swh_storage.origin_get(origin_dict) | actual_origin0 = swh_storage.origin_get(origin_dict) | ||||
assert actual_origin0["url"] == origin.url | assert actual_origin0["url"] == origin.url | ||||
def test_origin_get(self, swh_storage, sample_data): | def test_origin_get(self, swh_storage, sample_data): | ||||
origin, origin2 = sample_data["origin"][:2] | origin, origin2 = sample_data.origins[:2] | ||||
origin_dict, origin2_dict = [o.to_dict() for o in [origin, origin2]] | origin_dict, origin2_dict = [o.to_dict() for o in [origin, origin2]] | ||||
assert swh_storage.origin_get(origin_dict) is None | assert swh_storage.origin_get(origin_dict) is None | ||||
assert swh_storage.origin_get([origin_dict]) == [None] | assert swh_storage.origin_get([origin_dict]) == [None] | ||||
swh_storage.origin_add([origin]) | swh_storage.origin_add([origin]) | ||||
actual_origins = swh_storage.origin_get([origin_dict]) | actual_origins = swh_storage.origin_get([origin_dict]) | ||||
assert len(actual_origins) == 1 | assert len(actual_origins) == 1 | ||||
Show All 20 Lines | def _generate_random_visits(self, nb_visits=100, start=0, end=7): | ||||
weeks = random.randint(start, end) | weeks = random.randint(start, end) | ||||
date_visit = today - timedelta( | date_visit = today - timedelta( | ||||
weeks=weeks, hours=hours, minutes=minutes, seconds=seconds, days=days | weeks=weeks, hours=hours, minutes=minutes, seconds=seconds, days=days | ||||
) | ) | ||||
visits.append(date_visit) | visits.append(date_visit) | ||||
return visits | return visits | ||||
def test_origin_visit_get_all(self, swh_storage, sample_data): | def test_origin_visit_get_all(self, swh_storage, sample_data): | ||||
origin = sample_data["origin"][0] | origin = sample_data.origin | ||||
swh_storage.origin_add([origin]) | swh_storage.origin_add([origin]) | ||||
visits = swh_storage.origin_visit_add( | visits = swh_storage.origin_visit_add( | ||||
[ | [ | ||||
OriginVisit( | OriginVisit( | ||||
origin=origin.url, date=data.date_visit1, type=data.type_visit1, | origin=origin.url, | ||||
date=sample_data.date_visit1, | |||||
type=sample_data.type_visit1, | |||||
), | ), | ||||
OriginVisit( | OriginVisit( | ||||
origin=origin.url, date=data.date_visit2, type=data.type_visit2, | origin=origin.url, | ||||
date=sample_data.date_visit2, | |||||
type=sample_data.type_visit2, | |||||
), | ), | ||||
OriginVisit( | OriginVisit( | ||||
origin=origin.url, date=data.date_visit2, type=data.type_visit2, | origin=origin.url, | ||||
date=sample_data.date_visit2, | |||||
type=sample_data.type_visit2, | |||||
), | ), | ||||
] | ] | ||||
) | ) | ||||
ov1, ov2, ov3 = [ | ov1, ov2, ov3 = [ | ||||
{**v.to_dict(), "status": "created", "snapshot": None, "metadata": None,} | {**v.to_dict(), "status": "created", "snapshot": None, "metadata": None,} | ||||
for v in visits | for v in visits | ||||
] | ] | ||||
▲ Show 20 Lines • Show All 42 Lines • ▼ Show 20 Lines | def test_origin_visit_get_all(self, swh_storage, sample_data): | ||||
) | ) | ||||
) | ) | ||||
assert all_visits8 == [ov2] | assert all_visits8 == [ov2] | ||||
def test_origin_visit_get__unknown_origin(self, swh_storage): | def test_origin_visit_get__unknown_origin(self, swh_storage): | ||||
assert [] == list(swh_storage.origin_visit_get("foo")) | assert [] == list(swh_storage.origin_visit_get("foo")) | ||||
def test_origin_visit_get_random(self, swh_storage, sample_data): | def test_origin_visit_get_random(self, swh_storage, sample_data): | ||||
origins = sample_data["origin"][:2] | origins = sample_data.origins[:2] | ||||
swh_storage.origin_add(origins) | swh_storage.origin_add(origins) | ||||
# Add some random visits within the selection range | # Add some random visits within the selection range | ||||
visits = self._generate_random_visits() | visits = self._generate_random_visits() | ||||
visit_type = "git" | visit_type = "git" | ||||
# Add visits to those origins | # Add visits to those origins | ||||
for origin in origins: | for origin in origins: | ||||
Show All 20 Lines | def test_origin_visit_get_random(self, swh_storage, sample_data): | ||||
assert stats["origin_visit"] == len(origins) * len(visits) | assert stats["origin_visit"] == len(origins) * len(visits) | ||||
random_origin_visit = swh_storage.origin_visit_get_random(visit_type) | random_origin_visit = swh_storage.origin_visit_get_random(visit_type) | ||||
assert random_origin_visit | assert random_origin_visit | ||||
assert random_origin_visit["origin"] is not None | assert random_origin_visit["origin"] is not None | ||||
assert random_origin_visit["origin"] in [o.url for o in origins] | assert random_origin_visit["origin"] in [o.url for o in origins] | ||||
def test_origin_visit_get_random_nothing_found(self, swh_storage, sample_data): | def test_origin_visit_get_random_nothing_found(self, swh_storage, sample_data): | ||||
origins = sample_data["origin"] | origins = sample_data.origins | ||||
swh_storage.origin_add(origins) | swh_storage.origin_add(origins) | ||||
visit_type = "hg" | visit_type = "hg" | ||||
# Add some visits outside of the random generation selection so nothing | # Add some visits outside of the random generation selection so nothing | ||||
# will be found by the random selection | # will be found by the random selection | ||||
visits = self._generate_random_visits(nb_visits=3, start=13, end=24) | visits = self._generate_random_visits(nb_visits=3, start=13, end=24) | ||||
for origin in origins: | for origin in origins: | ||||
for date_visit in visits: | for date_visit in visits: | ||||
visit = swh_storage.origin_visit_add( | visit = swh_storage.origin_visit_add( | ||||
Show All 10 Lines | def test_origin_visit_get_random_nothing_found(self, swh_storage, sample_data): | ||||
) | ) | ||||
] | ] | ||||
) | ) | ||||
random_origin_visit = swh_storage.origin_visit_get_random(visit_type) | random_origin_visit = swh_storage.origin_visit_get_random(visit_type) | ||||
assert random_origin_visit is None | assert random_origin_visit is None | ||||
def test_origin_get_by_sha1(self, swh_storage, sample_data): | def test_origin_get_by_sha1(self, swh_storage, sample_data): | ||||
origin = sample_data["origin"][0] | origin = sample_data.origin | ||||
assert swh_storage.origin_get(origin.to_dict()) is None | assert swh_storage.origin_get(origin.to_dict()) is None | ||||
swh_storage.origin_add([origin]) | swh_storage.origin_add([origin]) | ||||
origins = list(swh_storage.origin_get_by_sha1([sha1(origin.url)])) | origins = list(swh_storage.origin_get_by_sha1([sha1(origin.url)])) | ||||
assert len(origins) == 1 | assert len(origins) == 1 | ||||
assert origins[0]["url"] == origin.url | assert origins[0]["url"] == origin.url | ||||
def test_origin_get_by_sha1_not_found(self, swh_storage, sample_data): | def test_origin_get_by_sha1_not_found(self, swh_storage, sample_data): | ||||
origin = sample_data["origin"][0] | origin = sample_data.origin | ||||
assert swh_storage.origin_get(origin.to_dict()) is None | assert swh_storage.origin_get(origin.to_dict()) is None | ||||
origins = list(swh_storage.origin_get_by_sha1([sha1(origin.url)])) | origins = list(swh_storage.origin_get_by_sha1([sha1(origin.url)])) | ||||
assert len(origins) == 1 | assert len(origins) == 1 | ||||
assert origins[0] is None | assert origins[0] is None | ||||
def test_origin_search_single_result(self, swh_storage, sample_data): | def test_origin_search_single_result(self, swh_storage, sample_data): | ||||
origin, origin2 = sample_data["origin"][:2] | origin, origin2 = sample_data.origins[:2] | ||||
found_origins = list(swh_storage.origin_search(origin.url)) | found_origins = list(swh_storage.origin_search(origin.url)) | ||||
assert len(found_origins) == 0 | assert len(found_origins) == 0 | ||||
found_origins = list(swh_storage.origin_search(origin.url, regexp=True)) | found_origins = list(swh_storage.origin_search(origin.url, regexp=True)) | ||||
assert len(found_origins) == 0 | assert len(found_origins) == 0 | ||||
swh_storage.origin_add([origin]) | swh_storage.origin_add([origin]) | ||||
Show All 17 Lines | def test_origin_search_single_result(self, swh_storage, sample_data): | ||||
found_origins = list( | found_origins = list( | ||||
swh_storage.origin_search(f".{origin2.url[1:-1]}.", regexp=True) | swh_storage.origin_search(f".{origin2.url[1:-1]}.", regexp=True) | ||||
) | ) | ||||
assert len(found_origins) == 1 | assert len(found_origins) == 1 | ||||
assert found_origins[0] == origin2_data | assert found_origins[0] == origin2_data | ||||
def test_origin_search_no_regexp(self, swh_storage, sample_data): | def test_origin_search_no_regexp(self, swh_storage, sample_data): | ||||
origin, origin2 = sample_data["origin"][:2] | origin, origin2 = sample_data.origins[:2] | ||||
origin_dicts = [o.to_dict() for o in [origin, origin2]] | origin_dicts = [o.to_dict() for o in [origin, origin2]] | ||||
swh_storage.origin_add([origin, origin2]) | swh_storage.origin_add([origin, origin2]) | ||||
# no pagination | # no pagination | ||||
found_origins = list(swh_storage.origin_search("/")) | found_origins = list(swh_storage.origin_search("/")) | ||||
assert len(found_origins) == 2 | assert len(found_origins) == 2 | ||||
# offset=0 | # offset=0 | ||||
found_origins0 = list(swh_storage.origin_search("/", offset=0, limit=1)) | found_origins0 = list(swh_storage.origin_search("/", offset=0, limit=1)) | ||||
assert len(found_origins0) == 1 | assert len(found_origins0) == 1 | ||||
assert found_origins0[0] in origin_dicts | assert found_origins0[0] in origin_dicts | ||||
# offset=1 | # offset=1 | ||||
found_origins1 = list(swh_storage.origin_search("/", offset=1, limit=1)) | found_origins1 = list(swh_storage.origin_search("/", offset=1, limit=1)) | ||||
assert len(found_origins1) == 1 | assert len(found_origins1) == 1 | ||||
assert found_origins1[0] in origin_dicts | assert found_origins1[0] in origin_dicts | ||||
# check both origins were returned | # check both origins were returned | ||||
assert found_origins0 != found_origins1 | assert found_origins0 != found_origins1 | ||||
def test_origin_search_regexp_substring(self, swh_storage, sample_data): | def test_origin_search_regexp_substring(self, swh_storage, sample_data): | ||||
origin, origin2 = sample_data["origin"][:2] | origin, origin2 = sample_data.origins[:2] | ||||
origin_dicts = [o.to_dict() for o in [origin, origin2]] | origin_dicts = [o.to_dict() for o in [origin, origin2]] | ||||
swh_storage.origin_add([origin, origin2]) | swh_storage.origin_add([origin, origin2]) | ||||
# no pagination | # no pagination | ||||
found_origins = list(swh_storage.origin_search("/", regexp=True)) | found_origins = list(swh_storage.origin_search("/", regexp=True)) | ||||
assert len(found_origins) == 2 | assert len(found_origins) == 2 | ||||
Show All 10 Lines | def test_origin_search_regexp_substring(self, swh_storage, sample_data): | ||||
) | ) | ||||
assert len(found_origins1) == 1 | assert len(found_origins1) == 1 | ||||
assert found_origins1[0] in origin_dicts | assert found_origins1[0] in origin_dicts | ||||
# check both origins were returned | # check both origins were returned | ||||
assert found_origins0 != found_origins1 | assert found_origins0 != found_origins1 | ||||
def test_origin_search_regexp_fullstring(self, swh_storage, sample_data): | def test_origin_search_regexp_fullstring(self, swh_storage, sample_data): | ||||
origin, origin2 = sample_data["origin"][:2] | origin, origin2 = sample_data.origins[:2] | ||||
origin_dicts = [o.to_dict() for o in [origin, origin2]] | origin_dicts = [o.to_dict() for o in [origin, origin2]] | ||||
swh_storage.origin_add([origin, origin2]) | swh_storage.origin_add([origin, origin2]) | ||||
# no pagination | # no pagination | ||||
found_origins = list(swh_storage.origin_search(".*/.*", regexp=True)) | found_origins = list(swh_storage.origin_search(".*/.*", regexp=True)) | ||||
assert len(found_origins) == 2 | assert len(found_origins) == 2 | ||||
Show All 10 Lines | def test_origin_search_regexp_fullstring(self, swh_storage, sample_data): | ||||
) | ) | ||||
assert len(found_origins1) == 1 | assert len(found_origins1) == 1 | ||||
assert found_origins1[0] in origin_dicts | assert found_origins1[0] in origin_dicts | ||||
# check both origins were returned | # check both origins were returned | ||||
assert found_origins0 != found_origins1 | assert found_origins0 != found_origins1 | ||||
def test_origin_visit_add(self, swh_storage, sample_data): | def test_origin_visit_add(self, swh_storage, sample_data): | ||||
origin1 = sample_data["origin"][1] | origin1 = sample_data.origins[1] | ||||
swh_storage.origin_add([origin1]) | swh_storage.origin_add([origin1]) | ||||
date_visit = now() | date_visit = now() | ||||
date_visit2 = date_visit + datetime.timedelta(minutes=1) | date_visit2 = date_visit + datetime.timedelta(minutes=1) | ||||
date_visit = round_to_milliseconds(date_visit) | date_visit = round_to_milliseconds(date_visit) | ||||
date_visit2 = round_to_milliseconds(date_visit2) | date_visit2 = round_to_milliseconds(date_visit2) | ||||
visit1 = OriginVisit( | visit1 = OriginVisit( | ||||
origin=origin1.url, date=date_visit, type=data.type_visit1, | origin=origin1.url, date=date_visit, type=sample_data.type_visit1, | ||||
) | ) | ||||
visit2 = OriginVisit( | visit2 = OriginVisit( | ||||
origin=origin1.url, date=date_visit2, type=data.type_visit2, | origin=origin1.url, date=date_visit2, type=sample_data.type_visit2, | ||||
) | ) | ||||
# add once | # add once | ||||
ov1, ov2 = swh_storage.origin_visit_add([visit1, visit2]) | ov1, ov2 = swh_storage.origin_visit_add([visit1, visit2]) | ||||
# then again (will be ignored as they already exist) | # then again (will be ignored as they already exist) | ||||
origin_visit1, origin_visit2 = swh_storage.origin_visit_add([ov1, ov2]) | origin_visit1, origin_visit2 = swh_storage.origin_visit_add([ov1, ov2]) | ||||
assert ov1 == origin_visit1 | assert ov1 == origin_visit1 | ||||
assert ov2 == origin_visit2 | assert ov2 == origin_visit2 | ||||
Show All 29 Lines | def test_origin_visit_add(self, swh_storage, sample_data): | ||||
[("origin", origin1)] | [("origin", origin1)] | ||||
+ [("origin_visit", visit) for visit in [ov1, ov2]] * 2 | + [("origin_visit", visit) for visit in [ov1, ov2]] * 2 | ||||
+ [("origin_visit_status", ovs) for ovs in [ovs1, ovs2]] | + [("origin_visit_status", ovs) for ovs in [ovs1, ovs2]] | ||||
) | ) | ||||
for obj in expected_objects: | for obj in expected_objects: | ||||
assert obj in actual_objects | assert obj in actual_objects | ||||
def test_origin_visit_add_validation(self, swh_storage): | def test_origin_visit_add_validation(self, swh_storage, sample_data): | ||||
"""Unknown origin when adding visits should raise""" | """Unknown origin when adding visits should raise""" | ||||
visit = OriginVisit( | visit = attr.evolve(sample_data.origin_visit, origin="something-unknonw") | ||||
origin="something-unknown", date=now(), type=data.type_visit1, | |||||
) | |||||
with pytest.raises(StorageArgumentException, match="Unknown origin"): | with pytest.raises(StorageArgumentException, match="Unknown origin"): | ||||
swh_storage.origin_visit_add([visit]) | swh_storage.origin_visit_add([visit]) | ||||
objects = list(swh_storage.journal_writer.journal.objects) | objects = list(swh_storage.journal_writer.journal.objects) | ||||
assert not objects | assert not objects | ||||
def test_origin_visit_status_add_validation(self, swh_storage): | def test_origin_visit_status_add_validation(self, swh_storage): | ||||
"""Wrong origin_visit_status input should raise storage argument error""" | """Wrong origin_visit_status input should raise storage argument error""" | ||||
Show All 10 Lines | def test_origin_visit_status_add_validation(self, swh_storage): | ||||
objects = list(swh_storage.journal_writer.journal.objects) | objects = list(swh_storage.journal_writer.journal.objects) | ||||
assert not objects | assert not objects | ||||
def test_origin_visit_status_add(self, swh_storage, sample_data): | def test_origin_visit_status_add(self, swh_storage, sample_data): | ||||
"""Correct origin visit statuses should add a new visit status | """Correct origin visit statuses should add a new visit status | ||||
""" | """ | ||||
snapshot = sample_data["snapshot"][0] | snapshot = sample_data.snapshot | ||||
origin1 = sample_data["origin"][1] | origin1 = sample_data.origins[1] | ||||
origin2 = Origin(url="new-origin") | origin2 = Origin(url="new-origin") | ||||
swh_storage.origin_add([origin1, origin2]) | swh_storage.origin_add([origin1, origin2]) | ||||
ov1, ov2 = swh_storage.origin_visit_add( | ov1, ov2 = swh_storage.origin_visit_add( | ||||
[ | [ | ||||
OriginVisit( | OriginVisit( | ||||
origin=origin1.url, date=data.date_visit1, type=data.type_visit1, | origin=origin1.url, | ||||
date=sample_data.date_visit1, | |||||
type=sample_data.type_visit1, | |||||
), | ), | ||||
OriginVisit( | OriginVisit( | ||||
origin=origin2.url, date=data.date_visit2, type=data.type_visit2, | origin=origin2.url, | ||||
date=sample_data.date_visit2, | |||||
type=sample_data.type_visit2, | |||||
), | ), | ||||
] | ] | ||||
) | ) | ||||
ovs1 = OriginVisitStatus( | ovs1 = OriginVisitStatus( | ||||
origin=origin1.url, | origin=origin1.url, | ||||
visit=ov1.visit, | visit=ov1.visit, | ||||
date=data.date_visit1, | date=sample_data.date_visit1, | ||||
status="created", | status="created", | ||||
snapshot=None, | snapshot=None, | ||||
) | ) | ||||
ovs2 = OriginVisitStatus( | ovs2 = OriginVisitStatus( | ||||
origin=origin2.url, | origin=origin2.url, | ||||
visit=ov2.visit, | visit=ov2.visit, | ||||
date=data.date_visit2, | date=sample_data.date_visit2, | ||||
status="created", | status="created", | ||||
snapshot=None, | snapshot=None, | ||||
) | ) | ||||
date_visit_now = now() | date_visit_now = now() | ||||
visit_status1 = OriginVisitStatus( | visit_status1 = OriginVisitStatus( | ||||
origin=ov1.origin, | origin=ov1.origin, | ||||
visit=ov1.visit, | visit=ov1.visit, | ||||
▲ Show 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | def test_origin_visit_status_add(self, swh_storage, sample_data): | ||||
for obj in expected_objects: | for obj in expected_objects: | ||||
assert obj in actual_objects | assert obj in actual_objects | ||||
def test_origin_visit_status_add_twice(self, swh_storage, sample_data): | def test_origin_visit_status_add_twice(self, swh_storage, sample_data): | ||||
"""Correct origin visit statuses should add a new visit status | """Correct origin visit statuses should add a new visit status | ||||
""" | """ | ||||
snapshot = sample_data["snapshot"][0] | snapshot = sample_data.snapshot | ||||
origin1 = sample_data["origin"][1] | origin1 = sample_data.origins[1] | ||||
swh_storage.origin_add([origin1]) | swh_storage.origin_add([origin1]) | ||||
ov1 = swh_storage.origin_visit_add( | ov1 = swh_storage.origin_visit_add( | ||||
[ | [ | ||||
OriginVisit( | OriginVisit( | ||||
origin=origin1.url, date=data.date_visit1, type=data.type_visit1, | origin=origin1.url, | ||||
date=sample_data.date_visit1, | |||||
type=sample_data.type_visit1, | |||||
), | ), | ||||
] | ] | ||||
)[0] | )[0] | ||||
ovs1 = OriginVisitStatus( | ovs1 = OriginVisitStatus( | ||||
origin=origin1.url, | origin=origin1.url, | ||||
visit=ov1.visit, | visit=ov1.visit, | ||||
date=data.date_visit1, | date=sample_data.date_visit1, | ||||
status="created", | status="created", | ||||
snapshot=None, | snapshot=None, | ||||
) | ) | ||||
date_visit_now = now() | date_visit_now = now() | ||||
visit_status1 = OriginVisitStatus( | visit_status1 = OriginVisitStatus( | ||||
origin=ov1.origin, | origin=ov1.origin, | ||||
visit=ov1.visit, | visit=ov1.visit, | ||||
date=date_visit_now, | date=date_visit_now, | ||||
Show All 25 Lines | def test_origin_visit_status_add_twice(self, swh_storage, sample_data): | ||||
+ [("origin_visit", v) for v in expected_visits] | + [("origin_visit", v) for v in expected_visits] | ||||
+ [("origin_visit_status", ovs) for ovs in expected_visit_statuses] | + [("origin_visit_status", ovs) for ovs in expected_visit_statuses] | ||||
) | ) | ||||
for obj in expected_objects: | for obj in expected_objects: | ||||
assert obj in actual_objects | assert obj in actual_objects | ||||
def test_origin_visit_find_by_date(self, swh_storage, sample_data): | def test_origin_visit_find_by_date(self, swh_storage, sample_data): | ||||
origin = sample_data["origin"][0] | origin = sample_data.origin | ||||
swh_storage.origin_add([origin]) | swh_storage.origin_add([origin]) | ||||
visit1 = OriginVisit( | visit1 = OriginVisit( | ||||
origin=origin.url, date=data.date_visit2, type=data.type_visit1, | origin=origin.url, | ||||
date=sample_data.date_visit2, | |||||
type=sample_data.type_visit1, | |||||
) | ) | ||||
visit2 = OriginVisit( | visit2 = OriginVisit( | ||||
origin=origin.url, date=data.date_visit3, type=data.type_visit2, | origin=origin.url, | ||||
date=sample_data.date_visit3, | |||||
type=sample_data.type_visit2, | |||||
) | ) | ||||
visit3 = OriginVisit( | visit3 = OriginVisit( | ||||
origin=origin.url, date=data.date_visit2, type=data.type_visit3, | origin=origin.url, | ||||
date=sample_data.date_visit2, | |||||
type=sample_data.type_visit3, | |||||
) | ) | ||||
ov1, ov2, ov3 = swh_storage.origin_visit_add([visit1, visit2, visit3]) | ov1, ov2, ov3 = swh_storage.origin_visit_add([visit1, visit2, visit3]) | ||||
ovs1 = OriginVisitStatus( | ovs1 = OriginVisitStatus( | ||||
origin=origin.url, | origin=origin.url, | ||||
visit=ov1.visit, | visit=ov1.visit, | ||||
date=data.date_visit2, | date=sample_data.date_visit2, | ||||
status="ongoing", | status="ongoing", | ||||
snapshot=None, | snapshot=None, | ||||
) | ) | ||||
ovs2 = OriginVisitStatus( | ovs2 = OriginVisitStatus( | ||||
origin=origin.url, | origin=origin.url, | ||||
visit=ov2.visit, | visit=ov2.visit, | ||||
date=data.date_visit3, | date=sample_data.date_visit3, | ||||
status="ongoing", | status="ongoing", | ||||
snapshot=None, | snapshot=None, | ||||
) | ) | ||||
ovs3 = OriginVisitStatus( | ovs3 = OriginVisitStatus( | ||||
origin=origin.url, | origin=origin.url, | ||||
visit=ov3.visit, | visit=ov3.visit, | ||||
date=data.date_visit2, | date=sample_data.date_visit2, | ||||
status="ongoing", | status="ongoing", | ||||
snapshot=None, | snapshot=None, | ||||
) | ) | ||||
swh_storage.origin_visit_status_add([ovs1, ovs2, ovs3]) | swh_storage.origin_visit_status_add([ovs1, ovs2, ovs3]) | ||||
# Simple case | # Simple case | ||||
visit = swh_storage.origin_visit_find_by_date(origin.url, data.date_visit3) | visit = swh_storage.origin_visit_find_by_date( | ||||
origin.url, sample_data.date_visit3 | |||||
) | |||||
assert visit["visit"] == ov2.visit | assert visit["visit"] == ov2.visit | ||||
# There are two visits at the same date, the latest must be returned | # There are two visits at the same date, the latest must be returned | ||||
visit = swh_storage.origin_visit_find_by_date(origin.url, data.date_visit2) | visit = swh_storage.origin_visit_find_by_date( | ||||
origin.url, sample_data.date_visit2 | |||||
) | |||||
assert visit["visit"] == ov3.visit | assert visit["visit"] == ov3.visit | ||||
def test_origin_visit_find_by_date__unknown_origin(self, swh_storage): | def test_origin_visit_find_by_date__unknown_origin(self, swh_storage, sample_data): | ||||
swh_storage.origin_visit_find_by_date("foo", data.date_visit2) | swh_storage.origin_visit_find_by_date("foo", sample_data.date_visit2) | ||||
def test_origin_visit_get_by(self, swh_storage, sample_data): | def test_origin_visit_get_by(self, swh_storage, sample_data): | ||||
snapshot = sample_data["snapshot"][0] | snapshot = sample_data.snapshot | ||||
origins = sample_data["origin"][:2] | origins = sample_data.origins[:2] | ||||
swh_storage.origin_add(origins) | swh_storage.origin_add(origins) | ||||
origin_url, origin_url2 = [o.url for o in origins] | origin_url, origin_url2 = [o.url for o in origins] | ||||
visit = OriginVisit( | visit = OriginVisit( | ||||
origin=origin_url, date=data.date_visit2, type=data.type_visit2, | origin=origin_url, | ||||
date=sample_data.date_visit2, | |||||
type=sample_data.type_visit2, | |||||
) | ) | ||||
origin_visit1 = swh_storage.origin_visit_add([visit])[0] | origin_visit1 = swh_storage.origin_visit_add([visit])[0] | ||||
swh_storage.snapshot_add([snapshot]) | swh_storage.snapshot_add([snapshot]) | ||||
swh_storage.origin_visit_status_add( | swh_storage.origin_visit_status_add( | ||||
[ | [ | ||||
OriginVisitStatus( | OriginVisitStatus( | ||||
origin=origin_url, | origin=origin_url, | ||||
visit=origin_visit1.visit, | visit=origin_visit1.visit, | ||||
date=now(), | date=now(), | ||||
status="ongoing", | status="ongoing", | ||||
snapshot=snapshot.id, | snapshot=snapshot.id, | ||||
) | ) | ||||
] | ] | ||||
) | ) | ||||
# Add some other {origin, visit} entries | # Add some other {origin, visit} entries | ||||
visit2 = OriginVisit( | visit2 = OriginVisit( | ||||
origin=origin_url, date=data.date_visit3, type=data.type_visit3, | origin=origin_url, | ||||
date=sample_data.date_visit3, | |||||
type=sample_data.type_visit3, | |||||
) | ) | ||||
visit3 = OriginVisit( | visit3 = OriginVisit( | ||||
origin=origin_url2, date=data.date_visit3, type=data.type_visit3, | origin=origin_url2, | ||||
date=sample_data.date_visit3, | |||||
type=sample_data.type_visit3, | |||||
) | ) | ||||
swh_storage.origin_visit_add([visit2, visit3]) | swh_storage.origin_visit_add([visit2, visit3]) | ||||
# when | # when | ||||
visit1_metadata = { | visit1_metadata = { | ||||
"contents": 42, | "contents": 42, | ||||
"directories": 22, | "directories": 22, | ||||
} | } | ||||
Show All 11 Lines | def test_origin_visit_get_by(self, swh_storage, sample_data): | ||||
] | ] | ||||
) | ) | ||||
expected_origin_visit = origin_visit1.to_dict() | expected_origin_visit = origin_visit1.to_dict() | ||||
expected_origin_visit.update( | expected_origin_visit.update( | ||||
{ | { | ||||
"origin": origin_url, | "origin": origin_url, | ||||
"visit": origin_visit1.visit, | "visit": origin_visit1.visit, | ||||
"date": data.date_visit2, | "date": sample_data.date_visit2, | ||||
"type": data.type_visit2, | "type": sample_data.type_visit2, | ||||
"metadata": visit1_metadata, | "metadata": visit1_metadata, | ||||
"status": "full", | "status": "full", | ||||
"snapshot": snapshot.id, | "snapshot": snapshot.id, | ||||
} | } | ||||
) | ) | ||||
# when | # when | ||||
actual_origin_visit1 = swh_storage.origin_visit_get_by( | actual_origin_visit1 = swh_storage.origin_visit_get_by( | ||||
origin_url, origin_visit1.visit | origin_url, origin_visit1.visit | ||||
) | ) | ||||
# then | # then | ||||
assert actual_origin_visit1 == expected_origin_visit | assert actual_origin_visit1 == expected_origin_visit | ||||
def test_origin_visit_get_by__unknown_origin(self, swh_storage): | def test_origin_visit_get_by__unknown_origin(self, swh_storage): | ||||
assert swh_storage.origin_visit_get_by("foo", 10) is None | assert swh_storage.origin_visit_get_by("foo", 10) is None | ||||
def test_origin_visit_get_by_no_result(self, swh_storage, sample_data): | def test_origin_visit_get_by_no_result(self, swh_storage, sample_data): | ||||
origin = sample_data["origin"][0] | origin = sample_data.origin | ||||
swh_storage.origin_add([origin]) | swh_storage.origin_add([origin]) | ||||
actual_origin_visit = swh_storage.origin_visit_get_by(origin.url, 999) | actual_origin_visit = swh_storage.origin_visit_get_by(origin.url, 999) | ||||
assert actual_origin_visit is None | assert actual_origin_visit is None | ||||
def test_origin_visit_get_latest_none(self, swh_storage, sample_data): | def test_origin_visit_get_latest_none(self, swh_storage, sample_data): | ||||
"""Origin visit get latest on unknown objects should return nothing | """Origin visit get latest on unknown objects should return nothing | ||||
""" | """ | ||||
# unknown origin so no result | # unknown origin so no result | ||||
assert swh_storage.origin_visit_get_latest("unknown-origin") is None | assert swh_storage.origin_visit_get_latest("unknown-origin") is None | ||||
# unknown type | # unknown type | ||||
origin = sample_data["origin"][0] | origin = sample_data.origin | ||||
swh_storage.origin_add([origin]) | swh_storage.origin_add([origin]) | ||||
assert swh_storage.origin_visit_get_latest(origin.url, type="unknown") is None | assert swh_storage.origin_visit_get_latest(origin.url, type="unknown") is None | ||||
def test_origin_visit_get_latest_filter_type(self, swh_storage, sample_data): | def test_origin_visit_get_latest_filter_type(self, swh_storage, sample_data): | ||||
"""Filtering origin visit get latest with filter type should be ok | """Filtering origin visit get latest with filter type should be ok | ||||
""" | """ | ||||
origin = sample_data["origin"][0] | origin = sample_data.origin | ||||
swh_storage.origin_add([origin]) | swh_storage.origin_add([origin]) | ||||
visit1 = OriginVisit( | visit1 = OriginVisit( | ||||
origin=origin.url, date=data.date_visit1, type=data.type_visit1, | origin=origin.url, | ||||
date=sample_data.date_visit1, | |||||
type=sample_data.type_visit1, | |||||
) | ) | ||||
visit2 = OriginVisit( | visit2 = OriginVisit( | ||||
origin=origin.url, date=data.date_visit2, type=data.type_visit2, | origin=origin.url, | ||||
date=sample_data.date_visit2, | |||||
type=sample_data.type_visit2, | |||||
) | ) | ||||
# Add a visit with the same date as the previous one | # Add a visit with the same date as the previous one | ||||
visit3 = OriginVisit( | visit3 = OriginVisit( | ||||
origin=origin.url, date=data.date_visit2, type=data.type_visit2, | origin=origin.url, | ||||
date=sample_data.date_visit2, | |||||
type=sample_data.type_visit2, | |||||
) | ) | ||||
assert data.type_visit1 != data.type_visit2 | assert sample_data.type_visit1 != sample_data.type_visit2 | ||||
assert data.date_visit1 < data.date_visit2 | assert sample_data.date_visit1 < sample_data.date_visit2 | ||||
ov1, ov2, ov3 = swh_storage.origin_visit_add([visit1, visit2, visit3]) | ov1, ov2, ov3 = swh_storage.origin_visit_add([visit1, visit2, visit3]) | ||||
origin_visit1 = swh_storage.origin_visit_get_by(origin.url, ov1.visit) | origin_visit1 = swh_storage.origin_visit_get_by(origin.url, ov1.visit) | ||||
origin_visit3 = swh_storage.origin_visit_get_by(origin.url, ov3.visit) | origin_visit3 = swh_storage.origin_visit_get_by(origin.url, ov3.visit) | ||||
assert data.type_visit1 != data.type_visit2 | assert sample_data.type_visit1 != sample_data.type_visit2 | ||||
# Check type filter is ok | # Check type filter is ok | ||||
actual_ov1 = swh_storage.origin_visit_get_latest( | actual_ov1 = swh_storage.origin_visit_get_latest( | ||||
origin.url, type=data.type_visit1, | origin.url, type=sample_data.type_visit1, | ||||
) | ) | ||||
assert actual_ov1 == origin_visit1 | assert actual_ov1 == origin_visit1 | ||||
actual_ov3 = swh_storage.origin_visit_get_latest( | actual_ov3 = swh_storage.origin_visit_get_latest( | ||||
origin.url, type=data.type_visit2, | origin.url, type=sample_data.type_visit2, | ||||
) | ) | ||||
assert actual_ov3 == origin_visit3 | assert actual_ov3 == origin_visit3 | ||||
new_type = "npm" | new_type = "npm" | ||||
assert new_type not in [data.type_visit1, data.type_visit2] | assert new_type not in [sample_data.type_visit1, sample_data.type_visit2] | ||||
assert ( | assert ( | ||||
swh_storage.origin_visit_get_latest( | swh_storage.origin_visit_get_latest( | ||||
origin.url, type=new_type, # no visit matching that type | origin.url, type=new_type, # no visit matching that type | ||||
) | ) | ||||
is None | is None | ||||
) | ) | ||||
def test_origin_visit_get_latest(self, swh_storage, sample_data): | def test_origin_visit_get_latest(self, swh_storage, sample_data): | ||||
empty_snapshot, complete_snapshot = sample_data["snapshot"][1:3] | empty_snapshot, complete_snapshot = sample_data.snapshots[1:3] | ||||
origin = sample_data["origin"][0] | origin = sample_data.origin | ||||
swh_storage.origin_add([origin]) | swh_storage.origin_add([origin]) | ||||
visit1 = OriginVisit( | visit1 = OriginVisit( | ||||
origin=origin.url, date=data.date_visit1, type=data.type_visit1, | origin=origin.url, | ||||
date=sample_data.date_visit1, | |||||
type=sample_data.type_visit1, | |||||
) | ) | ||||
visit2 = OriginVisit( | visit2 = OriginVisit( | ||||
origin=origin.url, date=data.date_visit2, type=data.type_visit2, | origin=origin.url, | ||||
date=sample_data.date_visit2, | |||||
type=sample_data.type_visit2, | |||||
) | ) | ||||
# Add a visit with the same date as the previous one | # Add a visit with the same date as the previous one | ||||
visit3 = OriginVisit( | visit3 = OriginVisit( | ||||
origin=origin.url, date=data.date_visit2, type=data.type_visit2, | origin=origin.url, | ||||
date=sample_data.date_visit2, | |||||
type=sample_data.type_visit2, | |||||
) | ) | ||||
ov1, ov2, ov3 = swh_storage.origin_visit_add([visit1, visit2, visit3]) | ov1, ov2, ov3 = swh_storage.origin_visit_add([visit1, visit2, visit3]) | ||||
origin_visit1 = swh_storage.origin_visit_get_by(origin.url, ov1.visit) | origin_visit1 = swh_storage.origin_visit_get_by(origin.url, ov1.visit) | ||||
origin_visit2 = swh_storage.origin_visit_get_by(origin.url, ov2.visit) | origin_visit2 = swh_storage.origin_visit_get_by(origin.url, ov2.visit) | ||||
origin_visit3 = swh_storage.origin_visit_get_by(origin.url, ov3.visit) | origin_visit3 = swh_storage.origin_visit_get_by(origin.url, ov3.visit) | ||||
# Two visits, both with no snapshot | # Two visits, both with no snapshot | ||||
▲ Show 20 Lines • Show All 118 Lines • ▼ Show 20 Lines | def test_origin_visit_get_latest(self, swh_storage, sample_data): | ||||
assert { | assert { | ||||
**origin_visit3, | **origin_visit3, | ||||
"snapshot": complete_snapshot.id, | "snapshot": complete_snapshot.id, | ||||
"status": "ongoing", | "status": "ongoing", | ||||
} == swh_storage.origin_visit_get_latest(origin.url, require_snapshot=True) | } == swh_storage.origin_visit_get_latest(origin.url, require_snapshot=True) | ||||
def test_origin_visit_status_get_latest(self, swh_storage, sample_data): | def test_origin_visit_status_get_latest(self, swh_storage, sample_data): | ||||
snapshot = sample_data["snapshot"][2] | snapshot = sample_data.snapshots[2] | ||||
origin1 = sample_data["origin"][0] | origin1 = sample_data.origin | ||||
swh_storage.origin_add([origin1]) | swh_storage.origin_add([origin1]) | ||||
# to have some reference visits | # to have some reference visits | ||||
ov1, ov2 = swh_storage.origin_visit_add( | ov1, ov2 = swh_storage.origin_visit_add( | ||||
[ | [ | ||||
OriginVisit( | OriginVisit( | ||||
origin=origin1.url, date=data.date_visit1, type=data.type_visit1, | origin=origin1.url, | ||||
date=sample_data.date_visit1, | |||||
type=sample_data.type_visit1, | |||||
), | ), | ||||
OriginVisit( | OriginVisit( | ||||
origin=origin1.url, date=data.date_visit2, type=data.type_visit2, | origin=origin1.url, | ||||
date=sample_data.date_visit2, | |||||
type=sample_data.type_visit2, | |||||
), | ), | ||||
] | ] | ||||
) | ) | ||||
swh_storage.snapshot_add([snapshot]) | swh_storage.snapshot_add([snapshot]) | ||||
date_now = now() | date_now = now() | ||||
date_now = round_to_milliseconds(date_now) | date_now = round_to_milliseconds(date_now) | ||||
assert data.date_visit1 < data.date_visit2 | assert sample_data.date_visit1 < sample_data.date_visit2 | ||||
assert data.date_visit2 < date_now | assert sample_data.date_visit2 < date_now | ||||
ovs1 = OriginVisitStatus( | ovs1 = OriginVisitStatus( | ||||
origin=origin1.url, | origin=origin1.url, | ||||
visit=ov1.visit, | visit=ov1.visit, | ||||
date=data.date_visit1, | date=sample_data.date_visit1, | ||||
status="partial", | status="partial", | ||||
snapshot=None, | snapshot=None, | ||||
) | ) | ||||
ovs2 = OriginVisitStatus( | ovs2 = OriginVisitStatus( | ||||
origin=origin1.url, | origin=origin1.url, | ||||
visit=ov1.visit, | visit=ov1.visit, | ||||
date=data.date_visit2, | date=sample_data.date_visit2, | ||||
status="ongoing", | status="ongoing", | ||||
snapshot=None, | snapshot=None, | ||||
) | ) | ||||
ovs3 = OriginVisitStatus( | ovs3 = OriginVisitStatus( | ||||
origin=origin1.url, | origin=origin1.url, | ||||
visit=ov2.visit, | visit=ov2.visit, | ||||
date=data.date_visit2 + datetime.timedelta(minutes=1), # to not be ignored | date=sample_data.date_visit2 | ||||
+ datetime.timedelta(minutes=1), # to not be ignored | |||||
status="ongoing", | status="ongoing", | ||||
snapshot=None, | snapshot=None, | ||||
) | ) | ||||
ovs4 = OriginVisitStatus( | ovs4 = OriginVisitStatus( | ||||
origin=origin1.url, | origin=origin1.url, | ||||
visit=ov2.visit, | visit=ov2.visit, | ||||
date=date_now, | date=date_now, | ||||
status="full", | status="full", | ||||
▲ Show 20 Lines • Show All 50 Lines • ▼ Show 20 Lines | def test_origin_visit_status_get_latest(self, swh_storage, sample_data): | ||||
assert actual_origin_visit is None # there is no visit with status full | assert actual_origin_visit is None # there is no visit with status full | ||||
actual_origin_visit3 = swh_storage.origin_visit_status_get_latest( | actual_origin_visit3 = swh_storage.origin_visit_status_get_latest( | ||||
origin1.url, ov2.visit, allowed_statuses=["ongoing"] | origin1.url, ov2.visit, allowed_statuses=["ongoing"] | ||||
) | ) | ||||
assert actual_origin_visit3 == ovs3 | assert actual_origin_visit3 == ovs3 | ||||
def test_person_fullname_unicity(self, swh_storage, sample_data): | def test_person_fullname_unicity(self, swh_storage, sample_data): | ||||
revision, rev2 = sample_data["revision"][0:2] | revision, rev2 = sample_data.revisions[0:2] | ||||
# create a revision with same committer fullname but wo name and email | # create a revision with same committer fullname but wo name and email | ||||
revision2 = attr.evolve( | revision2 = attr.evolve( | ||||
rev2, | rev2, | ||||
committer=Person( | committer=Person( | ||||
fullname=revision.committer.fullname, name=None, email=None | fullname=revision.committer.fullname, name=None, email=None | ||||
), | ), | ||||
) | ) | ||||
swh_storage.revision_add([revision, revision2]) | swh_storage.revision_add([revision, revision2]) | ||||
# when getting added revisions | # when getting added revisions | ||||
revisions = list(swh_storage.revision_get([revision.id, revision2.id])) | revisions = list(swh_storage.revision_get([revision.id, revision2.id])) | ||||
# then check committers are the same | # then check committers are the same | ||||
assert revisions[0]["committer"] == revisions[1]["committer"] | assert revisions[0]["committer"] == revisions[1]["committer"] | ||||
def test_snapshot_add_get_empty(self, swh_storage, sample_data): | def test_snapshot_add_get_empty(self, swh_storage, sample_data): | ||||
empty_snapshot = sample_data["snapshot"][1] | empty_snapshot = sample_data.snapshots[1] | ||||
empty_snapshot_dict = empty_snapshot.to_dict() | empty_snapshot_dict = empty_snapshot.to_dict() | ||||
origin = sample_data["origin"][0] | origin = sample_data.origin | ||||
swh_storage.origin_add([origin]) | swh_storage.origin_add([origin]) | ||||
ov1 = swh_storage.origin_visit_add( | ov1 = swh_storage.origin_visit_add( | ||||
[ | [ | ||||
OriginVisit( | OriginVisit( | ||||
origin=origin.url, date=data.date_visit1, type=data.type_visit1, | origin=origin.url, | ||||
date=sample_data.date_visit1, | |||||
type=sample_data.type_visit1, | |||||
) | ) | ||||
] | ] | ||||
)[0] | )[0] | ||||
actual_result = swh_storage.snapshot_add([empty_snapshot]) | actual_result = swh_storage.snapshot_add([empty_snapshot]) | ||||
assert actual_result == {"snapshot:add": 1} | assert actual_result == {"snapshot:add": 1} | ||||
date_now = now() | date_now = now() | ||||
Show All 14 Lines | def test_snapshot_add_get_empty(self, swh_storage, sample_data): | ||||
assert by_id == {**empty_snapshot_dict, "next_branch": None} | assert by_id == {**empty_snapshot_dict, "next_branch": None} | ||||
by_ov = swh_storage.snapshot_get_by_origin_visit(origin.url, ov1.visit) | by_ov = swh_storage.snapshot_get_by_origin_visit(origin.url, ov1.visit) | ||||
assert by_ov == {**empty_snapshot_dict, "next_branch": None} | assert by_ov == {**empty_snapshot_dict, "next_branch": None} | ||||
ovs1 = OriginVisitStatus.from_dict( | ovs1 = OriginVisitStatus.from_dict( | ||||
{ | { | ||||
"origin": origin.url, | "origin": origin.url, | ||||
"date": data.date_visit1, | "date": sample_data.date_visit1, | ||||
"visit": ov1.visit, | "visit": ov1.visit, | ||||
"status": "created", | "status": "created", | ||||
"snapshot": None, | "snapshot": None, | ||||
"metadata": None, | "metadata": None, | ||||
} | } | ||||
) | ) | ||||
ovs2 = OriginVisitStatus.from_dict( | ovs2 = OriginVisitStatus.from_dict( | ||||
{ | { | ||||
Show All 13 Lines | def test_snapshot_add_get_empty(self, swh_storage, sample_data): | ||||
("origin_visit_status", ovs1,), | ("origin_visit_status", ovs1,), | ||||
("snapshot", empty_snapshot), | ("snapshot", empty_snapshot), | ||||
("origin_visit_status", ovs2,), | ("origin_visit_status", ovs2,), | ||||
] | ] | ||||
for obj in expected_objects: | for obj in expected_objects: | ||||
assert obj in actual_objects | assert obj in actual_objects | ||||
def test_snapshot_add_get_complete(self, swh_storage, sample_data): | def test_snapshot_add_get_complete(self, swh_storage, sample_data): | ||||
complete_snapshot = sample_data["snapshot"][2] | complete_snapshot = sample_data.snapshots[2] | ||||
complete_snapshot_dict = complete_snapshot.to_dict() | complete_snapshot_dict = complete_snapshot.to_dict() | ||||
origin = sample_data["origin"][0] | origin = sample_data.origin | ||||
swh_storage.origin_add([origin]) | swh_storage.origin_add([origin]) | ||||
visit = OriginVisit( | visit = OriginVisit( | ||||
origin=origin.url, date=data.date_visit1, type=data.type_visit1, | origin=origin.url, | ||||
date=sample_data.date_visit1, | |||||
type=sample_data.type_visit1, | |||||
) | ) | ||||
origin_visit1 = swh_storage.origin_visit_add([visit])[0] | origin_visit1 = swh_storage.origin_visit_add([visit])[0] | ||||
visit_id = origin_visit1.visit | visit_id = origin_visit1.visit | ||||
actual_result = swh_storage.snapshot_add([complete_snapshot]) | actual_result = swh_storage.snapshot_add([complete_snapshot]) | ||||
swh_storage.origin_visit_status_add( | swh_storage.origin_visit_status_add( | ||||
[ | [ | ||||
OriginVisitStatus( | OriginVisitStatus( | ||||
Show All 9 Lines | def test_snapshot_add_get_complete(self, swh_storage, sample_data): | ||||
by_id = swh_storage.snapshot_get(complete_snapshot.id) | by_id = swh_storage.snapshot_get(complete_snapshot.id) | ||||
assert by_id == {**complete_snapshot_dict, "next_branch": None} | assert by_id == {**complete_snapshot_dict, "next_branch": None} | ||||
by_ov = swh_storage.snapshot_get_by_origin_visit(origin.url, visit_id) | by_ov = swh_storage.snapshot_get_by_origin_visit(origin.url, visit_id) | ||||
assert by_ov == {**complete_snapshot_dict, "next_branch": None} | assert by_ov == {**complete_snapshot_dict, "next_branch": None} | ||||
def test_snapshot_add_many(self, swh_storage, sample_data): | def test_snapshot_add_many(self, swh_storage, sample_data): | ||||
snapshot, _, complete_snapshot = sample_data["snapshot"][:3] | snapshot, _, complete_snapshot = sample_data.snapshots[:3] | ||||
actual_result = swh_storage.snapshot_add([snapshot, complete_snapshot]) | actual_result = swh_storage.snapshot_add([snapshot, complete_snapshot]) | ||||
assert actual_result == {"snapshot:add": 2} | assert actual_result == {"snapshot:add": 2} | ||||
assert swh_storage.snapshot_get(complete_snapshot.id) == { | assert swh_storage.snapshot_get(complete_snapshot.id) == { | ||||
**complete_snapshot.to_dict(), | **complete_snapshot.to_dict(), | ||||
"next_branch": None, | "next_branch": None, | ||||
} | } | ||||
assert swh_storage.snapshot_get(snapshot.id) == { | assert swh_storage.snapshot_get(snapshot.id) == { | ||||
**snapshot.to_dict(), | **snapshot.to_dict(), | ||||
"next_branch": None, | "next_branch": None, | ||||
} | } | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()["snapshot"] == 2 | assert swh_storage.stat_counters()["snapshot"] == 2 | ||||
def test_snapshot_add_many_from_generator(self, swh_storage, sample_data): | def test_snapshot_add_many_from_generator(self, swh_storage, sample_data): | ||||
snapshot, _, complete_snapshot = sample_data["snapshot"][:3] | snapshot, _, complete_snapshot = sample_data.snapshots[:3] | ||||
def _snp_gen(): | def _snp_gen(): | ||||
yield from [snapshot, complete_snapshot] | yield from [snapshot, complete_snapshot] | ||||
actual_result = swh_storage.snapshot_add(_snp_gen()) | actual_result = swh_storage.snapshot_add(_snp_gen()) | ||||
assert actual_result == {"snapshot:add": 2} | assert actual_result == {"snapshot:add": 2} | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()["snapshot"] == 2 | assert swh_storage.stat_counters()["snapshot"] == 2 | ||||
def test_snapshot_add_many_incremental(self, swh_storage, sample_data): | def test_snapshot_add_many_incremental(self, swh_storage, sample_data): | ||||
snapshot, _, complete_snapshot = sample_data["snapshot"][:3] | snapshot, _, complete_snapshot = sample_data.snapshots[:3] | ||||
actual_result = swh_storage.snapshot_add([complete_snapshot]) | actual_result = swh_storage.snapshot_add([complete_snapshot]) | ||||
assert actual_result == {"snapshot:add": 1} | assert actual_result == {"snapshot:add": 1} | ||||
actual_result2 = swh_storage.snapshot_add([snapshot, complete_snapshot]) | actual_result2 = swh_storage.snapshot_add([snapshot, complete_snapshot]) | ||||
assert actual_result2 == {"snapshot:add": 1} | assert actual_result2 == {"snapshot:add": 1} | ||||
assert swh_storage.snapshot_get(complete_snapshot.id) == { | assert swh_storage.snapshot_get(complete_snapshot.id) == { | ||||
**complete_snapshot.to_dict(), | **complete_snapshot.to_dict(), | ||||
"next_branch": None, | "next_branch": None, | ||||
} | } | ||||
assert swh_storage.snapshot_get(snapshot.id) == { | assert swh_storage.snapshot_get(snapshot.id) == { | ||||
**snapshot.to_dict(), | **snapshot.to_dict(), | ||||
"next_branch": None, | "next_branch": None, | ||||
} | } | ||||
def test_snapshot_add_twice(self, swh_storage, sample_data): | def test_snapshot_add_twice(self, swh_storage, sample_data): | ||||
snapshot, empty_snapshot = sample_data["snapshot"][:2] | snapshot, empty_snapshot = sample_data.snapshots[:2] | ||||
actual_result = swh_storage.snapshot_add([empty_snapshot]) | actual_result = swh_storage.snapshot_add([empty_snapshot]) | ||||
assert actual_result == {"snapshot:add": 1} | assert actual_result == {"snapshot:add": 1} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("snapshot", empty_snapshot) | ("snapshot", empty_snapshot) | ||||
] | ] | ||||
actual_result = swh_storage.snapshot_add([snapshot]) | actual_result = swh_storage.snapshot_add([snapshot]) | ||||
assert actual_result == {"snapshot:add": 1} | assert actual_result == {"snapshot:add": 1} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("snapshot", empty_snapshot), | ("snapshot", empty_snapshot), | ||||
("snapshot", snapshot), | ("snapshot", snapshot), | ||||
] | ] | ||||
def test_snapshot_add_count_branches(self, swh_storage, sample_data): | def test_snapshot_add_count_branches(self, swh_storage, sample_data): | ||||
complete_snapshot = sample_data["snapshot"][2] | complete_snapshot = sample_data.snapshots[2] | ||||
actual_result = swh_storage.snapshot_add([complete_snapshot]) | actual_result = swh_storage.snapshot_add([complete_snapshot]) | ||||
assert actual_result == {"snapshot:add": 1} | assert actual_result == {"snapshot:add": 1} | ||||
snp_size = swh_storage.snapshot_count_branches(complete_snapshot.id) | snp_size = swh_storage.snapshot_count_branches(complete_snapshot.id) | ||||
expected_snp_size = { | expected_snp_size = { | ||||
"alias": 1, | "alias": 1, | ||||
"content": 1, | "content": 1, | ||||
"directory": 2, | "directory": 2, | ||||
"release": 1, | "release": 1, | ||||
"revision": 1, | "revision": 1, | ||||
"snapshot": 1, | "snapshot": 1, | ||||
None: 1, | None: 1, | ||||
} | } | ||||
assert snp_size == expected_snp_size | assert snp_size == expected_snp_size | ||||
def test_snapshot_add_get_paginated(self, swh_storage, sample_data): | def test_snapshot_add_get_paginated(self, swh_storage, sample_data): | ||||
complete_snapshot = sample_data["snapshot"][2] | complete_snapshot = sample_data.snapshots[2] | ||||
swh_storage.snapshot_add([complete_snapshot]) | swh_storage.snapshot_add([complete_snapshot]) | ||||
snp_id = complete_snapshot.id | snp_id = complete_snapshot.id | ||||
branches = complete_snapshot.to_dict()["branches"] | branches = complete_snapshot.to_dict()["branches"] | ||||
branch_names = list(sorted(branches)) | branch_names = list(sorted(branches)) | ||||
# Test branch_from | # Test branch_from | ||||
Show All 31 Lines | def test_snapshot_add_get_paginated(self, swh_storage, sample_data): | ||||
name: branches[name] for name in branch_names[dir_idx : dir_idx + 3] | name: branches[name] for name in branch_names[dir_idx : dir_idx + 3] | ||||
}, | }, | ||||
"next_branch": branch_names[dir_idx + 3], | "next_branch": branch_names[dir_idx + 3], | ||||
} | } | ||||
assert snapshot == expected_snapshot | assert snapshot == expected_snapshot | ||||
def test_snapshot_add_get_filtered(self, swh_storage, sample_data): | def test_snapshot_add_get_filtered(self, swh_storage, sample_data): | ||||
origin = sample_data["origin"][0] | origin = sample_data.origin | ||||
complete_snapshot = sample_data["snapshot"][2] | complete_snapshot = sample_data.snapshots[2] | ||||
swh_storage.origin_add([origin]) | swh_storage.origin_add([origin]) | ||||
visit = OriginVisit( | visit = OriginVisit( | ||||
origin=origin.url, date=data.date_visit1, type=data.type_visit1, | origin=origin.url, | ||||
date=sample_data.date_visit1, | |||||
type=sample_data.type_visit1, | |||||
) | ) | ||||
origin_visit1 = swh_storage.origin_visit_add([visit])[0] | origin_visit1 = swh_storage.origin_visit_add([visit])[0] | ||||
swh_storage.snapshot_add([complete_snapshot]) | swh_storage.snapshot_add([complete_snapshot]) | ||||
swh_storage.origin_visit_status_add( | swh_storage.origin_visit_status_add( | ||||
[ | [ | ||||
OriginVisitStatus( | OriginVisitStatus( | ||||
origin=origin.url, | origin=origin.url, | ||||
Show All 34 Lines | def test_snapshot_add_get_filtered(self, swh_storage, sample_data): | ||||
if tgt and tgt["target_type"] == "alias" | if tgt and tgt["target_type"] == "alias" | ||||
}, | }, | ||||
"next_branch": None, | "next_branch": None, | ||||
} | } | ||||
assert snapshot == expected_snapshot | assert snapshot == expected_snapshot | ||||
def test_snapshot_add_get_filtered_and_paginated(self, swh_storage, sample_data): | def test_snapshot_add_get_filtered_and_paginated(self, swh_storage, sample_data): | ||||
complete_snapshot = sample_data["snapshot"][2] | complete_snapshot = sample_data.snapshots[2] | ||||
swh_storage.snapshot_add([complete_snapshot]) | swh_storage.snapshot_add([complete_snapshot]) | ||||
snp_id = complete_snapshot.id | snp_id = complete_snapshot.id | ||||
branches = complete_snapshot.to_dict()["branches"] | branches = complete_snapshot.to_dict()["branches"] | ||||
branch_names = list(sorted(branches)) | branch_names = list(sorted(branches)) | ||||
# Test branch_from | # Test branch_from | ||||
▲ Show 20 Lines • Show All 52 Lines • ▼ Show 20 Lines | def test_snapshot_add_get_filtered_and_paginated(self, swh_storage, sample_data): | ||||
"id": snp_id, | "id": snp_id, | ||||
"branches": {branch_names[dir_idx]: branches[branch_names[dir_idx]],}, | "branches": {branch_names[dir_idx]: branches[branch_names[dir_idx]],}, | ||||
"next_branch": b"release", | "next_branch": b"release", | ||||
} | } | ||||
assert snapshot == expected_snapshot | assert snapshot == expected_snapshot | ||||
def test_snapshot_add_get_branch_by_type(self, swh_storage, sample_data): | def test_snapshot_add_get_branch_by_type(self, swh_storage, sample_data): | ||||
complete_snapshot = sample_data["snapshot"][2] | complete_snapshot = sample_data.snapshots[2] | ||||
snapshot = complete_snapshot.to_dict() | snapshot = complete_snapshot.to_dict() | ||||
alias1 = b"alias1" | alias1 = b"alias1" | ||||
alias2 = b"alias2" | alias2 = b"alias2" | ||||
target1 = random.choice(list(snapshot["branches"].keys())) | target1 = random.choice(list(snapshot["branches"].keys())) | ||||
target2 = random.choice(list(snapshot["branches"].keys())) | target2 = random.choice(list(snapshot["branches"].keys())) | ||||
snapshot["branches"][alias2] = { | snapshot["branches"][alias2] = { | ||||
Show All 15 Lines | def test_snapshot_add_get_branch_by_type(self, swh_storage, sample_data): | ||||
branches_from=alias1, | branches_from=alias1, | ||||
branches_count=1, | branches_count=1, | ||||
)["branches"] | )["branches"] | ||||
assert len(branches) == 1 | assert len(branches) == 1 | ||||
assert alias1 in branches | assert alias1 in branches | ||||
def test_snapshot_add_get(self, swh_storage, sample_data): | def test_snapshot_add_get(self, swh_storage, sample_data): | ||||
snapshot = sample_data["snapshot"][0] | snapshot = sample_data.snapshot | ||||
origin = sample_data["origin"][0] | origin = sample_data.origin | ||||
swh_storage.origin_add([origin]) | swh_storage.origin_add([origin]) | ||||
visit = OriginVisit( | visit = OriginVisit( | ||||
origin=origin.url, date=data.date_visit1, type=data.type_visit1, | origin=origin.url, | ||||
date=sample_data.date_visit1, | |||||
type=sample_data.type_visit1, | |||||
) | ) | ||||
origin_visit1 = swh_storage.origin_visit_add([visit])[0] | origin_visit1 = swh_storage.origin_visit_add([visit])[0] | ||||
visit_id = origin_visit1.visit | visit_id = origin_visit1.visit | ||||
swh_storage.snapshot_add([snapshot]) | swh_storage.snapshot_add([snapshot]) | ||||
swh_storage.origin_visit_status_add( | swh_storage.origin_visit_status_add( | ||||
[ | [ | ||||
OriginVisitStatus( | OriginVisitStatus( | ||||
Show All 13 Lines | def test_snapshot_add_get(self, swh_storage, sample_data): | ||||
by_ov = swh_storage.snapshot_get_by_origin_visit(origin.url, visit_id) | by_ov = swh_storage.snapshot_get_by_origin_visit(origin.url, visit_id) | ||||
assert by_ov == expected_snapshot | assert by_ov == expected_snapshot | ||||
origin_visit_info = swh_storage.origin_visit_get_by(origin.url, visit_id) | origin_visit_info = swh_storage.origin_visit_get_by(origin.url, visit_id) | ||||
assert origin_visit_info["snapshot"] == snapshot.id | assert origin_visit_info["snapshot"] == snapshot.id | ||||
def test_snapshot_add_twice__by_origin_visit(self, swh_storage, sample_data): | def test_snapshot_add_twice__by_origin_visit(self, swh_storage, sample_data): | ||||
snapshot = sample_data["snapshot"][0] | snapshot = sample_data.snapshot | ||||
origin = sample_data["origin"][0] | origin = sample_data.origin | ||||
swh_storage.origin_add([origin]) | swh_storage.origin_add([origin]) | ||||
ov1 = swh_storage.origin_visit_add( | ov1 = swh_storage.origin_visit_add( | ||||
[ | [ | ||||
OriginVisit( | OriginVisit( | ||||
origin=origin.url, date=data.date_visit1, type=data.type_visit1, | origin=origin.url, | ||||
date=sample_data.date_visit1, | |||||
type=sample_data.type_visit1, | |||||
) | ) | ||||
] | ] | ||||
)[0] | )[0] | ||||
swh_storage.snapshot_add([snapshot]) | swh_storage.snapshot_add([snapshot]) | ||||
date_now2 = now() | date_now2 = now() | ||||
swh_storage.origin_visit_status_add( | swh_storage.origin_visit_status_add( | ||||
[ | [ | ||||
Show All 10 Lines | def test_snapshot_add_twice__by_origin_visit(self, swh_storage, sample_data): | ||||
expected_snapshot = {**snapshot.to_dict(), "next_branch": None} | expected_snapshot = {**snapshot.to_dict(), "next_branch": None} | ||||
by_ov1 = swh_storage.snapshot_get_by_origin_visit(origin.url, ov1.visit) | by_ov1 = swh_storage.snapshot_get_by_origin_visit(origin.url, ov1.visit) | ||||
assert by_ov1 == expected_snapshot | assert by_ov1 == expected_snapshot | ||||
ov2 = swh_storage.origin_visit_add( | ov2 = swh_storage.origin_visit_add( | ||||
[ | [ | ||||
OriginVisit( | OriginVisit( | ||||
origin=origin.url, date=data.date_visit2, type=data.type_visit2, | origin=origin.url, | ||||
date=sample_data.date_visit2, | |||||
type=sample_data.type_visit2, | |||||
) | ) | ||||
] | ] | ||||
)[0] | )[0] | ||||
date_now4 = now() | date_now4 = now() | ||||
swh_storage.origin_visit_status_add( | swh_storage.origin_visit_status_add( | ||||
[ | [ | ||||
OriginVisitStatus( | OriginVisitStatus( | ||||
origin=origin.url, | origin=origin.url, | ||||
visit=ov2.visit, | visit=ov2.visit, | ||||
date=date_now4, | date=date_now4, | ||||
status="ongoing", | status="ongoing", | ||||
snapshot=snapshot.id, | snapshot=snapshot.id, | ||||
) | ) | ||||
] | ] | ||||
) | ) | ||||
by_ov2 = swh_storage.snapshot_get_by_origin_visit(origin.url, ov2.visit) | by_ov2 = swh_storage.snapshot_get_by_origin_visit(origin.url, ov2.visit) | ||||
assert by_ov2 == expected_snapshot | assert by_ov2 == expected_snapshot | ||||
ovs1 = OriginVisitStatus.from_dict( | ovs1 = OriginVisitStatus.from_dict( | ||||
{ | { | ||||
"origin": origin.url, | "origin": origin.url, | ||||
"date": data.date_visit1, | "date": sample_data.date_visit1, | ||||
"visit": ov1.visit, | "visit": ov1.visit, | ||||
"status": "created", | "status": "created", | ||||
"metadata": None, | "metadata": None, | ||||
"snapshot": None, | "snapshot": None, | ||||
} | } | ||||
) | ) | ||||
ovs2 = OriginVisitStatus.from_dict( | ovs2 = OriginVisitStatus.from_dict( | ||||
{ | { | ||||
"origin": origin.url, | "origin": origin.url, | ||||
"date": date_now2, | "date": date_now2, | ||||
"visit": ov1.visit, | "visit": ov1.visit, | ||||
"status": "ongoing", | "status": "ongoing", | ||||
"metadata": None, | "metadata": None, | ||||
"snapshot": snapshot.id, | "snapshot": snapshot.id, | ||||
} | } | ||||
) | ) | ||||
ovs3 = OriginVisitStatus.from_dict( | ovs3 = OriginVisitStatus.from_dict( | ||||
{ | { | ||||
"origin": origin.url, | "origin": origin.url, | ||||
"date": data.date_visit2, | "date": sample_data.date_visit2, | ||||
"visit": ov2.visit, | "visit": ov2.visit, | ||||
"status": "created", | "status": "created", | ||||
"metadata": None, | "metadata": None, | ||||
"snapshot": None, | "snapshot": None, | ||||
} | } | ||||
) | ) | ||||
ovs4 = OriginVisitStatus.from_dict( | ovs4 = OriginVisitStatus.from_dict( | ||||
{ | { | ||||
Show All 16 Lines | def test_snapshot_add_twice__by_origin_visit(self, swh_storage, sample_data): | ||||
("origin_visit_status", ovs3), | ("origin_visit_status", ovs3), | ||||
("origin_visit_status", ovs4), | ("origin_visit_status", ovs4), | ||||
] | ] | ||||
for obj in expected_objects: | for obj in expected_objects: | ||||
assert obj in actual_objects | assert obj in actual_objects | ||||
def test_snapshot_get_random(self, swh_storage, sample_data): | def test_snapshot_get_random(self, swh_storage, sample_data): | ||||
snapshot, empty_snapshot, complete_snapshot = sample_data["snapshot"][:3] | snapshot, empty_snapshot, complete_snapshot = sample_data.snapshots[:3] | ||||
swh_storage.snapshot_add([snapshot, empty_snapshot, complete_snapshot]) | swh_storage.snapshot_add([snapshot, empty_snapshot, complete_snapshot]) | ||||
assert swh_storage.snapshot_get_random() in { | assert swh_storage.snapshot_get_random() in { | ||||
snapshot.id, | snapshot.id, | ||||
empty_snapshot.id, | empty_snapshot.id, | ||||
complete_snapshot.id, | complete_snapshot.id, | ||||
} | } | ||||
def test_snapshot_missing(self, swh_storage, sample_data): | def test_snapshot_missing(self, swh_storage, sample_data): | ||||
snapshot, missing_snapshot = sample_data["snapshot"][:2] | snapshot, missing_snapshot = sample_data.snapshots[:2] | ||||
snapshots = [snapshot.id, missing_snapshot.id] | snapshots = [snapshot.id, missing_snapshot.id] | ||||
swh_storage.snapshot_add([snapshot]) | swh_storage.snapshot_add([snapshot]) | ||||
missing_snapshots = swh_storage.snapshot_missing(snapshots) | missing_snapshots = swh_storage.snapshot_missing(snapshots) | ||||
assert list(missing_snapshots) == [missing_snapshot.id] | assert list(missing_snapshots) == [missing_snapshot.id] | ||||
def test_stat_counters(self, swh_storage, sample_data): | def test_stat_counters(self, swh_storage, sample_data): | ||||
origin = sample_data["origin"][0] | origin = sample_data.origin | ||||
snapshot = sample_data["snapshot"][0] | snapshot = sample_data.snapshot | ||||
revision = sample_data["revision"][0] | revision = sample_data.revision | ||||
release = sample_data["release"][0] | release = sample_data.release | ||||
directory = sample_data["directory"][0] | directory = sample_data.directory | ||||
content = sample_data["content"][0] | content = sample_data.content | ||||
expected_keys = ["content", "directory", "origin", "revision"] | expected_keys = ["content", "directory", "origin", "revision"] | ||||
# Initially, all counters are 0 | # Initially, all counters are 0 | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
counters = swh_storage.stat_counters() | counters = swh_storage.stat_counters() | ||||
assert set(expected_keys) <= set(counters) | assert set(expected_keys) <= set(counters) | ||||
Show All 12 Lines | def test_stat_counters(self, swh_storage, sample_data): | ||||
if key != "content": | if key != "content": | ||||
assert counters[key] == 0 | assert counters[key] == 0 | ||||
assert counters["content"] == 1 | assert counters["content"] == 1 | ||||
# Add other objects. Check their counter increased as well. | # Add other objects. Check their counter increased as well. | ||||
swh_storage.origin_add([origin]) | swh_storage.origin_add([origin]) | ||||
visit = OriginVisit( | visit = OriginVisit( | ||||
origin=origin.url, date=data.date_visit2, type=data.type_visit2, | origin=origin.url, | ||||
date=sample_data.date_visit2, | |||||
type=sample_data.type_visit2, | |||||
) | ) | ||||
origin_visit1 = swh_storage.origin_visit_add([visit])[0] | origin_visit1 = swh_storage.origin_visit_add([visit])[0] | ||||
swh_storage.snapshot_add([snapshot]) | swh_storage.snapshot_add([snapshot]) | ||||
swh_storage.origin_visit_status_add( | swh_storage.origin_visit_status_add( | ||||
[ | [ | ||||
OriginVisitStatus( | OriginVisitStatus( | ||||
origin=origin.url, | origin=origin.url, | ||||
Show All 17 Lines | def test_stat_counters(self, swh_storage, sample_data): | ||||
assert counters["origin_visit"] == 1 | assert counters["origin_visit"] == 1 | ||||
assert counters["revision"] == 1 | assert counters["revision"] == 1 | ||||
assert counters["release"] == 1 | assert counters["release"] == 1 | ||||
assert counters["snapshot"] == 1 | assert counters["snapshot"] == 1 | ||||
if "person" in counters: | if "person" in counters: | ||||
assert counters["person"] == 3 | assert counters["person"] == 3 | ||||
def test_content_find_ctime(self, swh_storage, sample_data): | def test_content_find_ctime(self, swh_storage, sample_data): | ||||
origin_content = sample_data["content"][0] | origin_content = sample_data.content | ||||
ctime = round_to_milliseconds(now()) | ctime = round_to_milliseconds(now()) | ||||
content = attr.evolve(origin_content, data=None, ctime=ctime) | content = attr.evolve(origin_content, data=None, ctime=ctime) | ||||
swh_storage.content_add_metadata([content]) | swh_storage.content_add_metadata([content]) | ||||
actually_present = swh_storage.content_find({"sha1": content.sha1}) | actually_present = swh_storage.content_find({"sha1": content.sha1}) | ||||
assert actually_present[0] == content.to_dict() | assert actually_present[0] == content.to_dict() | ||||
def test_content_find_with_present_content(self, swh_storage, sample_data): | def test_content_find_with_present_content(self, swh_storage, sample_data): | ||||
content = sample_data["content"][0] | content = sample_data.content | ||||
expected_content = content.to_dict() | expected_content = content.to_dict() | ||||
del expected_content["data"] | del expected_content["data"] | ||||
del expected_content["ctime"] | del expected_content["ctime"] | ||||
# 1. with something to find | # 1. with something to find | ||||
swh_storage.content_add([content]) | swh_storage.content_add([content]) | ||||
actually_present = swh_storage.content_find({"sha1": content.sha1}) | actually_present = swh_storage.content_find({"sha1": content.sha1}) | ||||
Show All 15 Lines | def test_content_find_with_present_content(self, swh_storage, sample_data): | ||||
# 4. with something to find | # 4. with something to find | ||||
actually_present = swh_storage.content_find(content.hashes()) | actually_present = swh_storage.content_find(content.hashes()) | ||||
assert 1 == len(actually_present) | assert 1 == len(actually_present) | ||||
actually_present[0].pop("ctime") | actually_present[0].pop("ctime") | ||||
assert actually_present[0] == expected_content | assert actually_present[0] == expected_content | ||||
def test_content_find_with_non_present_content(self, swh_storage, sample_data): | def test_content_find_with_non_present_content(self, swh_storage, sample_data): | ||||
missing_content = sample_data["skipped_content"][0] | missing_content = sample_data.skipped_content | ||||
# 1. with something that does not exist | # 1. with something that does not exist | ||||
actually_present = swh_storage.content_find({"sha1": missing_content.sha1}) | actually_present = swh_storage.content_find({"sha1": missing_content.sha1}) | ||||
assert actually_present == [] | assert actually_present == [] | ||||
# 2. with something that does not exist | # 2. with something that does not exist | ||||
actually_present = swh_storage.content_find( | actually_present = swh_storage.content_find( | ||||
{"sha1_git": missing_content.sha1_git} | {"sha1_git": missing_content.sha1_git} | ||||
) | ) | ||||
assert actually_present == [] | assert actually_present == [] | ||||
# 3. with something that does not exist | # 3. with something that does not exist | ||||
actually_present = swh_storage.content_find({"sha256": missing_content.sha256}) | actually_present = swh_storage.content_find({"sha256": missing_content.sha256}) | ||||
assert actually_present == [] | assert actually_present == [] | ||||
def test_content_find_with_duplicate_input(self, swh_storage, sample_data): | def test_content_find_with_duplicate_input(self, swh_storage, sample_data): | ||||
content = sample_data["content"][0] | content = sample_data.content | ||||
# Create fake data with colliding sha256 and blake2s256 | # Create fake data with colliding sha256 and blake2s256 | ||||
sha1_array = bytearray(content.sha1) | sha1_array = bytearray(content.sha1) | ||||
sha1_array[0] += 1 | sha1_array[0] += 1 | ||||
sha1git_array = bytearray(content.sha1_git) | sha1git_array = bytearray(content.sha1_git) | ||||
sha1git_array[0] += 1 | sha1git_array[0] += 1 | ||||
duplicated_content = attr.evolve( | duplicated_content = attr.evolve( | ||||
content, sha1=bytes(sha1_array), sha1_git=bytes(sha1git_array) | content, sha1=bytes(sha1_array), sha1_git=bytes(sha1git_array) | ||||
Show All 23 Lines | def test_content_find_with_duplicate_input(self, swh_storage, sample_data): | ||||
]: | ]: | ||||
dict_.pop(key, None) | dict_.pop(key, None) | ||||
expected_result = [expected_content, expected_duplicated_content] | expected_result = [expected_content, expected_duplicated_content] | ||||
for result in expected_result: | for result in expected_result: | ||||
assert result in actual_result | assert result in actual_result | ||||
def test_content_find_with_duplicate_sha256(self, swh_storage, sample_data): | def test_content_find_with_duplicate_sha256(self, swh_storage, sample_data): | ||||
content = sample_data["content"][0] | content = sample_data.content | ||||
hashes = {} | hashes = {} | ||||
# Create fake data with colliding sha256 | # Create fake data with colliding sha256 | ||||
for hashalgo in ("sha1", "sha1_git", "blake2s256"): | for hashalgo in ("sha1", "sha1_git", "blake2s256"): | ||||
value = bytearray(getattr(content, hashalgo)) | value = bytearray(getattr(content, hashalgo)) | ||||
value[0] += 1 | value[0] += 1 | ||||
hashes[hashalgo] = bytes(value) | hashes[hashalgo] = bytes(value) | ||||
Show All 39 Lines | def test_content_find_with_duplicate_sha256(self, swh_storage, sample_data): | ||||
) | ) | ||||
assert len(actual_result) == 1 | assert len(actual_result) == 1 | ||||
actual_result[0].pop("ctime") | actual_result[0].pop("ctime") | ||||
assert actual_result == [expected_duplicated_content] | assert actual_result == [expected_duplicated_content] | ||||
def test_content_find_with_duplicate_blake2s256(self, swh_storage, sample_data): | def test_content_find_with_duplicate_blake2s256(self, swh_storage, sample_data): | ||||
content = sample_data["content"][0] | content = sample_data.content | ||||
# Create fake data with colliding sha256 and blake2s256 | # Create fake data with colliding sha256 and blake2s256 | ||||
sha1_array = bytearray(content.sha1) | sha1_array = bytearray(content.sha1) | ||||
sha1_array[0] += 1 | sha1_array[0] += 1 | ||||
sha1git_array = bytearray(content.sha1_git) | sha1git_array = bytearray(content.sha1_git) | ||||
sha1git_array[0] += 1 | sha1git_array[0] += 1 | ||||
sha256_array = bytearray(content.sha256) | sha256_array = bytearray(content.sha256) | ||||
sha256_array[0] += 1 | sha256_array[0] += 1 | ||||
▲ Show 20 Lines • Show All 45 Lines • ▼ Show 20 Lines | def test_content_find_bad_input(self, swh_storage): | ||||
with pytest.raises(StorageArgumentException): | with pytest.raises(StorageArgumentException): | ||||
swh_storage.content_find({}) # empty is bad | swh_storage.content_find({}) # empty is bad | ||||
# 2. with bad input | # 2. with bad input | ||||
with pytest.raises(StorageArgumentException): | with pytest.raises(StorageArgumentException): | ||||
swh_storage.content_find({"unknown-sha1": "something"}) # not the right key | swh_storage.content_find({"unknown-sha1": "something"}) # not the right key | ||||
def test_object_find_by_sha1_git(self, swh_storage, sample_data): | def test_object_find_by_sha1_git(self, swh_storage, sample_data): | ||||
content = sample_data["content"][0] | content = sample_data.content | ||||
directory = sample_data["directory"][0] | directory = sample_data.directory | ||||
revision = sample_data["revision"][0] | revision = sample_data.revision | ||||
release = sample_data["release"][0] | release = sample_data.release | ||||
sha1_gits = [b"00000000000000000000"] | sha1_gits = [b"00000000000000000000"] | ||||
expected = { | expected = { | ||||
b"00000000000000000000": [], | b"00000000000000000000": [], | ||||
} | } | ||||
swh_storage.content_add([content]) | swh_storage.content_add([content]) | ||||
sha1_gits.append(content.sha1_git) | sha1_gits.append(content.sha1_git) | ||||
Show All 14 Lines | def test_object_find_by_sha1_git(self, swh_storage, sample_data): | ||||
sha1_gits.append(release.id) | sha1_gits.append(release.id) | ||||
expected[release.id] = [{"sha1_git": release.id, "type": "release",}] | expected[release.id] = [{"sha1_git": release.id, "type": "release",}] | ||||
ret = swh_storage.object_find_by_sha1_git(sha1_gits) | ret = swh_storage.object_find_by_sha1_git(sha1_gits) | ||||
assert expected == ret | assert expected == ret | ||||
def test_metadata_fetcher_add_get(self, swh_storage, sample_data): | def test_metadata_fetcher_add_get(self, swh_storage, sample_data): | ||||
fetcher = sample_data["fetcher"][0] | fetcher = sample_data.metadata_fetcher | ||||
actual_fetcher = swh_storage.metadata_fetcher_get(fetcher.name, fetcher.version) | actual_fetcher = swh_storage.metadata_fetcher_get(fetcher.name, fetcher.version) | ||||
assert actual_fetcher is None # does not exist | assert actual_fetcher is None # does not exist | ||||
swh_storage.metadata_fetcher_add([fetcher]) | swh_storage.metadata_fetcher_add([fetcher]) | ||||
res = swh_storage.metadata_fetcher_get(fetcher.name, fetcher.version) | res = swh_storage.metadata_fetcher_get(fetcher.name, fetcher.version) | ||||
assert res == fetcher | assert res == fetcher | ||||
def test_metadata_authority_add_get(self, swh_storage, sample_data): | def test_metadata_authority_add_get(self, swh_storage, sample_data): | ||||
authority = sample_data["authority"][0] | authority = sample_data.metadata_authority | ||||
actual_authority = swh_storage.metadata_authority_get( | actual_authority = swh_storage.metadata_authority_get( | ||||
authority.type, authority.url | authority.type, authority.url | ||||
) | ) | ||||
assert actual_authority is None # does not exist | assert actual_authority is None # does not exist | ||||
swh_storage.metadata_authority_add([authority]) | swh_storage.metadata_authority_add([authority]) | ||||
res = swh_storage.metadata_authority_get(authority.type, authority.url) | res = swh_storage.metadata_authority_get(authority.type, authority.url) | ||||
assert res == authority | assert res == authority | ||||
def test_content_metadata_add(self, swh_storage, sample_data): | def test_content_metadata_add(self, swh_storage, sample_data): | ||||
content = sample_data["content"][0] | content = sample_data.content | ||||
fetcher = sample_data["fetcher"][0] | fetcher = sample_data.metadata_fetcher | ||||
authority = sample_data["authority"][0] | authority = sample_data.metadata_authority | ||||
content_metadata = sample_data["content_metadata"][:2] | content_metadata = sample_data.content_metadata[:2] | ||||
content_swhid = SWHID( | content_swhid = SWHID( | ||||
object_type="content", object_id=hash_to_bytes(content.sha1_git) | object_type="content", object_id=hash_to_bytes(content.sha1_git) | ||||
) | ) | ||||
swh_storage.metadata_fetcher_add([fetcher]) | swh_storage.metadata_fetcher_add([fetcher]) | ||||
swh_storage.metadata_authority_add([authority]) | swh_storage.metadata_authority_add([authority]) | ||||
swh_storage.object_metadata_add(content_metadata) | swh_storage.object_metadata_add(content_metadata) | ||||
result = swh_storage.object_metadata_get( | result = swh_storage.object_metadata_get( | ||||
MetadataTargetType.CONTENT, content_swhid, authority | MetadataTargetType.CONTENT, content_swhid, authority | ||||
) | ) | ||||
assert result["next_page_token"] is None | assert result["next_page_token"] is None | ||||
assert list(sorted(result["results"], key=lambda x: x.discovery_date,)) == list( | assert list(sorted(result["results"], key=lambda x: x.discovery_date,)) == list( | ||||
content_metadata | content_metadata | ||||
) | ) | ||||
def test_content_metadata_add_duplicate(self, swh_storage, sample_data): | def test_content_metadata_add_duplicate(self, swh_storage, sample_data): | ||||
"""Duplicates should be silently updated.""" | """Duplicates should be silently updated.""" | ||||
content = sample_data["content"][0] | content = sample_data.content | ||||
fetcher = sample_data["fetcher"][0] | fetcher = sample_data.metadata_fetcher | ||||
authority = sample_data["authority"][0] | authority = sample_data.metadata_authority | ||||
content_metadata, content_metadata2 = sample_data["content_metadata"][:2] | content_metadata, content_metadata2 = sample_data.content_metadata[:2] | ||||
content_swhid = SWHID( | content_swhid = SWHID( | ||||
object_type="content", object_id=hash_to_bytes(content.sha1_git) | object_type="content", object_id=hash_to_bytes(content.sha1_git) | ||||
) | ) | ||||
new_content_metadata2 = attr.evolve( | new_content_metadata2 = attr.evolve( | ||||
content_metadata2, format="new-format", metadata=b"new-metadata", | content_metadata2, format="new-format", metadata=b"new-metadata", | ||||
) | ) | ||||
Show All 12 Lines | def test_content_metadata_add_duplicate(self, swh_storage, sample_data): | ||||
expected_results2 = (content_metadata, content_metadata2) | expected_results2 = (content_metadata, content_metadata2) | ||||
assert tuple(sorted(result["results"], key=lambda x: x.discovery_date,)) in ( | assert tuple(sorted(result["results"], key=lambda x: x.discovery_date,)) in ( | ||||
expected_results1, # cassandra | expected_results1, # cassandra | ||||
expected_results2, # postgresql | expected_results2, # postgresql | ||||
) | ) | ||||
def test_content_metadata_get(self, swh_storage, sample_data): | def test_content_metadata_get(self, swh_storage, sample_data): | ||||
content, content2 = sample_data["content"][:2] | content, content2 = sample_data.contents[:2] | ||||
fetcher, fetcher2 = sample_data["fetcher"][:2] | fetcher, fetcher2 = sample_data.fetchers[:2] | ||||
authority, authority2 = sample_data["authority"][:2] | authority, authority2 = sample_data.authorities[:2] | ||||
content1_metadata1, content1_metadata2, content1_metadata3 = sample_data[ | ( | ||||
"content_metadata" | content1_metadata1, | ||||
][:3] | content1_metadata2, | ||||
content1_metadata3, | |||||
) = sample_data.content_metadata[:3] | |||||
content1_swhid = SWHID(object_type="content", object_id=content.sha1_git) | content1_swhid = SWHID(object_type="content", object_id=content.sha1_git) | ||||
content2_swhid = SWHID(object_type="content", object_id=content2.sha1_git) | content2_swhid = SWHID(object_type="content", object_id=content2.sha1_git) | ||||
content2_metadata = attr.evolve(content1_metadata2, id=content2_swhid) | content2_metadata = attr.evolve(content1_metadata2, id=content2_swhid) | ||||
swh_storage.metadata_authority_add([authority, authority2]) | swh_storage.metadata_authority_add([authority, authority2]) | ||||
swh_storage.metadata_fetcher_add([fetcher, fetcher2]) | swh_storage.metadata_fetcher_add([fetcher, fetcher2]) | ||||
Show All 24 Lines | def test_content_metadata_get(self, swh_storage, sample_data): | ||||
result = swh_storage.object_metadata_get( | result = swh_storage.object_metadata_get( | ||||
MetadataTargetType.CONTENT, content2_swhid, authority | MetadataTargetType.CONTENT, content2_swhid, authority | ||||
) | ) | ||||
assert result["next_page_token"] is None | assert result["next_page_token"] is None | ||||
assert [content2_metadata] == list(result["results"],) | assert [content2_metadata] == list(result["results"],) | ||||
def test_content_metadata_get_after(self, swh_storage, sample_data): | def test_content_metadata_get_after(self, swh_storage, sample_data): | ||||
content = sample_data["content"][0] | content = sample_data.content | ||||
fetcher = sample_data["fetcher"][0] | fetcher = sample_data.metadata_fetcher | ||||
authority = sample_data["authority"][0] | authority = sample_data.metadata_authority | ||||
content_metadata, content_metadata2 = sample_data["content_metadata"][:2] | content_metadata, content_metadata2 = sample_data.content_metadata[:2] | ||||
content_swhid = SWHID(object_type="content", object_id=content.sha1_git) | content_swhid = SWHID(object_type="content", object_id=content.sha1_git) | ||||
swh_storage.metadata_fetcher_add([fetcher]) | swh_storage.metadata_fetcher_add([fetcher]) | ||||
swh_storage.metadata_authority_add([authority]) | swh_storage.metadata_authority_add([authority]) | ||||
swh_storage.object_metadata_add([content_metadata, content_metadata2]) | swh_storage.object_metadata_add([content_metadata, content_metadata2]) | ||||
Show All 22 Lines | def test_content_metadata_get_after(self, swh_storage, sample_data): | ||||
content_swhid, | content_swhid, | ||||
authority, | authority, | ||||
after=content_metadata2.discovery_date, | after=content_metadata2.discovery_date, | ||||
) | ) | ||||
assert result["next_page_token"] is None | assert result["next_page_token"] is None | ||||
assert result["results"] == [] | assert result["results"] == [] | ||||
def test_content_metadata_get_paginate(self, swh_storage, sample_data): | def test_content_metadata_get_paginate(self, swh_storage, sample_data): | ||||
content = sample_data["content"][0] | content = sample_data.content | ||||
fetcher = sample_data["fetcher"][0] | fetcher = sample_data.metadata_fetcher | ||||
authority = sample_data["authority"][0] | authority = sample_data.metadata_authority | ||||
content_metadata, content_metadata2 = sample_data["content_metadata"][:2] | content_metadata, content_metadata2 = sample_data.content_metadata[:2] | ||||
content_swhid = SWHID(object_type="content", object_id=content.sha1_git) | content_swhid = SWHID(object_type="content", object_id=content.sha1_git) | ||||
swh_storage.metadata_fetcher_add([fetcher]) | swh_storage.metadata_fetcher_add([fetcher]) | ||||
swh_storage.metadata_authority_add([authority]) | swh_storage.metadata_authority_add([authority]) | ||||
swh_storage.object_metadata_add([content_metadata, content_metadata2]) | swh_storage.object_metadata_add([content_metadata, content_metadata2]) | ||||
swh_storage.object_metadata_get( | swh_storage.object_metadata_get( | ||||
MetadataTargetType.CONTENT, content_swhid, authority | MetadataTargetType.CONTENT, content_swhid, authority | ||||
Show All 11 Lines | def test_content_metadata_get_paginate(self, swh_storage, sample_data): | ||||
authority, | authority, | ||||
limit=1, | limit=1, | ||||
page_token=result["next_page_token"], | page_token=result["next_page_token"], | ||||
) | ) | ||||
assert result["next_page_token"] is None | assert result["next_page_token"] is None | ||||
assert result["results"] == [content_metadata2] | assert result["results"] == [content_metadata2] | ||||
def test_content_metadata_get_paginate_same_date(self, swh_storage, sample_data): | def test_content_metadata_get_paginate_same_date(self, swh_storage, sample_data): | ||||
content = sample_data["content"][0] | content = sample_data.content | ||||
fetcher1, fetcher2 = sample_data["fetcher"][:2] | fetcher1, fetcher2 = sample_data.fetchers[:2] | ||||
authority = sample_data["authority"][0] | authority = sample_data.metadata_authority | ||||
content_metadata, content_metadata2 = sample_data["content_metadata"][:2] | content_metadata, content_metadata2 = sample_data.content_metadata[:2] | ||||
content_swhid = SWHID(object_type="content", object_id=content.sha1_git) | content_swhid = SWHID(object_type="content", object_id=content.sha1_git) | ||||
swh_storage.metadata_fetcher_add([fetcher1, fetcher2]) | swh_storage.metadata_fetcher_add([fetcher1, fetcher2]) | ||||
swh_storage.metadata_authority_add([authority]) | swh_storage.metadata_authority_add([authority]) | ||||
new_content_metadata2 = attr.evolve( | new_content_metadata2 = attr.evolve( | ||||
content_metadata2, | content_metadata2, | ||||
Show All 15 Lines | def test_content_metadata_get_paginate_same_date(self, swh_storage, sample_data): | ||||
authority, | authority, | ||||
limit=1, | limit=1, | ||||
page_token=result["next_page_token"], | page_token=result["next_page_token"], | ||||
) | ) | ||||
assert result["next_page_token"] is None | assert result["next_page_token"] is None | ||||
assert result["results"] == [new_content_metadata2] | assert result["results"] == [new_content_metadata2] | ||||
def test_content_metadata_get__invalid_id(self, swh_storage, sample_data): | def test_content_metadata_get__invalid_id(self, swh_storage, sample_data): | ||||
origin = sample_data["origin"][0] | origin = sample_data.origin | ||||
fetcher = sample_data["fetcher"][0] | fetcher = sample_data.metadata_fetcher | ||||
authority = sample_data["authority"][0] | authority = sample_data.metadata_authority | ||||
content_metadata, content_metadata2 = sample_data["content_metadata"][:2] | content_metadata, content_metadata2 = sample_data.content_metadata[:2] | ||||
swh_storage.metadata_fetcher_add([fetcher]) | swh_storage.metadata_fetcher_add([fetcher]) | ||||
swh_storage.metadata_authority_add([authority]) | swh_storage.metadata_authority_add([authority]) | ||||
swh_storage.object_metadata_add([content_metadata, content_metadata2]) | swh_storage.object_metadata_add([content_metadata, content_metadata2]) | ||||
with pytest.raises(StorageArgumentException, match="SWHID"): | with pytest.raises(StorageArgumentException, match="SWHID"): | ||||
swh_storage.object_metadata_get( | swh_storage.object_metadata_get( | ||||
MetadataTargetType.CONTENT, origin.url, authority | MetadataTargetType.CONTENT, origin.url, authority | ||||
) | ) | ||||
def test_origin_metadata_add(self, swh_storage, sample_data): | def test_origin_metadata_add(self, swh_storage, sample_data): | ||||
origin = sample_data["origin"][0] | origin = sample_data.origin | ||||
fetcher = sample_data["fetcher"][0] | fetcher = sample_data.metadata_fetcher | ||||
authority = sample_data["authority"][0] | authority = sample_data.metadata_authority | ||||
origin_metadata, origin_metadata2 = sample_data["origin_metadata"][:2] | origin_metadata, origin_metadata2 = sample_data.origin_metadata[:2] | ||||
assert swh_storage.origin_add([origin]) == {"origin:add": 1} | assert swh_storage.origin_add([origin]) == {"origin:add": 1} | ||||
swh_storage.metadata_fetcher_add([fetcher]) | swh_storage.metadata_fetcher_add([fetcher]) | ||||
swh_storage.metadata_authority_add([authority]) | swh_storage.metadata_authority_add([authority]) | ||||
swh_storage.object_metadata_add([origin_metadata, origin_metadata2]) | swh_storage.object_metadata_add([origin_metadata, origin_metadata2]) | ||||
result = swh_storage.object_metadata_get( | result = swh_storage.object_metadata_get( | ||||
MetadataTargetType.ORIGIN, origin.url, authority | MetadataTargetType.ORIGIN, origin.url, authority | ||||
) | ) | ||||
assert result["next_page_token"] is None | assert result["next_page_token"] is None | ||||
assert list(sorted(result["results"], key=lambda x: x.discovery_date)) == [ | assert list(sorted(result["results"], key=lambda x: x.discovery_date)) == [ | ||||
origin_metadata, | origin_metadata, | ||||
origin_metadata2, | origin_metadata2, | ||||
] | ] | ||||
def test_origin_metadata_add_duplicate(self, swh_storage, sample_data): | def test_origin_metadata_add_duplicate(self, swh_storage, sample_data): | ||||
"""Duplicates should be silently updated.""" | """Duplicates should be silently updated.""" | ||||
origin = sample_data["origin"][0] | origin = sample_data.origin | ||||
fetcher = sample_data["fetcher"][0] | fetcher = sample_data.metadata_fetcher | ||||
authority = sample_data["authority"][0] | authority = sample_data.metadata_authority | ||||
origin_metadata, origin_metadata2 = sample_data["origin_metadata"][:2] | origin_metadata, origin_metadata2 = sample_data.origin_metadata[:2] | ||||
assert swh_storage.origin_add([origin]) == {"origin:add": 1} | assert swh_storage.origin_add([origin]) == {"origin:add": 1} | ||||
new_origin_metadata2 = attr.evolve( | new_origin_metadata2 = attr.evolve( | ||||
origin_metadata2, format="new-format", metadata=b"new-metadata", | origin_metadata2, format="new-format", metadata=b"new-metadata", | ||||
) | ) | ||||
swh_storage.metadata_fetcher_add([fetcher]) | swh_storage.metadata_fetcher_add([fetcher]) | ||||
swh_storage.metadata_authority_add([authority]) | swh_storage.metadata_authority_add([authority]) | ||||
Show All 11 Lines | def test_origin_metadata_add_duplicate(self, swh_storage, sample_data): | ||||
expected_results2 = (origin_metadata, origin_metadata2) | expected_results2 = (origin_metadata, origin_metadata2) | ||||
assert tuple(sorted(result["results"], key=lambda x: x.discovery_date,)) in ( | assert tuple(sorted(result["results"], key=lambda x: x.discovery_date,)) in ( | ||||
expected_results1, # cassandra | expected_results1, # cassandra | ||||
expected_results2, # postgresql | expected_results2, # postgresql | ||||
) | ) | ||||
def test_origin_metadata_get(self, swh_storage, sample_data): | def test_origin_metadata_get(self, swh_storage, sample_data): | ||||
origin, origin2 = sample_data["origin"][:2] | origin, origin2 = sample_data.origins[:2] | ||||
fetcher, fetcher2 = sample_data["fetcher"][:2] | fetcher, fetcher2 = sample_data.fetchers[:2] | ||||
authority, authority2 = sample_data["authority"][:2] | authority, authority2 = sample_data.authorities[:2] | ||||
origin1_metadata1, origin1_metadata2, origin1_metadata3 = sample_data[ | ( | ||||
"origin_metadata" | origin1_metadata1, | ||||
][:3] | origin1_metadata2, | ||||
origin1_metadata3, | |||||
) = sample_data.origin_metadata[:3] | |||||
assert swh_storage.origin_add([origin, origin2]) == {"origin:add": 2} | assert swh_storage.origin_add([origin, origin2]) == {"origin:add": 2} | ||||
origin2_metadata = attr.evolve(origin1_metadata2, id=origin2.url) | origin2_metadata = attr.evolve(origin1_metadata2, id=origin2.url) | ||||
swh_storage.metadata_authority_add([authority, authority2]) | swh_storage.metadata_authority_add([authority, authority2]) | ||||
swh_storage.metadata_fetcher_add([fetcher, fetcher2]) | swh_storage.metadata_fetcher_add([fetcher, fetcher2]) | ||||
Show All 19 Lines | def test_origin_metadata_get(self, swh_storage, sample_data): | ||||
result = swh_storage.object_metadata_get( | result = swh_storage.object_metadata_get( | ||||
MetadataTargetType.ORIGIN, origin2.url, authority | MetadataTargetType.ORIGIN, origin2.url, authority | ||||
) | ) | ||||
assert result["next_page_token"] is None | assert result["next_page_token"] is None | ||||
assert [origin2_metadata] == list(result["results"],) | assert [origin2_metadata] == list(result["results"],) | ||||
def test_origin_metadata_get_after(self, swh_storage, sample_data): | def test_origin_metadata_get_after(self, swh_storage, sample_data): | ||||
origin = sample_data["origin"][0] | origin = sample_data.origin | ||||
fetcher = sample_data["fetcher"][0] | fetcher = sample_data.metadata_fetcher | ||||
authority = sample_data["authority"][0] | authority = sample_data.metadata_authority | ||||
origin_metadata, origin_metadata2 = sample_data["origin_metadata"][:2] | origin_metadata, origin_metadata2 = sample_data.origin_metadata[:2] | ||||
assert swh_storage.origin_add([origin]) == {"origin:add": 1} | assert swh_storage.origin_add([origin]) == {"origin:add": 1} | ||||
swh_storage.metadata_fetcher_add([fetcher]) | swh_storage.metadata_fetcher_add([fetcher]) | ||||
swh_storage.metadata_authority_add([authority]) | swh_storage.metadata_authority_add([authority]) | ||||
swh_storage.object_metadata_add([origin_metadata, origin_metadata2]) | swh_storage.object_metadata_add([origin_metadata, origin_metadata2]) | ||||
result = swh_storage.object_metadata_get( | result = swh_storage.object_metadata_get( | ||||
Show All 22 Lines | def test_origin_metadata_get_after(self, swh_storage, sample_data): | ||||
origin.url, | origin.url, | ||||
authority, | authority, | ||||
after=origin_metadata2.discovery_date, | after=origin_metadata2.discovery_date, | ||||
) | ) | ||||
assert result["next_page_token"] is None | assert result["next_page_token"] is None | ||||
assert result["results"] == [] | assert result["results"] == [] | ||||
def test_origin_metadata_get_paginate(self, swh_storage, sample_data): | def test_origin_metadata_get_paginate(self, swh_storage, sample_data): | ||||
origin = sample_data["origin"][0] | origin = sample_data.origin | ||||
fetcher = sample_data["fetcher"][0] | fetcher = sample_data.metadata_fetcher | ||||
authority = sample_data["authority"][0] | authority = sample_data.metadata_authority | ||||
origin_metadata, origin_metadata2 = sample_data["origin_metadata"][:2] | origin_metadata, origin_metadata2 = sample_data.origin_metadata[:2] | ||||
assert swh_storage.origin_add([origin]) == {"origin:add": 1} | assert swh_storage.origin_add([origin]) == {"origin:add": 1} | ||||
swh_storage.metadata_fetcher_add([fetcher]) | swh_storage.metadata_fetcher_add([fetcher]) | ||||
swh_storage.metadata_authority_add([authority]) | swh_storage.metadata_authority_add([authority]) | ||||
swh_storage.object_metadata_add([origin_metadata, origin_metadata2]) | swh_storage.object_metadata_add([origin_metadata, origin_metadata2]) | ||||
swh_storage.object_metadata_get( | swh_storage.object_metadata_get( | ||||
Show All 12 Lines | def test_origin_metadata_get_paginate(self, swh_storage, sample_data): | ||||
authority, | authority, | ||||
limit=1, | limit=1, | ||||
page_token=result["next_page_token"], | page_token=result["next_page_token"], | ||||
) | ) | ||||
assert result["next_page_token"] is None | assert result["next_page_token"] is None | ||||
assert result["results"] == [origin_metadata2] | assert result["results"] == [origin_metadata2] | ||||
def test_origin_metadata_get_paginate_same_date(self, swh_storage, sample_data): | def test_origin_metadata_get_paginate_same_date(self, swh_storage, sample_data): | ||||
origin = sample_data["origin"][0] | origin = sample_data.origin | ||||
fetcher1, fetcher2 = sample_data["fetcher"][:2] | fetcher1, fetcher2 = sample_data.fetchers[:2] | ||||
authority = sample_data["authority"][0] | authority = sample_data.metadata_authority | ||||
origin_metadata, origin_metadata2 = sample_data["origin_metadata"][:2] | origin_metadata, origin_metadata2 = sample_data.origin_metadata[:2] | ||||
assert swh_storage.origin_add([origin]) == {"origin:add": 1} | assert swh_storage.origin_add([origin]) == {"origin:add": 1} | ||||
swh_storage.metadata_fetcher_add([fetcher1, fetcher2]) | swh_storage.metadata_fetcher_add([fetcher1, fetcher2]) | ||||
swh_storage.metadata_authority_add([authority]) | swh_storage.metadata_authority_add([authority]) | ||||
new_origin_metadata2 = attr.evolve( | new_origin_metadata2 = attr.evolve( | ||||
origin_metadata2, | origin_metadata2, | ||||
discovery_date=origin_metadata2.discovery_date, | discovery_date=origin_metadata2.discovery_date, | ||||
Show All 14 Lines | def test_origin_metadata_get_paginate_same_date(self, swh_storage, sample_data): | ||||
authority, | authority, | ||||
limit=1, | limit=1, | ||||
page_token=result["next_page_token"], | page_token=result["next_page_token"], | ||||
) | ) | ||||
assert result["next_page_token"] is None | assert result["next_page_token"] is None | ||||
assert result["results"] == [new_origin_metadata2] | assert result["results"] == [new_origin_metadata2] | ||||
def test_origin_metadata_add_missing_authority(self, swh_storage, sample_data): | def test_origin_metadata_add_missing_authority(self, swh_storage, sample_data): | ||||
origin = sample_data["origin"][0] | origin = sample_data.origin | ||||
fetcher = sample_data["fetcher"][0] | fetcher = sample_data.metadata_fetcher | ||||
origin_metadata, origin_metadata2 = sample_data["origin_metadata"][:2] | origin_metadata, origin_metadata2 = sample_data.origin_metadata[:2] | ||||
assert swh_storage.origin_add([origin]) == {"origin:add": 1} | assert swh_storage.origin_add([origin]) == {"origin:add": 1} | ||||
swh_storage.metadata_fetcher_add([fetcher]) | swh_storage.metadata_fetcher_add([fetcher]) | ||||
with pytest.raises(StorageArgumentException, match="authority"): | with pytest.raises(StorageArgumentException, match="authority"): | ||||
swh_storage.object_metadata_add([origin_metadata, origin_metadata2]) | swh_storage.object_metadata_add([origin_metadata, origin_metadata2]) | ||||
def test_origin_metadata_add_missing_fetcher(self, swh_storage, sample_data): | def test_origin_metadata_add_missing_fetcher(self, swh_storage, sample_data): | ||||
origin = sample_data["origin"][0] | origin = sample_data.origin | ||||
authority = sample_data["authority"][0] | authority = sample_data.metadata_authority | ||||
origin_metadata, origin_metadata2 = sample_data["origin_metadata"][:2] | origin_metadata, origin_metadata2 = sample_data.origin_metadata[:2] | ||||
assert swh_storage.origin_add([origin]) == {"origin:add": 1} | assert swh_storage.origin_add([origin]) == {"origin:add": 1} | ||||
swh_storage.metadata_authority_add([authority]) | swh_storage.metadata_authority_add([authority]) | ||||
with pytest.raises(StorageArgumentException, match="fetcher"): | with pytest.raises(StorageArgumentException, match="fetcher"): | ||||
swh_storage.object_metadata_add([origin_metadata, origin_metadata2]) | swh_storage.object_metadata_add([origin_metadata, origin_metadata2]) | ||||
def test_origin_metadata_get__invalid_id_type(self, swh_storage, sample_data): | def test_origin_metadata_get__invalid_id_type(self, swh_storage, sample_data): | ||||
origin = sample_data["origin"][0] | origin = sample_data.origin | ||||
authority = sample_data["authority"][0] | authority = sample_data.metadata_authority | ||||
fetcher = sample_data["fetcher"][0] | fetcher = sample_data.metadata_fetcher | ||||
origin_metadata, origin_metadata2 = sample_data["origin_metadata"][:2] | origin_metadata, origin_metadata2 = sample_data.origin_metadata[:2] | ||||
content_metadata = sample_data["content_metadata"][0] | content_metadata = sample_data.content_metadata[0] | ||||
assert swh_storage.origin_add([origin]) == {"origin:add": 1} | assert swh_storage.origin_add([origin]) == {"origin:add": 1} | ||||
swh_storage.metadata_fetcher_add([fetcher]) | swh_storage.metadata_fetcher_add([fetcher]) | ||||
swh_storage.metadata_authority_add([authority]) | swh_storage.metadata_authority_add([authority]) | ||||
swh_storage.object_metadata_add([origin_metadata, origin_metadata2]) | swh_storage.object_metadata_add([origin_metadata, origin_metadata2]) | ||||
with pytest.raises(StorageArgumentException, match="SWHID"): | with pytest.raises(StorageArgumentException, match="SWHID"): | ||||
▲ Show 20 Lines • Show All 183 Lines • ▼ Show 20 Lines | def test_origin_list(self, swh_storage, swh_origins, limit): | ||||
break | break | ||||
else: | else: | ||||
assert len(result["origins"]) == limit | assert len(result["origins"]) == limit | ||||
expected_origins = [origin.url for origin in swh_origins] | expected_origins = [origin.url for origin in swh_origins] | ||||
assert sorted(returned_origins) == sorted(expected_origins) | assert sorted(returned_origins) == sorted(expected_origins) | ||||
def test_origin_count(self, swh_storage, sample_data): | def test_origin_count(self, swh_storage, sample_data): | ||||
swh_storage.origin_add(sample_data["origin"]) | swh_storage.origin_add(sample_data.origins) | ||||
assert swh_storage.origin_count("github") == 3 | assert swh_storage.origin_count("github") == 3 | ||||
assert swh_storage.origin_count("gitlab") == 2 | assert swh_storage.origin_count("gitlab") == 2 | ||||
assert swh_storage.origin_count(".*user.*", regexp=True) == 5 | assert swh_storage.origin_count(".*user.*", regexp=True) == 5 | ||||
assert swh_storage.origin_count(".*user.*", regexp=False) == 0 | assert swh_storage.origin_count(".*user.*", regexp=False) == 0 | ||||
assert swh_storage.origin_count(".*user1.*", regexp=True) == 2 | assert swh_storage.origin_count(".*user1.*", regexp=True) == 2 | ||||
assert swh_storage.origin_count(".*user1.*", regexp=False) == 0 | assert swh_storage.origin_count(".*user1.*", regexp=False) == 0 | ||||
def test_origin_count_with_visit_no_visits(self, swh_storage, sample_data): | def test_origin_count_with_visit_no_visits(self, swh_storage, sample_data): | ||||
swh_storage.origin_add(sample_data["origin"]) | swh_storage.origin_add(sample_data.origins) | ||||
# none of them have visits, so with_visit=True => 0 | # none of them have visits, so with_visit=True => 0 | ||||
assert swh_storage.origin_count("github", with_visit=True) == 0 | assert swh_storage.origin_count("github", with_visit=True) == 0 | ||||
assert swh_storage.origin_count("gitlab", with_visit=True) == 0 | assert swh_storage.origin_count("gitlab", with_visit=True) == 0 | ||||
assert swh_storage.origin_count(".*user.*", regexp=True, with_visit=True) == 0 | assert swh_storage.origin_count(".*user.*", regexp=True, with_visit=True) == 0 | ||||
assert swh_storage.origin_count(".*user.*", regexp=False, with_visit=True) == 0 | assert swh_storage.origin_count(".*user.*", regexp=False, with_visit=True) == 0 | ||||
assert swh_storage.origin_count(".*user1.*", regexp=True, with_visit=True) == 0 | assert swh_storage.origin_count(".*user1.*", regexp=True, with_visit=True) == 0 | ||||
assert swh_storage.origin_count(".*user1.*", regexp=False, with_visit=True) == 0 | assert swh_storage.origin_count(".*user1.*", regexp=False, with_visit=True) == 0 | ||||
def test_origin_count_with_visit_with_visits_no_snapshot( | def test_origin_count_with_visit_with_visits_no_snapshot( | ||||
self, swh_storage, sample_data | self, swh_storage, sample_data | ||||
): | ): | ||||
swh_storage.origin_add(sample_data["origin"]) | swh_storage.origin_add(sample_data.origins) | ||||
origin_url = "https://github.com/user1/repo1" | origin_url = "https://github.com/user1/repo1" | ||||
visit = OriginVisit(origin=origin_url, date=now(), type="git",) | visit = OriginVisit(origin=origin_url, date=now(), type="git",) | ||||
swh_storage.origin_visit_add([visit]) | swh_storage.origin_visit_add([visit]) | ||||
assert swh_storage.origin_count("github", with_visit=False) == 3 | assert swh_storage.origin_count("github", with_visit=False) == 3 | ||||
# it has a visit, but no snapshot, so with_visit=True => 0 | # it has a visit, but no snapshot, so with_visit=True => 0 | ||||
assert swh_storage.origin_count("github", with_visit=True) == 0 | assert swh_storage.origin_count("github", with_visit=True) == 0 | ||||
Show All 9 Lines | ): | ||||
assert ( | assert ( | ||||
swh_storage.origin_count("github.*user1", regexp=True, with_visit=True) == 0 | swh_storage.origin_count("github.*user1", regexp=True, with_visit=True) == 0 | ||||
) | ) | ||||
assert swh_storage.origin_count("github", regexp=True, with_visit=True) == 0 | assert swh_storage.origin_count("github", regexp=True, with_visit=True) == 0 | ||||
def test_origin_count_with_visit_with_visits_and_snapshot( | def test_origin_count_with_visit_with_visits_and_snapshot( | ||||
self, swh_storage, sample_data | self, swh_storage, sample_data | ||||
): | ): | ||||
snapshot = sample_data["snapshot"][0] | snapshot = sample_data.snapshot | ||||
swh_storage.origin_add(sample_data["origin"]) | swh_storage.origin_add(sample_data.origins) | ||||
swh_storage.snapshot_add([snapshot]) | swh_storage.snapshot_add([snapshot]) | ||||
origin_url = "https://github.com/user1/repo1" | origin_url = "https://github.com/user1/repo1" | ||||
visit = OriginVisit(origin=origin_url, date=now(), type="git",) | visit = OriginVisit(origin=origin_url, date=now(), type="git",) | ||||
visit = swh_storage.origin_visit_add([visit])[0] | visit = swh_storage.origin_visit_add([visit])[0] | ||||
swh_storage.origin_visit_status_add( | swh_storage.origin_visit_status_add( | ||||
[ | [ | ||||
OriginVisitStatus( | OriginVisitStatus( | ||||
Show All 37 Lines | |||||
@pytest.mark.db | @pytest.mark.db | ||||
class TestLocalStorage: | class TestLocalStorage: | ||||
"""Test the local storage""" | """Test the local storage""" | ||||
# This test is only relevant on the local storage, with an actual | # This test is only relevant on the local storage, with an actual | ||||
# objstorage raising an exception | # objstorage raising an exception | ||||
def test_content_add_objstorage_exception(self, swh_storage, sample_data): | def test_content_add_objstorage_exception(self, swh_storage, sample_data): | ||||
content = sample_data["content"][0] | content = sample_data.content | ||||
swh_storage.objstorage.content_add = Mock( | swh_storage.objstorage.content_add = Mock( | ||||
side_effect=Exception("mocked broken objstorage") | side_effect=Exception("mocked broken objstorage") | ||||
) | ) | ||||
with pytest.raises(Exception, match="mocked broken"): | with pytest.raises(Exception, match="mocked broken"): | ||||
swh_storage.content_add([content]) | swh_storage.content_add([content]) | ||||
missing = list(swh_storage.content_missing([content.hashes()])) | missing = list(swh_storage.content_missing([content.hashes()])) | ||||
assert missing == [content.sha1] | assert missing == [content.sha1] | ||||
@pytest.mark.db | @pytest.mark.db | ||||
class TestStorageRaceConditions: | class TestStorageRaceConditions: | ||||
@pytest.mark.xfail | @pytest.mark.xfail | ||||
def test_content_add_race(self, swh_storage, sample_data): | def test_content_add_race(self, swh_storage, sample_data): | ||||
content = sample_data["content"][0] | content = sample_data.content | ||||
results = queue.Queue() | results = queue.Queue() | ||||
def thread(): | def thread(): | ||||
try: | try: | ||||
with db_transaction(swh_storage) as (db, cur): | with db_transaction(swh_storage) as (db, cur): | ||||
ret = swh_storage.content_add([content], db=db, cur=cur) | ret = swh_storage.content_add([content], db=db, cur=cur) | ||||
results.put((threading.get_ident(), "data", ret)) | results.put((threading.get_ident(), "data", ret)) | ||||
Show All 25 Lines | class TestPgStorage: | ||||
"""This class is dedicated for the rare case where the schema needs to | """This class is dedicated for the rare case where the schema needs to | ||||
be altered dynamically. | be altered dynamically. | ||||
Otherwise, the tests could be blocking when ran altogether. | Otherwise, the tests could be blocking when ran altogether. | ||||
""" | """ | ||||
def test_content_update_with_new_cols(self, swh_storage, sample_data): | def test_content_update_with_new_cols(self, swh_storage, sample_data): | ||||
content, content2 = sample_data["content"][:2] | content, content2 = sample_data.contents[:2] | ||||
swh_storage.journal_writer.journal = None # TODO, not supported | swh_storage.journal_writer.journal = None # TODO, not supported | ||||
with db_transaction(swh_storage) as (_, cur): | with db_transaction(swh_storage) as (_, cur): | ||||
cur.execute( | cur.execute( | ||||
"""alter table content | """alter table content | ||||
add column test text default null, | add column test text default null, | ||||
add column test2 text default null""" | add column test2 text default null""" | ||||
Show All 28 Lines | def test_content_update_with_new_cols(self, swh_storage, sample_data): | ||||
with db_transaction(swh_storage) as (_, cur): | with db_transaction(swh_storage) as (_, cur): | ||||
cur.execute( | cur.execute( | ||||
"""alter table content drop column test, | """alter table content drop column test, | ||||
drop column test2""" | drop column test2""" | ||||
) | ) | ||||
def test_content_add_db(self, swh_storage, sample_data): | def test_content_add_db(self, swh_storage, sample_data): | ||||
content = sample_data["content"][0] | content = sample_data.content | ||||
actual_result = swh_storage.content_add([content]) | actual_result = swh_storage.content_add([content]) | ||||
assert actual_result == { | assert actual_result == { | ||||
"content:add": 1, | "content:add": 1, | ||||
"content:add:bytes": content.length, | "content:add:bytes": content.length, | ||||
} | } | ||||
Show All 20 Lines | def test_content_add_db(self, swh_storage, sample_data): | ||||
obj | obj | ||||
for (obj_type, obj) in swh_storage.journal_writer.journal.objects | for (obj_type, obj) in swh_storage.journal_writer.journal.objects | ||||
if obj_type == "content" | if obj_type == "content" | ||||
] | ] | ||||
assert len(contents) == 1 | assert len(contents) == 1 | ||||
assert contents[0] == attr.evolve(content, data=None) | assert contents[0] == attr.evolve(content, data=None) | ||||
def test_content_add_metadata_db(self, swh_storage, sample_data): | def test_content_add_metadata_db(self, swh_storage, sample_data): | ||||
content = attr.evolve(sample_data["content"][0], data=None, ctime=now()) | content = attr.evolve(sample_data.content, data=None, ctime=now()) | ||||
actual_result = swh_storage.content_add_metadata([content]) | actual_result = swh_storage.content_add_metadata([content]) | ||||
assert actual_result == { | assert actual_result == { | ||||
"content:add": 1, | "content:add": 1, | ||||
} | } | ||||
if hasattr(swh_storage, "objstorage"): | if hasattr(swh_storage, "objstorage"): | ||||
Show All 17 Lines | def test_content_add_metadata_db(self, swh_storage, sample_data): | ||||
obj | obj | ||||
for (obj_type, obj) in swh_storage.journal_writer.journal.objects | for (obj_type, obj) in swh_storage.journal_writer.journal.objects | ||||
if obj_type == "content" | if obj_type == "content" | ||||
] | ] | ||||
assert len(contents) == 1 | assert len(contents) == 1 | ||||
assert contents[0] == content | assert contents[0] == content | ||||
def test_skipped_content_add_db(self, swh_storage, sample_data): | def test_skipped_content_add_db(self, swh_storage, sample_data): | ||||
content, cont2 = sample_data["skipped_content"][:2] | content, cont2 = sample_data.skipped_contents[:2] | ||||
content2 = attr.evolve(cont2, blake2s256=None) | content2 = attr.evolve(cont2, blake2s256=None) | ||||
actual_result = swh_storage.skipped_content_add([content, content, content2]) | actual_result = swh_storage.skipped_content_add([content, content, content2]) | ||||
assert 2 <= actual_result.pop("skipped_content:add") <= 3 | assert 2 <= actual_result.pop("skipped_content:add") <= 3 | ||||
assert actual_result == {} | assert actual_result == {} | ||||
with db_transaction(swh_storage) as (_, cur): | with db_transaction(swh_storage) as (_, cur): | ||||
Show All 40 Lines |