diff --git a/swh/indexer/storage/__init__.py b/swh/indexer/storage/__init__.py --- a/swh/indexer/storage/__init__.py +++ b/swh/indexer/storage/__init__.py @@ -9,6 +9,7 @@ from typing import Dict, Iterable, List, Optional, Tuple, Union import warnings +import attr import psycopg2 import psycopg2.pool @@ -115,17 +116,19 @@ Args: data (List[dict]): List of dictionaries to be inserted + >>> tool1 = {"name": "foo", "version": "1.2.3", "configuration": {}} + >>> tool2 = {"name": "foo", "version": "1.2.4", "configuration": {}} >>> check_id_duplicates([ - ... ContentLicenseRow(id=b'foo', indexer_configuration_id=42, license="GPL"), - ... ContentLicenseRow(id=b'foo', indexer_configuration_id=32, license="GPL"), + ... ContentLicenseRow(id=b'foo', tool=tool1, license="GPL"), + ... ContentLicenseRow(id=b'foo', tool=tool2, license="GPL"), ... ]) >>> check_id_duplicates([ - ... ContentLicenseRow(id=b'foo', indexer_configuration_id=42, license="AGPL"), - ... ContentLicenseRow(id=b'foo', indexer_configuration_id=42, license="AGPL"), + ... ContentLicenseRow(id=b'foo', tool=tool1, license="AGPL"), + ... ContentLicenseRow(id=b'foo', tool=tool1, license="AGPL"), ... ]) Traceback (most recent call last): ... - swh.indexer.storage.exc.DuplicateId: [{'id': b'foo', 'indexer_configuration_id': 42, 'license': 'AGPL'}] + swh.indexer.storage.exc.DuplicateId: [{'id': b'foo', 'license': 'AGPL', 'tool_configuration': '{}', 'tool_name': 'foo', 'tool_version': '1.2.3'}] """ # noqa counter = Counter(tuple(sorted(item.unique_key().items())) for item in data) @@ -147,7 +150,7 @@ `swh.journal.writer.get_journal_writer` """ - self.journal_writer = JournalWriter(self._tool_get_from_id, journal_writer) + self.journal_writer = JournalWriter(journal_writer) try: if isinstance(db, psycopg2.extensions.connection): self._pool = None @@ -169,6 +172,32 @@ if db is not self._db: db.put_conn() + def _join_indexer_configuration(self, entries, db, cur): + """Replaces ``entry.indexer_configuration_id`` with a full tool dict + in ``entry.tool``.""" + joined_entries = [] + + # usually, all the additions in a batch are from the same indexer, + # so this cache allows doing a single query for all the entries. + tool_cache = {} + + for entry in entries: + # get the tool used to generate this addition + tool_id = entry.indexer_configuration_id + assert tool_id + if tool_id not in tool_cache: + tool_cache[tool_id] = dict( + self._tool_get_from_id(tool_id, db=db, cur=cur) + ) + del tool_cache[tool_id]["id"] + entry = attr.evolve( + entry, tool=tool_cache[tool_id], indexer_configuration_id=None + ) + + joined_entries.append(entry) + + return joined_entries + @timed @db_transaction() def check_config(self, *, check_write, db=None, cur=None): @@ -293,8 +322,11 @@ db=None, cur=None, ) -> Dict[str, int]: - check_id_duplicates(mimetypes) - self.journal_writer.write_additions("content_mimetype", mimetypes) + mimetypes_with_tools = self._join_indexer_configuration( + mimetypes, db=db, cur=cur + ) + check_id_duplicates(mimetypes_with_tools) + self.journal_writer.write_additions("content_mimetype", mimetypes_with_tools) db.mktemp_content_mimetype(cur) db.copy_to( [m.to_dict() for m in mimetypes], @@ -340,8 +372,11 @@ db=None, cur=None, ) -> Dict[str, int]: - check_id_duplicates(licenses) - self.journal_writer.write_additions("content_fossology_license", licenses) + licenses_with_tools = self._join_indexer_configuration(licenses, db=db, cur=cur) + check_id_duplicates(licenses_with_tools) + self.journal_writer.write_additions( + "content_fossology_license", licenses_with_tools + ) db.mktemp_content_fossology_license(cur) db.copy_to( [license.to_dict() for license in licenses], @@ -404,8 +439,9 @@ db=None, cur=None, ) -> Dict[str, int]: - check_id_duplicates(metadata) - self.journal_writer.write_additions("content_metadata", metadata) + metadata_with_tools = self._join_indexer_configuration(metadata, db=db, cur=cur) + check_id_duplicates(metadata_with_tools) + self.journal_writer.write_additions("content_metadata", metadata_with_tools) db.mktemp_content_metadata(cur) @@ -457,8 +493,11 @@ db=None, cur=None, ) -> Dict[str, int]: - check_id_duplicates(metadata) - self.journal_writer.write_additions("directory_intrinsic_metadata", metadata) + metadata_with_tools = self._join_indexer_configuration(metadata, db=db, cur=cur) + check_id_duplicates(metadata_with_tools) + self.journal_writer.write_additions( + "directory_intrinsic_metadata", metadata_with_tools + ) db.mktemp_directory_intrinsic_metadata(cur) @@ -500,8 +539,11 @@ db=None, cur=None, ) -> Dict[str, int]: - check_id_duplicates(metadata) - self.journal_writer.write_additions("origin_intrinsic_metadata", metadata) + metadata_with_tools = self._join_indexer_configuration(metadata, db=db, cur=cur) + check_id_duplicates(metadata_with_tools) + self.journal_writer.write_additions( + "origin_intrinsic_metadata", metadata_with_tools + ) db.mktemp_origin_intrinsic_metadata(cur) @@ -641,8 +683,11 @@ db=None, cur=None, ) -> Dict[str, int]: - check_id_duplicates(metadata) - self.journal_writer.write_additions("origin_extrinsic_metadata", metadata) + metadata_with_tools = self._join_indexer_configuration(metadata, db=db, cur=cur) + check_id_duplicates(metadata_with_tools) + self.journal_writer.write_additions( + "origin_extrinsic_metadata", metadata_with_tools + ) db.mktemp_origin_extrinsic_metadata(cur) diff --git a/swh/indexer/storage/in_memory.py b/swh/indexer/storage/in_memory.py --- a/swh/indexer/storage/in_memory.py +++ b/swh/indexer/storage/in_memory.py @@ -23,6 +23,8 @@ Union, ) +import attr + from swh.core.collections import SortedList from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.model import SHA1_SIZE @@ -83,6 +85,30 @@ self._journal_writer = journal_writer self._tools_per_id = defaultdict(set) + def _join_indexer_configuration(self, entries): + """Replaces ``entry.indexer_configuration_id`` with a full tool dict + in ``entry.tool``.""" + joined_entries = [] + + for entry in entries: + # get the tool used to generate this addition + tool_id = entry.indexer_configuration_id + assert tool_id + tool = self._tools[tool_id] + entry = attr.evolve( + entry, + tool={ + "name": tool["tool_name"], + "version": tool["tool_version"], + "configuration": tool["tool_configuration"], + }, + indexer_configuration_id=None, + ) + + joined_entries.append(entry) + + return joined_entries + def _key_from_dict(self, d) -> Tuple: """Like the global _key_from_dict, but filters out dict keys that don't belong in the unique key.""" @@ -210,15 +236,16 @@ """ data = list(data) - check_id_duplicates(data) + data_with_tools = self._join_indexer_configuration(data) + check_id_duplicates(data_with_tools) object_type = self.row_class.object_type # type: ignore - self._journal_writer.write_additions(object_type, data) + self._journal_writer.write_additions(object_type, data_with_tools) count = 0 - for obj in data: + for (obj, obj_with_tool) in zip(data, data_with_tools): item = obj.to_dict() id_ = item.pop("id") tool_id = item["indexer_configuration_id"] - key = _key_from_dict(obj.unique_key()) + key = _key_from_dict(obj_with_tool.unique_key()) self._data[id_][key] = item self._tools_per_id[id_].add(tool_id) count += 1 @@ -233,16 +260,7 @@ def __init__(self, journal_writer=None): self._tools = {} - def tool_getter(id_): - tool = self._tools[id_] - return { - "id": tool["id"], - "name": tool["tool_name"], - "version": tool["tool_version"], - "configuration": tool["tool_configuration"], - } - - self.journal_writer = JournalWriter(tool_getter, journal_writer) + self.journal_writer = JournalWriter(journal_writer) args = (self._tools, self.journal_writer) self._mimetypes = SubStorage(ContentMimetypeRow, *args) self._licenses = SubStorage(ContentLicenseRow, *args) diff --git a/swh/indexer/storage/model.py b/swh/indexer/storage/model.py --- a/swh/indexer/storage/model.py +++ b/swh/indexer/storage/model.py @@ -8,6 +8,7 @@ from __future__ import annotations +import json from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar import attr @@ -20,7 +21,7 @@ @attr.s class BaseRow: - UNIQUE_KEY_FIELDS: Tuple = ("id", "indexer_configuration_id") + UNIQUE_KEY_FIELDS: Tuple = ("id",) id = attr.ib(type=Any) indexer_configuration_id = attr.ib(type=Optional[int], default=None, kw_only=True) @@ -55,15 +56,24 @@ return cls(**d) def unique_key(self) -> Dict: - obj = self + if not self.tool: + raise ValueError( + f"Cannot compute unique_key of {self.__class__.__name__} with no tool " + f"dictionary (indexer_configuration_id was given instead)" + ) - # tool["id"] and obj.indexer_configuration_id are the same value, but - # only one of them is set for any given object - if obj.indexer_configuration_id is None: - assert obj.tool # constructors ensures tool XOR indexer_configuration_id - obj = attr.evolve(obj, indexer_configuration_id=obj.tool["id"], tool=None) + tool_dict = { + "tool_name": self.tool["name"], + "tool_version": self.tool["version"], + "tool_configuration": json.dumps( + self.tool["configuration"], sort_keys=True + ), + } - return {key: getattr(obj, key) for key in self.UNIQUE_KEY_FIELDS} + return { + **{key: getattr(self, key) for key in self.UNIQUE_KEY_FIELDS}, + **tool_dict, + } @attr.s @@ -78,7 +88,7 @@ @attr.s class ContentLicenseRow(BaseRow): object_type: Final = "content_fossology_license" - UNIQUE_KEY_FIELDS = ("id", "indexer_configuration_id", "license") + UNIQUE_KEY_FIELDS = ("id", "license") id = attr.ib(type=Sha1Git) license = attr.ib(type=str) diff --git a/swh/indexer/storage/writer.py b/swh/indexer/storage/writer.py --- a/swh/indexer/storage/writer.py +++ b/swh/indexer/storage/writer.py @@ -1,11 +1,9 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Callable, Dict, Iterable, Optional - -import attr +from typing import Any, Dict, Iterable, Optional try: from swh.journal.writer import JournalWriterInterface, get_journal_writer @@ -24,15 +22,12 @@ journal: Optional[JournalWriterInterface] - def __init__(self, tool_getter: Callable[[int], Dict[str, Any]], journal_writer): + def __init__(self, journal_writer: Dict[str, Any]): """ Args: - tool_getter: a callable that takes a tool_id and return a dict representing - a tool object journal_writer: configuration passed to `swh.journal.writer.get_journal_writer` """ - self._tool_getter = tool_getter if journal_writer: if get_journal_writer is None: raise EnvironmentError( @@ -52,20 +47,21 @@ translated = [] - # usually, all the additions in a batch are from the same indexer, - # so this cache allows doing a single query for all the entries. - tool_cache = {} - for entry in entries: assert entry.object_type == obj_type # type: ignore - # get the tool used to generate this addition - tool_id = entry.indexer_configuration_id - assert tool_id - if tool_id not in tool_cache: - tool_cache[tool_id] = self._tool_getter(tool_id) - entry = attr.evolve( - entry, tool=tool_cache[tool_id], indexer_configuration_id=None - ) + + # ids are internal to the database and should not be sent to postgresql + if entry.indexer_configuration_id is not None: + raise ValueError( + f"{entry} passed to JournalWriter.write_additions has " + f"indexer_configuration_id instead of full tool dict" + ) + assert entry.tool, "Missing both indexer_configuration_id and tool dict" + if "id" in entry.tool: + raise ValueError( + f"{entry} passed to JournalWriter.write_additions " + f"contains a tool id" + ) translated.append(entry) diff --git a/swh/indexer/tests/storage/test_model.py b/swh/indexer/tests/storage/test_model.py --- a/swh/indexer/tests/storage/test_model.py +++ b/swh/indexer/tests/storage/test_model.py @@ -1,26 +1,57 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import pytest + from swh.indexer.storage.model import BaseRow, ContentLicenseRow +def test_unique_key__no_tool_dict(): + with pytest.raises(ValueError, match="indexer_configuration_id"): + BaseRow(id=12, indexer_configuration_id=34).unique_key() + with pytest.raises(ValueError, match="indexer_configuration_id"): + ContentLicenseRow( + id=12, indexer_configuration_id=34, license="BSD" + ).unique_key() + + def test_unique_key(): - assert BaseRow(id=12, indexer_configuration_id=34).unique_key() == { + assert BaseRow( + id=12, tool={"id": 34, "name": "foo", "version": "1.2.3", "configuration": {}} + ).unique_key() == { "id": 12, - "indexer_configuration_id": 34, + "tool_name": "foo", + "tool_version": "1.2.3", + "tool_configuration": "{}", } - assert BaseRow(id=12, tool={"id": 34, "name": "foo"}).unique_key() == { + assert ContentLicenseRow( + id=12, + tool={"id": 34, "name": "foo", "version": "1.2.3", "configuration": {}}, + license="BSD", + ).unique_key() == { "id": 12, - "indexer_configuration_id": 34, + "license": "BSD", + "tool_name": "foo", + "tool_version": "1.2.3", + "tool_configuration": "{}", } assert ContentLicenseRow( - id=12, indexer_configuration_id=34, license="BSD" - ).unique_key() == {"id": 12, "indexer_configuration_id": 34, "license": "BSD"} - - assert ContentLicenseRow( - id=12, tool={"id": 34, "name": "foo"}, license="BSD" - ).unique_key() == {"id": 12, "indexer_configuration_id": 34, "license": "BSD"} + id=12, + tool={ + "id": 34, + "name": "foo", + "version": "1.2.3", + "configuration": {"foo": 1, "bar": 2}, + }, + license="BSD", + ).unique_key() == { + "id": 12, + "license": "BSD", + "tool_name": "foo", + "tool_version": "1.2.3", + "tool_configuration": '{"bar": 2, "foo": 1}', + } diff --git a/swh/indexer/tests/storage/test_storage.py b/swh/indexer/tests/storage/test_storage.py --- a/swh/indexer/tests/storage/test_storage.py +++ b/swh/indexer/tests/storage/test_storage.py @@ -24,6 +24,15 @@ from swh.model.hashutil import hash_to_bytes +def _remove_tool_ids(rows): + results = [] + for row in rows: + tool = dict(row.tool) + del tool["id"] + results.append(attr.evolve(row, tool=tool)) + return results + + def prepare_mimetypes_from_licenses( fossology_licenses: List[ContentLicenseRow], ) -> List[ContentMimetypeRow]: @@ -358,11 +367,13 @@ assert actual_data == expected_data + expected_journal_data = _remove_tool_ids(expected_data) + journal_objects = storage.journal_writer.journal.objects # type: ignore actual_journal_data = [ obj for (obj_type, obj) in journal_objects if obj_type == self.endpoint_type ] - assert list(sorted(actual_journal_data)) == list(sorted(expected_data)) + assert list(sorted(actual_journal_data)) == list(sorted(expected_journal_data)) class TestIndexerStorageContentMimetypes(StorageETypeTester): @@ -574,11 +585,13 @@ assert actual_data in (expected_data_postgresql, expected_data_verbatim) + expected_journal_data = _remove_tool_ids(expected_data_verbatim) + journal_objects = storage.journal_writer.journal.objects # type: ignore actual_journal_data = [ obj for (obj_type, obj) in journal_objects if obj_type == self.endpoint_type ] - assert list(sorted(actual_journal_data)) == list(sorted(expected_data_verbatim)) + assert list(sorted(actual_journal_data)) == list(sorted(expected_journal_data)) class TestIndexerStorageDirectoryIntrinsicMetadata(StorageETypeTester): @@ -912,13 +925,17 @@ assert actual_metadata == expected_metadata + expected_journal_metadata = _remove_tool_ids(expected_metadata) + journal_objects = storage.journal_writer.journal.objects # type: ignore actual_journal_metadata = [ obj for (obj_type, obj) in journal_objects if obj_type == "origin_intrinsic_metadata" ] - assert list(sorted(actual_journal_metadata)) == list(sorted(expected_metadata)) + assert list(sorted(actual_journal_metadata)) == list( + sorted(expected_journal_metadata) + ) def test_origin_intrinsic_metadata_add_update_in_place_duplicate( self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any] @@ -1527,13 +1544,17 @@ assert actual_metadata == expected_metadata + expected_journal_metadata = _remove_tool_ids(expected_metadata) + journal_objects = storage.journal_writer.journal.objects # type: ignore actual_journal_metadata = [ obj for (obj_type, obj) in journal_objects if obj_type == "origin_extrinsic_metadata" ] - assert list(sorted(actual_journal_metadata)) == list(sorted(expected_metadata)) + assert list(sorted(actual_journal_metadata)) == list( + sorted(expected_journal_metadata) + ) def test_origin_extrinsic_metadata_add_update_in_place_duplicate( self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]