Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9312575
D8934.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
19 KB
Subscribers
None
D8934.diff
View Options
diff --git a/swh/indexer/storage/__init__.py b/swh/indexer/storage/__init__.py
--- a/swh/indexer/storage/__init__.py
+++ b/swh/indexer/storage/__init__.py
@@ -9,6 +9,7 @@
from typing import Dict, Iterable, List, Optional, Tuple, Union
import warnings
+import attr
import psycopg2
import psycopg2.pool
@@ -115,17 +116,19 @@
Args:
data (List[dict]): List of dictionaries to be inserted
+ >>> tool1 = {"name": "foo", "version": "1.2.3", "configuration": {}}
+ >>> tool2 = {"name": "foo", "version": "1.2.4", "configuration": {}}
>>> check_id_duplicates([
- ... ContentLicenseRow(id=b'foo', indexer_configuration_id=42, license="GPL"),
- ... ContentLicenseRow(id=b'foo', indexer_configuration_id=32, license="GPL"),
+ ... ContentLicenseRow(id=b'foo', tool=tool1, license="GPL"),
+ ... ContentLicenseRow(id=b'foo', tool=tool2, license="GPL"),
... ])
>>> check_id_duplicates([
- ... ContentLicenseRow(id=b'foo', indexer_configuration_id=42, license="AGPL"),
- ... ContentLicenseRow(id=b'foo', indexer_configuration_id=42, license="AGPL"),
+ ... ContentLicenseRow(id=b'foo', tool=tool1, license="AGPL"),
+ ... ContentLicenseRow(id=b'foo', tool=tool1, license="AGPL"),
... ])
Traceback (most recent call last):
...
- swh.indexer.storage.exc.DuplicateId: [{'id': b'foo', 'indexer_configuration_id': 42, 'license': 'AGPL'}]
+ swh.indexer.storage.exc.DuplicateId: [{'id': b'foo', 'license': 'AGPL', 'tool_configuration': '{}', 'tool_name': 'foo', 'tool_version': '1.2.3'}]
""" # noqa
counter = Counter(tuple(sorted(item.unique_key().items())) for item in data)
@@ -147,7 +150,7 @@
`swh.journal.writer.get_journal_writer`
"""
- self.journal_writer = JournalWriter(self._tool_get_from_id, journal_writer)
+ self.journal_writer = JournalWriter(journal_writer)
try:
if isinstance(db, psycopg2.extensions.connection):
self._pool = None
@@ -169,6 +172,32 @@
if db is not self._db:
db.put_conn()
+ def _join_indexer_configuration(self, entries, db, cur):
+ """Replaces ``entry.indexer_configuration_id`` with a full tool dict
+ in ``entry.tool``."""
+ joined_entries = []
+
+ # usually, all the additions in a batch are from the same indexer,
+ # so this cache allows doing a single query for all the entries.
+ tool_cache = {}
+
+ for entry in entries:
+ # get the tool used to generate this addition
+ tool_id = entry.indexer_configuration_id
+ assert tool_id
+ if tool_id not in tool_cache:
+ tool_cache[tool_id] = dict(
+ self._tool_get_from_id(tool_id, db=db, cur=cur)
+ )
+ del tool_cache[tool_id]["id"]
+ entry = attr.evolve(
+ entry, tool=tool_cache[tool_id], indexer_configuration_id=None
+ )
+
+ joined_entries.append(entry)
+
+ return joined_entries
+
@timed
@db_transaction()
def check_config(self, *, check_write, db=None, cur=None):
@@ -293,8 +322,11 @@
db=None,
cur=None,
) -> Dict[str, int]:
- check_id_duplicates(mimetypes)
- self.journal_writer.write_additions("content_mimetype", mimetypes)
+ mimetypes_with_tools = self._join_indexer_configuration(
+ mimetypes, db=db, cur=cur
+ )
+ check_id_duplicates(mimetypes_with_tools)
+ self.journal_writer.write_additions("content_mimetype", mimetypes_with_tools)
db.mktemp_content_mimetype(cur)
db.copy_to(
[m.to_dict() for m in mimetypes],
@@ -340,8 +372,11 @@
db=None,
cur=None,
) -> Dict[str, int]:
- check_id_duplicates(licenses)
- self.journal_writer.write_additions("content_fossology_license", licenses)
+ licenses_with_tools = self._join_indexer_configuration(licenses, db=db, cur=cur)
+ check_id_duplicates(licenses_with_tools)
+ self.journal_writer.write_additions(
+ "content_fossology_license", licenses_with_tools
+ )
db.mktemp_content_fossology_license(cur)
db.copy_to(
[license.to_dict() for license in licenses],
@@ -404,8 +439,9 @@
db=None,
cur=None,
) -> Dict[str, int]:
- check_id_duplicates(metadata)
- self.journal_writer.write_additions("content_metadata", metadata)
+ metadata_with_tools = self._join_indexer_configuration(metadata, db=db, cur=cur)
+ check_id_duplicates(metadata_with_tools)
+ self.journal_writer.write_additions("content_metadata", metadata_with_tools)
db.mktemp_content_metadata(cur)
@@ -457,8 +493,11 @@
db=None,
cur=None,
) -> Dict[str, int]:
- check_id_duplicates(metadata)
- self.journal_writer.write_additions("directory_intrinsic_metadata", metadata)
+ metadata_with_tools = self._join_indexer_configuration(metadata, db=db, cur=cur)
+ check_id_duplicates(metadata_with_tools)
+ self.journal_writer.write_additions(
+ "directory_intrinsic_metadata", metadata_with_tools
+ )
db.mktemp_directory_intrinsic_metadata(cur)
@@ -500,8 +539,11 @@
db=None,
cur=None,
) -> Dict[str, int]:
- check_id_duplicates(metadata)
- self.journal_writer.write_additions("origin_intrinsic_metadata", metadata)
+ metadata_with_tools = self._join_indexer_configuration(metadata, db=db, cur=cur)
+ check_id_duplicates(metadata_with_tools)
+ self.journal_writer.write_additions(
+ "origin_intrinsic_metadata", metadata_with_tools
+ )
db.mktemp_origin_intrinsic_metadata(cur)
@@ -641,8 +683,11 @@
db=None,
cur=None,
) -> Dict[str, int]:
- check_id_duplicates(metadata)
- self.journal_writer.write_additions("origin_extrinsic_metadata", metadata)
+ metadata_with_tools = self._join_indexer_configuration(metadata, db=db, cur=cur)
+ check_id_duplicates(metadata_with_tools)
+ self.journal_writer.write_additions(
+ "origin_extrinsic_metadata", metadata_with_tools
+ )
db.mktemp_origin_extrinsic_metadata(cur)
diff --git a/swh/indexer/storage/in_memory.py b/swh/indexer/storage/in_memory.py
--- a/swh/indexer/storage/in_memory.py
+++ b/swh/indexer/storage/in_memory.py
@@ -23,6 +23,8 @@
Union,
)
+import attr
+
from swh.core.collections import SortedList
from swh.model.hashutil import hash_to_bytes, hash_to_hex
from swh.model.model import SHA1_SIZE
@@ -83,6 +85,30 @@
self._journal_writer = journal_writer
self._tools_per_id = defaultdict(set)
+ def _join_indexer_configuration(self, entries):
+ """Replaces ``entry.indexer_configuration_id`` with a full tool dict
+ in ``entry.tool``."""
+ joined_entries = []
+
+ for entry in entries:
+ # get the tool used to generate this addition
+ tool_id = entry.indexer_configuration_id
+ assert tool_id
+ tool = self._tools[tool_id]
+ entry = attr.evolve(
+ entry,
+ tool={
+ "name": tool["tool_name"],
+ "version": tool["tool_version"],
+ "configuration": tool["tool_configuration"],
+ },
+ indexer_configuration_id=None,
+ )
+
+ joined_entries.append(entry)
+
+ return joined_entries
+
def _key_from_dict(self, d) -> Tuple:
"""Like the global _key_from_dict, but filters out dict keys that don't
belong in the unique key."""
@@ -210,15 +236,16 @@
"""
data = list(data)
- check_id_duplicates(data)
+ data_with_tools = self._join_indexer_configuration(data)
+ check_id_duplicates(data_with_tools)
object_type = self.row_class.object_type # type: ignore
- self._journal_writer.write_additions(object_type, data)
+ self._journal_writer.write_additions(object_type, data_with_tools)
count = 0
- for obj in data:
+ for (obj, obj_with_tool) in zip(data, data_with_tools):
item = obj.to_dict()
id_ = item.pop("id")
tool_id = item["indexer_configuration_id"]
- key = _key_from_dict(obj.unique_key())
+ key = _key_from_dict(obj_with_tool.unique_key())
self._data[id_][key] = item
self._tools_per_id[id_].add(tool_id)
count += 1
@@ -233,16 +260,7 @@
def __init__(self, journal_writer=None):
self._tools = {}
- def tool_getter(id_):
- tool = self._tools[id_]
- return {
- "id": tool["id"],
- "name": tool["tool_name"],
- "version": tool["tool_version"],
- "configuration": tool["tool_configuration"],
- }
-
- self.journal_writer = JournalWriter(tool_getter, journal_writer)
+ self.journal_writer = JournalWriter(journal_writer)
args = (self._tools, self.journal_writer)
self._mimetypes = SubStorage(ContentMimetypeRow, *args)
self._licenses = SubStorage(ContentLicenseRow, *args)
diff --git a/swh/indexer/storage/model.py b/swh/indexer/storage/model.py
--- a/swh/indexer/storage/model.py
+++ b/swh/indexer/storage/model.py
@@ -8,6 +8,7 @@
from __future__ import annotations
+import json
from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar
import attr
@@ -20,7 +21,7 @@
@attr.s
class BaseRow:
- UNIQUE_KEY_FIELDS: Tuple = ("id", "indexer_configuration_id")
+ UNIQUE_KEY_FIELDS: Tuple = ("id",)
id = attr.ib(type=Any)
indexer_configuration_id = attr.ib(type=Optional[int], default=None, kw_only=True)
@@ -55,15 +56,24 @@
return cls(**d)
def unique_key(self) -> Dict:
- obj = self
+ if not self.tool:
+ raise ValueError(
+ f"Cannot compute unique_key of {self.__class__.__name__} with no tool "
+ f"dictionary (indexer_configuration_id was given instead)"
+ )
- # tool["id"] and obj.indexer_configuration_id are the same value, but
- # only one of them is set for any given object
- if obj.indexer_configuration_id is None:
- assert obj.tool # constructors ensures tool XOR indexer_configuration_id
- obj = attr.evolve(obj, indexer_configuration_id=obj.tool["id"], tool=None)
+ tool_dict = {
+ "tool_name": self.tool["name"],
+ "tool_version": self.tool["version"],
+ "tool_configuration": json.dumps(
+ self.tool["configuration"], sort_keys=True
+ ),
+ }
- return {key: getattr(obj, key) for key in self.UNIQUE_KEY_FIELDS}
+ return {
+ **{key: getattr(self, key) for key in self.UNIQUE_KEY_FIELDS},
+ **tool_dict,
+ }
@attr.s
@@ -78,7 +88,7 @@
@attr.s
class ContentLicenseRow(BaseRow):
object_type: Final = "content_fossology_license"
- UNIQUE_KEY_FIELDS = ("id", "indexer_configuration_id", "license")
+ UNIQUE_KEY_FIELDS = ("id", "license")
id = attr.ib(type=Sha1Git)
license = attr.ib(type=str)
diff --git a/swh/indexer/storage/writer.py b/swh/indexer/storage/writer.py
--- a/swh/indexer/storage/writer.py
+++ b/swh/indexer/storage/writer.py
@@ -1,11 +1,9 @@
-# Copyright (C) 2020 The Software Heritage developers
+# Copyright (C) 2020-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from typing import Any, Callable, Dict, Iterable, Optional
-
-import attr
+from typing import Any, Dict, Iterable, Optional
try:
from swh.journal.writer import JournalWriterInterface, get_journal_writer
@@ -24,15 +22,12 @@
journal: Optional[JournalWriterInterface]
- def __init__(self, tool_getter: Callable[[int], Dict[str, Any]], journal_writer):
+ def __init__(self, journal_writer: Dict[str, Any]):
"""
Args:
- tool_getter: a callable that takes a tool_id and return a dict representing
- a tool object
journal_writer: configuration passed to
`swh.journal.writer.get_journal_writer`
"""
- self._tool_getter = tool_getter
if journal_writer:
if get_journal_writer is None:
raise EnvironmentError(
@@ -52,20 +47,21 @@
translated = []
- # usually, all the additions in a batch are from the same indexer,
- # so this cache allows doing a single query for all the entries.
- tool_cache = {}
-
for entry in entries:
assert entry.object_type == obj_type # type: ignore
- # get the tool used to generate this addition
- tool_id = entry.indexer_configuration_id
- assert tool_id
- if tool_id not in tool_cache:
- tool_cache[tool_id] = self._tool_getter(tool_id)
- entry = attr.evolve(
- entry, tool=tool_cache[tool_id], indexer_configuration_id=None
- )
+
+ # ids are internal to the database and should not be sent to postgresql
+ if entry.indexer_configuration_id is not None:
+ raise ValueError(
+ f"{entry} passed to JournalWriter.write_additions has "
+ f"indexer_configuration_id instead of full tool dict"
+ )
+ assert entry.tool, "Missing both indexer_configuration_id and tool dict"
+ if "id" in entry.tool:
+ raise ValueError(
+ f"{entry} passed to JournalWriter.write_additions "
+ f"contains a tool id"
+ )
translated.append(entry)
diff --git a/swh/indexer/tests/storage/test_model.py b/swh/indexer/tests/storage/test_model.py
--- a/swh/indexer/tests/storage/test_model.py
+++ b/swh/indexer/tests/storage/test_model.py
@@ -1,26 +1,57 @@
-# Copyright (C) 2020 The Software Heritage developers
+# Copyright (C) 2020-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import pytest
+
from swh.indexer.storage.model import BaseRow, ContentLicenseRow
+def test_unique_key__no_tool_dict():
+ with pytest.raises(ValueError, match="indexer_configuration_id"):
+ BaseRow(id=12, indexer_configuration_id=34).unique_key()
+ with pytest.raises(ValueError, match="indexer_configuration_id"):
+ ContentLicenseRow(
+ id=12, indexer_configuration_id=34, license="BSD"
+ ).unique_key()
+
+
def test_unique_key():
- assert BaseRow(id=12, indexer_configuration_id=34).unique_key() == {
+ assert BaseRow(
+ id=12, tool={"id": 34, "name": "foo", "version": "1.2.3", "configuration": {}}
+ ).unique_key() == {
"id": 12,
- "indexer_configuration_id": 34,
+ "tool_name": "foo",
+ "tool_version": "1.2.3",
+ "tool_configuration": "{}",
}
- assert BaseRow(id=12, tool={"id": 34, "name": "foo"}).unique_key() == {
+ assert ContentLicenseRow(
+ id=12,
+ tool={"id": 34, "name": "foo", "version": "1.2.3", "configuration": {}},
+ license="BSD",
+ ).unique_key() == {
"id": 12,
- "indexer_configuration_id": 34,
+ "license": "BSD",
+ "tool_name": "foo",
+ "tool_version": "1.2.3",
+ "tool_configuration": "{}",
}
assert ContentLicenseRow(
- id=12, indexer_configuration_id=34, license="BSD"
- ).unique_key() == {"id": 12, "indexer_configuration_id": 34, "license": "BSD"}
-
- assert ContentLicenseRow(
- id=12, tool={"id": 34, "name": "foo"}, license="BSD"
- ).unique_key() == {"id": 12, "indexer_configuration_id": 34, "license": "BSD"}
+ id=12,
+ tool={
+ "id": 34,
+ "name": "foo",
+ "version": "1.2.3",
+ "configuration": {"foo": 1, "bar": 2},
+ },
+ license="BSD",
+ ).unique_key() == {
+ "id": 12,
+ "license": "BSD",
+ "tool_name": "foo",
+ "tool_version": "1.2.3",
+ "tool_configuration": '{"bar": 2, "foo": 1}',
+ }
diff --git a/swh/indexer/tests/storage/test_storage.py b/swh/indexer/tests/storage/test_storage.py
--- a/swh/indexer/tests/storage/test_storage.py
+++ b/swh/indexer/tests/storage/test_storage.py
@@ -24,6 +24,15 @@
from swh.model.hashutil import hash_to_bytes
+def _remove_tool_ids(rows):
+ results = []
+ for row in rows:
+ tool = dict(row.tool)
+ del tool["id"]
+ results.append(attr.evolve(row, tool=tool))
+ return results
+
+
def prepare_mimetypes_from_licenses(
fossology_licenses: List[ContentLicenseRow],
) -> List[ContentMimetypeRow]:
@@ -358,11 +367,13 @@
assert actual_data == expected_data
+ expected_journal_data = _remove_tool_ids(expected_data)
+
journal_objects = storage.journal_writer.journal.objects # type: ignore
actual_journal_data = [
obj for (obj_type, obj) in journal_objects if obj_type == self.endpoint_type
]
- assert list(sorted(actual_journal_data)) == list(sorted(expected_data))
+ assert list(sorted(actual_journal_data)) == list(sorted(expected_journal_data))
class TestIndexerStorageContentMimetypes(StorageETypeTester):
@@ -574,11 +585,13 @@
assert actual_data in (expected_data_postgresql, expected_data_verbatim)
+ expected_journal_data = _remove_tool_ids(expected_data_verbatim)
+
journal_objects = storage.journal_writer.journal.objects # type: ignore
actual_journal_data = [
obj for (obj_type, obj) in journal_objects if obj_type == self.endpoint_type
]
- assert list(sorted(actual_journal_data)) == list(sorted(expected_data_verbatim))
+ assert list(sorted(actual_journal_data)) == list(sorted(expected_journal_data))
class TestIndexerStorageDirectoryIntrinsicMetadata(StorageETypeTester):
@@ -912,13 +925,17 @@
assert actual_metadata == expected_metadata
+ expected_journal_metadata = _remove_tool_ids(expected_metadata)
+
journal_objects = storage.journal_writer.journal.objects # type: ignore
actual_journal_metadata = [
obj
for (obj_type, obj) in journal_objects
if obj_type == "origin_intrinsic_metadata"
]
- assert list(sorted(actual_journal_metadata)) == list(sorted(expected_metadata))
+ assert list(sorted(actual_journal_metadata)) == list(
+ sorted(expected_journal_metadata)
+ )
def test_origin_intrinsic_metadata_add_update_in_place_duplicate(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
@@ -1527,13 +1544,17 @@
assert actual_metadata == expected_metadata
+ expected_journal_metadata = _remove_tool_ids(expected_metadata)
+
journal_objects = storage.journal_writer.journal.objects # type: ignore
actual_journal_metadata = [
obj
for (obj_type, obj) in journal_objects
if obj_type == "origin_extrinsic_metadata"
]
- assert list(sorted(actual_journal_metadata)) == list(sorted(expected_metadata))
+ assert list(sorted(actual_journal_metadata)) == list(
+ sorted(expected_journal_metadata)
+ )
def test_origin_extrinsic_metadata_add_update_in_place_duplicate(
self, swh_indexer_storage_with_data: Tuple[IndexerStorageInterface, Any]
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Jul 2, 10:58 AM (1 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3214916
Attached To
D8934: Remove tool ids from Kafka messages
Event Timeline
Log In to Comment