Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/storage_tests.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | |||||
import datetime | import datetime | ||||
from datetime import timedelta | |||||
import inspect | import inspect | ||||
import itertools | import itertools | ||||
import math | import math | ||||
import queue | |||||
import random | import random | ||||
import threading | |||||
from collections import defaultdict | |||||
from contextlib import contextmanager | |||||
from datetime import timedelta | |||||
from unittest.mock import Mock | |||||
import attr | import attr | ||||
import pytest | import pytest | ||||
from hypothesis import given, strategies, settings, HealthCheck | from hypothesis import given, strategies, settings, HealthCheck | ||||
from typing import Any, ClassVar, Dict, Iterator, Optional | from typing import Any, ClassVar, Dict, Iterator, Optional | ||||
Show All 16 Lines | |||||
from swh.model.hypothesis_strategies import objects | from swh.model.hypothesis_strategies import objects | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.common import origin_url_to_sha1 as sha1 | from swh.storage.common import origin_url_to_sha1 as sha1 | ||||
from swh.storage.exc import HashCollision, StorageArgumentException | from swh.storage.exc import HashCollision, StorageArgumentException | ||||
from swh.storage.interface import ListOrder, PagedResult, StorageInterface | from swh.storage.interface import ListOrder, PagedResult, StorageInterface | ||||
from swh.storage.utils import content_hex_hashes, now, round_to_milliseconds | from swh.storage.utils import content_hex_hashes, now, round_to_milliseconds | ||||
@contextmanager | |||||
def db_transaction(storage): | |||||
with storage.db() as db: | |||||
with db.transaction() as cur: | |||||
yield db, cur | |||||
def transform_entries( | def transform_entries( | ||||
storage: StorageInterface, dir_: Directory, *, prefix: bytes = b"" | storage: StorageInterface, dir_: Directory, *, prefix: bytes = b"" | ||||
) -> Iterator[Dict[str, Any]]: | ) -> Iterator[Dict[str, Any]]: | ||||
"""Iterate through a directory's entries, and yields the items 'directory_ls' is | """Iterate through a directory's entries, and yields the items 'directory_ls' is | ||||
expected to return; including content metadata for file entries.""" | expected to return; including content metadata for file entries.""" | ||||
for ent in dir_.entries: | for ent in dir_.entries: | ||||
if ent.type == "dir": | if ent.type == "dir": | ||||
Show All 33 Lines | ): | ||||
"""Assert that a given list of contents matches on a given set of keys. | """Assert that a given list of contents matches on a given set of keys. | ||||
""" | """ | ||||
for k in keys_to_check: | for k in keys_to_check: | ||||
expected_list = set([c.get(k) for c in expected_contents]) | expected_list = set([c.get(k) for c in expected_contents]) | ||||
actual_list = set([c.get(k) for c in actual_contents]) | actual_list = set([c.get(k) for c in actual_contents]) | ||||
assert actual_list == expected_list, k | assert actual_list == expected_list, k | ||||
class LazyContent(Content): | class LazyContent(Content): | ||||
def with_data(self): | def with_data(self): | ||||
return Content.from_dict({**self.to_dict(), "data": b"42\n"}) | return Content.from_dict({**self.to_dict(), "data": b"42\n"}) | ||||
class TestStorage: | class TestStorage: | ||||
"""Main class for Storage testing. | """Main class for Storage testing. | ||||
This class is used as-is to test local storage (see TestLocalStorage | This class is used as-is to test local storage (see TestLocalStorage | ||||
below) and remote storage (see TestRemoteStorage in | below) and remote storage (see TestRemoteStorage in | ||||
test_remote_storage.py. | test_remote_storage.py. | ||||
We need to have the two classes inherit from this base class | We need to have the two classes inherit from this base class | ||||
separately to avoid nosetests running the tests from the base | separately to avoid nosetests running the tests from the base | ||||
anlambert: I put that function in `storage.utils` and its test to `storage.tests.test_utils` (as pytest… | |||||
Done Inline Actionsbecause I'm an idiot who can't use git properly vlorentz: because I'm an idiot who can't use git properly | |||||
class twice. | class twice. | ||||
""" | """ | ||||
maxDiff = None # type: ClassVar[Optional[int]] | maxDiff = None # type: ClassVar[Optional[int]] | ||||
def test_types(self, swh_storage_backend_config): | def test_types(self, swh_storage_backend_config): | ||||
"""Checks all methods of StorageInterface are implemented by this | """Checks all methods of StorageInterface are implemented by this | ||||
backend, and that they have the same signature.""" | backend, and that they have the same signature.""" | ||||
▲ Show 20 Lines • Show All 3,737 Lines • ▼ Show 20 Lines | def test_add_arbitrary(self, swh_storage, objects): | ||||
visit = OriginVisit(origin=obj.origin, date=obj.date, type=obj.type,) | visit = OriginVisit(origin=obj.origin, date=obj.date, type=obj.type,) | ||||
swh_storage.origin_visit_add([visit]) | swh_storage.origin_visit_add([visit]) | ||||
else: | else: | ||||
method = getattr(swh_storage, obj_type + "_add") | method = getattr(swh_storage, obj_type + "_add") | ||||
try: | try: | ||||
method([obj]) | method([obj]) | ||||
except HashCollision: | except HashCollision: | ||||
pass | pass | ||||
@pytest.mark.db | |||||
class TestLocalStorage: | |||||
"""Test the local storage""" | |||||
# This test is only relevant on the local storage, with an actual | |||||
# objstorage raising an exception | |||||
def test_content_add_objstorage_exception(self, swh_storage, sample_data): | |||||
content = sample_data.content | |||||
swh_storage.objstorage.content_add = Mock( | |||||
side_effect=Exception("mocked broken objstorage") | |||||
) | |||||
with pytest.raises(Exception, match="mocked broken"): | |||||
swh_storage.content_add([content]) | |||||
missing = list(swh_storage.content_missing([content.hashes()])) | |||||
assert missing == [content.sha1] | |||||
@pytest.mark.db | |||||
class TestStorageRaceConditions: | |||||
@pytest.mark.xfail | |||||
def test_content_add_race(self, swh_storage, sample_data): | |||||
content = sample_data.content | |||||
results = queue.Queue() | |||||
def thread(): | |||||
try: | |||||
with db_transaction(swh_storage) as (db, cur): | |||||
ret = swh_storage.content_add([content], db=db, cur=cur) | |||||
results.put((threading.get_ident(), "data", ret)) | |||||
except Exception as e: | |||||
results.put((threading.get_ident(), "exc", e)) | |||||
t1 = threading.Thread(target=thread) | |||||
t2 = threading.Thread(target=thread) | |||||
t1.start() | |||||
# this avoids the race condition | |||||
# import time | |||||
# time.sleep(1) | |||||
t2.start() | |||||
t1.join() | |||||
t2.join() | |||||
r1 = results.get(block=False) | |||||
r2 = results.get(block=False) | |||||
with pytest.raises(queue.Empty): | |||||
results.get(block=False) | |||||
assert r1[0] != r2[0] | |||||
assert r1[1] == "data", "Got exception %r in Thread%s" % (r1[2], r1[0]) | |||||
assert r2[1] == "data", "Got exception %r in Thread%s" % (r2[2], r2[0]) | |||||
@pytest.mark.db | |||||
class TestPgStorage: | |||||
"""This class is dedicated for the rare case where the schema needs to | |||||
be altered dynamically. | |||||
Otherwise, the tests could be blocking when ran altogether. | |||||
""" | |||||
def test_content_update_with_new_cols(self, swh_storage, sample_data): | |||||
content, content2 = sample_data.contents[:2] | |||||
swh_storage.journal_writer.journal = None # TODO, not supported | |||||
with db_transaction(swh_storage) as (_, cur): | |||||
cur.execute( | |||||
"""alter table content | |||||
add column test text default null, | |||||
add column test2 text default null""" | |||||
) | |||||
swh_storage.content_add([content]) | |||||
cont = content.to_dict() | |||||
cont["test"] = "value-1" | |||||
cont["test2"] = "value-2" | |||||
swh_storage.content_update([cont], keys=["test", "test2"]) | |||||
with db_transaction(swh_storage) as (_, cur): | |||||
cur.execute( | |||||
"""SELECT sha1, sha1_git, sha256, length, status, | |||||
test, test2 | |||||
FROM content WHERE sha1 = %s""", | |||||
(cont["sha1"],), | |||||
) | |||||
datum = cur.fetchone() | |||||
assert datum == ( | |||||
cont["sha1"], | |||||
cont["sha1_git"], | |||||
cont["sha256"], | |||||
cont["length"], | |||||
"visible", | |||||
cont["test"], | |||||
cont["test2"], | |||||
) | |||||
with db_transaction(swh_storage) as (_, cur): | |||||
cur.execute( | |||||
"""alter table content drop column test, | |||||
drop column test2""" | |||||
) | |||||
def test_content_add_db(self, swh_storage, sample_data): | |||||
content = sample_data.content | |||||
actual_result = swh_storage.content_add([content]) | |||||
assert actual_result == { | |||||
"content:add": 1, | |||||
"content:add:bytes": content.length, | |||||
} | |||||
if hasattr(swh_storage, "objstorage"): | |||||
assert content.sha1 in swh_storage.objstorage.objstorage | |||||
with db_transaction(swh_storage) as (_, cur): | |||||
cur.execute( | |||||
"SELECT sha1, sha1_git, sha256, length, status" | |||||
" FROM content WHERE sha1 = %s", | |||||
(content.sha1,), | |||||
) | |||||
datum = cur.fetchone() | |||||
assert datum == ( | |||||
content.sha1, | |||||
content.sha1_git, | |||||
content.sha256, | |||||
content.length, | |||||
"visible", | |||||
) | |||||
contents = [ | |||||
obj | |||||
for (obj_type, obj) in swh_storage.journal_writer.journal.objects | |||||
if obj_type == "content" | |||||
] | |||||
assert len(contents) == 1 | |||||
assert contents[0] == attr.evolve(content, data=None) | |||||
def test_content_add_metadata_db(self, swh_storage, sample_data): | |||||
content = attr.evolve(sample_data.content, data=None, ctime=now()) | |||||
actual_result = swh_storage.content_add_metadata([content]) | |||||
assert actual_result == { | |||||
"content:add": 1, | |||||
} | |||||
if hasattr(swh_storage, "objstorage"): | |||||
assert content.sha1 not in swh_storage.objstorage.objstorage | |||||
with db_transaction(swh_storage) as (_, cur): | |||||
cur.execute( | |||||
"SELECT sha1, sha1_git, sha256, length, status" | |||||
" FROM content WHERE sha1 = %s", | |||||
(content.sha1,), | |||||
) | |||||
datum = cur.fetchone() | |||||
assert datum == ( | |||||
content.sha1, | |||||
content.sha1_git, | |||||
content.sha256, | |||||
content.length, | |||||
"visible", | |||||
) | |||||
contents = [ | |||||
obj | |||||
for (obj_type, obj) in swh_storage.journal_writer.journal.objects | |||||
if obj_type == "content" | |||||
] | |||||
assert len(contents) == 1 | |||||
assert contents[0] == content | |||||
def test_skipped_content_add_db(self, swh_storage, sample_data): | |||||
content, cont2 = sample_data.skipped_contents[:2] | |||||
content2 = attr.evolve(cont2, blake2s256=None) | |||||
actual_result = swh_storage.skipped_content_add([content, content, content2]) | |||||
assert 2 <= actual_result.pop("skipped_content:add") <= 3 | |||||
assert actual_result == {} | |||||
with db_transaction(swh_storage) as (_, cur): | |||||
cur.execute( | |||||
"SELECT sha1, sha1_git, sha256, blake2s256, " | |||||
"length, status, reason " | |||||
"FROM skipped_content ORDER BY sha1_git" | |||||
) | |||||
dbdata = cur.fetchall() | |||||
assert len(dbdata) == 2 | |||||
assert dbdata[0] == ( | |||||
content.sha1, | |||||
content.sha1_git, | |||||
content.sha256, | |||||
content.blake2s256, | |||||
content.length, | |||||
"absent", | |||||
"Content too long", | |||||
) | |||||
assert dbdata[1] == ( | |||||
content2.sha1, | |||||
content2.sha1_git, | |||||
content2.sha256, | |||||
content2.blake2s256, | |||||
content2.length, | |||||
"absent", | |||||
"Content too long", | |||||
) | |||||
def test_clear_buffers(self, swh_storage): | |||||
"""Calling clear buffers on real storage does nothing | |||||
""" | |||||
assert swh_storage.clear_buffers() is None | |||||
def test_flush(self, swh_storage): | |||||
"""Calling clear buffers on real storage does nothing | |||||
""" | |||||
assert swh_storage.flush() == {} |
I put that function in storage.utils and its test to storage.tests.test_utils (as pytest will not try to discover tests in storage_tests.py) in my previous commit.
Why reintroducing those here ?