diff --git a/swh/indexer/storage/__init__.py b/swh/indexer/storage/__init__.py --- a/swh/indexer/storage/__init__.py +++ b/swh/indexer/storage/__init__.py @@ -33,6 +33,7 @@ OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow, ) +from .writer import JournalWriter INDEXER_CFG_KEY = "indexer_storage" @@ -121,12 +122,13 @@ """ - def __init__(self, db, min_pool_conns=1, max_pool_conns=10): + def __init__(self, db, min_pool_conns=1, max_pool_conns=10, journal_writer=None): """ Args: db_conn: either a libpq connection string, or a psycopg2 connection """ + self._writer = JournalWriter(self.indexer_configuration_get, journal_writer) try: if isinstance(db, psycopg2.extensions.connection): self._pool = None diff --git a/swh/indexer/storage/in_memory.py b/swh/indexer/storage/in_memory.py --- a/swh/indexer/storage/in_memory.py +++ b/swh/indexer/storage/in_memory.py @@ -41,9 +41,12 @@ OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow, ) +from .writer import JournalWriter SHA1_DIGEST_SIZE = 160 +ToolId = int + def _transform_tool(tool): return { @@ -64,7 +67,6 @@ return tuple(sorted(d.items())) -ToolId = int TValue = TypeVar("TValue", bound=BaseRow) @@ -74,11 +76,13 @@ _data: Dict[Sha1, Dict[Tuple, Dict[str, Any]]] _tools_per_id: Dict[Sha1, Set[ToolId]] - def __init__(self, row_class: Type[TValue], tools): + def __init__(self, obj_type: str, row_class: Type[TValue], tools, journal_writer): self.row_class = row_class + self._obj_type = obj_type self._tools = tools self._sorted_ids = SortedList[bytes, Sha1]() self._data = defaultdict(dict) + self._journal_writer = journal_writer self._tools_per_id = defaultdict(set) def _key_from_dict(self, d) -> Tuple: @@ -210,6 +214,7 @@ """ data = list(data) check_id_duplicates(data) + self._journal_writer.write_additions(self._obj_type, data) count = 0 for obj in data: item = obj.to_dict() @@ -231,6 +236,7 @@ """ deleted = 0 + self._journal_writer.write_deletions(self._obj_type, entries) for entry in entries: (id_, tool_id) = (entry["id"], entry["indexer_configuration_id"]) if tool_id in self._tools_per_id[id_]: @@ -246,18 +252,23 @@ class IndexerStorage: """In-memory SWH indexer storage.""" - def __init__(self): + def __init__(self, journal_writer=None): self._tools = {} - self._mimetypes = SubStorage(ContentMimetypeRow, self._tools) - self._languages = SubStorage(ContentLanguageRow, self._tools) - self._content_ctags = SubStorage(ContentCtagsRow, self._tools) - self._licenses = SubStorage(ContentLicenseRow, self._tools) - self._content_metadata = SubStorage(ContentMetadataRow, self._tools) + tool_getter = self._tools.__getitem__ + self._writer = JournalWriter(tool_getter, journal_writer) + args = (self._tools, self._writer) + self._mimetypes = SubStorage("mimetype", ContentMimetypeRow, *args) + self._languages = SubStorage("language", ContentLanguageRow, *args) + self._content_ctags = SubStorage("content_ctag", ContentCtagsRow, *args) + self._licenses = SubStorage("license", ContentLicenseRow, *args) + self._content_metadata = SubStorage( + "content_metadata", ContentMetadataRow, *args + ) self._revision_intrinsic_metadata = SubStorage( - RevisionIntrinsicMetadataRow, self._tools + "revision_intrinsic_metadata", RevisionIntrinsicMetadataRow, *args ) self._origin_intrinsic_metadata = SubStorage( - OriginIntrinsicMetadataRow, self._tools + "origin_intrinsic_metadata", OriginIntrinsicMetadataRow, *args ) def check_config(self, *, check_write): diff --git a/swh/indexer/storage/writer.py b/swh/indexer/storage/writer.py new file mode 100644 --- /dev/null +++ b/swh/indexer/storage/writer.py @@ -0,0 +1,61 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Dict, Iterable + +import attr + +try: + from swh.journal.writer import get_journal_writer +except ImportError: + get_journal_writer = None # type: ignore + # mypy limitation, see https://github.com/python/mypy/issues/1153 + + +class JournalWriter: + """Journal writer storage collaborator. It's in charge of adding objects to + the journal. + + """ + + def __init__(self, tool_getter, journal_writer): + self._tool_getter = tool_getter + if journal_writer: + if get_journal_writer is None: + raise EnvironmentError( + "You need the swh.journal package to use the " + "journal_writer feature" + ) + self.journal = get_journal_writer(**journal_writer) + else: + self.journal = None + + def write_additions(self, obj_type, entries) -> None: + if self.journal: + # usually, all the additions in a batch are from the same indexer, + # so this cache allows doing a single query for all the entries. + tool_cache = {} + + for entry in entries: + # get the tool used to generate this addition + tool_id = entry.indexer_configuration_id + if tool_id not in tool_cache: + tool_cache[tool_id] = self._tool_getter(tool_id) + entry = attr.evolve( + entry, tool=tool_cache[tool_id], indexer_configuration_id=None + ) + + # write to kafka + self.journal.write_addition(obj_type, entry) + + def write_deletions(self, obj_type, entries) -> None: + if self.journal: + raise NotImplementedError("deletions are not supported by the journal.") + + def revision_intrinsic_metadata_add(self, metadata: Iterable[Dict]) -> None: + self.write_additions("revision_intrinsic_metadata", metadata) + + def origin_intrinsic_metadata_add(self, metadata: Iterable[Dict]) -> None: + self.write_additions("origin_intrinsic_metadata", metadata) diff --git a/swh/indexer/tests/storage/conftest.py b/swh/indexer/tests/storage/conftest.py --- a/swh/indexer/tests/storage/conftest.py +++ b/swh/indexer/tests/storage/conftest.py @@ -73,4 +73,8 @@ @pytest.fixture def swh_indexer_storage(swh_indexer_storage_postgresql): - return get_indexer_storage("local", db=swh_indexer_storage_postgresql.dsn) + return get_indexer_storage( + "local", + db=swh_indexer_storage_postgresql.dsn, + journal_writer={"cls": "memory",}, + ) diff --git a/swh/indexer/tests/storage/test_api_client.py b/swh/indexer/tests/storage/test_api_client.py --- a/swh/indexer/tests/storage/test_api_client.py +++ b/swh/indexer/tests/storage/test_api_client.py @@ -14,7 +14,11 @@ @pytest.fixture def app(swh_indexer_storage_postgresql): - server.storage = get_indexer_storage("local", db=swh_indexer_storage_postgresql.dsn) + server.storage = get_indexer_storage( + "local", + db=swh_indexer_storage_postgresql.dsn, + journal_writer={"cls": "memory",}, + ) return server.app diff --git a/swh/indexer/tests/storage/test_in_memory.py b/swh/indexer/tests/storage/test_in_memory.py --- a/swh/indexer/tests/storage/test_in_memory.py +++ b/swh/indexer/tests/storage/test_in_memory.py @@ -12,4 +12,4 @@ @pytest.fixture def swh_indexer_storage(): - return get_indexer_storage("memory") + return get_indexer_storage("memory", journal_writer={"cls": "memory",})