diff --git a/swh/indexer/storage/__init__.py b/swh/indexer/storage/__init__.py --- a/swh/indexer/storage/__init__.py +++ b/swh/indexer/storage/__init__.py @@ -33,6 +33,7 @@ OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow, ) +from .writer import JournalWriter INDEXER_CFG_KEY = "indexer_storage" @@ -121,12 +122,15 @@ """ - def __init__(self, db, min_pool_conns=1, max_pool_conns=10): + def __init__(self, db, min_pool_conns=1, max_pool_conns=10, journal_writer=None): """ Args: - db_conn: either a libpq connection string, or a psycopg2 connection + db: either a libpq connection string, or a psycopg2 connection + journal_writer: configuration passed to + `swh.journal.writer.get_journal_writer` """ + self.journal_writer = JournalWriter(self._tool_get_from_id, journal_writer) try: if isinstance(db, psycopg2.extensions.connection): self._pool = None @@ -271,6 +275,7 @@ ) -> Dict[str, int]: check_id_duplicates(mimetypes) mimetypes.sort(key=lambda m: m.id) + self.journal_writer.write_additions("content_mimetype", mimetypes) db.mktemp_content_mimetype(cur) db.copy_to( [m.to_dict() for m in mimetypes], @@ -320,6 +325,7 @@ ) -> Dict[str, int]: check_id_duplicates(languages) languages.sort(key=lambda m: m.id) + self.journal_writer.write_additions("content_language", languages) db.mktemp_content_language(cur) # empty language is mapped to 'unknown' db.copy_to( @@ -366,6 +372,7 @@ ) -> Dict[str, int]: check_id_duplicates(ctags) ctags.sort(key=lambda m: m.id) + self.journal_writer.write_additions("content_ctags", ctags) db.mktemp_content_ctags(cur) db.copy_to( @@ -417,6 +424,7 @@ ) -> Dict[str, int]: check_id_duplicates(licenses) licenses.sort(key=lambda m: m.id) + self.journal_writer.write_additions("content_fossology_license", licenses) db.mktemp_content_fossology_license(cur) db.copy_to( [license.to_dict() for license in licenses], @@ -478,6 +486,7 @@ ) -> Dict[str, int]: check_id_duplicates(metadata) metadata.sort(key=lambda m: m.id) + self.journal_writer.write_additions("content_metadata", metadata) db.mktemp_content_metadata(cur) @@ -524,6 +533,7 @@ ) -> Dict[str, int]: check_id_duplicates(metadata) metadata.sort(key=lambda m: m.id) + self.journal_writer.write_additions("revision_intrinsic_metadata", metadata) db.mktemp_revision_intrinsic_metadata(cur) @@ -560,6 +570,7 @@ ) -> Dict[str, int]: check_id_duplicates(metadata) metadata.sort(key=lambda m: m.id) + self.journal_writer.write_additions("origin_intrinsic_metadata", metadata) db.mktemp_origin_intrinsic_metadata(cur) @@ -695,3 +706,18 @@ if not idx: return None return dict(zip(db.indexer_configuration_cols, idx)) + + @db_transaction() + def _tool_get_from_id(self, id_, db, cur): + tool = dict( + zip( + db.indexer_configuration_cols, + db.indexer_configuration_get_from_id(id_, cur), + ) + ) + return { + "id": tool["id"], + "name": tool["tool_name"], + "version": tool["tool_version"], + "configuration": tool["tool_configuration"], + } diff --git a/swh/indexer/storage/db.py b/swh/indexer/storage/db.py --- a/swh/indexer/storage/db.py +++ b/swh/indexer/storage/db.py @@ -536,3 +536,15 @@ ) return cur.fetchone() + + def indexer_configuration_get_from_id(self, id_, cur=None): + cur = self._cursor(cur) + cur.execute( + """select %s + from indexer_configuration + where id=%%s""" + % (",".join(self.indexer_configuration_cols)), + (id_,), + ) + + return cur.fetchone() diff --git a/swh/indexer/storage/in_memory.py b/swh/indexer/storage/in_memory.py --- a/swh/indexer/storage/in_memory.py +++ b/swh/indexer/storage/in_memory.py @@ -41,9 +41,12 @@ OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow, ) +from .writer import JournalWriter SHA1_DIGEST_SIZE = 160 +ToolId = int + def _transform_tool(tool): return { @@ -64,7 +67,6 @@ return tuple(sorted(d.items())) -ToolId = int TValue = TypeVar("TValue", bound=BaseRow) @@ -74,11 +76,12 @@ _data: Dict[Sha1, Dict[Tuple, Dict[str, Any]]] _tools_per_id: Dict[Sha1, Set[ToolId]] - def __init__(self, row_class: Type[TValue], tools): + def __init__(self, row_class: Type[TValue], tools, journal_writer): self.row_class = row_class self._tools = tools self._sorted_ids = SortedList[bytes, Sha1]() self._data = defaultdict(dict) + self._journal_writer = journal_writer self._tools_per_id = defaultdict(set) def _key_from_dict(self, d) -> Tuple: @@ -207,6 +210,8 @@ """ data = list(data) check_id_duplicates(data) + object_type = self.row_class.object_type # type: ignore + self._journal_writer.write_additions(object_type, data) count = 0 for obj in data: item = obj.to_dict() @@ -224,19 +229,29 @@ class IndexerStorage: """In-memory SWH indexer storage.""" - def __init__(self): + def __init__(self, journal_writer=None): self._tools = {} - self._mimetypes = SubStorage(ContentMimetypeRow, self._tools) - self._languages = SubStorage(ContentLanguageRow, self._tools) - self._content_ctags = SubStorage(ContentCtagsRow, self._tools) - self._licenses = SubStorage(ContentLicenseRow, self._tools) - self._content_metadata = SubStorage(ContentMetadataRow, self._tools) + + def tool_getter(id_): + tool = self._tools[id_] + return { + "id": tool["id"], + "name": tool["tool_name"], + "version": tool["tool_version"], + "configuration": tool["tool_configuration"], + } + + self.journal_writer = JournalWriter(tool_getter, journal_writer) + args = (self._tools, self.journal_writer) + self._mimetypes = SubStorage(ContentMimetypeRow, *args) + self._languages = SubStorage(ContentLanguageRow, *args) + self._content_ctags = SubStorage(ContentCtagsRow, *args) + self._licenses = SubStorage(ContentLicenseRow, *args) + self._content_metadata = SubStorage(ContentMetadataRow, *args) self._revision_intrinsic_metadata = SubStorage( - RevisionIntrinsicMetadataRow, self._tools - ) - self._origin_intrinsic_metadata = SubStorage( - OriginIntrinsicMetadataRow, self._tools + RevisionIntrinsicMetadataRow, *args ) + self._origin_intrinsic_metadata = SubStorage(OriginIntrinsicMetadataRow, *args) def check_config(self, *, check_write): return True diff --git a/swh/indexer/storage/model.py b/swh/indexer/storage/model.py --- a/swh/indexer/storage/model.py +++ b/swh/indexer/storage/model.py @@ -11,6 +11,7 @@ from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar import attr +from typing_extensions import Final from swh.model.model import Sha1Git, dictify @@ -64,6 +65,8 @@ @attr.s class ContentMimetypeRow(BaseRow): + object_type: Final = "content_mimetype" + id = attr.ib(type=Sha1Git) mimetype = attr.ib(type=str) encoding = attr.ib(type=str) @@ -71,12 +74,15 @@ @attr.s class ContentLanguageRow(BaseRow): + object_type: Final = "content_language" + id = attr.ib(type=Sha1Git) lang = attr.ib(type=str) @attr.s class ContentCtagsRow(BaseRow): + object_type: Final = "content_ctags" UNIQUE_KEY_FIELDS = ( "id", "indexer_configuration_id", @@ -95,6 +101,7 @@ @attr.s class ContentLicenseRow(BaseRow): + object_type: Final = "content_fossology_license" UNIQUE_KEY_FIELDS = ("id", "indexer_configuration_id", "license") id = attr.ib(type=Sha1Git) @@ -103,12 +110,16 @@ @attr.s class ContentMetadataRow(BaseRow): + object_type: Final = "content_metadata" + id = attr.ib(type=Sha1Git) metadata = attr.ib(type=Dict[str, Any]) @attr.s class RevisionIntrinsicMetadataRow(BaseRow): + object_type: Final = "revision_intrinsic_metadata" + id = attr.ib(type=Sha1Git) metadata = attr.ib(type=Dict[str, Any]) mappings = attr.ib(type=List[str]) @@ -116,6 +127,8 @@ @attr.s class OriginIntrinsicMetadataRow(BaseRow): + object_type: Final = "origin_intrinsic_metadata" + id = attr.ib(type=str) metadata = attr.ib(type=Dict[str, Any]) from_revision = attr.ib(type=Sha1Git) diff --git a/swh/indexer/storage/writer.py b/swh/indexer/storage/writer.py new file mode 100644 --- /dev/null +++ b/swh/indexer/storage/writer.py @@ -0,0 +1,64 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Any, Callable, Dict, Iterable + +import attr + +try: + from swh.journal.writer import get_journal_writer +except ImportError: + get_journal_writer = None # type: ignore + # mypy limitation, see https://github.com/python/mypy/issues/1153 + +from .model import BaseRow + + +class JournalWriter: + """Journal writer storage collaborator. It's in charge of adding objects to + the journal. + + """ + + def __init__(self, tool_getter: Callable[[int], Dict[str, Any]], journal_writer): + """ + Args: + tool_getter: a callable that takes a tool_id and return a dict representing + a tool object + journal_writer: configuration passed to + `swh.journal.writer.get_journal_writer` + """ + self._tool_getter = tool_getter + if journal_writer: + if get_journal_writer is None: + raise EnvironmentError( + "You need the swh.journal package to use the " + "journal_writer feature" + ) + self.journal = get_journal_writer(**journal_writer) + else: + self.journal = None + + def write_additions(self, obj_type, entries: Iterable[BaseRow]) -> None: + if not self.journal: + return + + # usually, all the additions in a batch are from the same indexer, + # so this cache allows doing a single query for all the entries. + tool_cache = {} + + for entry in entries: + assert entry.object_type == obj_type # type: ignore + # get the tool used to generate this addition + tool_id = entry.indexer_configuration_id + assert tool_id + if tool_id not in tool_cache: + tool_cache[tool_id] = self._tool_getter(tool_id) + entry = attr.evolve( + entry, tool=tool_cache[tool_id], indexer_configuration_id=None + ) + + # write to kafka + self.journal.write_addition(obj_type, entry) diff --git a/swh/indexer/tests/storage/conftest.py b/swh/indexer/tests/storage/conftest.py --- a/swh/indexer/tests/storage/conftest.py +++ b/swh/indexer/tests/storage/conftest.py @@ -73,4 +73,8 @@ @pytest.fixture def swh_indexer_storage(swh_indexer_storage_postgresql): - return get_indexer_storage("local", db=swh_indexer_storage_postgresql.dsn) + return get_indexer_storage( + "local", + db=swh_indexer_storage_postgresql.dsn, + journal_writer={"cls": "memory",}, + ) diff --git a/swh/indexer/tests/storage/test_api_client.py b/swh/indexer/tests/storage/test_api_client.py --- a/swh/indexer/tests/storage/test_api_client.py +++ b/swh/indexer/tests/storage/test_api_client.py @@ -13,9 +13,18 @@ @pytest.fixture -def app(swh_indexer_storage_postgresql): - server.storage = get_indexer_storage("local", db=swh_indexer_storage_postgresql.dsn) - return server.app +def app_server(swh_indexer_storage_postgresql): + server.storage = get_indexer_storage( + "local", + db=swh_indexer_storage_postgresql.dsn, + journal_writer={"cls": "memory",}, + ) + yield server + + +@pytest.fixture +def app(app_server): + return app_server.app @pytest.fixture @@ -27,9 +36,19 @@ @pytest.fixture -def swh_indexer_storage(swh_rpc_client, app): +def swh_indexer_storage(swh_rpc_client, app_server): # This version of the swh_storage fixture uses the swh_rpc_client fixture # to instantiate a RemoteStorage (see swh_rpc_client_class above) that # proxies, via the swh.core RPC mechanism, the local (in memory) storage # configured in the app fixture above. - return swh_rpc_client + # + # Also note that, for the sake of + # making it easier to write tests, the in-memory journal writer of the + # in-memory backend storage is attached to the RemoteStorage as its + # journal_writer attribute. + storage = swh_rpc_client + + journal_writer = getattr(storage, "journal_writer", None) + storage.journal_writer = app_server.storage.journal_writer + yield storage + storage.journal_writer = journal_writer diff --git a/swh/indexer/tests/storage/test_in_memory.py b/swh/indexer/tests/storage/test_in_memory.py --- a/swh/indexer/tests/storage/test_in_memory.py +++ b/swh/indexer/tests/storage/test_in_memory.py @@ -12,4 +12,4 @@ @pytest.fixture def swh_indexer_storage(): - return get_indexer_storage("memory") + return get_indexer_storage("memory", journal_writer={"cls": "memory",}) diff --git a/swh/indexer/tests/storage/test_storage.py b/swh/indexer/tests/storage/test_storage.py --- a/swh/indexer/tests/storage/test_storage.py +++ b/swh/indexer/tests/storage/test_storage.py @@ -314,13 +314,16 @@ ] assert actual_data == expected_data - def test_get( + def test_add( self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any] ) -> None: storage, data = swh_indexer_storage_with_data etype = self.endpoint_type tool = data.tools[self.tool_name] + # conftest fills it with mimetypes + storage.journal_writer.journal.objects = [] # type: ignore + query = [data.sha1_2, data.sha1_1] data1 = self.row_class.from_dict( { @@ -346,6 +349,12 @@ assert actual_data == expected_data + journal_objects = storage.journal_writer.journal.objects # type: ignore + actual_journal_data = [ + obj for (obj_type, obj) in journal_objects if obj_type == self.endpoint_type + ] + assert list(sorted(actual_journal_data)) == list(sorted(expected_data)) + class TestIndexerStorageContentMimetypes(StorageETypeTester): """Test Indexer Storage content_mimetype related methods @@ -1018,7 +1027,7 @@ class TestIndexerStorageOriginIntrinsicMetadata: - def test_origin_intrinsic_metadata_get( + def test_origin_intrinsic_metadata_add( self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any] ) -> None: storage, data = swh_indexer_storage_with_data @@ -1064,6 +1073,14 @@ assert actual_metadata == expected_metadata + journal_objects = storage.journal_writer.journal.objects # type: ignore + actual_journal_metadata = [ + obj + for (obj_type, obj) in journal_objects + if obj_type == "origin_intrinsic_metadata" + ] + assert list(sorted(actual_journal_metadata)) == list(sorted(expected_metadata)) + def test_origin_intrinsic_metadata_add_update_in_place_duplicate( self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any] ) -> None: