Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/storage/in_memory.py
Show All 19 Lines | from typing import ( | ||||
Set, | Set, | ||||
Tuple, | Tuple, | ||||
Type, | Type, | ||||
TypeVar, | TypeVar, | ||||
) | ) | ||||
from swh.core.collections import SortedList | from swh.core.collections import SortedList | ||||
from swh.model.hashutil import hash_to_bytes, hash_to_hex | from swh.model.hashutil import hash_to_bytes, hash_to_hex | ||||
from swh.model.model import SHA1_SIZE | from swh.model.model import SHA1_SIZE, Sha1Git | ||||
from swh.storage.utils import get_partition_bounds_bytes | from swh.storage.utils import get_partition_bounds_bytes | ||||
from . import MAPPING_NAMES, check_id_duplicates | from . import MAPPING_NAMES, check_id_duplicates, converters | ||||
from .exc import IndexerStorageArgumentException | from .exc import IndexerStorageArgumentException | ||||
from .interface import PagedResult, Sha1 | from .interface import PagedResult, Sha1 | ||||
from .model import ( | from .model import ( | ||||
BaseRow, | BaseRow, | ||||
ContentCtagsRow, | ContentCtagsRow, | ||||
ContentLanguageRow, | ContentLanguageRow, | ||||
ContentLicensesRow, | ContentLicenseRow, | ||||
ContentMetadataRow, | ContentMetadataRow, | ||||
ContentMimetypeRow, | ContentMimetypeRow, | ||||
KeyDict, | |||||
OriginIntrinsicMetadataRow, | OriginIntrinsicMetadataRow, | ||||
RevisionIntrinsicMetadataRow, | RevisionIntrinsicMetadataRow, | ||||
) | ) | ||||
SHA1_DIGEST_SIZE = 160 | SHA1_DIGEST_SIZE = 160 | ||||
def _transform_tool(tool): | def _transform_tool(tool): | ||||
return { | return { | ||||
"id": tool["id"], | "id": tool["id"], | ||||
"name": tool["tool_name"], | "name": tool["tool_name"], | ||||
"version": tool["tool_version"], | "version": tool["tool_version"], | ||||
"configuration": tool["tool_configuration"], | "configuration": tool["tool_configuration"], | ||||
} | } | ||||
def check_id_types(data: List[Dict[str, Any]]): | def check_id_types(data: List[Dict[str, Any]]): | ||||
"""Checks all elements of the list have an 'id' whose type is 'bytes'.""" | """Checks all elements of the list have an 'id' whose type is 'bytes'.""" | ||||
if not all(isinstance(item.get("id"), bytes) for item in data): | if not all(isinstance(item.get("id"), bytes) for item in data): | ||||
raise IndexerStorageArgumentException("identifiers must be bytes.") | raise IndexerStorageArgumentException("identifiers must be bytes.") | ||||
def _key_from_dict(d): | |||||
return tuple(sorted(d.items())) | |||||
ToolId = int | ToolId = int | ||||
TValue = TypeVar("TValue", bound=BaseRow) | TValue = TypeVar("TValue", bound=BaseRow) | ||||
class SubStorage(Generic[TValue]): | class SubStorage(Generic[TValue]): | ||||
"""Implements common missing/get/add logic for each indexer type.""" | """Implements common missing/get/add logic for each indexer type.""" | ||||
_data: Dict[Tuple[Sha1, ToolId], Dict[str, Any]] | _data: Dict[Sha1, Dict[Tuple, Dict[str, Any]]] | ||||
_tools_per_id: Dict[Sha1, Set[ToolId]] | _tools_per_id: Dict[Sha1, Set[ToolId]] | ||||
def __init__(self, row_class: Type[TValue], tools): | def __init__(self, row_class: Type[TValue], tools): | ||||
self.row_class = row_class | self.row_class = row_class | ||||
self._tools = tools | self._tools = tools | ||||
self._sorted_ids = SortedList[bytes, Sha1]() | self._sorted_ids = SortedList[bytes, Sha1]() | ||||
self._data = {} | self._data = defaultdict(dict) | ||||
self._tools_per_id = defaultdict(set) | self._tools_per_id = defaultdict(set) | ||||
def missing(self, keys: Iterable[KeyDict]) -> Iterator[Sha1]: | def _key_from_dict(self, d) -> Tuple: | ||||
"""Like the global _key_from_dict, but filters out dict keys that don't | |||||
belong in the unique key.""" | |||||
return _key_from_dict({k: d[k] for k in self.row_class.UNIQUE_KEY_FIELDS}) | |||||
def missing(self, keys: Iterable[Dict]) -> Iterator[Sha1]: | |||||
"""List data missing from storage. | """List data missing from storage. | ||||
Args: | Args: | ||||
data (iterable): dictionaries with keys: | data (iterable): dictionaries with keys: | ||||
- **id** (bytes): sha1 identifier | - **id** (bytes): sha1 identifier | ||||
- **indexer_configuration_id** (int): tool used to compute | - **indexer_configuration_id** (int): tool used to compute | ||||
the results | the results | ||||
Show All 18 Lines | def get(self, ids: Iterable[Sha1]) -> Iterator[TValue]: | ||||
dict: dictionaries with the following keys: | dict: dictionaries with the following keys: | ||||
- **id** (bytes) | - **id** (bytes) | ||||
- **tool** (dict): tool used to compute metadata | - **tool** (dict): tool used to compute metadata | ||||
- arbitrary data (as provided to `add`) | - arbitrary data (as provided to `add`) | ||||
""" | """ | ||||
for id_ in ids: | for id_ in ids: | ||||
for tool_id in self._tools_per_id.get(id_, set()): | for entry in self._data[id_].values(): | ||||
key = (id_, tool_id) | entry = entry.copy() | ||||
tool_id = entry.pop("indexer_configuration_id") | |||||
yield self.row_class( | yield self.row_class( | ||||
id=id_, | id=id_, tool=_transform_tool(self._tools[tool_id]), **entry, | ||||
tool=_transform_tool(self._tools[tool_id]), | |||||
**self._data[key], | |||||
) | ) | ||||
def get_all(self) -> Iterator[TValue]: | def get_all(self) -> Iterator[TValue]: | ||||
yield from self.get(self._sorted_ids) | yield from self.get(self._sorted_ids) | ||||
def get_partition( | def get_partition( | ||||
self, | self, | ||||
indexer_configuration_id: int, | indexer_configuration_id: int, | ||||
▲ Show 20 Lines • Show All 61 Lines • ▼ Show 20 Lines | def add(self, data: Iterable[TValue], conflict_update: bool) -> int: | ||||
results | results | ||||
- arbitrary data | - arbitrary data | ||||
conflict_update (bool): Flag to determine if we want to overwrite | conflict_update (bool): Flag to determine if we want to overwrite | ||||
(true) or skip duplicates (false) | (true) or skip duplicates (false) | ||||
""" | """ | ||||
data = list(data) | data = list(data) | ||||
check_id_duplicates(obj.to_dict() for obj in data) | check_id_duplicates(data) | ||||
count = 0 | count = 0 | ||||
for obj in data: | for obj in data: | ||||
item = obj.to_dict() | item = obj.to_dict() | ||||
tool_id = item.pop("indexer_configuration_id") | |||||
id_ = item.pop("id") | id_ = item.pop("id") | ||||
data_item = item | tool_id = item["indexer_configuration_id"] | ||||
if not conflict_update and tool_id in self._tools_per_id.get(id_, set()): | key = _key_from_dict(obj.unique_key()) | ||||
if not conflict_update and key in self._data[id_]: | |||||
# Duplicate, should not be updated | # Duplicate, should not be updated | ||||
continue | continue | ||||
key = (id_, tool_id) | self._data[id_][key] = item | ||||
self._data[key] = data_item | |||||
self._tools_per_id[id_].add(tool_id) | self._tools_per_id[id_].add(tool_id) | ||||
count += 1 | count += 1 | ||||
if id_ not in self._sorted_ids: | if id_ not in self._sorted_ids: | ||||
self._sorted_ids.add(id_) | self._sorted_ids.add(id_) | ||||
return count | return count | ||||
def add_merge( | def delete(self, entries: List[Dict]) -> int: | ||||
self, new_data: Iterable[TValue], conflict_update: bool, merged_key: str | |||||
) -> int: | |||||
added = 0 | |||||
all_subitems: List | |||||
for new_obj in new_data: | |||||
new_item = new_obj.to_dict() | |||||
id_ = new_item["id"] | |||||
tool_id = new_item["indexer_configuration_id"] | |||||
if conflict_update: | |||||
all_subitems = [] | |||||
else: | |||||
existing = list(self.get([id_])) | |||||
all_subitems = [ | |||||
old_subitem | |||||
for existing_item in existing | |||||
if existing_item.tool["id"] == tool_id # type: ignore | |||||
for old_subitem in getattr(existing_item, merged_key) | |||||
] | |||||
for new_subitem in new_item[merged_key]: | |||||
if new_subitem not in all_subitems: | |||||
all_subitems.append(new_subitem) | |||||
added += self.add( | |||||
[ | |||||
self.row_class( | |||||
id=id_, | |||||
indexer_configuration_id=tool_id, | |||||
**{merged_key: all_subitems}, # type: ignore | |||||
) # FIXME: this only works for classes with three attributes | |||||
], | |||||
conflict_update=True, | |||||
) | |||||
if id_ not in self._sorted_ids: | |||||
self._sorted_ids.add(id_) | |||||
return added | |||||
def delete(self, entries: List[KeyDict]) -> int: | |||||
"""Delete entries and return the number of entries deleted. | """Delete entries and return the number of entries deleted. | ||||
""" | """ | ||||
deleted = 0 | deleted = 0 | ||||
for entry in entries: | for entry in entries: | ||||
(id_, tool_id) = (entry["id"], entry["indexer_configuration_id"]) | (id_, tool_id) = (entry["id"], entry["indexer_configuration_id"]) | ||||
key = (id_, tool_id) | |||||
if tool_id in self._tools_per_id[id_]: | if tool_id in self._tools_per_id[id_]: | ||||
self._tools_per_id[id_].remove(tool_id) | self._tools_per_id[id_].remove(tool_id) | ||||
if key in self._data: | if id_ in self._data: | ||||
key = self._key_from_dict(entry) | |||||
if key in self._data[id_]: | |||||
deleted += 1 | deleted += 1 | ||||
del self._data[key] | del self._data[id_][key] | ||||
return deleted | return deleted | ||||
class IndexerStorage: | class IndexerStorage: | ||||
"""In-memory SWH indexer storage.""" | """In-memory SWH indexer storage.""" | ||||
def __init__(self): | def __init__(self): | ||||
self._tools = {} | self._tools = {} | ||||
self._mimetypes = SubStorage(ContentMimetypeRow, self._tools) | self._mimetypes = SubStorage(ContentMimetypeRow, self._tools) | ||||
self._languages = SubStorage(ContentLanguageRow, self._tools) | self._languages = SubStorage(ContentLanguageRow, self._tools) | ||||
self._content_ctags = SubStorage(ContentCtagsRow, self._tools) | self._content_ctags = SubStorage(ContentCtagsRow, self._tools) | ||||
self._licenses = SubStorage(ContentLicensesRow, self._tools) | self._licenses = SubStorage(ContentLicenseRow, self._tools) | ||||
self._content_metadata = SubStorage(ContentMetadataRow, self._tools) | self._content_metadata = SubStorage(ContentMetadataRow, self._tools) | ||||
self._revision_intrinsic_metadata = SubStorage( | self._revision_intrinsic_metadata = SubStorage( | ||||
RevisionIntrinsicMetadataRow, self._tools | RevisionIntrinsicMetadataRow, self._tools | ||||
) | ) | ||||
self._origin_intrinsic_metadata = SubStorage( | self._origin_intrinsic_metadata = SubStorage( | ||||
OriginIntrinsicMetadataRow, self._tools | OriginIntrinsicMetadataRow, self._tools | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 42 Lines • ▼ Show 20 Lines | ) -> Dict[str, int]: | ||||
) | ) | ||||
return {"content_language:add": added} | return {"content_language:add": added} | ||||
def content_ctags_missing(self, ctags): | def content_ctags_missing(self, ctags): | ||||
yield from self._content_ctags.missing(ctags) | yield from self._content_ctags.missing(ctags) | ||||
def content_ctags_get(self, ids): | def content_ctags_get(self, ids): | ||||
for item in self._content_ctags.get(ids): | for item in self._content_ctags.get(ids): | ||||
for item_ctags_item in item.ctags: | yield {"id": item.id, "tool": item.tool, **item.to_dict()} | ||||
yield {"id": item.id, "tool": item.tool, **item_ctags_item.to_dict()} | |||||
def content_ctags_add( | def content_ctags_add( | ||||
self, ctags: List[Dict], conflict_update: bool = False | self, ctags: List[Dict], conflict_update: bool = False | ||||
) -> Dict[str, int]: | ) -> Dict[str, int]: | ||||
check_id_types(ctags) | check_id_types(ctags) | ||||
added = self._content_ctags.add_merge( | added = self._content_ctags.add( | ||||
map(ContentCtagsRow.from_dict, ctags), conflict_update, "ctags" | map( | ||||
ContentCtagsRow.from_dict, | |||||
itertools.chain.from_iterable(map(converters.ctags_to_db, ctags)), | |||||
), | |||||
conflict_update, | |||||
) | ) | ||||
return {"content_ctags:add": added} | return {"content_ctags:add": added} | ||||
def content_ctags_search(self, expression, limit=10, last_sha1=None): | def content_ctags_search(self, expression, limit=10, last_sha1=None): | ||||
anlambert: Could you add types to that method signature ? | |||||
Done Inline ActionsThat's on my todo-list for next couple of weeks. vlorentz: That's on my todo-list for next couple of weeks. | |||||
nb_matches = 0 | nb_matches = 0 | ||||
items_per_id: Dict[Tuple[Sha1Git, ToolId], List[ContentCtagsRow]] = {} | |||||
for item in sorted(self._content_ctags.get_all()): | for item in sorted(self._content_ctags.get_all()): | ||||
if item.id <= (last_sha1 or bytes(0 for _ in range(SHA1_DIGEST_SIZE))): | if item.id <= (last_sha1 or bytes(0 for _ in range(SHA1_DIGEST_SIZE))): | ||||
continue | continue | ||||
for ctags_item in item.ctags: | items_per_id.setdefault( | ||||
if ctags_item.name != expression: | (item.id, item.indexer_configuration_id), [] | ||||
).append(item) | |||||
for items in items_per_id.values(): | |||||
ctags = [] | |||||
for item in items: | |||||
if item.name != expression: | |||||
continue | continue | ||||
nb_matches += 1 | nb_matches += 1 | ||||
yield { | if nb_matches > limit: | ||||
"id": item.id, | break | ||||
"tool": item.tool, | item_dict = item.to_dict() | ||||
**ctags_item.to_dict(), | id_ = item_dict.pop("id") | ||||
} | tool = item_dict.pop("tool") | ||||
if nb_matches >= limit: | ctags.append(item_dict) | ||||
return | |||||
if ctags: | |||||
for ctag in ctags: | |||||
yield {"id": id_, "tool": tool, **ctag} | |||||
def content_fossology_license_get(self, ids): | def content_fossology_license_get(self, ids): | ||||
Not Done Inline Actionssame here anlambert: same here | |||||
# Rewrites the output of SubStorage.get from the old format to | # Rewrites the output of SubStorage.get from the old format to | ||||
# the new one. SubStorage.get should be updated once all other | # the new one. SubStorage.get should be updated once all other | ||||
# *_get methods use the new format. | # *_get methods use the new format. | ||||
# See: https://forge.softwareheritage.org/T1433 | # See: https://forge.softwareheritage.org/T1433 | ||||
res = {} | for id_ in ids: | ||||
for obj in self._licenses.get(ids): | items = {} | ||||
d = obj.to_dict() | for obj in self._licenses.get([id_]): | ||||
res.setdefault(d.pop("id"), []).append(d) | items.setdefault(obj.tool["id"], (obj.tool, []))[1].append(obj.license) | ||||
for (id_, facts) in res.items(): | if items: | ||||
yield {id_: facts} | yield { | ||||
id_: [ | |||||
{"tool": tool, "licenses": licenses} | |||||
for (tool, licenses) in items.values() | |||||
] | |||||
} | |||||
def content_fossology_license_add( | def content_fossology_license_add( | ||||
self, licenses: List[Dict], conflict_update: bool = False | self, licenses: List[Dict], conflict_update: bool = False | ||||
) -> Dict[str, int]: | ) -> Dict[str, int]: | ||||
check_id_types(licenses) | check_id_types(licenses) | ||||
added = self._licenses.add_merge( | added = self._licenses.add( | ||||
map(ContentLicensesRow.from_dict, licenses), conflict_update, "licenses" | map( | ||||
ContentLicenseRow.from_dict, | |||||
itertools.chain.from_iterable( | |||||
map(converters.fossology_license_to_db, licenses) | |||||
), | |||||
), | |||||
conflict_update, | |||||
) | ) | ||||
return {"fossology_license_add:add": added} | return {"content_fossology_license:add": added} | ||||
def content_fossology_license_get_partition( | def content_fossology_license_get_partition( | ||||
self, | self, | ||||
indexer_configuration_id: int, | indexer_configuration_id: int, | ||||
partition_id: int, | partition_id: int, | ||||
nb_partitions: int, | nb_partitions: int, | ||||
page_token: Optional[str] = None, | page_token: Optional[str] = None, | ||||
limit: int = 1000, | limit: int = 1000, | ||||
▲ Show 20 Lines • Show All 153 Lines • Show Last 20 Lines |
Could you add types to that method signature ?