diff --git a/swh/indexer/storage/in_memory.py b/swh/indexer/storage/in_memory.py --- a/swh/indexer/storage/in_memory.py +++ b/swh/indexer/storage/in_memory.py @@ -9,7 +9,19 @@ import math import operator import re -from typing import Any, Dict, List, Optional +from typing import ( + Any, + Dict, + Generic, + Iterable, + Iterator, + List, + Optional, + Set, + Tuple, + Type, + TypeVar, +) from swh.core.collections import SortedList from swh.model.hashutil import hash_to_bytes, hash_to_hex @@ -19,6 +31,17 @@ from . import MAPPING_NAMES, check_id_duplicates from .exc import IndexerStorageArgumentException from .interface import PagedResult, Sha1 +from .model import ( + BaseRow, + ContentCtagsRow, + ContentLanguageRow, + ContentLicensesRow, + ContentMetadataRow, + ContentMimetypeRow, + KeyDict, + OriginIntrinsicMetadataRow, + RevisionIntrinsicMetadataRow, +) SHA1_DIGEST_SIZE = 160 @@ -38,16 +61,24 @@ raise IndexerStorageArgumentException("identifiers must be bytes.") -class SubStorage: +ToolId = int +TValue = TypeVar("TValue", bound=BaseRow) + + +class SubStorage(Generic[TValue]): """Implements common missing/get/add logic for each indexer type.""" - def __init__(self, tools): + _data: Dict[Tuple[Sha1, ToolId], Dict[str, Any]] + _tools_per_id: Dict[Sha1, Set[ToolId]] + + def __init__(self, row_class: Type[TValue], tools): + self.row_class = row_class self._tools = tools - self._sorted_ids = SortedList[bytes, bytes]() - self._data = {} # map (id_, tool_id) -> metadata_dict - self._tools_per_id = defaultdict(set) # map id_ -> Set[tool_id] + self._sorted_ids = SortedList[bytes, Sha1]() + self._data = {} + self._tools_per_id = defaultdict(set) - def missing(self, ids): + def missing(self, keys: Iterable[KeyDict]) -> Iterator[Sha1]: """List data missing from storage. Args: @@ -61,13 +92,13 @@ missing sha1s """ - for id_ in ids: - tool_id = id_["indexer_configuration_id"] - id_ = id_["id"] + for key in keys: + tool_id = key["indexer_configuration_id"] + id_ = key["id"] if tool_id not in self._tools_per_id.get(id_, set()): yield id_ - def get(self, ids): + def get(self, ids: Iterable[Sha1]) -> Iterator[TValue]: """Retrieve data per id. Args: @@ -84,13 +115,13 @@ for id_ in ids: for tool_id in self._tools_per_id.get(id_, set()): key = (id_, tool_id) - yield { - "id": id_, - "tool": _transform_tool(self._tools[tool_id]), + yield self.row_class( + id=id_, + tool=_transform_tool(self._tools[tool_id]), **self._data[key], - } + ) - def get_all(self): + def get_all(self) -> Iterator[TValue]: yield from self.get(self._sorted_ids) def get_partition( @@ -149,7 +180,7 @@ assert len(ids) <= limit return PagedResult(results=ids, next_page_token=next_page_token) - def add(self, data: List[Dict], conflict_update: bool) -> int: + def add(self, data: Iterable[TValue], conflict_update: bool) -> int: """Add data not present in storage. Args: @@ -165,10 +196,10 @@ """ data = list(data) - check_id_duplicates(data) + check_id_duplicates(obj.to_dict() for obj in data) count = 0 - for item in data: - item = item.copy() + for obj in data: + item = obj.to_dict() tool_id = item.pop("indexer_configuration_id") id_ = item.pop("id") data_item = item @@ -184,11 +215,12 @@ return count def add_merge( - self, new_data: List[Dict], conflict_update: bool, merged_key: str + self, new_data: Iterable[TValue], conflict_update: bool, merged_key: str ) -> int: added = 0 all_subitems: List - for new_item in new_data: + for new_obj in new_data: + new_item = new_obj.to_dict() id_ = new_item["id"] tool_id = new_item["indexer_configuration_id"] if conflict_update: @@ -198,19 +230,19 @@ all_subitems = [ old_subitem for existing_item in existing - if existing_item["tool"]["id"] == tool_id - for old_subitem in existing_item[merged_key] + if existing_item.tool["id"] == tool_id # type: ignore + for old_subitem in getattr(existing_item, merged_key) ] for new_subitem in new_item[merged_key]: if new_subitem not in all_subitems: all_subitems.append(new_subitem) added += self.add( [ - { - "id": id_, - "indexer_configuration_id": tool_id, - merged_key: all_subitems, - } + self.row_class( + id=id_, + indexer_configuration_id=tool_id, + **{merged_key: all_subitems}, # type: ignore + ) # FIXME: this only works for classes with three attributes ], conflict_update=True, ) @@ -218,7 +250,7 @@ self._sorted_ids.add(id_) return added - def delete(self, entries: List[Dict]) -> int: + def delete(self, entries: List[KeyDict]) -> int: """Delete entries and return the number of entries deleted. """ @@ -239,13 +271,17 @@ def __init__(self): self._tools = {} - self._mimetypes = SubStorage(self._tools) - self._languages = SubStorage(self._tools) - self._content_ctags = SubStorage(self._tools) - self._licenses = SubStorage(self._tools) - self._content_metadata = SubStorage(self._tools) - self._revision_intrinsic_metadata = SubStorage(self._tools) - self._origin_intrinsic_metadata = SubStorage(self._tools) + self._mimetypes = SubStorage(ContentMimetypeRow, self._tools) + self._languages = SubStorage(ContentLanguageRow, self._tools) + self._content_ctags = SubStorage(ContentCtagsRow, self._tools) + self._licenses = SubStorage(ContentLicensesRow, self._tools) + self._content_metadata = SubStorage(ContentMetadataRow, self._tools) + self._revision_intrinsic_metadata = SubStorage( + RevisionIntrinsicMetadataRow, self._tools + ) + self._origin_intrinsic_metadata = SubStorage( + OriginIntrinsicMetadataRow, self._tools + ) def check_config(self, *, check_write): return True @@ -269,23 +305,27 @@ self, mimetypes: List[Dict], conflict_update: bool = False ) -> Dict[str, int]: check_id_types(mimetypes) - added = self._mimetypes.add(mimetypes, conflict_update) + added = self._mimetypes.add( + map(ContentMimetypeRow.from_dict, mimetypes), conflict_update + ) return {"content_mimetype:add": added} def content_mimetype_get(self, ids): - yield from self._mimetypes.get(ids) + yield from (obj.to_dict() for obj in self._mimetypes.get(ids)) def content_language_missing(self, languages): yield from self._languages.missing(languages) def content_language_get(self, ids): - yield from self._languages.get(ids) + yield from (obj.to_dict() for obj in self._languages.get(ids)) def content_language_add( self, languages: List[Dict], conflict_update: bool = False ) -> Dict[str, int]: check_id_types(languages) - added = self._languages.add(languages, conflict_update) + added = self._languages.add( + map(ContentLanguageRow.from_dict, languages), conflict_update + ) return {"content_language:add": added} def content_ctags_missing(self, ctags): @@ -293,29 +333,31 @@ def content_ctags_get(self, ids): for item in self._content_ctags.get(ids): - for item_ctags_item in item["ctags"]: - yield {"id": item["id"], "tool": item["tool"], **item_ctags_item} + for item_ctags_item in item.ctags: + yield {"id": item.id, "tool": item.tool, **item_ctags_item.to_dict()} def content_ctags_add( self, ctags: List[Dict], conflict_update: bool = False ) -> Dict[str, int]: check_id_types(ctags) - added = self._content_ctags.add_merge(ctags, conflict_update, "ctags") + added = self._content_ctags.add_merge( + map(ContentCtagsRow.from_dict, ctags), conflict_update, "ctags" + ) return {"content_ctags:add": added} def content_ctags_search(self, expression, limit=10, last_sha1=None): nb_matches = 0 - for ((id_, tool_id), item) in sorted(self._content_ctags._data.items()): - if id_ <= (last_sha1 or bytes(0 for _ in range(SHA1_DIGEST_SIZE))): + for item in sorted(self._content_ctags.get_all()): + if item.id <= (last_sha1 or bytes(0 for _ in range(SHA1_DIGEST_SIZE))): continue - for ctags_item in item["ctags"]: - if ctags_item["name"] != expression: + for ctags_item in item.ctags: + if ctags_item.name != expression: continue nb_matches += 1 yield { - "id": id_, - "tool": _transform_tool(self._tools[tool_id]), - **ctags_item, + "id": item.id, + "tool": item.tool, + **ctags_item.to_dict(), } if nb_matches >= limit: return @@ -326,7 +368,8 @@ # *_get methods use the new format. # See: https://forge.softwareheritage.org/T1433 res = {} - for d in self._licenses.get(ids): + for obj in self._licenses.get(ids): + d = obj.to_dict() res.setdefault(d.pop("id"), []).append(d) for (id_, facts) in res.items(): yield {id_: facts} @@ -335,7 +378,9 @@ self, licenses: List[Dict], conflict_update: bool = False ) -> Dict[str, int]: check_id_types(licenses) - added = self._licenses.add_merge(licenses, conflict_update, "licenses") + added = self._licenses.add_merge( + map(ContentLicensesRow.from_dict, licenses), conflict_update, "licenses" + ) return {"fossology_license_add:add": added} def content_fossology_license_get_partition( @@ -354,26 +399,30 @@ yield from self._content_metadata.missing(metadata) def content_metadata_get(self, ids): - yield from self._content_metadata.get(ids) + yield from (obj.to_dict() for obj in self._content_metadata.get(ids)) def content_metadata_add( self, metadata: List[Dict], conflict_update: bool = False ) -> Dict[str, int]: check_id_types(metadata) - added = self._content_metadata.add(metadata, conflict_update) + added = self._content_metadata.add( + map(ContentMetadataRow.from_dict, metadata), conflict_update + ) return {"content_metadata:add": added} def revision_intrinsic_metadata_missing(self, metadata): yield from self._revision_intrinsic_metadata.missing(metadata) def revision_intrinsic_metadata_get(self, ids): - yield from self._revision_intrinsic_metadata.get(ids) + yield from (obj.to_dict() for obj in self._revision_intrinsic_metadata.get(ids)) def revision_intrinsic_metadata_add( self, metadata: List[Dict], conflict_update: bool = False ) -> Dict[str, int]: check_id_types(metadata) - added = self._revision_intrinsic_metadata.add(metadata, conflict_update) + added = self._revision_intrinsic_metadata.add( + map(RevisionIntrinsicMetadataRow.from_dict, metadata), conflict_update + ) return {"revision_intrinsic_metadata:add": added} def revision_intrinsic_metadata_delete(self, entries: List[Dict]) -> Dict: @@ -381,12 +430,14 @@ return {"revision_intrinsic_metadata:del": deleted} def origin_intrinsic_metadata_get(self, ids): - yield from self._origin_intrinsic_metadata.get(ids) + yield from (obj.to_dict() for obj in self._origin_intrinsic_metadata.get(ids)) def origin_intrinsic_metadata_add( self, metadata: List[Dict], conflict_update: bool = False ) -> Dict[str, int]: - added = self._origin_intrinsic_metadata.add(metadata, conflict_update) + added = self._origin_intrinsic_metadata.add( + map(OriginIntrinsicMetadataRow.from_dict, metadata), conflict_update + ) return {"origin_intrinsic_metadata:add": added} def origin_intrinsic_metadata_delete(self, entries: List[Dict]) -> Dict: @@ -401,7 +452,7 @@ def rank(data): # Tokenize the metadata - text = json.dumps(data["metadata"]) + text = json.dumps(data.metadata) text_tokens = tokens_re.findall(text) text_token_occurences = Counter(text_tokens) @@ -424,7 +475,7 @@ key=operator.itemgetter(0), reverse=True # Don't try to order 'data' ) for (rank_, result) in results[:limit]: - yield result + yield result.to_dict() def origin_intrinsic_metadata_search_by_producer( self, page_token="", limit=100, ids_only=False, mappings=None, tool_ids=None @@ -440,15 +491,15 @@ # we go to limit+1 to check whether we should add next_page_token in # the response for entry in self._origin_intrinsic_metadata.get_all(): - if entry["id"] <= page_token: + if entry.id <= page_token: continue if nb_results >= (limit + 1): break - if mappings is not None and mappings.isdisjoint(entry["mappings"]): + if mappings is not None and mappings.isdisjoint(entry.mappings): continue - if tool_ids is not None and entry["tool"]["id"] not in tool_ids: + if tool_ids is not None and entry.tool["id"] not in tool_ids: continue - origins.append(entry) + origins.append(entry.to_dict()) nb_results += 1 result = {} @@ -465,9 +516,9 @@ total = non_empty = 0 for data in self._origin_intrinsic_metadata.get_all(): total += 1 - if set(data["metadata"]) - {"@context"}: + if set(data.metadata) - {"@context"}: non_empty += 1 - for mapping in data["mappings"]: + for mapping in data.mappings: mapping_count[mapping] += 1 return {"per_mapping": mapping_count, "total": total, "non_empty": non_empty} diff --git a/swh/indexer/storage/model.py b/swh/indexer/storage/model.py new file mode 100644 --- /dev/null +++ b/swh/indexer/storage/model.py @@ -0,0 +1,119 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Classes used internally by the in-memory idx-storage, and will be +used for the interface of the idx-storage in the near future.""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional, Type, TypeVar + +import attr +from typing_extensions import TypedDict + +from swh.model.model import Sha1Git, dictify + + +class KeyDict(TypedDict): + id: Sha1Git + indexer_configuration_id: int + + +TSelf = TypeVar("TSelf") + + +@attr.s +class BaseRow: + id = attr.ib(type=Sha1Git) + indexer_configuration_id = attr.ib(type=Optional[int], default=None, kw_only=True) + tool = attr.ib(type=Optional[Dict], default=None, kw_only=True) + + def __attrs_post_init__(self): + if self.indexer_configuration_id is None and self.tool is None: + raise TypeError("Either indexer_configuration_id or tool must be not None.") + if self.indexer_configuration_id is not None and self.tool is not None: + raise TypeError( + "indexer_configuration_id and tool are mutually exclusive; " + "only one may be not None." + ) + + def anonymize(self: TSelf) -> Optional[TSelf]: + # Needed to implement swh.journal.writer.ValueProtocol + return None + + def to_dict(self) -> Dict[str, Any]: + """Wrapper of `attr.asdict` that can be overridden by subclasses + that have special handling of some of the fields.""" + d = dictify(attr.asdict(self, recurse=False)) + if d["indexer_configuration_id"] is None: + del d["indexer_configuration_id"] + if d["tool"] is None: + del d["tool"] + + return d + + @classmethod + def from_dict(cls: Type[TSelf], d) -> TSelf: + return cls(**d) # type: ignore + + +@attr.s +class ContentMimetypeRow(BaseRow): + mimetype = attr.ib(type=str) + encoding = attr.ib(type=str) + + +@attr.s +class ContentLanguageRow(BaseRow): + lang = attr.ib(type=str) + + +@attr.s +class ContentCtagsEntry: + name = attr.ib(type=str) + kind = attr.ib(type=str) + line = attr.ib(type=int) + lang = attr.ib(type=str) + + def to_dict(self) -> Dict[str, Any]: + return dictify(attr.asdict(self, recurse=False)) + + @classmethod + def from_dict(cls, d) -> ContentCtagsEntry: + return cls(**d) + + +@attr.s +class ContentCtagsRow(BaseRow): + ctags = attr.ib(type=List[ContentCtagsEntry]) + + @classmethod + def from_dict(cls, d) -> ContentCtagsRow: + d = d.copy() + items = d.pop("ctags") + return cls(ctags=[ContentCtagsEntry.from_dict(item) for item in items], **d,) + + +@attr.s +class ContentLicensesRow(BaseRow): + licenses = attr.ib(type=List[str]) + + +@attr.s +class ContentMetadataRow(BaseRow): + metadata = attr.ib(type=Dict[str, Any]) + + +@attr.s +class RevisionIntrinsicMetadataRow(BaseRow): + metadata = attr.ib(type=Dict[str, Any]) + mappings = attr.ib(type=List[str]) + + +@attr.s +class OriginIntrinsicMetadataRow(BaseRow): + metadata = attr.ib(type=Dict[str, Any]) + from_revision = attr.ib(type=Sha1Git) + mappings = attr.ib(type=List[str])