diff --git a/swh/indexer/storage/in_memory.py b/swh/indexer/storage/in_memory.py index 8e061fd..75c7fe4 100644 --- a/swh/indexer/storage/in_memory.py +++ b/swh/indexer/storage/in_memory.py @@ -1,494 +1,545 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter, defaultdict import itertools import json import math import operator import re -from typing import Any, Dict, List, Optional +from typing import ( + Any, + Dict, + Generic, + Iterable, + Iterator, + List, + Optional, + Set, + Tuple, + Type, + TypeVar, +) from swh.core.collections import SortedList from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.model import SHA1_SIZE from swh.storage.utils import get_partition_bounds_bytes from . import MAPPING_NAMES, check_id_duplicates from .exc import IndexerStorageArgumentException from .interface import PagedResult, Sha1 +from .model import ( + BaseRow, + ContentCtagsRow, + ContentLanguageRow, + ContentLicensesRow, + ContentMetadataRow, + ContentMimetypeRow, + KeyDict, + OriginIntrinsicMetadataRow, + RevisionIntrinsicMetadataRow, +) SHA1_DIGEST_SIZE = 160 def _transform_tool(tool): return { "id": tool["id"], "name": tool["tool_name"], "version": tool["tool_version"], "configuration": tool["tool_configuration"], } def check_id_types(data: List[Dict[str, Any]]): """Checks all elements of the list have an 'id' whose type is 'bytes'.""" if not all(isinstance(item.get("id"), bytes) for item in data): raise IndexerStorageArgumentException("identifiers must be bytes.") -class SubStorage: +ToolId = int +TValue = TypeVar("TValue", bound=BaseRow) + + +class SubStorage(Generic[TValue]): """Implements common missing/get/add logic for each indexer type.""" - def __init__(self, tools): + _data: Dict[Tuple[Sha1, ToolId], Dict[str, Any]] + _tools_per_id: Dict[Sha1, Set[ToolId]] + + def __init__(self, row_class: Type[TValue], tools): + self.row_class = row_class self._tools = tools - self._sorted_ids = SortedList[bytes, bytes]() - self._data = {} # map (id_, tool_id) -> metadata_dict - self._tools_per_id = defaultdict(set) # map id_ -> Set[tool_id] + self._sorted_ids = SortedList[bytes, Sha1]() + self._data = {} + self._tools_per_id = defaultdict(set) - def missing(self, ids): + def missing(self, keys: Iterable[KeyDict]) -> Iterator[Sha1]: """List data missing from storage. Args: data (iterable): dictionaries with keys: - **id** (bytes): sha1 identifier - **indexer_configuration_id** (int): tool used to compute the results Yields: missing sha1s """ - for id_ in ids: - tool_id = id_["indexer_configuration_id"] - id_ = id_["id"] + for key in keys: + tool_id = key["indexer_configuration_id"] + id_ = key["id"] if tool_id not in self._tools_per_id.get(id_, set()): yield id_ - def get(self, ids): + def get(self, ids: Iterable[Sha1]) -> Iterator[TValue]: """Retrieve data per id. Args: ids (iterable): sha1 checksums Yields: dict: dictionaries with the following keys: - **id** (bytes) - **tool** (dict): tool used to compute metadata - arbitrary data (as provided to `add`) """ for id_ in ids: for tool_id in self._tools_per_id.get(id_, set()): key = (id_, tool_id) - yield { - "id": id_, - "tool": _transform_tool(self._tools[tool_id]), + yield self.row_class( + id=id_, + tool=_transform_tool(self._tools[tool_id]), **self._data[key], - } + ) - def get_all(self): + def get_all(self) -> Iterator[TValue]: yield from self.get(self._sorted_ids) def get_partition( self, indexer_configuration_id: int, partition_id: int, nb_partitions: int, page_token: Optional[str] = None, limit: int = 1000, ) -> PagedResult[Sha1]: """Retrieve ids of content with `indexer_type` within partition partition_id bound by limit. Args: **indexer_type**: Type of data content to index (mimetype, language, etc...) **indexer_configuration_id**: The tool used to index data **partition_id**: index of the partition to fetch **nb_partitions**: total number of partitions to split into **page_token**: opaque token used for pagination **limit**: Limit result (default to 1000) **with_textual_data** (bool): Deal with only textual content (True) or all content (all contents by defaults, False) Raises: IndexerStorageArgumentException for; - limit to None - wrong indexer_type provided Returns: PagedResult of Sha1. If next_page_token is None, there is no more data to fetch """ if limit is None: raise IndexerStorageArgumentException("limit should not be None") (start, end) = get_partition_bounds_bytes( partition_id, nb_partitions, SHA1_SIZE ) if page_token: start = hash_to_bytes(page_token) if end is None: end = b"\xff" * SHA1_SIZE next_page_token: Optional[str] = None ids: List[Sha1] = [] sha1s = (sha1 for sha1 in self._sorted_ids.iter_from(start)) for counter, sha1 in enumerate(sha1s): if sha1 > end: break if counter >= limit: next_page_token = hash_to_hex(sha1) break ids.append(sha1) assert len(ids) <= limit return PagedResult(results=ids, next_page_token=next_page_token) - def add(self, data: List[Dict], conflict_update: bool) -> int: + def add(self, data: Iterable[TValue], conflict_update: bool) -> int: """Add data not present in storage. Args: data (iterable): dictionaries with keys: - **id**: sha1 - **indexer_configuration_id**: tool used to compute the results - arbitrary data conflict_update (bool): Flag to determine if we want to overwrite (true) or skip duplicates (false) """ data = list(data) - check_id_duplicates(data) + check_id_duplicates(obj.to_dict() for obj in data) count = 0 - for item in data: - item = item.copy() + for obj in data: + item = obj.to_dict() tool_id = item.pop("indexer_configuration_id") id_ = item.pop("id") data_item = item if not conflict_update and tool_id in self._tools_per_id.get(id_, set()): # Duplicate, should not be updated continue key = (id_, tool_id) self._data[key] = data_item self._tools_per_id[id_].add(tool_id) count += 1 if id_ not in self._sorted_ids: self._sorted_ids.add(id_) return count def add_merge( - self, new_data: List[Dict], conflict_update: bool, merged_key: str + self, new_data: Iterable[TValue], conflict_update: bool, merged_key: str ) -> int: added = 0 all_subitems: List - for new_item in new_data: + for new_obj in new_data: + new_item = new_obj.to_dict() id_ = new_item["id"] tool_id = new_item["indexer_configuration_id"] if conflict_update: all_subitems = [] else: existing = list(self.get([id_])) all_subitems = [ old_subitem for existing_item in existing - if existing_item["tool"]["id"] == tool_id - for old_subitem in existing_item[merged_key] + if existing_item.tool["id"] == tool_id # type: ignore + for old_subitem in getattr(existing_item, merged_key) ] for new_subitem in new_item[merged_key]: if new_subitem not in all_subitems: all_subitems.append(new_subitem) added += self.add( [ - { - "id": id_, - "indexer_configuration_id": tool_id, - merged_key: all_subitems, - } + self.row_class( + id=id_, + indexer_configuration_id=tool_id, + **{merged_key: all_subitems}, # type: ignore + ) # FIXME: this only works for classes with three attributes ], conflict_update=True, ) if id_ not in self._sorted_ids: self._sorted_ids.add(id_) return added - def delete(self, entries: List[Dict]) -> int: + def delete(self, entries: List[KeyDict]) -> int: """Delete entries and return the number of entries deleted. """ deleted = 0 for entry in entries: (id_, tool_id) = (entry["id"], entry["indexer_configuration_id"]) key = (id_, tool_id) if tool_id in self._tools_per_id[id_]: self._tools_per_id[id_].remove(tool_id) if key in self._data: deleted += 1 del self._data[key] return deleted class IndexerStorage: """In-memory SWH indexer storage.""" def __init__(self): self._tools = {} - self._mimetypes = SubStorage(self._tools) - self._languages = SubStorage(self._tools) - self._content_ctags = SubStorage(self._tools) - self._licenses = SubStorage(self._tools) - self._content_metadata = SubStorage(self._tools) - self._revision_intrinsic_metadata = SubStorage(self._tools) - self._origin_intrinsic_metadata = SubStorage(self._tools) + self._mimetypes = SubStorage(ContentMimetypeRow, self._tools) + self._languages = SubStorage(ContentLanguageRow, self._tools) + self._content_ctags = SubStorage(ContentCtagsRow, self._tools) + self._licenses = SubStorage(ContentLicensesRow, self._tools) + self._content_metadata = SubStorage(ContentMetadataRow, self._tools) + self._revision_intrinsic_metadata = SubStorage( + RevisionIntrinsicMetadataRow, self._tools + ) + self._origin_intrinsic_metadata = SubStorage( + OriginIntrinsicMetadataRow, self._tools + ) def check_config(self, *, check_write): return True def content_mimetype_missing(self, mimetypes): yield from self._mimetypes.missing(mimetypes) def content_mimetype_get_partition( self, indexer_configuration_id: int, partition_id: int, nb_partitions: int, page_token: Optional[str] = None, limit: int = 1000, ) -> PagedResult[Sha1]: return self._mimetypes.get_partition( indexer_configuration_id, partition_id, nb_partitions, page_token, limit ) def content_mimetype_add( self, mimetypes: List[Dict], conflict_update: bool = False ) -> Dict[str, int]: check_id_types(mimetypes) - added = self._mimetypes.add(mimetypes, conflict_update) + added = self._mimetypes.add( + map(ContentMimetypeRow.from_dict, mimetypes), conflict_update + ) return {"content_mimetype:add": added} def content_mimetype_get(self, ids): - yield from self._mimetypes.get(ids) + yield from (obj.to_dict() for obj in self._mimetypes.get(ids)) def content_language_missing(self, languages): yield from self._languages.missing(languages) def content_language_get(self, ids): - yield from self._languages.get(ids) + yield from (obj.to_dict() for obj in self._languages.get(ids)) def content_language_add( self, languages: List[Dict], conflict_update: bool = False ) -> Dict[str, int]: check_id_types(languages) - added = self._languages.add(languages, conflict_update) + added = self._languages.add( + map(ContentLanguageRow.from_dict, languages), conflict_update + ) return {"content_language:add": added} def content_ctags_missing(self, ctags): yield from self._content_ctags.missing(ctags) def content_ctags_get(self, ids): for item in self._content_ctags.get(ids): - for item_ctags_item in item["ctags"]: - yield {"id": item["id"], "tool": item["tool"], **item_ctags_item} + for item_ctags_item in item.ctags: + yield {"id": item.id, "tool": item.tool, **item_ctags_item.to_dict()} def content_ctags_add( self, ctags: List[Dict], conflict_update: bool = False ) -> Dict[str, int]: check_id_types(ctags) - added = self._content_ctags.add_merge(ctags, conflict_update, "ctags") + added = self._content_ctags.add_merge( + map(ContentCtagsRow.from_dict, ctags), conflict_update, "ctags" + ) return {"content_ctags:add": added} def content_ctags_search(self, expression, limit=10, last_sha1=None): nb_matches = 0 - for ((id_, tool_id), item) in sorted(self._content_ctags._data.items()): - if id_ <= (last_sha1 or bytes(0 for _ in range(SHA1_DIGEST_SIZE))): + for item in sorted(self._content_ctags.get_all()): + if item.id <= (last_sha1 or bytes(0 for _ in range(SHA1_DIGEST_SIZE))): continue - for ctags_item in item["ctags"]: - if ctags_item["name"] != expression: + for ctags_item in item.ctags: + if ctags_item.name != expression: continue nb_matches += 1 yield { - "id": id_, - "tool": _transform_tool(self._tools[tool_id]), - **ctags_item, + "id": item.id, + "tool": item.tool, + **ctags_item.to_dict(), } if nb_matches >= limit: return def content_fossology_license_get(self, ids): # Rewrites the output of SubStorage.get from the old format to # the new one. SubStorage.get should be updated once all other # *_get methods use the new format. # See: https://forge.softwareheritage.org/T1433 res = {} - for d in self._licenses.get(ids): + for obj in self._licenses.get(ids): + d = obj.to_dict() res.setdefault(d.pop("id"), []).append(d) for (id_, facts) in res.items(): yield {id_: facts} def content_fossology_license_add( self, licenses: List[Dict], conflict_update: bool = False ) -> Dict[str, int]: check_id_types(licenses) - added = self._licenses.add_merge(licenses, conflict_update, "licenses") + added = self._licenses.add_merge( + map(ContentLicensesRow.from_dict, licenses), conflict_update, "licenses" + ) return {"fossology_license_add:add": added} def content_fossology_license_get_partition( self, indexer_configuration_id: int, partition_id: int, nb_partitions: int, page_token: Optional[str] = None, limit: int = 1000, ) -> PagedResult[Sha1]: return self._licenses.get_partition( indexer_configuration_id, partition_id, nb_partitions, page_token, limit ) def content_metadata_missing(self, metadata): yield from self._content_metadata.missing(metadata) def content_metadata_get(self, ids): - yield from self._content_metadata.get(ids) + yield from (obj.to_dict() for obj in self._content_metadata.get(ids)) def content_metadata_add( self, metadata: List[Dict], conflict_update: bool = False ) -> Dict[str, int]: check_id_types(metadata) - added = self._content_metadata.add(metadata, conflict_update) + added = self._content_metadata.add( + map(ContentMetadataRow.from_dict, metadata), conflict_update + ) return {"content_metadata:add": added} def revision_intrinsic_metadata_missing(self, metadata): yield from self._revision_intrinsic_metadata.missing(metadata) def revision_intrinsic_metadata_get(self, ids): - yield from self._revision_intrinsic_metadata.get(ids) + yield from (obj.to_dict() for obj in self._revision_intrinsic_metadata.get(ids)) def revision_intrinsic_metadata_add( self, metadata: List[Dict], conflict_update: bool = False ) -> Dict[str, int]: check_id_types(metadata) - added = self._revision_intrinsic_metadata.add(metadata, conflict_update) + added = self._revision_intrinsic_metadata.add( + map(RevisionIntrinsicMetadataRow.from_dict, metadata), conflict_update + ) return {"revision_intrinsic_metadata:add": added} def revision_intrinsic_metadata_delete(self, entries: List[Dict]) -> Dict: deleted = self._revision_intrinsic_metadata.delete(entries) return {"revision_intrinsic_metadata:del": deleted} def origin_intrinsic_metadata_get(self, ids): - yield from self._origin_intrinsic_metadata.get(ids) + yield from (obj.to_dict() for obj in self._origin_intrinsic_metadata.get(ids)) def origin_intrinsic_metadata_add( self, metadata: List[Dict], conflict_update: bool = False ) -> Dict[str, int]: - added = self._origin_intrinsic_metadata.add(metadata, conflict_update) + added = self._origin_intrinsic_metadata.add( + map(OriginIntrinsicMetadataRow.from_dict, metadata), conflict_update + ) return {"origin_intrinsic_metadata:add": added} def origin_intrinsic_metadata_delete(self, entries: List[Dict]) -> Dict: deleted = self._origin_intrinsic_metadata.delete(entries) return {"origin_intrinsic_metadata:del": deleted} def origin_intrinsic_metadata_search_fulltext(self, conjunction, limit=100): # A very crude fulltext search implementation, but that's enough # to work on English metadata tokens_re = re.compile("[a-zA-Z0-9]+") search_tokens = list(itertools.chain(*map(tokens_re.findall, conjunction))) def rank(data): # Tokenize the metadata - text = json.dumps(data["metadata"]) + text = json.dumps(data.metadata) text_tokens = tokens_re.findall(text) text_token_occurences = Counter(text_tokens) # Count the number of occurrences of search tokens in the text score = 0 for search_token in search_tokens: if text_token_occurences[search_token] == 0: # Search token is not in the text. return 0 score += text_token_occurences[search_token] # Normalize according to the text's length return score / math.log(len(text_tokens)) results = [ (rank(data), data) for data in self._origin_intrinsic_metadata.get_all() ] results = [(rank_, data) for (rank_, data) in results if rank_ > 0] results.sort( key=operator.itemgetter(0), reverse=True # Don't try to order 'data' ) for (rank_, result) in results[:limit]: - yield result + yield result.to_dict() def origin_intrinsic_metadata_search_by_producer( self, page_token="", limit=100, ids_only=False, mappings=None, tool_ids=None ): assert isinstance(page_token, str) nb_results = 0 if mappings is not None: mappings = frozenset(mappings) if tool_ids is not None: tool_ids = frozenset(tool_ids) origins = [] # we go to limit+1 to check whether we should add next_page_token in # the response for entry in self._origin_intrinsic_metadata.get_all(): - if entry["id"] <= page_token: + if entry.id <= page_token: continue if nb_results >= (limit + 1): break - if mappings is not None and mappings.isdisjoint(entry["mappings"]): + if mappings is not None and mappings.isdisjoint(entry.mappings): continue - if tool_ids is not None and entry["tool"]["id"] not in tool_ids: + if tool_ids is not None and entry.tool["id"] not in tool_ids: continue - origins.append(entry) + origins.append(entry.to_dict()) nb_results += 1 result = {} if len(origins) > limit: origins = origins[:limit] result["next_page_token"] = origins[-1]["id"] if ids_only: origins = [origin["id"] for origin in origins] result["origins"] = origins return result def origin_intrinsic_metadata_stats(self): mapping_count = {m: 0 for m in MAPPING_NAMES} total = non_empty = 0 for data in self._origin_intrinsic_metadata.get_all(): total += 1 - if set(data["metadata"]) - {"@context"}: + if set(data.metadata) - {"@context"}: non_empty += 1 - for mapping in data["mappings"]: + for mapping in data.mappings: mapping_count[mapping] += 1 return {"per_mapping": mapping_count, "total": total, "non_empty": non_empty} def indexer_configuration_add(self, tools): inserted = [] for tool in tools: tool = tool.copy() id_ = self._tool_key(tool) tool["id"] = id_ self._tools[id_] = tool inserted.append(tool) return inserted def indexer_configuration_get(self, tool): return self._tools.get(self._tool_key(tool)) def _tool_key(self, tool): return hash( ( tool["tool_name"], tool["tool_version"], json.dumps(tool["tool_configuration"], sort_keys=True), ) ) diff --git a/swh/indexer/storage/model.py b/swh/indexer/storage/model.py new file mode 100644 index 0000000..0c12ca8 --- /dev/null +++ b/swh/indexer/storage/model.py @@ -0,0 +1,119 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Classes used internally by the in-memory idx-storage, and will be +used for the interface of the idx-storage in the near future.""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional, Type, TypeVar + +import attr +from typing_extensions import TypedDict + +from swh.model.model import Sha1Git, dictify + + +class KeyDict(TypedDict): + id: Sha1Git + indexer_configuration_id: int + + +TSelf = TypeVar("TSelf") + + +@attr.s +class BaseRow: + id = attr.ib(type=Sha1Git) + indexer_configuration_id = attr.ib(type=Optional[int], default=None, kw_only=True) + tool = attr.ib(type=Optional[Dict], default=None, kw_only=True) + + def __attrs_post_init__(self): + if self.indexer_configuration_id is None and self.tool is None: + raise TypeError("Either indexer_configuration_id or tool must be not None.") + if self.indexer_configuration_id is not None and self.tool is not None: + raise TypeError( + "indexer_configuration_id and tool are mutually exclusive; " + "only one may be not None." + ) + + def anonymize(self: TSelf) -> Optional[TSelf]: + # Needed to implement swh.journal.writer.ValueProtocol + return None + + def to_dict(self) -> Dict[str, Any]: + """Wrapper of `attr.asdict` that can be overridden by subclasses + that have special handling of some of the fields.""" + d = dictify(attr.asdict(self, recurse=False)) + if d["indexer_configuration_id"] is None: + del d["indexer_configuration_id"] + if d["tool"] is None: + del d["tool"] + + return d + + @classmethod + def from_dict(cls: Type[TSelf], d) -> TSelf: + return cls(**d) # type: ignore + + +@attr.s +class ContentMimetypeRow(BaseRow): + mimetype = attr.ib(type=str) + encoding = attr.ib(type=str) + + +@attr.s +class ContentLanguageRow(BaseRow): + lang = attr.ib(type=str) + + +@attr.s +class ContentCtagsEntry: + name = attr.ib(type=str) + kind = attr.ib(type=str) + line = attr.ib(type=int) + lang = attr.ib(type=str) + + def to_dict(self) -> Dict[str, Any]: + return dictify(attr.asdict(self, recurse=False)) + + @classmethod + def from_dict(cls, d) -> ContentCtagsEntry: + return cls(**d) + + +@attr.s +class ContentCtagsRow(BaseRow): + ctags = attr.ib(type=List[ContentCtagsEntry]) + + @classmethod + def from_dict(cls, d) -> ContentCtagsRow: + d = d.copy() + items = d.pop("ctags") + return cls(ctags=[ContentCtagsEntry.from_dict(item) for item in items], **d,) + + +@attr.s +class ContentLicensesRow(BaseRow): + licenses = attr.ib(type=List[str]) + + +@attr.s +class ContentMetadataRow(BaseRow): + metadata = attr.ib(type=Dict[str, Any]) + + +@attr.s +class RevisionIntrinsicMetadataRow(BaseRow): + metadata = attr.ib(type=Dict[str, Any]) + mappings = attr.ib(type=List[str]) + + +@attr.s +class OriginIntrinsicMetadataRow(BaseRow): + metadata = attr.ib(type=Dict[str, Any]) + from_revision = attr.ib(type=Sha1Git) + mappings = attr.ib(type=List[str])