diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py --- a/swh/storage/__init__.py +++ b/swh/storage/__init__.py @@ -11,7 +11,7 @@ STORAGE_IMPLEMENTATIONS = { - "local": ".storage.Storage", + "local": ".postgresql.storage.Storage", "remote": ".api.client.RemoteStorage", "memory": ".in_memory.InMemoryStorage", "filter": ".filter.FilteringProxyStorage", diff --git a/swh/storage/algos/diff.py b/swh/storage/algos/diff.py --- a/swh/storage/algos/diff.py +++ b/swh/storage/algos/diff.py @@ -314,7 +314,7 @@ between them. Args: - storage (swh.storage.storage.Storage): instance of a swh + storage (swh.storage.interface.StorageInterface): instance of a swh storage (either local or remote, for optimal performance the use of a local storage is recommended) from_dir (bytes): the swh identifier of the directory to compare from @@ -367,7 +367,7 @@ i.e. the list of file changes between the two associated directories. Args: - storage (swh.storage.storage.Storage): instance of a swh + storage (swh.storage.interface.StorageInterface): instance of a swh storage (either local or remote, for optimal performance the use of a local storage is recommended) from_rev (bytes): the identifier of the revision to compare from @@ -395,7 +395,7 @@ specific revision. Args: - storage (swh.storage.storage.Storage): instance of a swh + storage (swh.storage.interface.StorageInterface): instance of a swh storage (either local or remote, for optimal performance the use of a local storage is recommended) revision (bytes): the identifier of the revision from which to diff --git a/swh/storage/algos/dir_iterators.py b/swh/storage/algos/dir_iterators.py --- a/swh/storage/algos/dir_iterators.py +++ b/swh/storage/algos/dir_iterators.py @@ -41,8 +41,8 @@ def __init__(self, storage, dir_id, base_path=b""): """ Args: - storage (swh.storage.storage.Storage): instance of swh storage - (either local or remote) + storage (swh.storage.interface.StorageInterface): instance of + swh storage (either local or remote) dir_id (bytes): identifier of a root directory base_path (bytes): optional base path used when traversing a sub-directory diff --git a/swh/storage/algos/revisions_walker.py b/swh/storage/algos/revisions_walker.py --- a/swh/storage/algos/revisions_walker.py +++ b/swh/storage/algos/revisions_walker.py @@ -49,7 +49,7 @@ :func:`get_revisions_walker`. Args: - storage (swh.storage.storage.Storage): instance of swh storage + storage (swh.storage.interface.StorageInterface): instance of swh storage (either local or remote) rev_start (bytes): a revision identifier max_revs (Optional[int]): maximum number of revisions to return @@ -97,7 +97,7 @@ Returns: dict: A dict describing a revision as returned by - :meth:`swh.storage.storage.Storage.revision_get` + :meth:`swh.storage.interface.StorageInterface.revision_get` """ pass @@ -109,7 +109,7 @@ Args: rev (dict): A dict describing a revision as returned by - :meth:`swh.storage.storage.Storage.revision_get` + :meth:`swh.storage.interface.StorageInterface.revision_get` """ for parent_id in rev["parents"]: self.process_rev(parent_id) @@ -121,7 +121,7 @@ Args: rev (dict): A dict describing a revision as returned by - :meth:`swh.storage.storage.Storage.revision_get` + :meth:`swh.storage.interface.StorageInterface.revision_get` Returns: bool: Whether to return the revision in the iteration @@ -246,7 +246,7 @@ Returns: dict: A dict describing a revision as returned by - :meth:`swh.storage.storage.Storage.revision_get` + :meth:`swh.storage.interface.StorageInterface.revision_get` """ _, rev_id = heapq.heappop(self._revs_to_visit) return rev_id @@ -281,7 +281,7 @@ Returns: dict: A dict describing a revision as returned by - :meth:`swh.storage.storage.Storage.revision_get` + :meth:`swh.storage.interface.StorageInterface.revision_get` """ return self._revs_to_visit.popleft() @@ -313,7 +313,7 @@ Returns: dict: A dict describing a revision as returned by - :meth:`swh.storage.storage.Storage.revision_get` + :meth:`swh.storage.interface.StorageInterface.revision_get` """ return self._revs_to_visit.pop() @@ -336,7 +336,7 @@ Args: rev (dict): A dict describing a revision as returned by - :meth:`swh.storage.storage.Storage.revision_get` + :meth:`swh.storage.interface.StorageInterface.revision_get` """ for parent_id in reversed(rev["parents"]): self.process_rev(parent_id) @@ -363,7 +363,7 @@ revisions does not exceed a couple of thousands. Args: - storage (swh.storage.storage.Storage): instance of swh storage + storage (swh.storage.interface.StorageInterface): instance of swh storage (either local or remote) rev_start (bytes): a revision identifier path (str): the path in the source tree to retrieve the history @@ -441,7 +441,7 @@ Args: rev (dict): A dict describing a revision as returned by - :meth:`swh.storage.storage.Storage.revision_get` + :meth:`swh.storage.interface.StorageInterface.revision_get` """ rev_path_id = self._get_path_id(rev["id"]) @@ -473,7 +473,7 @@ Args: rev (dict): A dict describing a revision as returned by - :meth:`swh.storage.storage.Storage.revision_get` + :meth:`swh.storage.interface.StorageInterface.revision_get` Returns: bool: Whether to return the revision in the iteration diff --git a/swh/storage/algos/snapshot.py b/swh/storage/algos/snapshot.py --- a/swh/storage/algos/snapshot.py +++ b/swh/storage/algos/snapshot.py @@ -22,7 +22,7 @@ """Get all the branches for a given snapshot Args: - storage (swh.storage.storage.Storage): the storage instance + storage (swh.storage.interface.StorageInterface): the storage instance snapshot_id (bytes): the snapshot's identifier Returns: dict: a dict with two keys: diff --git a/swh/storage/backfill.py b/swh/storage/backfill.py --- a/swh/storage/backfill.py +++ b/swh/storage/backfill.py @@ -31,7 +31,7 @@ SnapshotBranch, TargetType, ) -from swh.storage.converters import ( +from swh.storage.postgresql.converters import ( db_to_raw_extrinsic_metadata, db_to_release, db_to_revision, diff --git a/swh/storage/common.py b/swh/storage/common.py --- a/swh/storage/common.py +++ b/swh/storage/common.py @@ -1,6 +1,11 @@ -# Copyright (C) 2015-2016 The Software Heritage developers +# Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.core.db.common import * # noqa +from swh.model.hashutil import MultiHash + + +def origin_url_to_sha1(origin_url: str) -> bytes: + """Convert an origin URL to a sha1. Encodes URL to utf-8.""" + return MultiHash.from_data(origin_url.encode("utf-8"), {"sha1"}).digest()["sha1"] diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -51,7 +51,7 @@ from swh.storage.interface import ListOrder from swh.storage.objstorage import ObjStorage -from .converters import origin_url_to_sha1 +from .common import origin_url_to_sha1 from .writer import JournalWriter diff --git a/swh/storage/postgresql/__init__.py b/swh/storage/postgresql/__init__.py new file mode 100644 diff --git a/swh/storage/converters.py b/swh/storage/postgresql/converters.py rename from swh/storage/converters.py rename to swh/storage/postgresql/converters.py --- a/swh/storage/converters.py +++ b/swh/storage/postgresql/converters.py @@ -23,9 +23,8 @@ Timestamp, TimestampWithTimezone, ) -from swh.model.hashutil import MultiHash -from .utils import map_optional +from ..utils import map_optional DEFAULT_AUTHOR = { @@ -323,8 +322,3 @@ path=row["path"], directory=map_optional(parse_swhid, row["directory"]), ) - - -def origin_url_to_sha1(origin_url: str) -> bytes: - """Convert an origin URL to a sha1. Encodes URL to utf-8.""" - return MultiHash.from_data(origin_url.encode("utf-8"), {"sha1"}).digest()["sha1"] diff --git a/swh/storage/db.py b/swh/storage/postgresql/db.py rename from swh/storage/db.py rename to swh/storage/postgresql/db.py diff --git a/swh/storage/storage.py b/swh/storage/postgresql/storage.py rename from swh/storage/storage.py rename to swh/storage/postgresql/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/postgresql/storage.py @@ -27,6 +27,7 @@ import psycopg2.errors from swh.core.api.serializers import msgpack_loads, msgpack_dumps +from swh.core.db.common import db_transaction_generator, db_transaction from swh.model.identifiers import SWHID from swh.model.model import ( Content, @@ -50,23 +51,26 @@ RawExtrinsicMetadata, ) from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex +from swh.storage.algos import diff +from swh.storage.exc import StorageArgumentException, StorageDBError, HashCollision from swh.storage.interface import ( ListOrder, PagedResult, PartialBranches, VISIT_STATUSES, ) +from swh.storage.metrics import timed, send_metric, process_metrics from swh.storage.objstorage import ObjStorage -from swh.storage.utils import now +from swh.storage.utils import ( + get_partition_bounds_bytes, + extract_collision_hash, + map_optional, + now, +) +from swh.storage.writer import JournalWriter from . import converters -from .common import db_transaction_generator, db_transaction from .db import Db -from .exc import StorageArgumentException, StorageDBError, HashCollision -from .algos import diff -from .metrics import timed, send_metric, process_metrics -from .utils import get_partition_bounds_bytes, extract_collision_hash, map_optional -from .writer import JournalWriter # Max block size of contents to return diff --git a/swh/storage/tests/algos/test_revisions_walker.py b/swh/storage/tests/algos/test_revisions_walker.py --- a/swh/storage/tests/algos/test_revisions_walker.py +++ b/swh/storage/tests/algos/test_revisions_walker.py @@ -385,7 +385,7 @@ def check_revisions_ordering( self, rev_walker_type, expected_result, truncated_history ): - with patch("swh.storage.storage.Storage") as MockStorage: + with patch("swh.storage.postgresql.storage.Storage") as MockStorage: storage = MockStorage() if not truncated_history: storage.revision_log.return_value = _revisions_list diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/storage_tests.py rename from swh/storage/tests/test_storage.py rename to swh/storage/tests/storage_tests.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/storage_tests.py @@ -41,7 +41,7 @@ ) from swh.model.hypothesis_strategies import objects from swh.storage import get_storage -from swh.storage.converters import origin_url_to_sha1 as sha1 +from swh.storage.common import origin_url_to_sha1 as sha1 from swh.storage.exc import HashCollision, StorageArgumentException from swh.storage.interface import ListOrder, PagedResult, StorageInterface from swh.storage.utils import content_hex_hashes, now diff --git a/swh/storage/tests/test_api_client.py b/swh/storage/tests/test_api_client.py --- a/swh/storage/tests/test_api_client.py +++ b/swh/storage/tests/test_api_client.py @@ -6,7 +6,7 @@ import pytest import swh.storage.api.server as server -import swh.storage.storage +import swh.storage from swh.storage import get_storage from swh.storage.tests.test_storage import ( TestStorageGeneratedData as _TestStorageGeneratedData, diff --git a/swh/storage/tests/test_in_memory.py b/swh/storage/tests/test_in_memory.py --- a/swh/storage/tests/test_in_memory.py +++ b/swh/storage/tests/test_in_memory.py @@ -9,8 +9,8 @@ from swh.storage.cassandra.model import BaseRow from swh.storage.in_memory import Table -from swh.storage.tests.test_storage import TestStorage as _TestStorage -from swh.storage.tests.test_storage import ( +from swh.storage.tests.storage_tests import TestStorage as _TestStorage +from swh.storage.tests.storage_tests import ( TestStorageGeneratedData as _TestStorageGeneratedData, ) diff --git a/swh/storage/tests/test_init.py b/swh/storage/tests/test_init.py --- a/swh/storage/tests/test_init.py +++ b/swh/storage/tests/test_init.py @@ -10,14 +10,14 @@ from swh.storage import get_storage from swh.storage.api.client import RemoteStorage -from swh.storage.storage import Storage as DbStorage +from swh.storage.postgresql.storage import Storage as DbStorage from swh.storage.in_memory import InMemoryStorage from swh.storage.buffer import BufferingProxyStorage from swh.storage.filter import FilteringProxyStorage from swh.storage.retry import RetryingProxyStorage -@patch("swh.storage.storage.psycopg2.pool") +@patch("swh.storage.postgresql.storage.psycopg2.pool") def test_get_storage(mock_pool): """Instantiating an existing storage should be ok @@ -40,7 +40,7 @@ assert isinstance(actual_storage, real_class) -@patch("swh.storage.storage.psycopg2.pool") +@patch("swh.storage.postgresql.storage.psycopg2.pool") def test_get_storage_legacy_args(mock_pool): """Instantiating an existing storage should be ok even with the legacy explicit 'args' keys diff --git a/swh/storage/tests/test_postgresql.py b/swh/storage/tests/test_postgresql.py new file mode 100644 --- /dev/null +++ b/swh/storage/tests/test_postgresql.py @@ -0,0 +1,256 @@ +# Copyright (C) 2015-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from contextlib import contextmanager +import queue +import threading +from unittest.mock import Mock + +import attr +import pytest + +from swh.storage.tests.storage_tests import TestStorage # noqa +from swh.storage.tests.storage_tests import TestStorageGeneratedData # noqa +from swh.storage.utils import now + + +@contextmanager +def db_transaction(storage): + with storage.db() as db: + with db.transaction() as cur: + yield db, cur + + +@pytest.mark.db +class TestLocalStorage: + """Test the local storage""" + + # This test is only relevant on the local storage, with an actual + # objstorage raising an exception + def test_content_add_objstorage_exception(self, swh_storage, sample_data): + content = sample_data.content + + swh_storage.objstorage.content_add = Mock( + side_effect=Exception("mocked broken objstorage") + ) + + with pytest.raises(Exception, match="mocked broken"): + swh_storage.content_add([content]) + + missing = list(swh_storage.content_missing([content.hashes()])) + assert missing == [content.sha1] + + +@pytest.mark.db +class TestStorageRaceConditions: + @pytest.mark.xfail + def test_content_add_race(self, swh_storage, sample_data): + content = sample_data.content + + results = queue.Queue() + + def thread(): + try: + with db_transaction(swh_storage) as (db, cur): + ret = swh_storage.content_add([content], db=db, cur=cur) + results.put((threading.get_ident(), "data", ret)) + except Exception as e: + results.put((threading.get_ident(), "exc", e)) + + t1 = threading.Thread(target=thread) + t2 = threading.Thread(target=thread) + t1.start() + # this avoids the race condition + # import time + # time.sleep(1) + t2.start() + t1.join() + t2.join() + + r1 = results.get(block=False) + r2 = results.get(block=False) + + with pytest.raises(queue.Empty): + results.get(block=False) + assert r1[0] != r2[0] + assert r1[1] == "data", "Got exception %r in Thread%s" % (r1[2], r1[0]) + assert r2[1] == "data", "Got exception %r in Thread%s" % (r2[2], r2[0]) + + +@pytest.mark.db +class TestPgStorage: + """This class is dedicated for the rare case where the schema needs to + be altered dynamically. + + Otherwise, the tests could be blocking when ran altogether. + + """ + + def test_content_update_with_new_cols(self, swh_storage, sample_data): + content, content2 = sample_data.contents[:2] + + swh_storage.journal_writer.journal = None # TODO, not supported + + with db_transaction(swh_storage) as (_, cur): + cur.execute( + """alter table content + add column test text default null, + add column test2 text default null""" + ) + + swh_storage.content_add([content]) + + cont = content.to_dict() + cont["test"] = "value-1" + cont["test2"] = "value-2" + + swh_storage.content_update([cont], keys=["test", "test2"]) + with db_transaction(swh_storage) as (_, cur): + cur.execute( + """SELECT sha1, sha1_git, sha256, length, status, + test, test2 + FROM content WHERE sha1 = %s""", + (cont["sha1"],), + ) + + datum = cur.fetchone() + + assert datum == ( + cont["sha1"], + cont["sha1_git"], + cont["sha256"], + cont["length"], + "visible", + cont["test"], + cont["test2"], + ) + + with db_transaction(swh_storage) as (_, cur): + cur.execute( + """alter table content drop column test, + drop column test2""" + ) + + def test_content_add_db(self, swh_storage, sample_data): + content = sample_data.content + + actual_result = swh_storage.content_add([content]) + + assert actual_result == { + "content:add": 1, + "content:add:bytes": content.length, + } + + if hasattr(swh_storage, "objstorage"): + assert content.sha1 in swh_storage.objstorage.objstorage + + with db_transaction(swh_storage) as (_, cur): + cur.execute( + "SELECT sha1, sha1_git, sha256, length, status" + " FROM content WHERE sha1 = %s", + (content.sha1,), + ) + datum = cur.fetchone() + + assert datum == ( + content.sha1, + content.sha1_git, + content.sha256, + content.length, + "visible", + ) + + contents = [ + obj + for (obj_type, obj) in swh_storage.journal_writer.journal.objects + if obj_type == "content" + ] + assert len(contents) == 1 + assert contents[0] == attr.evolve(content, data=None) + + def test_content_add_metadata_db(self, swh_storage, sample_data): + content = attr.evolve(sample_data.content, data=None, ctime=now()) + + actual_result = swh_storage.content_add_metadata([content]) + + assert actual_result == { + "content:add": 1, + } + + if hasattr(swh_storage, "objstorage"): + assert content.sha1 not in swh_storage.objstorage.objstorage + with db_transaction(swh_storage) as (_, cur): + cur.execute( + "SELECT sha1, sha1_git, sha256, length, status" + " FROM content WHERE sha1 = %s", + (content.sha1,), + ) + datum = cur.fetchone() + assert datum == ( + content.sha1, + content.sha1_git, + content.sha256, + content.length, + "visible", + ) + + contents = [ + obj + for (obj_type, obj) in swh_storage.journal_writer.journal.objects + if obj_type == "content" + ] + assert len(contents) == 1 + assert contents[0] == content + + def test_skipped_content_add_db(self, swh_storage, sample_data): + content, cont2 = sample_data.skipped_contents[:2] + content2 = attr.evolve(cont2, blake2s256=None) + + actual_result = swh_storage.skipped_content_add([content, content, content2]) + + assert 2 <= actual_result.pop("skipped_content:add") <= 3 + assert actual_result == {} + + with db_transaction(swh_storage) as (_, cur): + cur.execute( + "SELECT sha1, sha1_git, sha256, blake2s256, " + "length, status, reason " + "FROM skipped_content ORDER BY sha1_git" + ) + + dbdata = cur.fetchall() + + assert len(dbdata) == 2 + assert dbdata[0] == ( + content.sha1, + content.sha1_git, + content.sha256, + content.blake2s256, + content.length, + "absent", + "Content too long", + ) + + assert dbdata[1] == ( + content2.sha1, + content2.sha1_git, + content2.sha256, + content2.blake2s256, + content2.length, + "absent", + "Content too long", + ) + + def test_clear_buffers(self, swh_storage): + """Calling clear buffers on real storage does nothing + + """ + assert swh_storage.clear_buffers() is None + + def test_flush(self, swh_storage): + """Calling clear buffers on real storage does nothing + + """ + assert swh_storage.flush() == {} diff --git a/swh/storage/tests/test_converters.py b/swh/storage/tests/test_postgresql_converters.py rename from swh/storage/tests/test_converters.py rename to swh/storage/tests/test_postgresql_converters.py --- a/swh/storage/tests/test_converters.py +++ b/swh/storage/tests/test_postgresql_converters.py @@ -13,7 +13,7 @@ TimestampWithTimezone, ) -from swh.storage import converters +from swh.storage.postgresql import converters def test_date_to_db(): diff --git a/swh/storage/tests/test_revision_bw_compat.py b/swh/storage/tests/test_revision_bw_compat.py --- a/swh/storage/tests/test_revision_bw_compat.py +++ b/swh/storage/tests/test_revision_bw_compat.py @@ -8,7 +8,7 @@ from swh.core.utils import decode_with_escape from swh.model.model import Revision from swh.storage import get_storage -from swh.storage.tests.test_storage import db_transaction +from swh.storage.tests.test_postgresql import db_transaction def headers_to_db(git_headers):