diff --git a/swh/indexer/storage/__init__.py b/swh/indexer/storage/__init__.py --- a/swh/indexer/storage/__init__.py +++ b/swh/indexer/storage/__init__.py @@ -5,6 +5,7 @@ from collections import Counter, defaultdict +import itertools import json from typing import Dict, List, Optional @@ -22,6 +23,15 @@ from .exc import DuplicateId, IndexerStorageArgumentException from .interface import PagedResult, Sha1 from .metrics import process_metrics, send_metric, timed +from .model import ( + ContentCtagsRow, + ContentLanguageRow, + ContentLicenseRow, + ContentMetadataRow, + ContentMimetypeRow, + OriginIntrinsicMetadataRow, + RevisionIntrinsicMetadataRow, +) INDEXER_CFG_KEY = "indexer_storage" @@ -59,7 +69,7 @@ def check_id_duplicates(data): """ - If any two dictionaries in `data` have the same id, raises + If any two row models in `data` have the same unique key, raises a `ValueError`. Values associated to the key must be hashable. @@ -68,21 +78,21 @@ data (List[dict]): List of dictionaries to be inserted >>> check_id_duplicates([ - ... {'id': 'foo', 'data': 'spam'}, - ... {'id': 'bar', 'data': 'egg'}, + ... ContentLanguageRow(id=b'foo', indexer_configuration_id=42, lang="python"), + ... ContentLanguageRow(id=b'foo', indexer_configuration_id=32, lang="python"), ... ]) >>> check_id_duplicates([ - ... {'id': 'foo', 'data': 'spam'}, - ... {'id': 'foo', 'data': 'egg'}, + ... ContentLanguageRow(id=b'foo', indexer_configuration_id=42, lang="python"), + ... ContentLanguageRow(id=b'foo', indexer_configuration_id=42, lang="python"), ... ]) Traceback (most recent call last): ... - swh.indexer.storage.exc.DuplicateId: ['foo'] - """ - counter = Counter(item["id"] for item in data) + swh.indexer.storage.exc.DuplicateId: [{'id': b'foo', 'indexer_configuration_id': 42}] + """ # noqa + counter = Counter(tuple(sorted(item.unique_key().items())) for item in data) duplicates = [id_ for (id_, count) in counter.items() if count >= 2] if duplicates: - raise DuplicateId(duplicates) + raise DuplicateId(list(map(dict, duplicates))) class IndexerStorage: @@ -244,7 +254,7 @@ A dict with the number of new elements added to the storage. """ - check_id_duplicates(mimetypes) + check_id_duplicates(map(ContentMimetypeRow.from_dict, mimetypes)) mimetypes.sort(key=lambda m: m["id"]) db.mktemp_content_mimetype(cur) db.copy_to( @@ -280,7 +290,7 @@ def content_language_add( self, languages: List[Dict], conflict_update: bool = False, db=None, cur=None ) -> Dict[str, int]: - check_id_duplicates(languages) + check_id_duplicates(map(ContentLanguageRow.from_dict, languages)) languages.sort(key=lambda m: m["id"]) db.mktemp_content_language(cur) # empty language is mapped to 'unknown' @@ -319,19 +329,13 @@ def content_ctags_add( self, ctags: List[Dict], conflict_update: bool = False, db=None, cur=None ) -> Dict[str, int]: - check_id_duplicates(ctags) + rows = list(itertools.chain.from_iterable(map(converters.ctags_to_db, ctags))) + check_id_duplicates(map(ContentCtagsRow.from_dict, rows)) ctags.sort(key=lambda m: m["id"]) - def _convert_ctags(__ctags): - """Convert ctags dict to list of ctags. - - """ - for ctags in __ctags: - yield from converters.ctags_to_db(ctags) - db.mktemp_content_ctags(cur) db.copy_to( - list(_convert_ctags(ctags)), + rows, tblname="tmp_content_ctags", columns=["id", "name", "kind", "line", "lang", "indexer_configuration_id"], cur=cur, @@ -367,19 +371,16 @@ def content_fossology_license_add( self, licenses: List[Dict], conflict_update: bool = False, db=None, cur=None ) -> Dict[str, int]: - check_id_duplicates(licenses) + rows = list( + itertools.chain.from_iterable( + map(converters.fossology_license_to_db, licenses) + ) + ) + check_id_duplicates(map(ContentLicenseRow.from_dict, rows)) licenses.sort(key=lambda m: m["id"]) db.mktemp_content_fossology_license(cur) db.copy_to( - ( - { - "id": sha1["id"], - "indexer_configuration_id": sha1["indexer_configuration_id"], - "license": license, - } - for sha1 in licenses - for license in sha1["licenses"] - ), + rows, tblname="tmp_content_fossology_license", columns=["id", "license", "indexer_configuration_id"], cur=cur, @@ -429,7 +430,7 @@ def content_metadata_add( self, metadata: List[Dict], conflict_update: bool = False, db=None, cur=None ) -> Dict[str, int]: - check_id_duplicates(metadata) + check_id_duplicates(map(ContentMetadataRow.from_dict, metadata)) metadata.sort(key=lambda m: m["id"]) db.mktemp_content_metadata(cur) @@ -465,7 +466,7 @@ def revision_intrinsic_metadata_add( self, metadata: List[Dict], conflict_update: bool = False, db=None, cur=None ) -> Dict[str, int]: - check_id_duplicates(metadata) + check_id_duplicates(map(RevisionIntrinsicMetadataRow.from_dict, metadata)) metadata.sort(key=lambda m: m["id"]) db.mktemp_revision_intrinsic_metadata(cur) @@ -504,7 +505,7 @@ def origin_intrinsic_metadata_add( self, metadata: List[Dict], conflict_update: bool = False, db=None, cur=None ) -> Dict[str, int]: - check_id_duplicates(metadata) + check_id_duplicates(map(OriginIntrinsicMetadataRow.from_dict, metadata)) metadata.sort(key=lambda m: m["id"]) db.mktemp_origin_intrinsic_metadata(cur) diff --git a/swh/indexer/storage/converters.py b/swh/indexer/storage/converters.py --- a/swh/indexer/storage/converters.py +++ b/swh/indexer/storage/converters.py @@ -42,6 +42,18 @@ } +def fossology_license_to_db(licenses): + """Similar to ctags_to_db, but for licenses.""" + id = licenses["id"] + tool_id = licenses["indexer_configuration_id"] + for license in licenses["licenses"]: + yield { + "id": id, + "indexer_configuration_id": tool_id, + "license": license, + } + + def db_to_ctags(ctag): """Convert a ctags entry into a ready ctags entry. diff --git a/swh/indexer/storage/in_memory.py b/swh/indexer/storage/in_memory.py --- a/swh/indexer/storage/in_memory.py +++ b/swh/indexer/storage/in_memory.py @@ -25,20 +25,19 @@ from swh.core.collections import SortedList from swh.model.hashutil import hash_to_bytes, hash_to_hex -from swh.model.model import SHA1_SIZE +from swh.model.model import SHA1_SIZE, Sha1Git from swh.storage.utils import get_partition_bounds_bytes -from . import MAPPING_NAMES, check_id_duplicates +from . import MAPPING_NAMES, check_id_duplicates, converters from .exc import IndexerStorageArgumentException from .interface import PagedResult, Sha1 from .model import ( BaseRow, ContentCtagsRow, ContentLanguageRow, - ContentLicensesRow, + ContentLicenseRow, ContentMetadataRow, ContentMimetypeRow, - KeyDict, OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow, ) @@ -61,6 +60,10 @@ raise IndexerStorageArgumentException("identifiers must be bytes.") +def _key_from_dict(d): + return tuple(sorted(d.items())) + + ToolId = int TValue = TypeVar("TValue", bound=BaseRow) @@ -68,17 +71,22 @@ class SubStorage(Generic[TValue]): """Implements common missing/get/add logic for each indexer type.""" - _data: Dict[Tuple[Sha1, ToolId], Dict[str, Any]] + _data: Dict[Sha1, Dict[Tuple, Dict[str, Any]]] _tools_per_id: Dict[Sha1, Set[ToolId]] def __init__(self, row_class: Type[TValue], tools): self.row_class = row_class self._tools = tools self._sorted_ids = SortedList[bytes, Sha1]() - self._data = {} + self._data = defaultdict(dict) self._tools_per_id = defaultdict(set) - def missing(self, keys: Iterable[KeyDict]) -> Iterator[Sha1]: + def _key_from_dict(self, d) -> Tuple: + """Like the global _key_from_dict, but filters out dict keys that don't + belong in the unique key.""" + return _key_from_dict({k: d[k] for k in self.row_class.UNIQUE_KEY_FIELDS}) + + def missing(self, keys: Iterable[Dict]) -> Iterator[Sha1]: """List data missing from storage. Args: @@ -113,12 +121,11 @@ """ for id_ in ids: - for tool_id in self._tools_per_id.get(id_, set()): - key = (id_, tool_id) + for entry in self._data[id_].values(): + entry = entry.copy() + tool_id = entry.pop("indexer_configuration_id") yield self.row_class( - id=id_, - tool=_transform_tool(self._tools[tool_id]), - **self._data[key], + id=id_, tool=_transform_tool(self._tools[tool_id]), **entry, ) def get_all(self) -> Iterator[TValue]: @@ -196,73 +203,37 @@ """ data = list(data) - check_id_duplicates(obj.to_dict() for obj in data) + check_id_duplicates(data) count = 0 for obj in data: item = obj.to_dict() - tool_id = item.pop("indexer_configuration_id") id_ = item.pop("id") - data_item = item - if not conflict_update and tool_id in self._tools_per_id.get(id_, set()): + tool_id = item["indexer_configuration_id"] + key = _key_from_dict(obj.unique_key()) + if not conflict_update and key in self._data[id_]: # Duplicate, should not be updated continue - key = (id_, tool_id) - self._data[key] = data_item + self._data[id_][key] = item self._tools_per_id[id_].add(tool_id) count += 1 if id_ not in self._sorted_ids: self._sorted_ids.add(id_) return count - def add_merge( - self, new_data: Iterable[TValue], conflict_update: bool, merged_key: str - ) -> int: - added = 0 - all_subitems: List - for new_obj in new_data: - new_item = new_obj.to_dict() - id_ = new_item["id"] - tool_id = new_item["indexer_configuration_id"] - if conflict_update: - all_subitems = [] - else: - existing = list(self.get([id_])) - all_subitems = [ - old_subitem - for existing_item in existing - if existing_item.tool["id"] == tool_id # type: ignore - for old_subitem in getattr(existing_item, merged_key) - ] - for new_subitem in new_item[merged_key]: - if new_subitem not in all_subitems: - all_subitems.append(new_subitem) - added += self.add( - [ - self.row_class( - id=id_, - indexer_configuration_id=tool_id, - **{merged_key: all_subitems}, # type: ignore - ) # FIXME: this only works for classes with three attributes - ], - conflict_update=True, - ) - if id_ not in self._sorted_ids: - self._sorted_ids.add(id_) - return added - - def delete(self, entries: List[KeyDict]) -> int: + def delete(self, entries: List[Dict]) -> int: """Delete entries and return the number of entries deleted. """ deleted = 0 for entry in entries: (id_, tool_id) = (entry["id"], entry["indexer_configuration_id"]) - key = (id_, tool_id) if tool_id in self._tools_per_id[id_]: self._tools_per_id[id_].remove(tool_id) - if key in self._data: - deleted += 1 - del self._data[key] + if id_ in self._data: + key = self._key_from_dict(entry) + if key in self._data[id_]: + deleted += 1 + del self._data[id_][key] return deleted @@ -274,7 +245,7 @@ self._mimetypes = SubStorage(ContentMimetypeRow, self._tools) self._languages = SubStorage(ContentLanguageRow, self._tools) self._content_ctags = SubStorage(ContentCtagsRow, self._tools) - self._licenses = SubStorage(ContentLicensesRow, self._tools) + self._licenses = SubStorage(ContentLicenseRow, self._tools) self._content_metadata = SubStorage(ContentMetadataRow, self._tools) self._revision_intrinsic_metadata = SubStorage( RevisionIntrinsicMetadataRow, self._tools @@ -333,55 +304,79 @@ def content_ctags_get(self, ids): for item in self._content_ctags.get(ids): - for item_ctags_item in item.ctags: - yield {"id": item.id, "tool": item.tool, **item_ctags_item.to_dict()} + yield {"id": item.id, "tool": item.tool, **item.to_dict()} def content_ctags_add( self, ctags: List[Dict], conflict_update: bool = False ) -> Dict[str, int]: check_id_types(ctags) - added = self._content_ctags.add_merge( - map(ContentCtagsRow.from_dict, ctags), conflict_update, "ctags" + added = self._content_ctags.add( + map( + ContentCtagsRow.from_dict, + itertools.chain.from_iterable(map(converters.ctags_to_db, ctags)), + ), + conflict_update, ) return {"content_ctags:add": added} def content_ctags_search(self, expression, limit=10, last_sha1=None): nb_matches = 0 + items_per_id: Dict[Tuple[Sha1Git, ToolId], List[ContentCtagsRow]] = {} for item in sorted(self._content_ctags.get_all()): if item.id <= (last_sha1 or bytes(0 for _ in range(SHA1_DIGEST_SIZE))): continue - for ctags_item in item.ctags: - if ctags_item.name != expression: + items_per_id.setdefault( + (item.id, item.indexer_configuration_id), [] + ).append(item) + + for items in items_per_id.values(): + ctags = [] + for item in items: + if item.name != expression: continue nb_matches += 1 - yield { - "id": item.id, - "tool": item.tool, - **ctags_item.to_dict(), - } - if nb_matches >= limit: - return + if nb_matches > limit: + break + item_dict = item.to_dict() + id_ = item_dict.pop("id") + tool = item_dict.pop("tool") + ctags.append(item_dict) + + if ctags: + for ctag in ctags: + yield {"id": id_, "tool": tool, **ctag} def content_fossology_license_get(self, ids): # Rewrites the output of SubStorage.get from the old format to # the new one. SubStorage.get should be updated once all other # *_get methods use the new format. # See: https://forge.softwareheritage.org/T1433 - res = {} - for obj in self._licenses.get(ids): - d = obj.to_dict() - res.setdefault(d.pop("id"), []).append(d) - for (id_, facts) in res.items(): - yield {id_: facts} + for id_ in ids: + items = {} + for obj in self._licenses.get([id_]): + items.setdefault(obj.tool["id"], (obj.tool, []))[1].append(obj.license) + if items: + yield { + id_: [ + {"tool": tool, "licenses": licenses} + for (tool, licenses) in items.values() + ] + } def content_fossology_license_add( self, licenses: List[Dict], conflict_update: bool = False ) -> Dict[str, int]: check_id_types(licenses) - added = self._licenses.add_merge( - map(ContentLicensesRow.from_dict, licenses), conflict_update, "licenses" + added = self._licenses.add( + map( + ContentLicenseRow.from_dict, + itertools.chain.from_iterable( + map(converters.fossology_license_to_db, licenses) + ), + ), + conflict_update, ) - return {"fossology_license_add:add": added} + return {"content_fossology_license:add": added} def content_fossology_license_get_partition( self, diff --git a/swh/indexer/storage/model.py b/swh/indexer/storage/model.py --- a/swh/indexer/storage/model.py +++ b/swh/indexer/storage/model.py @@ -8,24 +8,19 @@ from __future__ import annotations -from typing import Any, Dict, List, Optional, Type, TypeVar +from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar import attr -from typing_extensions import TypedDict from swh.model.model import Sha1Git, dictify - -class KeyDict(TypedDict): - id: Sha1Git - indexer_configuration_id: int - - TSelf = TypeVar("TSelf") @attr.s class BaseRow: + UNIQUE_KEY_FIELDS: Tuple = ("id", "indexer_configuration_id") + id = attr.ib(type=Sha1Git) indexer_configuration_id = attr.ib(type=Optional[int], default=None, kw_only=True) tool = attr.ib(type=Optional[Dict], default=None, kw_only=True) @@ -58,6 +53,13 @@ def from_dict(cls: Type[TSelf], d) -> TSelf: return cls(**d) # type: ignore + def unique_key(self) -> Dict: + if self.indexer_configuration_id is None: + raise ValueError( + "Can only call unique_key() on objects with indexer_configuration_id." + ) + return {key: getattr(self, key) for key in self.UNIQUE_KEY_FIELDS} + @attr.s class ContentMimetypeRow(BaseRow): @@ -71,34 +73,27 @@ @attr.s -class ContentCtagsEntry: +class ContentCtagsRow(BaseRow): + UNIQUE_KEY_FIELDS = ( + "id", + "indexer_configuration_id", + "name", + "kind", + "line", + "lang", + ) + name = attr.ib(type=str) kind = attr.ib(type=str) line = attr.ib(type=int) lang = attr.ib(type=str) - def to_dict(self) -> Dict[str, Any]: - return dictify(attr.asdict(self, recurse=False)) - - @classmethod - def from_dict(cls, d) -> ContentCtagsEntry: - return cls(**d) - @attr.s -class ContentCtagsRow(BaseRow): - ctags = attr.ib(type=List[ContentCtagsEntry]) +class ContentLicenseRow(BaseRow): + UNIQUE_KEY_FIELDS = ("id", "indexer_configuration_id", "license") - @classmethod - def from_dict(cls, d) -> ContentCtagsRow: - d = d.copy() - items = d.pop("ctags") - return cls(ctags=[ContentCtagsEntry.from_dict(item) for item in items], **d,) - - -@attr.s -class ContentLicensesRow(BaseRow): - licenses = attr.ib(type=List[str]) + license = attr.ib(type=str) @attr.s diff --git a/swh/indexer/tests/storage/test_storage.py b/swh/indexer/tests/storage/test_storage.py --- a/swh/indexer/tests/storage/test_storage.py +++ b/swh/indexer/tests/storage/test_storage.py @@ -812,6 +812,28 @@ ] assert actual_ctags == expected_ctags + def test_add_empty(self, swh_indexer_storage_with_data): + (storage, data) = swh_indexer_storage_with_data + etype = self.endpoint_type + tool = data.tools[self.tool_name] + + summary = endpoint(storage, etype, "add")( + [{"id": data.sha1_2, "indexer_configuration_id": tool["id"], "ctags": [],}] + ) + assert summary == {"content_ctags:add": 0} + + actual_ctags = list(endpoint(storage, etype, "get")([data.sha1_2])) + + assert actual_ctags == [] + + def test_get_unknown(self, swh_indexer_storage_with_data): + (storage, data) = swh_indexer_storage_with_data + etype = self.endpoint_type + + actual_ctags = list(endpoint(storage, etype, "get")([data.sha1_2])) + + assert actual_ctags == [] + class TestIndexerStorageContentMetadata(StorageETypeTester): """Test Indexer Storage content_metadata related methods @@ -900,7 +922,10 @@ ) -class TestIndexerStorageContentFossologyLicence: +class TestIndexerStorageContentFossologyLicense: + endpoint_type = "content_fossology_license" + tool_name = "nomos" + def test_content_fossology_license_add__new_license_added( self, swh_indexer_storage_with_data ): @@ -1100,6 +1125,34 @@ for actual_id in actual_ids: assert actual_id in expected_ids + def test_add_empty(self, swh_indexer_storage_with_data): + (storage, data) = swh_indexer_storage_with_data + etype = self.endpoint_type + tool = data.tools[self.tool_name] + + summary = endpoint(storage, etype, "add")( + [ + { + "id": data.sha1_2, + "indexer_configuration_id": tool["id"], + "licenses": [], + } + ] + ) + assert summary == {"content_fossology_license:add": 0} + + actual_license = list(endpoint(storage, etype, "get")([data.sha1_2])) + + assert actual_license == [] + + def test_get_unknown(self, swh_indexer_storage_with_data): + (storage, data) = swh_indexer_storage_with_data + etype = self.endpoint_type + + actual_license = list(endpoint(storage, etype, "get")([data.sha1_2])) + + assert actual_license == [] + class TestIndexerStorageOriginIntrinsicMetadata: def test_origin_intrinsic_metadata_get(self, swh_indexer_storage_with_data): diff --git a/swh/indexer/tests/test_fossology_license.py b/swh/indexer/tests/test_fossology_license.py --- a/swh/indexer/tests/test_fossology_license.py +++ b/swh/indexer/tests/test_fossology_license.py @@ -100,7 +100,7 @@ self.expected_results = { self.id0: {"tool": tool, "licenses": SHA1_TO_LICENSES[self.id0],}, self.id1: {"tool": tool, "licenses": SHA1_TO_LICENSES[self.id1],}, - self.id2: {"tool": tool, "licenses": SHA1_TO_LICENSES[self.id2],}, + self.id2: None, } def tearDown(self): diff --git a/swh/indexer/tests/utils.py b/swh/indexer/tests/utils.py --- a/swh/indexer/tests/utils.py +++ b/swh/indexer/tests/utils.py @@ -631,15 +631,18 @@ expected_results = self.expected_results self.assertEqual( - len(expected_results), - len(actual_results), + sum(res is not None for res in expected_results.values()), + sum(sum(map(len, res.values())) for res in actual_results), (expected_results, actual_results), ) for indexed_data in actual_results: (_id, indexed_data) = list(indexed_data.items())[0] - expected_data = expected_results[hashutil.hash_to_hex(_id)].copy() - expected_data = [expected_data] - self.assertEqual(indexed_data, expected_data) + if expected_results.get(hashutil.hash_to_hex(_id)) is None: + self.assertEqual(indexed_data, []) + else: + expected_data = expected_results[hashutil.hash_to_hex(_id)].copy() + expected_data = [expected_data] + self.assertEqual(indexed_data, expected_data) def test_index(self): """Known sha1 have their data indexed