Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_storage.py
Show All 20 Lines | |||||
import pytest | import pytest | ||||
from hypothesis import given, strategies, settings, HealthCheck | from hypothesis import given, strategies, settings, HealthCheck | ||||
from typing import ClassVar, Optional | from typing import ClassVar, Optional | ||||
from swh.model import from_disk, identifiers | from swh.model import from_disk, identifiers | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
from swh.model.model import Content, OriginVisit | from swh.model.model import ( | ||||
Content, | |||||
Directory, | |||||
Origin, | |||||
OriginVisit, | |||||
Release, | |||||
Revision, | |||||
Snapshot, | |||||
) | |||||
from swh.model.hypothesis_strategies import objects | from swh.model.hypothesis_strategies import objects | ||||
from swh.model.hashutil import hash_to_hex | from swh.model.hashutil import hash_to_hex | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.converters import origin_url_to_sha1 as sha1 | from swh.storage.converters import origin_url_to_sha1 as sha1 | ||||
from swh.storage.exc import HashCollision, StorageArgumentException | from swh.storage.exc import HashCollision, StorageArgumentException | ||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||
from swh.storage.utils import content_hex_hashes | from swh.storage.utils import content_hex_hashes | ||||
▲ Show 20 Lines • Show All 46 Lines • ▼ Show 20 Lines | ): | ||||
""" | """ | ||||
for k in keys_to_check: | for k in keys_to_check: | ||||
expected_list = set([c.get(k) for c in expected_contents]) | expected_list = set([c.get(k) for c in expected_contents]) | ||||
actual_list = set([c.get(k) for c in actual_contents]) | actual_list = set([c.get(k) for c in actual_contents]) | ||||
assert actual_list == expected_list, k | assert actual_list == expected_list, k | ||||
class LazyContent(Content): | |||||
def with_data(self): | |||||
return Content.from_dict({**self.to_dict(), "data": data.cont["data"]}) | |||||
class TestStorage: | class TestStorage: | ||||
"""Main class for Storage testing. | """Main class for Storage testing. | ||||
This class is used as-is to test local storage (see TestLocalStorage | This class is used as-is to test local storage (see TestLocalStorage | ||||
below) and remote storage (see TestRemoteStorage in | below) and remote storage (see TestRemoteStorage in | ||||
test_remote_storage.py. | test_remote_storage.py. | ||||
We need to have the two classes inherit from this base class | We need to have the two classes inherit from this base class | ||||
▲ Show 20 Lines • Show All 51 Lines • ▼ Show 20 Lines | def test_content_add(self, swh_storage): | ||||
} | } | ||||
assert list(swh_storage.content_get([cont["sha1"]])) == [ | assert list(swh_storage.content_get([cont["sha1"]])) == [ | ||||
{"sha1": cont["sha1"], "data": cont["data"]} | {"sha1": cont["sha1"], "data": cont["data"]} | ||||
] | ] | ||||
expected_cont = data.cont | expected_cont = data.cont | ||||
del expected_cont["data"] | del expected_cont["data"] | ||||
journal_objects = list(swh_storage.journal_writer.journal.objects) | contents = [ | ||||
for (obj_type, obj) in journal_objects: | obj | ||||
assert insertion_start_time <= obj["ctime"] | for (obj_type, obj) in swh_storage.journal_writer.journal.objects | ||||
assert obj["ctime"] <= insertion_end_time | if obj_type == "content" | ||||
del obj["ctime"] | ] | ||||
assert journal_objects == [("content", expected_cont)] | assert len(contents) == 1 | ||||
for obj in contents: | |||||
assert insertion_start_time <= obj.ctime | |||||
assert obj.ctime <= insertion_end_time | |||||
obj_d = obj.to_dict() | |||||
del obj_d["ctime"] | |||||
assert obj_d == expected_cont | |||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()["content"] == 1 | assert swh_storage.stat_counters()["content"] == 1 | ||||
def test_content_add_from_generator(self, swh_storage): | def test_content_add_from_generator(self, swh_storage): | ||||
def _cnt_gen(): | def _cnt_gen(): | ||||
yield data.cont | yield data.cont | ||||
actual_result = swh_storage.content_add(_cnt_gen()) | actual_result = swh_storage.content_add(_cnt_gen()) | ||||
assert actual_result == { | assert actual_result == { | ||||
"content:add": 1, | "content:add": 1, | ||||
"content:add:bytes": data.cont["length"], | "content:add:bytes": data.cont["length"], | ||||
} | } | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()["content"] == 1 | assert swh_storage.stat_counters()["content"] == 1 | ||||
def test_content_add_from_lazy_content(self, swh_storage): | def test_content_add_from_lazy_content(self, swh_storage): | ||||
called = False | |||||
cont = data.cont | |||||
class LazyContent(Content): | |||||
def with_data(self): | |||||
nonlocal called | |||||
called = True | |||||
return Content.from_dict({**self.to_dict(), "data": cont["data"]}) | |||||
lazy_content = LazyContent.from_dict({**cont, "data": b"nope",}) | lazy_content = LazyContent.from_dict({**data.cont, "data": b"nope",}) | ||||
insertion_start_time = datetime.datetime.now(tz=datetime.timezone.utc) | insertion_start_time = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
# bypass the validation proxy for now, to directly put a dict | # bypass the validation proxy for now, to directly put a dict | ||||
actual_result = swh_storage.storage.content_add([lazy_content]) | actual_result = swh_storage.storage.content_add([lazy_content]) | ||||
insertion_end_time = datetime.datetime.now(tz=datetime.timezone.utc) | insertion_end_time = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
assert actual_result == { | assert actual_result == { | ||||
"content:add": 1, | "content:add": 1, | ||||
"content:add:bytes": cont["length"], | "content:add:bytes": data.cont["length"], | ||||
} | } | ||||
assert called | # the fact that we retrieve the content object from the storage with | ||||
# the correct 'data' field ensures it has been 'called' | |||||
assert list(swh_storage.content_get([cont["sha1"]])) == [ | assert list(swh_storage.content_get([data.cont["sha1"]])) == [ | ||||
{"sha1": cont["sha1"], "data": cont["data"]} | {"sha1": data.cont["sha1"], "data": data.cont["data"]} | ||||
] | ] | ||||
expected_cont = data.cont | expected_cont = data.cont | ||||
del expected_cont["data"] | del expected_cont["data"] | ||||
journal_objects = list(swh_storage.journal_writer.journal.objects) | contents = [ | ||||
for (obj_type, obj) in journal_objects: | obj | ||||
assert insertion_start_time <= obj["ctime"] | for (obj_type, obj) in swh_storage.journal_writer.journal.objects | ||||
assert obj["ctime"] <= insertion_end_time | if obj_type == "content" | ||||
del obj["ctime"] | ] | ||||
assert journal_objects == [("content", expected_cont)] | assert len(contents) == 1 | ||||
for obj in contents: | |||||
assert insertion_start_time <= obj.ctime | |||||
assert obj.ctime <= insertion_end_time | |||||
obj_d = obj.to_dict() | |||||
del obj_d["ctime"] | |||||
assert obj_d == expected_cont | |||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()["content"] == 1 | assert swh_storage.stat_counters()["content"] == 1 | ||||
def test_content_add_validation(self, swh_storage): | def test_content_add_validation(self, swh_storage): | ||||
cont = data.cont | cont = data.cont | ||||
with pytest.raises(StorageArgumentException, match="status"): | with pytest.raises(StorageArgumentException, match="status"): | ||||
▲ Show 20 Lines • Show All 128 Lines • ▼ Show 20 Lines | def test_content_add_metadata(self, swh_storage): | ||||
"content:add": 1, | "content:add": 1, | ||||
} | } | ||||
expected_cont = cont.copy() | expected_cont = cont.copy() | ||||
del expected_cont["ctime"] | del expected_cont["ctime"] | ||||
assert swh_storage.content_get_metadata([cont["sha1"]]) == { | assert swh_storage.content_get_metadata([cont["sha1"]]) == { | ||||
cont["sha1"]: [expected_cont] | cont["sha1"]: [expected_cont] | ||||
} | } | ||||
contents = [ | |||||
assert list(swh_storage.journal_writer.journal.objects) == [("content", cont)] | obj | ||||
for (obj_type, obj) in swh_storage.journal_writer.journal.objects | |||||
if obj_type == "content" | |||||
] | |||||
assert len(contents) == 1 | |||||
for obj in contents: | |||||
obj_d = obj.to_dict() | |||||
del obj_d["ctime"] | |||||
assert obj_d == expected_cont | |||||
def test_content_add_metadata_different_input(self, swh_storage): | def test_content_add_metadata_different_input(self, swh_storage): | ||||
cont = data.cont | cont = data.cont | ||||
del cont["data"] | del cont["data"] | ||||
cont["ctime"] = datetime.datetime.now() | cont["ctime"] = datetime.datetime.now() | ||||
cont2 = data.cont2 | cont2 = data.cont2 | ||||
del cont2["data"] | del cont2["data"] | ||||
cont2["ctime"] = datetime.datetime.now() | cont2["ctime"] = datetime.datetime.now() | ||||
▲ Show 20 Lines • Show All 307 Lines • ▼ Show 20 Lines | class TestStorage: | ||||
def test_directory_add(self, swh_storage): | def test_directory_add(self, swh_storage): | ||||
init_missing = list(swh_storage.directory_missing([data.dir["id"]])) | init_missing = list(swh_storage.directory_missing([data.dir["id"]])) | ||||
assert [data.dir["id"]] == init_missing | assert [data.dir["id"]] == init_missing | ||||
actual_result = swh_storage.directory_add([data.dir]) | actual_result = swh_storage.directory_add([data.dir]) | ||||
assert actual_result == {"directory:add": 1} | assert actual_result == {"directory:add": 1} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("directory", data.dir) | ("directory", Directory.from_dict(data.dir)) | ||||
] | ] | ||||
actual_data = list(swh_storage.directory_ls(data.dir["id"])) | actual_data = list(swh_storage.directory_ls(data.dir["id"])) | ||||
expected_data = list(transform_entries(data.dir)) | expected_data = list(transform_entries(data.dir)) | ||||
assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | ||||
after_missing = list(swh_storage.directory_missing([data.dir["id"]])) | after_missing = list(swh_storage.directory_missing([data.dir["id"]])) | ||||
assert after_missing == [] | assert after_missing == [] | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()["directory"] == 1 | assert swh_storage.stat_counters()["directory"] == 1 | ||||
def test_directory_add_from_generator(self, swh_storage): | def test_directory_add_from_generator(self, swh_storage): | ||||
def _dir_gen(): | def _dir_gen(): | ||||
yield data.dir | yield data.dir | ||||
actual_result = swh_storage.directory_add(directories=_dir_gen()) | actual_result = swh_storage.directory_add(directories=_dir_gen()) | ||||
assert actual_result == {"directory:add": 1} | assert actual_result == {"directory:add": 1} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("directory", data.dir) | ("directory", Directory.from_dict(data.dir)) | ||||
] | ] | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()["directory"] == 1 | assert swh_storage.stat_counters()["directory"] == 1 | ||||
def test_directory_add_validation(self, swh_storage): | def test_directory_add_validation(self, swh_storage): | ||||
dir_ = copy.deepcopy(data.dir) | dir_ = copy.deepcopy(data.dir) | ||||
dir_["entries"][0]["type"] = "foobar" | dir_["entries"][0]["type"] = "foobar" | ||||
Show All 10 Lines | def test_directory_add_validation(self, swh_storage): | ||||
if type(cm.value) == psycopg2.IntegrityError: | if type(cm.value) == psycopg2.IntegrityError: | ||||
assert cm.value.pgcode == psycopg2.errorcodes.NOT_NULL_VIOLATION | assert cm.value.pgcode == psycopg2.errorcodes.NOT_NULL_VIOLATION | ||||
def test_directory_add_twice(self, swh_storage): | def test_directory_add_twice(self, swh_storage): | ||||
actual_result = swh_storage.directory_add([data.dir]) | actual_result = swh_storage.directory_add([data.dir]) | ||||
assert actual_result == {"directory:add": 1} | assert actual_result == {"directory:add": 1} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("directory", data.dir) | ("directory", Directory.from_dict(data.dir)) | ||||
] | ] | ||||
actual_result = swh_storage.directory_add([data.dir]) | actual_result = swh_storage.directory_add([data.dir]) | ||||
assert actual_result == {"directory:add": 0} | assert actual_result == {"directory:add": 0} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("directory", data.dir) | ("directory", Directory.from_dict(data.dir)) | ||||
] | ] | ||||
def test_directory_get_recursive(self, swh_storage): | def test_directory_get_recursive(self, swh_storage): | ||||
init_missing = list(swh_storage.directory_missing([data.dir["id"]])) | init_missing = list(swh_storage.directory_missing([data.dir["id"]])) | ||||
assert init_missing == [data.dir["id"]] | assert init_missing == [data.dir["id"]] | ||||
actual_result = swh_storage.directory_add([data.dir, data.dir2, data.dir3]) | actual_result = swh_storage.directory_add([data.dir, data.dir2, data.dir3]) | ||||
assert actual_result == {"directory:add": 3} | assert actual_result == {"directory:add": 3} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("directory", data.dir), | ("directory", Directory.from_dict(data.dir)), | ||||
("directory", data.dir2), | ("directory", Directory.from_dict(data.dir2)), | ||||
("directory", data.dir3), | ("directory", Directory.from_dict(data.dir3)), | ||||
] | ] | ||||
# List directory containing a file and an unknown subdirectory | # List directory containing a file and an unknown subdirectory | ||||
actual_data = list(swh_storage.directory_ls(data.dir["id"], recursive=True)) | actual_data = list(swh_storage.directory_ls(data.dir["id"], recursive=True)) | ||||
expected_data = list(transform_entries(data.dir)) | expected_data = list(transform_entries(data.dir)) | ||||
assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | ||||
# List directory containing a file and an unknown subdirectory | # List directory containing a file and an unknown subdirectory | ||||
Show All 15 Lines | class TestStorage: | ||||
def test_directory_get_non_recursive(self, swh_storage): | def test_directory_get_non_recursive(self, swh_storage): | ||||
init_missing = list(swh_storage.directory_missing([data.dir["id"]])) | init_missing = list(swh_storage.directory_missing([data.dir["id"]])) | ||||
assert init_missing == [data.dir["id"]] | assert init_missing == [data.dir["id"]] | ||||
actual_result = swh_storage.directory_add([data.dir, data.dir2, data.dir3]) | actual_result = swh_storage.directory_add([data.dir, data.dir2, data.dir3]) | ||||
assert actual_result == {"directory:add": 3} | assert actual_result == {"directory:add": 3} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("directory", data.dir), | ("directory", Directory.from_dict(data.dir)), | ||||
("directory", data.dir2), | ("directory", Directory.from_dict(data.dir2)), | ||||
("directory", data.dir3), | ("directory", Directory.from_dict(data.dir3)), | ||||
] | ] | ||||
# List directory containing a file and an unknown subdirectory | # List directory containing a file and an unknown subdirectory | ||||
actual_data = list(swh_storage.directory_ls(data.dir["id"])) | actual_data = list(swh_storage.directory_ls(data.dir["id"])) | ||||
expected_data = list(transform_entries(data.dir)) | expected_data = list(transform_entries(data.dir)) | ||||
assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | ||||
# List directory contaiining a single file | # List directory contaiining a single file | ||||
▲ Show 20 Lines • Show All 92 Lines • ▼ Show 20 Lines | def test_revision_add(self, swh_storage): | ||||
actual_result = swh_storage.revision_add([data.revision]) | actual_result = swh_storage.revision_add([data.revision]) | ||||
assert actual_result == {"revision:add": 1} | assert actual_result == {"revision:add": 1} | ||||
end_missing = swh_storage.revision_missing([data.revision["id"]]) | end_missing = swh_storage.revision_missing([data.revision["id"]]) | ||||
assert list(end_missing) == [] | assert list(end_missing) == [] | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("revision", data.revision) | ("revision", Revision.from_dict(data.revision)) | ||||
] | ] | ||||
# already there so nothing added | # already there so nothing added | ||||
actual_result = swh_storage.revision_add([data.revision]) | actual_result = swh_storage.revision_add([data.revision]) | ||||
assert actual_result == {"revision:add": 0} | assert actual_result == {"revision:add": 0} | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()["revision"] == 1 | assert swh_storage.stat_counters()["revision"] == 1 | ||||
Show All 36 Lines | def test_revision_add_validation(self, swh_storage): | ||||
if type(cm.value) == psycopg2.DataError: | if type(cm.value) == psycopg2.DataError: | ||||
assert cm.value.pgcode == psycopg2.errorcodes.INVALID_TEXT_REPRESENTATION | assert cm.value.pgcode == psycopg2.errorcodes.INVALID_TEXT_REPRESENTATION | ||||
def test_revision_add_twice(self, swh_storage): | def test_revision_add_twice(self, swh_storage): | ||||
actual_result = swh_storage.revision_add([data.revision]) | actual_result = swh_storage.revision_add([data.revision]) | ||||
assert actual_result == {"revision:add": 1} | assert actual_result == {"revision:add": 1} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("revision", data.revision) | ("revision", Revision.from_dict(data.revision)) | ||||
] | ] | ||||
actual_result = swh_storage.revision_add([data.revision, data.revision2]) | actual_result = swh_storage.revision_add([data.revision, data.revision2]) | ||||
assert actual_result == {"revision:add": 1} | assert actual_result == {"revision:add": 1} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("revision", data.revision), | ("revision", Revision.from_dict(data.revision)), | ||||
("revision", data.revision2), | ("revision", Revision.from_dict(data.revision2)), | ||||
] | ] | ||||
def test_revision_add_name_clash(self, swh_storage): | def test_revision_add_name_clash(self, swh_storage): | ||||
revision1 = data.revision | revision1 = data.revision | ||||
revision2 = data.revision2 | revision2 = data.revision2 | ||||
revision1["author"] = { | revision1["author"] = { | ||||
"fullname": b"John Doe <john.doe@example.com>", | "fullname": b"John Doe <john.doe@example.com>", | ||||
Show All 35 Lines | def test_revision_log(self, swh_storage): | ||||
if "id" in actual_result["committer"]: | if "id" in actual_result["committer"]: | ||||
del actual_result["committer"]["id"] | del actual_result["committer"]["id"] | ||||
assert len(actual_results) == 2 # rev4 -child-> rev3 | assert len(actual_results) == 2 # rev4 -child-> rev3 | ||||
assert actual_results[0] == normalize_entity(data.revision4) | assert actual_results[0] == normalize_entity(data.revision4) | ||||
assert actual_results[1] == normalize_entity(data.revision3) | assert actual_results[1] == normalize_entity(data.revision3) | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("revision", data.revision3), | ("revision", Revision.from_dict(data.revision3)), | ||||
("revision", data.revision4), | ("revision", Revision.from_dict(data.revision4)), | ||||
] | ] | ||||
def test_revision_log_with_limit(self, swh_storage): | def test_revision_log_with_limit(self, swh_storage): | ||||
# given | # given | ||||
# data.revision4 -is-child-of-> data.revision3 | # data.revision4 -is-child-of-> data.revision3 | ||||
swh_storage.revision_add([data.revision3, data.revision4]) | swh_storage.revision_add([data.revision3, data.revision4]) | ||||
actual_results = list(swh_storage.revision_log([data.revision4["id"]], 1)) | actual_results = list(swh_storage.revision_log([data.revision4["id"]], 1)) | ||||
▲ Show 20 Lines • Show All 76 Lines • ▼ Show 20 Lines | def test_release_add(self, swh_storage): | ||||
assert actual_result == {"release:add": 2} | assert actual_result == {"release:add": 2} | ||||
end_missing = swh_storage.release_missing( | end_missing = swh_storage.release_missing( | ||||
[data.release["id"], data.release2["id"]] | [data.release["id"], data.release2["id"]] | ||||
) | ) | ||||
assert list(end_missing) == [] | assert list(end_missing) == [] | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("release", data.release), | ("release", Release.from_dict(data.release)), | ||||
("release", data.release2), | ("release", Release.from_dict(data.release2)), | ||||
] | ] | ||||
# already present so nothing added | # already present so nothing added | ||||
actual_result = swh_storage.release_add([data.release, data.release2]) | actual_result = swh_storage.release_add([data.release, data.release2]) | ||||
assert actual_result == {"release:add": 0} | assert actual_result == {"release:add": 0} | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()["release"] == 2 | assert swh_storage.stat_counters()["release"] == 2 | ||||
def test_release_add_from_generator(self, swh_storage): | def test_release_add_from_generator(self, swh_storage): | ||||
def _rel_gen(): | def _rel_gen(): | ||||
yield data.release | yield data.release | ||||
yield data.release2 | yield data.release2 | ||||
actual_result = swh_storage.release_add(_rel_gen()) | actual_result = swh_storage.release_add(_rel_gen()) | ||||
assert actual_result == {"release:add": 2} | assert actual_result == {"release:add": 2} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("release", data.release), | ("release", Release.from_dict(data.release)), | ||||
("release", data.release2), | ("release", Release.from_dict(data.release2)), | ||||
] | ] | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()["release"] == 2 | assert swh_storage.stat_counters()["release"] == 2 | ||||
def test_release_add_no_author_date(self, swh_storage): | def test_release_add_no_author_date(self, swh_storage): | ||||
release = data.release | release = data.release | ||||
release["author"] = None | release["author"] = None | ||||
release["date"] = None | release["date"] = None | ||||
actual_result = swh_storage.release_add([release]) | actual_result = swh_storage.release_add([release]) | ||||
assert actual_result == {"release:add": 1} | assert actual_result == {"release:add": 1} | ||||
end_missing = swh_storage.release_missing([data.release["id"]]) | end_missing = swh_storage.release_missing([data.release["id"]]) | ||||
assert list(end_missing) == [] | assert list(end_missing) == [] | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("release", release) | ("release", Release.from_dict(release)) | ||||
] | ] | ||||
def test_release_add_validation(self, swh_storage): | def test_release_add_validation(self, swh_storage): | ||||
rel = copy.deepcopy(data.release) | rel = copy.deepcopy(data.release) | ||||
rel["date"]["offset"] = 2 ** 16 | rel["date"]["offset"] = 2 ** 16 | ||||
with pytest.raises(StorageArgumentException, match="offset") as cm: | with pytest.raises(StorageArgumentException, match="offset") as cm: | ||||
swh_storage.release_add([rel]) | swh_storage.release_add([rel]) | ||||
Show All 17 Lines | def test_release_add_validation_type(self, swh_storage): | ||||
with pytest.raises(StorageArgumentException): | with pytest.raises(StorageArgumentException): | ||||
swh_storage.release_add([rel]) | swh_storage.release_add([rel]) | ||||
def test_release_add_twice(self, swh_storage): | def test_release_add_twice(self, swh_storage): | ||||
actual_result = swh_storage.release_add([data.release]) | actual_result = swh_storage.release_add([data.release]) | ||||
assert actual_result == {"release:add": 1} | assert actual_result == {"release:add": 1} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("release", data.release) | ("release", Release.from_dict(data.release)) | ||||
] | ] | ||||
actual_result = swh_storage.release_add([data.release, data.release2]) | actual_result = swh_storage.release_add([data.release, data.release2]) | ||||
assert actual_result == {"release:add": 1} | assert actual_result == {"release:add": 1} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("release", data.release), | ("release", Release.from_dict(data.release)), | ||||
("release", data.release2), | ("release", Release.from_dict(data.release2)), | ||||
] | ] | ||||
def test_release_add_name_clash(self, swh_storage): | def test_release_add_name_clash(self, swh_storage): | ||||
release1 = data.release.copy() | release1 = data.release.copy() | ||||
release2 = data.release2.copy() | release2 = data.release2.copy() | ||||
release1["author"] = { | release1["author"] = { | ||||
"fullname": b"John Doe <john.doe@example.com>", | "fullname": b"John Doe <john.doe@example.com>", | ||||
▲ Show 20 Lines • Show All 77 Lines • ▼ Show 20 Lines | def test_origin_add(self, swh_storage): | ||||
actual_origin2 = swh_storage.origin_get([{"url": data.origin2["url"],}])[0] | actual_origin2 = swh_storage.origin_get([{"url": data.origin2["url"],}])[0] | ||||
assert actual_origin2["url"] == origin2["url"] | assert actual_origin2["url"] == origin2["url"] | ||||
if "id" in actual_origin: | if "id" in actual_origin: | ||||
del actual_origin["id"] | del actual_origin["id"] | ||||
del actual_origin2["id"] | del actual_origin2["id"] | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("origin", actual_origin), | ("origin", Origin.from_dict(actual_origin)), | ||||
("origin", actual_origin2), | ("origin", Origin.from_dict(actual_origin2)), | ||||
] | ] | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()["origin"] == 2 | assert swh_storage.stat_counters()["origin"] == 2 | ||||
def test_origin_add_from_generator(self, swh_storage): | def test_origin_add_from_generator(self, swh_storage): | ||||
def _ori_gen(): | def _ori_gen(): | ||||
yield data.origin | yield data.origin | ||||
yield data.origin2 | yield data.origin2 | ||||
origin1, origin2 = swh_storage.origin_add(_ori_gen()) | origin1, origin2 = swh_storage.origin_add(_ori_gen()) | ||||
actual_origin = swh_storage.origin_get([{"url": data.origin["url"],}])[0] | actual_origin = swh_storage.origin_get([{"url": data.origin["url"],}])[0] | ||||
assert actual_origin["url"] == origin1["url"] | assert actual_origin["url"] == origin1["url"] | ||||
actual_origin2 = swh_storage.origin_get([{"url": data.origin2["url"],}])[0] | actual_origin2 = swh_storage.origin_get([{"url": data.origin2["url"],}])[0] | ||||
assert actual_origin2["url"] == origin2["url"] | assert actual_origin2["url"] == origin2["url"] | ||||
if "id" in actual_origin: | if "id" in actual_origin: | ||||
del actual_origin["id"] | del actual_origin["id"] | ||||
del actual_origin2["id"] | del actual_origin2["id"] | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("origin", actual_origin), | ("origin", Origin.from_dict(actual_origin)), | ||||
("origin", actual_origin2), | ("origin", Origin.from_dict(actual_origin2)), | ||||
] | ] | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()["origin"] == 2 | assert swh_storage.stat_counters()["origin"] == 2 | ||||
def test_origin_add_twice(self, swh_storage): | def test_origin_add_twice(self, swh_storage): | ||||
add1 = swh_storage.origin_add([data.origin, data.origin2]) | add1 = swh_storage.origin_add([data.origin, data.origin2]) | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("origin", data.origin), | ("origin", Origin.from_dict(data.origin)), | ||||
("origin", data.origin2), | ("origin", Origin.from_dict(data.origin2)), | ||||
] | ] | ||||
add2 = swh_storage.origin_add([data.origin, data.origin2]) | add2 = swh_storage.origin_add([data.origin, data.origin2]) | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("origin", data.origin), | ("origin", Origin.from_dict(data.origin)), | ||||
("origin", data.origin2), | ("origin", Origin.from_dict(data.origin2)), | ||||
] | ] | ||||
assert add1 == add2 | assert add1 == add2 | ||||
def test_origin_add_validation(self, swh_storage): | def test_origin_add_validation(self, swh_storage): | ||||
"""Incorrect formatted origin should fail the validation | """Incorrect formatted origin should fail the validation | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 243 Lines • ▼ Show 20 Lines | def test_origin_visit_add(self, swh_storage): | ||||
"snapshot": None, | "snapshot": None, | ||||
} | } | ||||
assert origin_visit == OriginVisit.from_dict(expected_origin_visit) | assert origin_visit == OriginVisit.from_dict(expected_origin_visit) | ||||
actual_origin_visits = list(swh_storage.origin_visit_get(origin_url)) | actual_origin_visits = list(swh_storage.origin_visit_get(origin_url)) | ||||
assert expected_origin_visit in actual_origin_visits | assert expected_origin_visit in actual_origin_visits | ||||
objects = list(swh_storage.journal_writer.journal.objects) | objects = list(swh_storage.journal_writer.journal.objects) | ||||
assert ("origin", data.origin2) in objects | assert ("origin", Origin.from_dict(data.origin2)) in objects | ||||
assert ("origin_visit", expected_origin_visit) in objects | assert ("origin_visit", OriginVisit.from_dict(expected_origin_visit)) in objects | ||||
def test_origin_visit_get__unknown_origin(self, swh_storage): | def test_origin_visit_get__unknown_origin(self, swh_storage): | ||||
assert [] == list(swh_storage.origin_visit_get("foo")) | assert [] == list(swh_storage.origin_visit_get("foo")) | ||||
def test_origin_visit_add_default_type(self, swh_storage): | def test_origin_visit_add_default_type(self, swh_storage): | ||||
# given | # given | ||||
origin_url = swh_storage.origin_add_one(data.origin2) | origin_url = swh_storage.origin_add_one(data.origin2) | ||||
Show All 40 Lines | def test_origin_visit_add_default_type(self, swh_storage): | ||||
"snapshot": None, | "snapshot": None, | ||||
}, | }, | ||||
] | ] | ||||
assert len(expected_visits) == len(actual_origin_visits) | assert len(expected_visits) == len(actual_origin_visits) | ||||
for visit in expected_visits: | for visit in expected_visits: | ||||
assert visit in actual_origin_visits | assert visit in actual_origin_visits | ||||
objects = list(swh_storage.journal_writer.journal.objects) | objects = list(swh_storage.journal_writer.journal.objects) | ||||
assert ("origin", data.origin2) in objects | assert ("origin", Origin.from_dict(data.origin2)) in objects | ||||
for visit in expected_visits: | for visit in expected_visits: | ||||
assert ("origin_visit", visit) in objects | assert ("origin_visit", OriginVisit.from_dict(visit)) in objects | ||||
def test_origin_visit_add_validation(self, swh_storage): | def test_origin_visit_add_validation(self, swh_storage): | ||||
origin_url = swh_storage.origin_add_one(data.origin2) | origin_url = swh_storage.origin_add_one(data.origin2) | ||||
with pytest.raises(StorageArgumentException) as cm: | with pytest.raises(StorageArgumentException) as cm: | ||||
swh_storage.origin_visit_add(origin_url, date=[b"foo"], type="git") | swh_storage.origin_visit_add(origin_url, date=[b"foo"], type="git") | ||||
if type(cm.value) == psycopg2.ProgrammingError: | if type(cm.value) == psycopg2.ProgrammingError: | ||||
assert cm.value.pgcode == psycopg2.errorcodes.UNDEFINED_FUNCTION | assert cm.value.pgcode == psycopg2.errorcodes.UNDEFINED_FUNCTION | ||||
▲ Show 20 Lines • Show All 144 Lines • ▼ Show 20 Lines | def test_origin_visit_update(self, swh_storage): | ||||
"date": date_visit2, | "date": date_visit2, | ||||
"visit": origin_visit3.visit, | "visit": origin_visit3.visit, | ||||
"type": data.type_visit3, | "type": data.type_visit3, | ||||
"status": "partial", | "status": "partial", | ||||
"metadata": None, | "metadata": None, | ||||
"snapshot": None, | "snapshot": None, | ||||
} | } | ||||
objects = list(swh_storage.journal_writer.journal.objects) | objects = list(swh_storage.journal_writer.journal.objects) | ||||
assert ("origin", data.origin) in objects | assert ("origin", Origin.from_dict(data.origin)) in objects | ||||
assert ("origin", data.origin2) in objects | assert ("origin", Origin.from_dict(data.origin2)) in objects | ||||
assert ("origin_visit", data1) in objects | assert ("origin_visit", OriginVisit.from_dict(data1)) in objects | ||||
assert ("origin_visit", data2) in objects | assert ("origin_visit", OriginVisit.from_dict(data2)) in objects | ||||
assert ("origin_visit", data3) in objects | assert ("origin_visit", OriginVisit.from_dict(data3)) in objects | ||||
assert ("origin_visit", data4) in objects | assert ("origin_visit", OriginVisit.from_dict(data4)) in objects | ||||
assert ("origin_visit", data5) in objects | assert ("origin_visit", OriginVisit.from_dict(data5)) in objects | ||||
def test_origin_visit_update_validation(self, swh_storage): | def test_origin_visit_update_validation(self, swh_storage): | ||||
origin_url = swh_storage.origin_add_one(data.origin) | origin_url = swh_storage.origin_add_one(data.origin) | ||||
visit = swh_storage.origin_visit_add( | visit = swh_storage.origin_visit_add( | ||||
origin_url, date=data.date_visit2, type=data.type_visit2 | origin_url, date=data.date_visit2, type=data.type_visit2 | ||||
) | ) | ||||
with pytest.raises(StorageArgumentException, match="status") as cm: | with pytest.raises( | ||||
(StorageArgumentException, ValueError), match="status" | |||||
) as cm: | |||||
swh_storage.origin_visit_update(origin_url, visit.visit, status="foobar") | swh_storage.origin_visit_update(origin_url, visit.visit, status="foobar") | ||||
if type(cm.value) == psycopg2.DataError: | if type(cm.value) == psycopg2.DataError: | ||||
assert cm.value.pgcode == psycopg2.errorcodes.INVALID_TEXT_REPRESENTATION | assert cm.value.pgcode == psycopg2.errorcodes.INVALID_TEXT_REPRESENTATION | ||||
def test_origin_visit_find_by_date(self, swh_storage): | def test_origin_visit_find_by_date(self, swh_storage): | ||||
# given | # given | ||||
origin_url = swh_storage.origin_add_one(data.origin) | origin_url = swh_storage.origin_add_one(data.origin) | ||||
▲ Show 20 Lines • Show All 170 Lines • ▼ Show 20 Lines | def test_origin_visit_upsert_new(self, swh_storage): | ||||
"date": data.date_visit3, | "date": data.date_visit3, | ||||
"visit": 1234, | "visit": 1234, | ||||
"type": data.type_visit2, | "type": data.type_visit2, | ||||
"status": "full", | "status": "full", | ||||
"metadata": None, | "metadata": None, | ||||
"snapshot": None, | "snapshot": None, | ||||
} | } | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("origin", data.origin2), | ("origin", Origin.from_dict(data.origin2)), | ||||
("origin_visit", data1), | ("origin_visit", OriginVisit.from_dict(data1)), | ||||
("origin_visit", data2), | ("origin_visit", OriginVisit.from_dict(data2)), | ||||
] | ] | ||||
def test_origin_visit_upsert_existing(self, swh_storage): | def test_origin_visit_upsert_existing(self, swh_storage): | ||||
# given | # given | ||||
origin_url = swh_storage.origin_add_one(data.origin2) | origin_url = swh_storage.origin_add_one(data.origin2) | ||||
# when | # when | ||||
origin_visit1 = swh_storage.origin_visit_add( | origin_visit1 = swh_storage.origin_visit_add( | ||||
▲ Show 20 Lines • Show All 47 Lines • ▼ Show 20 Lines | def test_origin_visit_upsert_existing(self, swh_storage): | ||||
"date": data.date_visit2, | "date": data.date_visit2, | ||||
"visit": origin_visit1.visit, | "visit": origin_visit1.visit, | ||||
"type": data.type_visit1, | "type": data.type_visit1, | ||||
"status": "full", | "status": "full", | ||||
"metadata": None, | "metadata": None, | ||||
"snapshot": None, | "snapshot": None, | ||||
} | } | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("origin", data.origin2), | ("origin", Origin.from_dict(data.origin2)), | ||||
("origin_visit", data1), | ("origin_visit", OriginVisit.from_dict(data1)), | ||||
("origin_visit", data2), | ("origin_visit", OriginVisit.from_dict(data2)), | ||||
] | ] | ||||
def test_origin_visit_upsert_missing_visit_id(self, swh_storage): | def test_origin_visit_upsert_missing_visit_id(self, swh_storage): | ||||
# given | # given | ||||
origin_url = swh_storage.origin_add_one(data.origin2) | origin_url = swh_storage.origin_add_one(data.origin2) | ||||
# then | # then | ||||
with pytest.raises(StorageArgumentException, match="Missing visit id"): | with pytest.raises(StorageArgumentException, match="Missing visit id"): | ||||
Show All 9 Lines | def test_origin_visit_upsert_missing_visit_id(self, swh_storage): | ||||
"metadata": None, | "metadata": None, | ||||
"snapshot": None, | "snapshot": None, | ||||
} | } | ||||
) | ) | ||||
] | ] | ||||
) | ) | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("origin", data.origin2) | ("origin", Origin.from_dict(data.origin2)) | ||||
] | ] | ||||
def test_origin_visit_get_by_no_result(self, swh_storage): | def test_origin_visit_get_by_no_result(self, swh_storage): | ||||
swh_storage.origin_add([data.origin]) | swh_storage.origin_add([data.origin]) | ||||
actual_origin_visit = swh_storage.origin_visit_get_by(data.origin["url"], 999) | actual_origin_visit = swh_storage.origin_visit_get_by(data.origin["url"], 999) | ||||
assert actual_origin_visit is None | assert actual_origin_visit is None | ||||
def test_origin_visit_get_latest(self, swh_storage): | def test_origin_visit_get_latest(self, swh_storage): | ||||
▲ Show 20 Lines • Show All 158 Lines • ▼ Show 20 Lines | def test_snapshot_add_get_empty(self, swh_storage): | ||||
"date": data.date_visit1, | "date": data.date_visit1, | ||||
"visit": origin_visit1.visit, | "visit": origin_visit1.visit, | ||||
"type": data.type_visit1, | "type": data.type_visit1, | ||||
"status": "ongoing", | "status": "ongoing", | ||||
"metadata": None, | "metadata": None, | ||||
"snapshot": data.empty_snapshot["id"], | "snapshot": data.empty_snapshot["id"], | ||||
} | } | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("origin", data.origin), | ("origin", Origin.from_dict(data.origin)), | ||||
("origin_visit", data1), | ("origin_visit", OriginVisit.from_dict(data1)), | ||||
("snapshot", data.empty_snapshot), | ("snapshot", Snapshot.from_dict(data.empty_snapshot)), | ||||
("origin_visit", data2), | ("origin_visit", OriginVisit.from_dict(data2)), | ||||
] | ] | ||||
def test_snapshot_add_get_complete(self, swh_storage): | def test_snapshot_add_get_complete(self, swh_storage): | ||||
origin_url = data.origin["url"] | origin_url = data.origin["url"] | ||||
origin_url = swh_storage.origin_add_one(data.origin) | origin_url = swh_storage.origin_add_one(data.origin) | ||||
origin_visit1 = swh_storage.origin_visit_add( | origin_visit1 = swh_storage.origin_visit_add( | ||||
origin_url, date=data.date_visit1, type=data.type_visit1 | origin_url, date=data.date_visit1, type=data.type_visit1 | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 61 Lines • ▼ Show 20 Lines | def test_snapshot_add_many_incremental(self, swh_storage): | ||||
data.snapshot["id"] | data.snapshot["id"] | ||||
) | ) | ||||
def test_snapshot_add_twice(self, swh_storage): | def test_snapshot_add_twice(self, swh_storage): | ||||
actual_result = swh_storage.snapshot_add([data.empty_snapshot]) | actual_result = swh_storage.snapshot_add([data.empty_snapshot]) | ||||
assert actual_result == {"snapshot:add": 1} | assert actual_result == {"snapshot:add": 1} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("snapshot", data.empty_snapshot) | ("snapshot", Snapshot.from_dict(data.empty_snapshot)) | ||||
] | ] | ||||
actual_result = swh_storage.snapshot_add([data.snapshot]) | actual_result = swh_storage.snapshot_add([data.snapshot]) | ||||
assert actual_result == {"snapshot:add": 1} | assert actual_result == {"snapshot:add": 1} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("snapshot", data.empty_snapshot), | ("snapshot", Snapshot.from_dict(data.empty_snapshot)), | ||||
("snapshot", data.snapshot), | ("snapshot", Snapshot.from_dict(data.snapshot)), | ||||
] | ] | ||||
def test_snapshot_add_validation(self, swh_storage): | def test_snapshot_add_validation(self, swh_storage): | ||||
snap = copy.deepcopy(data.snapshot) | snap = copy.deepcopy(data.snapshot) | ||||
snap["branches"][b"foo"] = {"target_type": "revision"} | snap["branches"][b"foo"] = {"target_type": "revision"} | ||||
with pytest.raises(StorageArgumentException, match="target"): | with pytest.raises(StorageArgumentException, match="target"): | ||||
swh_storage.snapshot_add([snap]) | swh_storage.snapshot_add([snap]) | ||||
▲ Show 20 Lines • Show All 215 Lines • ▼ Show 20 Lines | def test_snapshot_add_nonexistent_visit(self, swh_storage): | ||||
swh_storage.snapshot_add([data.snapshot]) | swh_storage.snapshot_add([data.snapshot]) | ||||
with pytest.raises(StorageArgumentException): | with pytest.raises(StorageArgumentException): | ||||
swh_storage.origin_visit_update( | swh_storage.origin_visit_update( | ||||
origin_url, visit_id, status="ongoing", snapshot=data.snapshot["id"] | origin_url, visit_id, status="ongoing", snapshot=data.snapshot["id"] | ||||
) | ) | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("snapshot", data.snapshot) | ("snapshot", Snapshot.from_dict(data.snapshot)) | ||||
] | ] | ||||
def test_snapshot_add_twice__by_origin_visit(self, swh_storage): | def test_snapshot_add_twice__by_origin_visit(self, swh_storage): | ||||
origin_url = swh_storage.origin_add_one(data.origin) | origin_url = swh_storage.origin_add_one(data.origin) | ||||
origin_visit1 = swh_storage.origin_visit_add( | origin_visit1 = swh_storage.origin_visit_add( | ||||
origin_url, date=data.date_visit1, type=data.type_visit1 | origin_url, date=data.date_visit1, type=data.type_visit1 | ||||
) | ) | ||||
visit1_id = origin_visit1.visit | visit1_id = origin_visit1.visit | ||||
▲ Show 20 Lines • Show All 56 Lines • ▼ Show 20 Lines | def test_snapshot_add_twice__by_origin_visit(self, swh_storage): | ||||
"date": data.date_visit2, | "date": data.date_visit2, | ||||
"visit": origin_visit2.visit, | "visit": origin_visit2.visit, | ||||
"type": data.type_visit2, | "type": data.type_visit2, | ||||
"status": "ongoing", | "status": "ongoing", | ||||
"metadata": None, | "metadata": None, | ||||
"snapshot": data.snapshot["id"], | "snapshot": data.snapshot["id"], | ||||
} | } | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("origin", data.origin), | ("origin", Origin.from_dict(data.origin)), | ||||
("origin_visit", data1), | ("origin_visit", OriginVisit.from_dict(data1)), | ||||
("snapshot", data.snapshot), | ("snapshot", Snapshot.from_dict(data.snapshot)), | ||||
("origin_visit", data2), | ("origin_visit", OriginVisit.from_dict(data2)), | ||||
("origin_visit", data3), | ("origin_visit", OriginVisit.from_dict(data3)), | ||||
("origin_visit", data4), | ("origin_visit", OriginVisit.from_dict(data4)), | ||||
] | ] | ||||
def test_snapshot_get_latest(self, swh_storage): | def test_snapshot_get_latest(self, swh_storage): | ||||
origin_url = swh_storage.origin_add_one(data.origin) | origin_url = swh_storage.origin_add_one(data.origin) | ||||
origin_visit1 = swh_storage.origin_visit_add( | origin_visit1 = swh_storage.origin_visit_add( | ||||
origin_url, date=data.date_visit1, type=data.type_visit1 | origin_url, date=data.date_visit1, type=data.type_visit1 | ||||
) | ) | ||||
origin_visit2 = swh_storage.origin_visit_add( | origin_visit2 = swh_storage.origin_visit_add( | ||||
▲ Show 20 Lines • Show All 1,225 Lines • ▼ Show 20 Lines | def test_content_add_db(self, swh_storage): | ||||
cont["sha1_git"], | cont["sha1_git"], | ||||
cont["sha256"], | cont["sha256"], | ||||
cont["length"], | cont["length"], | ||||
"visible", | "visible", | ||||
) | ) | ||||
expected_cont = cont.copy() | expected_cont = cont.copy() | ||||
del expected_cont["data"] | del expected_cont["data"] | ||||
journal_objects = list(swh_storage.journal_writer.journal.objects) | contents = [ | ||||
for (obj_type, obj) in journal_objects: | obj | ||||
del obj["ctime"] | for (obj_type, obj) in swh_storage.journal_writer.journal.objects | ||||
assert journal_objects == [("content", expected_cont)] | if obj_type == "content" | ||||
] | |||||
assert len(contents) == 1 | |||||
for obj in contents: | |||||
obj_d = obj.to_dict() | |||||
del obj_d["ctime"] | |||||
assert obj_d == expected_cont | |||||
def test_content_add_metadata_db(self, swh_storage): | def test_content_add_metadata_db(self, swh_storage): | ||||
cont = data.cont | cont = data.cont | ||||
del cont["data"] | del cont["data"] | ||||
cont["ctime"] = datetime.datetime.now() | cont["ctime"] = datetime.datetime.now() | ||||
actual_result = swh_storage.content_add_metadata([cont]) | actual_result = swh_storage.content_add_metadata([cont]) | ||||
Show All 13 Lines | def test_content_add_metadata_db(self, swh_storage): | ||||
assert datum == ( | assert datum == ( | ||||
cont["sha1"], | cont["sha1"], | ||||
cont["sha1_git"], | cont["sha1_git"], | ||||
cont["sha256"], | cont["sha256"], | ||||
cont["length"], | cont["length"], | ||||
"visible", | "visible", | ||||
) | ) | ||||
assert list(swh_storage.journal_writer.journal.objects) == [("content", cont)] | contents = [ | ||||
obj | |||||
for (obj_type, obj) in swh_storage.journal_writer.journal.objects | |||||
if obj_type == "content" | |||||
] | |||||
assert len(contents) == 1 | |||||
for obj in contents: | |||||
obj_d = obj.to_dict() | |||||
assert obj_d == cont | |||||
def test_skipped_content_add_db(self, swh_storage): | def test_skipped_content_add_db(self, swh_storage): | ||||
cont = data.skipped_cont | cont = data.skipped_cont | ||||
cont2 = data.skipped_cont2 | cont2 = data.skipped_cont2 | ||||
cont2["blake2s256"] = None | cont2["blake2s256"] = None | ||||
actual_result = swh_storage.skipped_content_add([cont, cont, cont2]) | actual_result = swh_storage.skipped_content_add([cont, cont, cont2]) | ||||
▲ Show 20 Lines • Show All 44 Lines • Show Last 20 Lines |