Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/storage/in_memory.py
Show All 35 Lines | from .model import ( | ||||
ContentCtagsRow, | ContentCtagsRow, | ||||
ContentLanguageRow, | ContentLanguageRow, | ||||
ContentLicenseRow, | ContentLicenseRow, | ||||
ContentMetadataRow, | ContentMetadataRow, | ||||
ContentMimetypeRow, | ContentMimetypeRow, | ||||
OriginIntrinsicMetadataRow, | OriginIntrinsicMetadataRow, | ||||
RevisionIntrinsicMetadataRow, | RevisionIntrinsicMetadataRow, | ||||
) | ) | ||||
from .writer import JournalWriter | |||||
SHA1_DIGEST_SIZE = 160 | SHA1_DIGEST_SIZE = 160 | ||||
ToolId = int | |||||
def _transform_tool(tool): | def _transform_tool(tool): | ||||
return { | return { | ||||
"id": tool["id"], | "id": tool["id"], | ||||
"name": tool["tool_name"], | "name": tool["tool_name"], | ||||
"version": tool["tool_version"], | "version": tool["tool_version"], | ||||
"configuration": tool["tool_configuration"], | "configuration": tool["tool_configuration"], | ||||
} | } | ||||
def check_id_types(data: List[Dict[str, Any]]): | def check_id_types(data: List[Dict[str, Any]]): | ||||
"""Checks all elements of the list have an 'id' whose type is 'bytes'.""" | """Checks all elements of the list have an 'id' whose type is 'bytes'.""" | ||||
if not all(isinstance(item.get("id"), bytes) for item in data): | if not all(isinstance(item.get("id"), bytes) for item in data): | ||||
raise IndexerStorageArgumentException("identifiers must be bytes.") | raise IndexerStorageArgumentException("identifiers must be bytes.") | ||||
def _key_from_dict(d): | def _key_from_dict(d): | ||||
return tuple(sorted(d.items())) | return tuple(sorted(d.items())) | ||||
ToolId = int | |||||
TValue = TypeVar("TValue", bound=BaseRow) | TValue = TypeVar("TValue", bound=BaseRow) | ||||
class SubStorage(Generic[TValue]): | class SubStorage(Generic[TValue]): | ||||
"""Implements common missing/get/add logic for each indexer type.""" | """Implements common missing/get/add logic for each indexer type.""" | ||||
_data: Dict[Sha1, Dict[Tuple, Dict[str, Any]]] | _data: Dict[Sha1, Dict[Tuple, Dict[str, Any]]] | ||||
_tools_per_id: Dict[Sha1, Set[ToolId]] | _tools_per_id: Dict[Sha1, Set[ToolId]] | ||||
def __init__(self, row_class: Type[TValue], tools): | def __init__(self, obj_type: str, row_class: Type[TValue], tools, journal_writer): | ||||
self.row_class = row_class | self.row_class = row_class | ||||
self._obj_type = obj_type | |||||
self._tools = tools | self._tools = tools | ||||
self._sorted_ids = SortedList[bytes, Sha1]() | self._sorted_ids = SortedList[bytes, Sha1]() | ||||
self._data = defaultdict(dict) | self._data = defaultdict(dict) | ||||
self._journal_writer = journal_writer | |||||
self._tools_per_id = defaultdict(set) | self._tools_per_id = defaultdict(set) | ||||
def _key_from_dict(self, d) -> Tuple: | def _key_from_dict(self, d) -> Tuple: | ||||
"""Like the global _key_from_dict, but filters out dict keys that don't | """Like the global _key_from_dict, but filters out dict keys that don't | ||||
belong in the unique key.""" | belong in the unique key.""" | ||||
return _key_from_dict({k: d[k] for k in self.row_class.UNIQUE_KEY_FIELDS}) | return _key_from_dict({k: d[k] for k in self.row_class.UNIQUE_KEY_FIELDS}) | ||||
def missing(self, keys: Iterable[Dict]) -> List[Sha1]: | def missing(self, keys: Iterable[Dict]) -> List[Sha1]: | ||||
▲ Show 20 Lines • Show All 115 Lines • ▼ Show 20 Lines | def add(self, data: Iterable[TValue], conflict_update: bool) -> int: | ||||
- arbitrary data | - arbitrary data | ||||
conflict_update (bool): Flag to determine if we want to overwrite | conflict_update (bool): Flag to determine if we want to overwrite | ||||
(true) or skip duplicates (false) | (true) or skip duplicates (false) | ||||
""" | """ | ||||
data = list(data) | data = list(data) | ||||
check_id_duplicates(data) | check_id_duplicates(data) | ||||
self._journal_writer.write_additions(self._obj_type, data) | |||||
count = 0 | count = 0 | ||||
for obj in data: | for obj in data: | ||||
item = obj.to_dict() | item = obj.to_dict() | ||||
id_ = item.pop("id") | id_ = item.pop("id") | ||||
tool_id = item["indexer_configuration_id"] | tool_id = item["indexer_configuration_id"] | ||||
key = _key_from_dict(obj.unique_key()) | key = _key_from_dict(obj.unique_key()) | ||||
if not conflict_update and key in self._data[id_]: | if not conflict_update and key in self._data[id_]: | ||||
# Duplicate, should not be updated | # Duplicate, should not be updated | ||||
continue | continue | ||||
self._data[id_][key] = item | self._data[id_][key] = item | ||||
self._tools_per_id[id_].add(tool_id) | self._tools_per_id[id_].add(tool_id) | ||||
count += 1 | count += 1 | ||||
if id_ not in self._sorted_ids: | if id_ not in self._sorted_ids: | ||||
self._sorted_ids.add(id_) | self._sorted_ids.add(id_) | ||||
return count | return count | ||||
def delete(self, entries: List[Dict]) -> int: | def delete(self, entries: List[Dict]) -> int: | ||||
"""Delete entries and return the number of entries deleted. | """Delete entries and return the number of entries deleted. | ||||
""" | """ | ||||
deleted = 0 | deleted = 0 | ||||
self._journal_writer.write_deletions(self._obj_type, entries) | |||||
for entry in entries: | for entry in entries: | ||||
(id_, tool_id) = (entry["id"], entry["indexer_configuration_id"]) | (id_, tool_id) = (entry["id"], entry["indexer_configuration_id"]) | ||||
if tool_id in self._tools_per_id[id_]: | if tool_id in self._tools_per_id[id_]: | ||||
self._tools_per_id[id_].remove(tool_id) | self._tools_per_id[id_].remove(tool_id) | ||||
if id_ in self._data: | if id_ in self._data: | ||||
key = self._key_from_dict(entry) | key = self._key_from_dict(entry) | ||||
if key in self._data[id_]: | if key in self._data[id_]: | ||||
deleted += 1 | deleted += 1 | ||||
del self._data[id_][key] | del self._data[id_][key] | ||||
return deleted | return deleted | ||||
class IndexerStorage: | class IndexerStorage: | ||||
"""In-memory SWH indexer storage.""" | """In-memory SWH indexer storage.""" | ||||
def __init__(self): | def __init__(self, journal_writer=None): | ||||
self._tools = {} | self._tools = {} | ||||
self._mimetypes = SubStorage(ContentMimetypeRow, self._tools) | tool_getter = self._tools.__getitem__ | ||||
self._languages = SubStorage(ContentLanguageRow, self._tools) | self._writer = JournalWriter(tool_getter, journal_writer) | ||||
self._content_ctags = SubStorage(ContentCtagsRow, self._tools) | args = (self._tools, self._writer) | ||||
self._licenses = SubStorage(ContentLicenseRow, self._tools) | self._mimetypes = SubStorage("mimetype", ContentMimetypeRow, *args) | ||||
self._content_metadata = SubStorage(ContentMetadataRow, self._tools) | self._languages = SubStorage("language", ContentLanguageRow, *args) | ||||
self._content_ctags = SubStorage("content_ctag", ContentCtagsRow, *args) | |||||
self._licenses = SubStorage("license", ContentLicenseRow, *args) | |||||
self._content_metadata = SubStorage( | |||||
"content_metadata", ContentMetadataRow, *args | |||||
) | |||||
self._revision_intrinsic_metadata = SubStorage( | self._revision_intrinsic_metadata = SubStorage( | ||||
RevisionIntrinsicMetadataRow, self._tools | "revision_intrinsic_metadata", RevisionIntrinsicMetadataRow, *args | ||||
) | ) | ||||
self._origin_intrinsic_metadata = SubStorage( | self._origin_intrinsic_metadata = SubStorage( | ||||
OriginIntrinsicMetadataRow, self._tools | "origin_intrinsic_metadata", OriginIntrinsicMetadataRow, *args | ||||
) | ) | ||||
def check_config(self, *, check_write): | def check_config(self, *, check_write): | ||||
return True | return True | ||||
def content_mimetype_missing( | def content_mimetype_missing( | ||||
self, mimetypes: Iterable[Dict] | self, mimetypes: Iterable[Dict] | ||||
) -> List[Tuple[Sha1, int]]: | ) -> List[Tuple[Sha1, int]]: | ||||
▲ Show 20 Lines • Show All 252 Lines • Show Last 20 Lines |