Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/indexer/storage/in_memory.py b/swh/indexer/storage/in_memory.py
index 8e061fd..75c7fe4 100644
--- a/swh/indexer/storage/in_memory.py
+++ b/swh/indexer/storage/in_memory.py
@@ -1,494 +1,545 @@
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import Counter, defaultdict
import itertools
import json
import math
import operator
import re
-from typing import Any, Dict, List, Optional
+from typing import (
+ Any,
+ Dict,
+ Generic,
+ Iterable,
+ Iterator,
+ List,
+ Optional,
+ Set,
+ Tuple,
+ Type,
+ TypeVar,
+)
from swh.core.collections import SortedList
from swh.model.hashutil import hash_to_bytes, hash_to_hex
from swh.model.model import SHA1_SIZE
from swh.storage.utils import get_partition_bounds_bytes
from . import MAPPING_NAMES, check_id_duplicates
from .exc import IndexerStorageArgumentException
from .interface import PagedResult, Sha1
+from .model import (
+ BaseRow,
+ ContentCtagsRow,
+ ContentLanguageRow,
+ ContentLicensesRow,
+ ContentMetadataRow,
+ ContentMimetypeRow,
+ KeyDict,
+ OriginIntrinsicMetadataRow,
+ RevisionIntrinsicMetadataRow,
+)
SHA1_DIGEST_SIZE = 160
def _transform_tool(tool):
return {
"id": tool["id"],
"name": tool["tool_name"],
"version": tool["tool_version"],
"configuration": tool["tool_configuration"],
}
def check_id_types(data: List[Dict[str, Any]]):
"""Checks all elements of the list have an 'id' whose type is 'bytes'."""
if not all(isinstance(item.get("id"), bytes) for item in data):
raise IndexerStorageArgumentException("identifiers must be bytes.")
-class SubStorage:
+ToolId = int
+TValue = TypeVar("TValue", bound=BaseRow)
+
+
+class SubStorage(Generic[TValue]):
"""Implements common missing/get/add logic for each indexer type."""
- def __init__(self, tools):
+ _data: Dict[Tuple[Sha1, ToolId], Dict[str, Any]]
+ _tools_per_id: Dict[Sha1, Set[ToolId]]
+
+ def __init__(self, row_class: Type[TValue], tools):
+ self.row_class = row_class
self._tools = tools
- self._sorted_ids = SortedList[bytes, bytes]()
- self._data = {} # map (id_, tool_id) -> metadata_dict
- self._tools_per_id = defaultdict(set) # map id_ -> Set[tool_id]
+ self._sorted_ids = SortedList[bytes, Sha1]()
+ self._data = {}
+ self._tools_per_id = defaultdict(set)
- def missing(self, ids):
+ def missing(self, keys: Iterable[KeyDict]) -> Iterator[Sha1]:
"""List data missing from storage.
Args:
data (iterable): dictionaries with keys:
- **id** (bytes): sha1 identifier
- **indexer_configuration_id** (int): tool used to compute
the results
Yields:
missing sha1s
"""
- for id_ in ids:
- tool_id = id_["indexer_configuration_id"]
- id_ = id_["id"]
+ for key in keys:
+ tool_id = key["indexer_configuration_id"]
+ id_ = key["id"]
if tool_id not in self._tools_per_id.get(id_, set()):
yield id_
- def get(self, ids):
+ def get(self, ids: Iterable[Sha1]) -> Iterator[TValue]:
"""Retrieve data per id.
Args:
ids (iterable): sha1 checksums
Yields:
dict: dictionaries with the following keys:
- **id** (bytes)
- **tool** (dict): tool used to compute metadata
- arbitrary data (as provided to `add`)
"""
for id_ in ids:
for tool_id in self._tools_per_id.get(id_, set()):
key = (id_, tool_id)
- yield {
- "id": id_,
- "tool": _transform_tool(self._tools[tool_id]),
+ yield self.row_class(
+ id=id_,
+ tool=_transform_tool(self._tools[tool_id]),
**self._data[key],
- }
+ )
- def get_all(self):
+ def get_all(self) -> Iterator[TValue]:
yield from self.get(self._sorted_ids)
def get_partition(
self,
indexer_configuration_id: int,
partition_id: int,
nb_partitions: int,
page_token: Optional[str] = None,
limit: int = 1000,
) -> PagedResult[Sha1]:
"""Retrieve ids of content with `indexer_type` within partition partition_id
bound by limit.
Args:
**indexer_type**: Type of data content to index (mimetype, language, etc...)
**indexer_configuration_id**: The tool used to index data
**partition_id**: index of the partition to fetch
**nb_partitions**: total number of partitions to split into
**page_token**: opaque token used for pagination
**limit**: Limit result (default to 1000)
**with_textual_data** (bool): Deal with only textual content (True) or all
content (all contents by defaults, False)
Raises:
IndexerStorageArgumentException for;
- limit to None
- wrong indexer_type provided
Returns:
PagedResult of Sha1. If next_page_token is None, there is no more data to
fetch
"""
if limit is None:
raise IndexerStorageArgumentException("limit should not be None")
(start, end) = get_partition_bounds_bytes(
partition_id, nb_partitions, SHA1_SIZE
)
if page_token:
start = hash_to_bytes(page_token)
if end is None:
end = b"\xff" * SHA1_SIZE
next_page_token: Optional[str] = None
ids: List[Sha1] = []
sha1s = (sha1 for sha1 in self._sorted_ids.iter_from(start))
for counter, sha1 in enumerate(sha1s):
if sha1 > end:
break
if counter >= limit:
next_page_token = hash_to_hex(sha1)
break
ids.append(sha1)
assert len(ids) <= limit
return PagedResult(results=ids, next_page_token=next_page_token)
- def add(self, data: List[Dict], conflict_update: bool) -> int:
+ def add(self, data: Iterable[TValue], conflict_update: bool) -> int:
"""Add data not present in storage.
Args:
data (iterable): dictionaries with keys:
- **id**: sha1
- **indexer_configuration_id**: tool used to compute the
results
- arbitrary data
conflict_update (bool): Flag to determine if we want to overwrite
(true) or skip duplicates (false)
"""
data = list(data)
- check_id_duplicates(data)
+ check_id_duplicates(obj.to_dict() for obj in data)
count = 0
- for item in data:
- item = item.copy()
+ for obj in data:
+ item = obj.to_dict()
tool_id = item.pop("indexer_configuration_id")
id_ = item.pop("id")
data_item = item
if not conflict_update and tool_id in self._tools_per_id.get(id_, set()):
# Duplicate, should not be updated
continue
key = (id_, tool_id)
self._data[key] = data_item
self._tools_per_id[id_].add(tool_id)
count += 1
if id_ not in self._sorted_ids:
self._sorted_ids.add(id_)
return count
def add_merge(
- self, new_data: List[Dict], conflict_update: bool, merged_key: str
+ self, new_data: Iterable[TValue], conflict_update: bool, merged_key: str
) -> int:
added = 0
all_subitems: List
- for new_item in new_data:
+ for new_obj in new_data:
+ new_item = new_obj.to_dict()
id_ = new_item["id"]
tool_id = new_item["indexer_configuration_id"]
if conflict_update:
all_subitems = []
else:
existing = list(self.get([id_]))
all_subitems = [
old_subitem
for existing_item in existing
- if existing_item["tool"]["id"] == tool_id
- for old_subitem in existing_item[merged_key]
+ if existing_item.tool["id"] == tool_id # type: ignore
+ for old_subitem in getattr(existing_item, merged_key)
]
for new_subitem in new_item[merged_key]:
if new_subitem not in all_subitems:
all_subitems.append(new_subitem)
added += self.add(
[
- {
- "id": id_,
- "indexer_configuration_id": tool_id,
- merged_key: all_subitems,
- }
+ self.row_class(
+ id=id_,
+ indexer_configuration_id=tool_id,
+ **{merged_key: all_subitems}, # type: ignore
+ ) # FIXME: this only works for classes with three attributes
],
conflict_update=True,
)
if id_ not in self._sorted_ids:
self._sorted_ids.add(id_)
return added
- def delete(self, entries: List[Dict]) -> int:
+ def delete(self, entries: List[KeyDict]) -> int:
"""Delete entries and return the number of entries deleted.
"""
deleted = 0
for entry in entries:
(id_, tool_id) = (entry["id"], entry["indexer_configuration_id"])
key = (id_, tool_id)
if tool_id in self._tools_per_id[id_]:
self._tools_per_id[id_].remove(tool_id)
if key in self._data:
deleted += 1
del self._data[key]
return deleted
class IndexerStorage:
"""In-memory SWH indexer storage."""
def __init__(self):
self._tools = {}
- self._mimetypes = SubStorage(self._tools)
- self._languages = SubStorage(self._tools)
- self._content_ctags = SubStorage(self._tools)
- self._licenses = SubStorage(self._tools)
- self._content_metadata = SubStorage(self._tools)
- self._revision_intrinsic_metadata = SubStorage(self._tools)
- self._origin_intrinsic_metadata = SubStorage(self._tools)
+ self._mimetypes = SubStorage(ContentMimetypeRow, self._tools)
+ self._languages = SubStorage(ContentLanguageRow, self._tools)
+ self._content_ctags = SubStorage(ContentCtagsRow, self._tools)
+ self._licenses = SubStorage(ContentLicensesRow, self._tools)
+ self._content_metadata = SubStorage(ContentMetadataRow, self._tools)
+ self._revision_intrinsic_metadata = SubStorage(
+ RevisionIntrinsicMetadataRow, self._tools
+ )
+ self._origin_intrinsic_metadata = SubStorage(
+ OriginIntrinsicMetadataRow, self._tools
+ )
def check_config(self, *, check_write):
return True
def content_mimetype_missing(self, mimetypes):
yield from self._mimetypes.missing(mimetypes)
def content_mimetype_get_partition(
self,
indexer_configuration_id: int,
partition_id: int,
nb_partitions: int,
page_token: Optional[str] = None,
limit: int = 1000,
) -> PagedResult[Sha1]:
return self._mimetypes.get_partition(
indexer_configuration_id, partition_id, nb_partitions, page_token, limit
)
def content_mimetype_add(
self, mimetypes: List[Dict], conflict_update: bool = False
) -> Dict[str, int]:
check_id_types(mimetypes)
- added = self._mimetypes.add(mimetypes, conflict_update)
+ added = self._mimetypes.add(
+ map(ContentMimetypeRow.from_dict, mimetypes), conflict_update
+ )
return {"content_mimetype:add": added}
def content_mimetype_get(self, ids):
- yield from self._mimetypes.get(ids)
+ yield from (obj.to_dict() for obj in self._mimetypes.get(ids))
def content_language_missing(self, languages):
yield from self._languages.missing(languages)
def content_language_get(self, ids):
- yield from self._languages.get(ids)
+ yield from (obj.to_dict() for obj in self._languages.get(ids))
def content_language_add(
self, languages: List[Dict], conflict_update: bool = False
) -> Dict[str, int]:
check_id_types(languages)
- added = self._languages.add(languages, conflict_update)
+ added = self._languages.add(
+ map(ContentLanguageRow.from_dict, languages), conflict_update
+ )
return {"content_language:add": added}
def content_ctags_missing(self, ctags):
yield from self._content_ctags.missing(ctags)
def content_ctags_get(self, ids):
for item in self._content_ctags.get(ids):
- for item_ctags_item in item["ctags"]:
- yield {"id": item["id"], "tool": item["tool"], **item_ctags_item}
+ for item_ctags_item in item.ctags:
+ yield {"id": item.id, "tool": item.tool, **item_ctags_item.to_dict()}
def content_ctags_add(
self, ctags: List[Dict], conflict_update: bool = False
) -> Dict[str, int]:
check_id_types(ctags)
- added = self._content_ctags.add_merge(ctags, conflict_update, "ctags")
+ added = self._content_ctags.add_merge(
+ map(ContentCtagsRow.from_dict, ctags), conflict_update, "ctags"
+ )
return {"content_ctags:add": added}
def content_ctags_search(self, expression, limit=10, last_sha1=None):
nb_matches = 0
- for ((id_, tool_id), item) in sorted(self._content_ctags._data.items()):
- if id_ <= (last_sha1 or bytes(0 for _ in range(SHA1_DIGEST_SIZE))):
+ for item in sorted(self._content_ctags.get_all()):
+ if item.id <= (last_sha1 or bytes(0 for _ in range(SHA1_DIGEST_SIZE))):
continue
- for ctags_item in item["ctags"]:
- if ctags_item["name"] != expression:
+ for ctags_item in item.ctags:
+ if ctags_item.name != expression:
continue
nb_matches += 1
yield {
- "id": id_,
- "tool": _transform_tool(self._tools[tool_id]),
- **ctags_item,
+ "id": item.id,
+ "tool": item.tool,
+ **ctags_item.to_dict(),
}
if nb_matches >= limit:
return
def content_fossology_license_get(self, ids):
# Rewrites the output of SubStorage.get from the old format to
# the new one. SubStorage.get should be updated once all other
# *_get methods use the new format.
# See: https://forge.softwareheritage.org/T1433
res = {}
- for d in self._licenses.get(ids):
+ for obj in self._licenses.get(ids):
+ d = obj.to_dict()
res.setdefault(d.pop("id"), []).append(d)
for (id_, facts) in res.items():
yield {id_: facts}
def content_fossology_license_add(
self, licenses: List[Dict], conflict_update: bool = False
) -> Dict[str, int]:
check_id_types(licenses)
- added = self._licenses.add_merge(licenses, conflict_update, "licenses")
+ added = self._licenses.add_merge(
+ map(ContentLicensesRow.from_dict, licenses), conflict_update, "licenses"
+ )
return {"fossology_license_add:add": added}
def content_fossology_license_get_partition(
self,
indexer_configuration_id: int,
partition_id: int,
nb_partitions: int,
page_token: Optional[str] = None,
limit: int = 1000,
) -> PagedResult[Sha1]:
return self._licenses.get_partition(
indexer_configuration_id, partition_id, nb_partitions, page_token, limit
)
def content_metadata_missing(self, metadata):
yield from self._content_metadata.missing(metadata)
def content_metadata_get(self, ids):
- yield from self._content_metadata.get(ids)
+ yield from (obj.to_dict() for obj in self._content_metadata.get(ids))
def content_metadata_add(
self, metadata: List[Dict], conflict_update: bool = False
) -> Dict[str, int]:
check_id_types(metadata)
- added = self._content_metadata.add(metadata, conflict_update)
+ added = self._content_metadata.add(
+ map(ContentMetadataRow.from_dict, metadata), conflict_update
+ )
return {"content_metadata:add": added}
def revision_intrinsic_metadata_missing(self, metadata):
yield from self._revision_intrinsic_metadata.missing(metadata)
def revision_intrinsic_metadata_get(self, ids):
- yield from self._revision_intrinsic_metadata.get(ids)
+ yield from (obj.to_dict() for obj in self._revision_intrinsic_metadata.get(ids))
def revision_intrinsic_metadata_add(
self, metadata: List[Dict], conflict_update: bool = False
) -> Dict[str, int]:
check_id_types(metadata)
- added = self._revision_intrinsic_metadata.add(metadata, conflict_update)
+ added = self._revision_intrinsic_metadata.add(
+ map(RevisionIntrinsicMetadataRow.from_dict, metadata), conflict_update
+ )
return {"revision_intrinsic_metadata:add": added}
def revision_intrinsic_metadata_delete(self, entries: List[Dict]) -> Dict:
deleted = self._revision_intrinsic_metadata.delete(entries)
return {"revision_intrinsic_metadata:del": deleted}
def origin_intrinsic_metadata_get(self, ids):
- yield from self._origin_intrinsic_metadata.get(ids)
+ yield from (obj.to_dict() for obj in self._origin_intrinsic_metadata.get(ids))
def origin_intrinsic_metadata_add(
self, metadata: List[Dict], conflict_update: bool = False
) -> Dict[str, int]:
- added = self._origin_intrinsic_metadata.add(metadata, conflict_update)
+ added = self._origin_intrinsic_metadata.add(
+ map(OriginIntrinsicMetadataRow.from_dict, metadata), conflict_update
+ )
return {"origin_intrinsic_metadata:add": added}
def origin_intrinsic_metadata_delete(self, entries: List[Dict]) -> Dict:
deleted = self._origin_intrinsic_metadata.delete(entries)
return {"origin_intrinsic_metadata:del": deleted}
def origin_intrinsic_metadata_search_fulltext(self, conjunction, limit=100):
# A very crude fulltext search implementation, but that's enough
# to work on English metadata
tokens_re = re.compile("[a-zA-Z0-9]+")
search_tokens = list(itertools.chain(*map(tokens_re.findall, conjunction)))
def rank(data):
# Tokenize the metadata
- text = json.dumps(data["metadata"])
+ text = json.dumps(data.metadata)
text_tokens = tokens_re.findall(text)
text_token_occurences = Counter(text_tokens)
# Count the number of occurrences of search tokens in the text
score = 0
for search_token in search_tokens:
if text_token_occurences[search_token] == 0:
# Search token is not in the text.
return 0
score += text_token_occurences[search_token]
# Normalize according to the text's length
return score / math.log(len(text_tokens))
results = [
(rank(data), data) for data in self._origin_intrinsic_metadata.get_all()
]
results = [(rank_, data) for (rank_, data) in results if rank_ > 0]
results.sort(
key=operator.itemgetter(0), reverse=True # Don't try to order 'data'
)
for (rank_, result) in results[:limit]:
- yield result
+ yield result.to_dict()
def origin_intrinsic_metadata_search_by_producer(
self, page_token="", limit=100, ids_only=False, mappings=None, tool_ids=None
):
assert isinstance(page_token, str)
nb_results = 0
if mappings is not None:
mappings = frozenset(mappings)
if tool_ids is not None:
tool_ids = frozenset(tool_ids)
origins = []
# we go to limit+1 to check whether we should add next_page_token in
# the response
for entry in self._origin_intrinsic_metadata.get_all():
- if entry["id"] <= page_token:
+ if entry.id <= page_token:
continue
if nb_results >= (limit + 1):
break
- if mappings is not None and mappings.isdisjoint(entry["mappings"]):
+ if mappings is not None and mappings.isdisjoint(entry.mappings):
continue
- if tool_ids is not None and entry["tool"]["id"] not in tool_ids:
+ if tool_ids is not None and entry.tool["id"] not in tool_ids:
continue
- origins.append(entry)
+ origins.append(entry.to_dict())
nb_results += 1
result = {}
if len(origins) > limit:
origins = origins[:limit]
result["next_page_token"] = origins[-1]["id"]
if ids_only:
origins = [origin["id"] for origin in origins]
result["origins"] = origins
return result
def origin_intrinsic_metadata_stats(self):
mapping_count = {m: 0 for m in MAPPING_NAMES}
total = non_empty = 0
for data in self._origin_intrinsic_metadata.get_all():
total += 1
- if set(data["metadata"]) - {"@context"}:
+ if set(data.metadata) - {"@context"}:
non_empty += 1
- for mapping in data["mappings"]:
+ for mapping in data.mappings:
mapping_count[mapping] += 1
return {"per_mapping": mapping_count, "total": total, "non_empty": non_empty}
def indexer_configuration_add(self, tools):
inserted = []
for tool in tools:
tool = tool.copy()
id_ = self._tool_key(tool)
tool["id"] = id_
self._tools[id_] = tool
inserted.append(tool)
return inserted
def indexer_configuration_get(self, tool):
return self._tools.get(self._tool_key(tool))
def _tool_key(self, tool):
return hash(
(
tool["tool_name"],
tool["tool_version"],
json.dumps(tool["tool_configuration"], sort_keys=True),
)
)
diff --git a/swh/indexer/storage/model.py b/swh/indexer/storage/model.py
new file mode 100644
index 0000000..0c12ca8
--- /dev/null
+++ b/swh/indexer/storage/model.py
@@ -0,0 +1,119 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""Classes used internally by the in-memory idx-storage, and will be
+used for the interface of the idx-storage in the near future."""
+
+from __future__ import annotations
+
+from typing import Any, Dict, List, Optional, Type, TypeVar
+
+import attr
+from typing_extensions import TypedDict
+
+from swh.model.model import Sha1Git, dictify
+
+
+class KeyDict(TypedDict):
+ id: Sha1Git
+ indexer_configuration_id: int
+
+
+TSelf = TypeVar("TSelf")
+
+
+@attr.s
+class BaseRow:
+ id = attr.ib(type=Sha1Git)
+ indexer_configuration_id = attr.ib(type=Optional[int], default=None, kw_only=True)
+ tool = attr.ib(type=Optional[Dict], default=None, kw_only=True)
+
+ def __attrs_post_init__(self):
+ if self.indexer_configuration_id is None and self.tool is None:
+ raise TypeError("Either indexer_configuration_id or tool must be not None.")
+ if self.indexer_configuration_id is not None and self.tool is not None:
+ raise TypeError(
+ "indexer_configuration_id and tool are mutually exclusive; "
+ "only one may be not None."
+ )
+
+ def anonymize(self: TSelf) -> Optional[TSelf]:
+ # Needed to implement swh.journal.writer.ValueProtocol
+ return None
+
+ def to_dict(self) -> Dict[str, Any]:
+ """Wrapper of `attr.asdict` that can be overridden by subclasses
+ that have special handling of some of the fields."""
+ d = dictify(attr.asdict(self, recurse=False))
+ if d["indexer_configuration_id"] is None:
+ del d["indexer_configuration_id"]
+ if d["tool"] is None:
+ del d["tool"]
+
+ return d
+
+ @classmethod
+ def from_dict(cls: Type[TSelf], d) -> TSelf:
+ return cls(**d) # type: ignore
+
+
+@attr.s
+class ContentMimetypeRow(BaseRow):
+ mimetype = attr.ib(type=str)
+ encoding = attr.ib(type=str)
+
+
+@attr.s
+class ContentLanguageRow(BaseRow):
+ lang = attr.ib(type=str)
+
+
+@attr.s
+class ContentCtagsEntry:
+ name = attr.ib(type=str)
+ kind = attr.ib(type=str)
+ line = attr.ib(type=int)
+ lang = attr.ib(type=str)
+
+ def to_dict(self) -> Dict[str, Any]:
+ return dictify(attr.asdict(self, recurse=False))
+
+ @classmethod
+ def from_dict(cls, d) -> ContentCtagsEntry:
+ return cls(**d)
+
+
+@attr.s
+class ContentCtagsRow(BaseRow):
+ ctags = attr.ib(type=List[ContentCtagsEntry])
+
+ @classmethod
+ def from_dict(cls, d) -> ContentCtagsRow:
+ d = d.copy()
+ items = d.pop("ctags")
+ return cls(ctags=[ContentCtagsEntry.from_dict(item) for item in items], **d,)
+
+
+@attr.s
+class ContentLicensesRow(BaseRow):
+ licenses = attr.ib(type=List[str])
+
+
+@attr.s
+class ContentMetadataRow(BaseRow):
+ metadata = attr.ib(type=Dict[str, Any])
+
+
+@attr.s
+class RevisionIntrinsicMetadataRow(BaseRow):
+ metadata = attr.ib(type=Dict[str, Any])
+ mappings = attr.ib(type=List[str])
+
+
+@attr.s
+class OriginIntrinsicMetadataRow(BaseRow):
+ metadata = attr.ib(type=Dict[str, Any])
+ from_revision = attr.ib(type=Sha1Git)
+ mappings = attr.ib(type=List[str])

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 11:58 AM (3 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3248393

Event Timeline