Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_storage.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import copy | |||||
import datetime | import datetime | ||||
import inspect | import inspect | ||||
import itertools | import itertools | ||||
import math | import math | ||||
import queue | import queue | ||||
import random | import random | ||||
import threading | import threading | ||||
▲ Show 20 Lines • Show All 4,015 Lines • ▼ Show 20 Lines | ): | ||||
== 1 | == 1 | ||||
) | ) | ||||
assert ( | assert ( | ||||
swh_storage.origin_count("github.*user1", regexp=True, with_visit=True) == 1 | swh_storage.origin_count("github.*user1", regexp=True, with_visit=True) == 1 | ||||
) | ) | ||||
assert swh_storage.origin_count("github", regexp=True, with_visit=True) == 1 | assert swh_storage.origin_count("github", regexp=True, with_visit=True) == 1 | ||||
@settings(suppress_health_check=[HealthCheck.too_slow]) | @settings(suppress_health_check=[HealthCheck.too_slow]) | ||||
@given(strategies.lists(objects(), max_size=2)) | @given(strategies.lists(objects(split_content=True), max_size=2)) | ||||
def test_add_arbitrary(self, swh_storage, objects): | def test_add_arbitrary(self, swh_storage, objects): | ||||
for (obj_type, obj) in objects: | for (obj_type, obj) in objects: | ||||
obj = obj.to_dict() | if obj.object_type == "origin_visit": | ||||
if obj_type == "origin_visit": | swh_storage.origin_add([Origin(url=obj.origin)]) | ||||
origin_url = obj.pop("origin") | visit = OriginVisit(origin=obj.origin, date=obj.date, type=obj.type,) | ||||
swh_storage.origin_add([{"url": origin_url}]) | |||||
if "visit" in obj: | |||||
del obj["visit"] | |||||
visit = OriginVisit( | |||||
origin=origin_url, date=obj["date"], type=obj["type"], | |||||
) | |||||
swh_storage.origin_visit_add([visit]) | swh_storage.origin_visit_add([visit]) | ||||
else: | else: | ||||
if obj_type == "content" and obj["status"] == "absent": | |||||
obj_type = "skipped_content" | |||||
method = getattr(swh_storage, obj_type + "_add") | method = getattr(swh_storage, obj_type + "_add") | ||||
try: | try: | ||||
method([obj]) | method([obj]) | ||||
except HashCollision: | except HashCollision: | ||||
pass | pass | ||||
@pytest.mark.db | @pytest.mark.db | ||||
class TestLocalStorage: | class TestLocalStorage: | ||||
"""Test the local storage""" | """Test the local storage""" | ||||
# This test is only relevant on the local storage, with an actual | # This test is only relevant on the local storage, with an actual | ||||
# objstorage raising an exception | # objstorage raising an exception | ||||
def test_content_add_objstorage_exception(self, swh_storage): | def test_content_add_objstorage_exception(self, swh_storage, sample_data_model): | ||||
content = sample_data_model["content"][0] | |||||
swh_storage.objstorage.content_add = Mock( | swh_storage.objstorage.content_add = Mock( | ||||
side_effect=Exception("mocked broken objstorage") | side_effect=Exception("mocked broken objstorage") | ||||
) | ) | ||||
with pytest.raises(Exception) as e: | with pytest.raises(Exception, match="mocked broken"): | ||||
swh_storage.content_add([data.cont]) | swh_storage.content_add([content]) | ||||
assert e.value.args == ("mocked broken objstorage",) | missing = list(swh_storage.content_missing([content.hashes()])) | ||||
missing = list(swh_storage.content_missing([data.cont])) | assert missing == [content.sha1] | ||||
assert missing == [data.cont["sha1"]] | |||||
@pytest.mark.db | @pytest.mark.db | ||||
class TestStorageRaceConditions: | class TestStorageRaceConditions: | ||||
@pytest.mark.xfail | @pytest.mark.xfail | ||||
def test_content_add_race(self, swh_storage): | def test_content_add_race(self, swh_storage, sample_data_model): | ||||
content = sample_data_model["content"][0] | |||||
results = queue.Queue() | results = queue.Queue() | ||||
def thread(): | def thread(): | ||||
try: | try: | ||||
with db_transaction(swh_storage) as (db, cur): | with db_transaction(swh_storage) as (db, cur): | ||||
ret = swh_storage.content_add([data.cont], db=db, cur=cur) | ret = swh_storage.content_add([content], db=db, cur=cur) | ||||
results.put((threading.get_ident(), "data", ret)) | results.put((threading.get_ident(), "data", ret)) | ||||
except Exception as e: | except Exception as e: | ||||
results.put((threading.get_ident(), "exc", e)) | results.put((threading.get_ident(), "exc", e)) | ||||
t1 = threading.Thread(target=thread) | t1 = threading.Thread(target=thread) | ||||
t2 = threading.Thread(target=thread) | t2 = threading.Thread(target=thread) | ||||
t1.start() | t1.start() | ||||
# this avoids the race condition | # this avoids the race condition | ||||
Show All 17 Lines | |||||
class TestPgStorage: | class TestPgStorage: | ||||
"""This class is dedicated for the rare case where the schema needs to | """This class is dedicated for the rare case where the schema needs to | ||||
be altered dynamically. | be altered dynamically. | ||||
Otherwise, the tests could be blocking when ran altogether. | Otherwise, the tests could be blocking when ran altogether. | ||||
""" | """ | ||||
def test_content_update_with_new_cols(self, swh_storage): | def test_content_update_with_new_cols(self, swh_storage, sample_data_model): | ||||
content, content2 = sample_data_model["content"][:2] | |||||
swh_storage.journal_writer.journal = None # TODO, not supported | swh_storage.journal_writer.journal = None # TODO, not supported | ||||
with db_transaction(swh_storage) as (_, cur): | with db_transaction(swh_storage) as (_, cur): | ||||
cur.execute( | cur.execute( | ||||
"""alter table content | """alter table content | ||||
add column test text default null, | add column test text default null, | ||||
add column test2 text default null""" | add column test2 text default null""" | ||||
) | ) | ||||
cont = copy.deepcopy(data.cont2) | swh_storage.content_add([content]) | ||||
swh_storage.content_add([cont]) | |||||
cont = content.to_dict() | |||||
cont["test"] = "value-1" | cont["test"] = "value-1" | ||||
cont["test2"] = "value-2" | cont["test2"] = "value-2" | ||||
swh_storage.content_update([cont], keys=["test", "test2"]) | swh_storage.content_update([cont], keys=["test", "test2"]) | ||||
with db_transaction(swh_storage) as (_, cur): | with db_transaction(swh_storage) as (_, cur): | ||||
cur.execute( | cur.execute( | ||||
"""SELECT sha1, sha1_git, sha256, length, status, | """SELECT sha1, sha1_git, sha256, length, status, | ||||
test, test2 | test, test2 | ||||
Show All 14 Lines | def test_content_update_with_new_cols(self, swh_storage, sample_data_model): | ||||
) | ) | ||||
with db_transaction(swh_storage) as (_, cur): | with db_transaction(swh_storage) as (_, cur): | ||||
cur.execute( | cur.execute( | ||||
"""alter table content drop column test, | """alter table content drop column test, | ||||
drop column test2""" | drop column test2""" | ||||
) | ) | ||||
def test_content_add_db(self, swh_storage): | def test_content_add_db(self, swh_storage, sample_data_model): | ||||
cont = data.cont | content = sample_data_model["content"][0] | ||||
actual_result = swh_storage.content_add([cont]) | actual_result = swh_storage.content_add([content]) | ||||
assert actual_result == { | assert actual_result == { | ||||
"content:add": 1, | "content:add": 1, | ||||
"content:add:bytes": cont["length"], | "content:add:bytes": content.length, | ||||
} | } | ||||
if hasattr(swh_storage, "objstorage"): | if hasattr(swh_storage, "objstorage"): | ||||
assert cont["sha1"] in swh_storage.objstorage.objstorage | assert content.sha1 in swh_storage.objstorage.objstorage | ||||
with db_transaction(swh_storage) as (_, cur): | with db_transaction(swh_storage) as (_, cur): | ||||
cur.execute( | cur.execute( | ||||
"SELECT sha1, sha1_git, sha256, length, status" | "SELECT sha1, sha1_git, sha256, length, status" | ||||
" FROM content WHERE sha1 = %s", | " FROM content WHERE sha1 = %s", | ||||
(cont["sha1"],), | (content.sha1,), | ||||
) | ) | ||||
datum = cur.fetchone() | datum = cur.fetchone() | ||||
assert datum == ( | assert datum == ( | ||||
cont["sha1"], | content.sha1, | ||||
cont["sha1_git"], | content.sha1_git, | ||||
cont["sha256"], | content.sha256, | ||||
cont["length"], | content.length, | ||||
"visible", | "visible", | ||||
) | ) | ||||
expected_cont = cont.copy() | |||||
del expected_cont["data"] | |||||
contents = [ | contents = [ | ||||
obj | obj | ||||
for (obj_type, obj) in swh_storage.journal_writer.journal.objects | for (obj_type, obj) in swh_storage.journal_writer.journal.objects | ||||
if obj_type == "content" | if obj_type == "content" | ||||
] | ] | ||||
assert len(contents) == 1 | assert len(contents) == 1 | ||||
for obj in contents: | assert contents[0] == attr.evolve(content, data=None) | ||||
obj_d = obj.to_dict() | |||||
del obj_d["ctime"] | |||||
assert obj_d == expected_cont | |||||
def test_content_add_metadata_db(self, swh_storage): | |||||
cont = data.cont | |||||
del cont["data"] | |||||
cont["ctime"] = now() | |||||
actual_result = swh_storage.content_add_metadata([cont]) | def test_content_add_metadata_db(self, swh_storage, sample_data_model): | ||||
content = attr.evolve(sample_data_model["content"][0], data=None, ctime=now()) | |||||
actual_result = swh_storage.content_add_metadata([content]) | |||||
assert actual_result == { | assert actual_result == { | ||||
"content:add": 1, | "content:add": 1, | ||||
} | } | ||||
if hasattr(swh_storage, "objstorage"): | if hasattr(swh_storage, "objstorage"): | ||||
assert cont["sha1"] not in swh_storage.objstorage.objstorage | assert content.sha1 not in swh_storage.objstorage.objstorage | ||||
with db_transaction(swh_storage) as (_, cur): | with db_transaction(swh_storage) as (_, cur): | ||||
cur.execute( | cur.execute( | ||||
"SELECT sha1, sha1_git, sha256, length, status" | "SELECT sha1, sha1_git, sha256, length, status" | ||||
" FROM content WHERE sha1 = %s", | " FROM content WHERE sha1 = %s", | ||||
(cont["sha1"],), | (content.sha1,), | ||||
) | ) | ||||
datum = cur.fetchone() | datum = cur.fetchone() | ||||
assert datum == ( | assert datum == ( | ||||
cont["sha1"], | content.sha1, | ||||
cont["sha1_git"], | content.sha1_git, | ||||
cont["sha256"], | content.sha256, | ||||
cont["length"], | content.length, | ||||
"visible", | "visible", | ||||
) | ) | ||||
contents = [ | contents = [ | ||||
obj | obj | ||||
for (obj_type, obj) in swh_storage.journal_writer.journal.objects | for (obj_type, obj) in swh_storage.journal_writer.journal.objects | ||||
if obj_type == "content" | if obj_type == "content" | ||||
] | ] | ||||
assert len(contents) == 1 | assert len(contents) == 1 | ||||
for obj in contents: | assert contents[0] == content | ||||
obj_d = obj.to_dict() | |||||
assert obj_d == cont | |||||
def test_skipped_content_add_db(self, swh_storage): | def test_skipped_content_add_db(self, swh_storage, sample_data_model): | ||||
cont = data.skipped_cont | content, cont2 = sample_data_model["skipped_content"][:2] | ||||
cont2 = data.skipped_cont2 | content2 = attr.evolve(cont2, blake2s256=None) | ||||
cont2["blake2s256"] = None | |||||
actual_result = swh_storage.skipped_content_add([cont, cont, cont2]) | actual_result = swh_storage.skipped_content_add([content, content, content2]) | ||||
assert 2 <= actual_result.pop("skipped_content:add") <= 3 | assert 2 <= actual_result.pop("skipped_content:add") <= 3 | ||||
assert actual_result == {} | assert actual_result == {} | ||||
with db_transaction(swh_storage) as (_, cur): | with db_transaction(swh_storage) as (_, cur): | ||||
cur.execute( | cur.execute( | ||||
"SELECT sha1, sha1_git, sha256, blake2s256, " | "SELECT sha1, sha1_git, sha256, blake2s256, " | ||||
"length, status, reason " | "length, status, reason " | ||||
"FROM skipped_content ORDER BY sha1_git" | "FROM skipped_content ORDER BY sha1_git" | ||||
) | ) | ||||
dbdata = cur.fetchall() | dbdata = cur.fetchall() | ||||
assert len(dbdata) == 2 | assert len(dbdata) == 2 | ||||
assert dbdata[0] == ( | assert dbdata[0] == ( | ||||
cont["sha1"], | content.sha1, | ||||
cont["sha1_git"], | content.sha1_git, | ||||
cont["sha256"], | content.sha256, | ||||
cont["blake2s256"], | content.blake2s256, | ||||
cont["length"], | content.length, | ||||
"absent", | "absent", | ||||
"Content too long", | "Content too long", | ||||
) | ) | ||||
assert dbdata[1] == ( | assert dbdata[1] == ( | ||||
cont2["sha1"], | content2.sha1, | ||||
cont2["sha1_git"], | content2.sha1_git, | ||||
cont2["sha256"], | content2.sha256, | ||||
cont2["blake2s256"], | content2.blake2s256, | ||||
cont2["length"], | content2.length, | ||||
"absent", | "absent", | ||||
"Content too long", | "Content too long", | ||||
) | ) | ||||
def test_clear_buffers(self, swh_storage): | def test_clear_buffers(self, swh_storage): | ||||
"""Calling clear buffers on real storage does nothing | """Calling clear buffers on real storage does nothing | ||||
""" | """ | ||||
assert swh_storage.clear_buffers() is None | assert swh_storage.clear_buffers() is None | ||||
def test_flush(self, swh_storage): | def test_flush(self, swh_storage): | ||||
"""Calling clear buffers on real storage does nothing | """Calling clear buffers on real storage does nothing | ||||
""" | """ | ||||
assert swh_storage.flush() == {} | assert swh_storage.flush() == {} |