diff --git a/swh/indexer/storage/__init__.py b/swh/indexer/storage/__init__.py --- a/swh/indexer/storage/__init__.py +++ b/swh/indexer/storage/__init__.py @@ -22,6 +22,7 @@ from .exc import DuplicateId, IndexerStorageArgumentException from .interface import PagedResult, Sha1 from .metrics import process_metrics, send_metric, timed +from .writer import JournalWriter INDEXER_CFG_KEY = "indexer_storage" @@ -90,12 +91,13 @@ """ - def __init__(self, db, min_pool_conns=1, max_pool_conns=10): + def __init__(self, db, min_pool_conns=1, max_pool_conns=10, journal_writer=None): """ Args: db_conn: either a libpq connection string, or a psycopg2 connection """ + self._writer = JournalWriter(self.indexer_configuration_get, journal_writer) try: if isinstance(db, psycopg2.extensions.connection): self._pool = None diff --git a/swh/indexer/storage/in_memory.py b/swh/indexer/storage/in_memory.py --- a/swh/indexer/storage/in_memory.py +++ b/swh/indexer/storage/in_memory.py @@ -9,19 +9,22 @@ import math import operator import re -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Set, Tuple from swh.core.collections import SortedList from swh.model.hashutil import hash_to_bytes, hash_to_hex -from swh.model.model import SHA1_SIZE +from swh.model.model import SHA1_SIZE, Sha1Git from swh.storage.utils import get_partition_bounds_bytes from . import MAPPING_NAMES, check_id_duplicates from .exc import IndexerStorageArgumentException from .interface import PagedResult, Sha1 +from .writer import JournalWriter SHA1_DIGEST_SIZE = 160 +ToolId = int + def _transform_tool(tool): return { @@ -41,11 +44,16 @@ class SubStorage: """Implements common missing/get/add logic for each indexer type.""" - def __init__(self, tools): + _data: Dict[Tuple[Sha1Git, ToolId], Dict] # (id_, tool_id) -> metadata_dict + _tools_per_id: Dict[Sha1Git, Set[ToolId]] + + def __init__(self, obj_type: str, tools, journal_writer): + self._obj_type = obj_type self._tools = tools + self._journal_writer = journal_writer self._sorted_ids = SortedList[bytes, bytes]() - self._data = {} # map (id_, tool_id) -> metadata_dict - self._tools_per_id = defaultdict(set) # map id_ -> Set[tool_id] + self._data = {} + self._tools_per_id = defaultdict(set) def missing(self, ids): """List data missing from storage. @@ -166,6 +174,7 @@ """ data = list(data) check_id_duplicates(data) + self._journal_writer.write_additions(self._obj_type, data) count = 0 for item in data: item = item.copy() @@ -186,8 +195,10 @@ def add_merge( self, new_data: List[Dict], conflict_update: bool, merged_key: str ) -> int: + return self.add(new_data, conflict_update) added = 0 all_subitems: List + self._journal_writer.write_merges(self._obj_type, new_data) for new_item in new_data: id_ = new_item["id"] tool_id = new_item["indexer_configuration_id"] @@ -223,6 +234,7 @@ """ deleted = 0 + self._journal_writer.write_deletions(self._obj_type, entries) for entry in entries: (id_, tool_id) = (entry["id"], entry["indexer_configuration_id"]) key = (id_, tool_id) @@ -237,15 +249,20 @@ class IndexerStorage: """In-memory SWH indexer storage.""" - def __init__(self): + def __init__(self, journal_writer=None): self._tools = {} - self._mimetypes = SubStorage(self._tools) - self._languages = SubStorage(self._tools) - self._content_ctags = SubStorage(self._tools) - self._licenses = SubStorage(self._tools) - self._content_metadata = SubStorage(self._tools) - self._revision_intrinsic_metadata = SubStorage(self._tools) - self._origin_intrinsic_metadata = SubStorage(self._tools) + tool_getter = self._tools.__getitem__ + self._writer = JournalWriter(tool_getter, journal_writer) + args = (self._tools, self._writer) + self._mimetypes = SubStorage("mimetype", *args) + self._languages = SubStorage("language", *args) + self._content_ctags = SubStorage("content_ctag", *args) + self._licenses = SubStorage("license", *args) + self._content_metadata = SubStorage("content_metadata", *args) + self._revision_intrinsic_metadata = SubStorage( + "revision_intrinsic_metadata", *args + ) + self._origin_intrinsic_metadata = SubStorage("origin_intrinsic_metadata", *args) def check_config(self, *, check_write): return True diff --git a/swh/indexer/storage/writer.py b/swh/indexer/storage/writer.py new file mode 100644 --- /dev/null +++ b/swh/indexer/storage/writer.py @@ -0,0 +1,63 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Dict, Iterable + +try: + from swh.journal.writer import get_journal_writer +except ImportError: + get_journal_writer = None # type: ignore + # mypy limitation, see https://github.com/python/mypy/issues/1153 + + +class JournalWriter: + """Journal writer storage collaborator. It's in charge of adding objects to + the journal. + + """ + + def __init__(self, tool_getter, journal_writer): + self._tool_getter = tool_getter + if journal_writer: + if get_journal_writer is None: + raise EnvironmentError( + "You need the swh.journal package to use the " + "journal_writer feature" + ) + self.journal = get_journal_writer(**journal_writer) + else: + self.journal = None + + def write_additions(self, obj_type, entries) -> None: + if self.journal: + # usually, all the additions in a batch are from the same indexer, + # so this cache allows doing a single query for all the entries. + tool_cache = {} + + for entry in entries: + entry = entry.copy() + + # get the tool used to generate this addition + tool_id = entry.pop("indexer_configuration_id") + if tool_id not in tool_cache: + tool_cache[tool_id] = self._tool_getter(tool_id) + entry["tool"] = tool_cache[tool_id] + + # write to kafka + self.journal.write_addition(obj_type, entry) + + def write_merges(self, obj_type, entries) -> None: + if self.journal: + raise NotImplementedError("merges are not supported by the journal.") + + def write_deletions(self, obj_type, entries) -> None: + if self.journal: + raise NotImplementedError("deletions are not supported by the journal.") + + def revision_intrinsic_metadata_add(self, metadata: Iterable[Dict]) -> None: + self.write_additions("revision_intrinsic_metadata", metadata) + + def origin_intrinsic_metadata_add(self, metadata: Iterable[Dict]) -> None: + self.write_additions("origin_intrinsic_metadata", metadata) diff --git a/swh/indexer/tests/storage/conftest.py b/swh/indexer/tests/storage/conftest.py --- a/swh/indexer/tests/storage/conftest.py +++ b/swh/indexer/tests/storage/conftest.py @@ -69,6 +69,9 @@ def swh_indexer_storage(swh_indexer_storage_postgresql): storage_config = { "cls": "local", - "args": {"db": swh_indexer_storage_postgresql.dsn,}, + "args": { + "db": swh_indexer_storage_postgresql.dsn, + "journal_writer": {"cls": "memory",}, + }, } return get_indexer_storage(**storage_config) diff --git a/swh/indexer/tests/storage/test_api_client.py b/swh/indexer/tests/storage/test_api_client.py --- a/swh/indexer/tests/storage/test_api_client.py +++ b/swh/indexer/tests/storage/test_api_client.py @@ -16,7 +16,10 @@ def app(swh_indexer_storage_postgresql): storage_config = { "cls": "local", - "args": {"db": swh_indexer_storage_postgresql.dsn,}, + "args": { + "db": swh_indexer_storage_postgresql.dsn, + "journal_writer": {"cls": "memory",}, + }, } server.storage = get_indexer_storage(**storage_config) return server.app diff --git a/swh/indexer/tests/storage/test_in_memory.py b/swh/indexer/tests/storage/test_in_memory.py --- a/swh/indexer/tests/storage/test_in_memory.py +++ b/swh/indexer/tests/storage/test_in_memory.py @@ -14,6 +14,6 @@ def swh_indexer_storage(): storage_config = { "cls": "memory", - "args": {}, + "args": {"journal_writer": {"cls": "memory",}}, } return get_indexer_storage(**storage_config)