Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/storage/in_memory.py
# Copyright (C) 2018 The Software Heritage developers | # Copyright (C) 2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import bisect | |||||
from collections import defaultdict | from collections import defaultdict | ||||
import json | import json | ||||
SHA1_DIGEST_SIZE = 160 | |||||
class MetadataStorage: | |||||
"""Implements missing/get/add logic for both content_metadata and | |||||
revision_metadata.""" | |||||
def __init__(self, tools): | |||||
self._tools = tools | |||||
self._metadata = {} # map (id_, tool_id) -> metadata_dict | |||||
self._tools_per_id = defaultdict(set) # map id_ -> Set[tool_id] | |||||
def _transform_tool(self, tool): | def _transform_tool(tool): | ||||
return { | return { | ||||
'id': tool['id'], | 'id': tool['id'], | ||||
'name': tool['tool_name'], | 'name': tool['tool_name'], | ||||
'version': tool['tool_version'], | 'version': tool['tool_version'], | ||||
'configuration': tool['tool_configuration'], | 'configuration': tool['tool_configuration'], | ||||
} | } | ||||
class SubStorage: | |||||
"""Implements common missing/get/add logic for each indexer type.""" | |||||
def __init__(self, tools): | |||||
self._tools = tools | |||||
self._sorted_ids = [] | |||||
self._data = {} # map (id_, tool_id) -> metadata_dict | |||||
self._tools_per_id = defaultdict(set) # map id_ -> Set[tool_id] | |||||
def missing(self, ids): | def missing(self, ids): | ||||
"""List metadata missing from storage. | """List data missing from storage. | ||||
Args: | Args: | ||||
metadata (iterable): dictionaries with keys: | data (iterable): dictionaries with keys: | ||||
- **id** (bytes): sha1 identifier | - **id** (bytes): sha1 identifier | ||||
- **indexer_configuration_id** (int): tool used to compute | - **indexer_configuration_id** (int): tool used to compute | ||||
the results | the results | ||||
Yields: | Yields: | ||||
missing sha1s | missing sha1s | ||||
""" | """ | ||||
for id_ in ids: | for id_ in ids: | ||||
tool_id = id_['indexer_configuration_id'] | tool_id = id_['indexer_configuration_id'] | ||||
id_ = id_['id'] | id_ = id_['id'] | ||||
if tool_id not in self._tools_per_id.get(id_, set()): | if tool_id not in self._tools_per_id.get(id_, set()): | ||||
yield id_ | yield id_ | ||||
def get(self, ids): | def get(self, ids): | ||||
"""Retrieve metadata per id. | """Retrieve data per id. | ||||
Args: | Args: | ||||
ids (iterable): sha1 checksums | ids (iterable): sha1 checksums | ||||
Yields: | Yields: | ||||
dict: dictionaries with the following keys: | dict: dictionaries with the following keys: | ||||
- **id** (bytes) | - **id** (bytes) | ||||
- **translated_metadata** (str): associated metadata | |||||
- **tool** (dict): tool used to compute metadata | - **tool** (dict): tool used to compute metadata | ||||
- arbitrary data (as provided to `add`) | |||||
""" | """ | ||||
for id_ in ids: | for id_ in ids: | ||||
for tool_id in self._tools_per_id.get(id_, set()): | for tool_id in self._tools_per_id.get(id_, set()): | ||||
key = (id_, tool_id) | key = (id_, tool_id) | ||||
yield { | yield { | ||||
'id': id_, | 'id': id_, | ||||
'tool': self._transform_tool(self._tools[tool_id]), | 'tool': _transform_tool(self._tools[tool_id]), | ||||
'translated_metadata': self._metadata[key], | **self._data[key], | ||||
} | } | ||||
def add(self, metadata, conflict_update): | def get_range(self, start, end, indexer_configuration_id, limit): | ||||
"""Add metadata not present in storage. | """Retrieve data within range [start, end] bound by limit. | ||||
Args: | Args: | ||||
metadata (iterable): dictionaries with keys: | **start** (bytes): Starting identifier range (expected smaller | ||||
than end) | |||||
**end** (bytes): Ending identifier range (expected larger | |||||
than start) | |||||
**indexer_configuration_id** (int): The tool used to index data | |||||
**limit** (int): Limit result | |||||
Raises: | |||||
ValueError for limit to None | |||||
Returns: | |||||
a dict with keys: | |||||
- **ids** [bytes]: iterable of content ids within the range. | |||||
- **next** (Optional[bytes]): The next range of sha1 starts at | |||||
this sha1 if any | |||||
""" | |||||
if limit is None: | |||||
raise ValueError('Development error: limit should not be None') | |||||
from_index = bisect.bisect_left(self._sorted_ids, start) | |||||
to_index = bisect.bisect_right(self._sorted_ids, end, lo=from_index) | |||||
if to_index - from_index >= limit: | |||||
return { | |||||
'ids': self._sorted_ids[from_index:from_index+limit], | |||||
'next': self._sorted_ids[from_index+limit], | |||||
} | |||||
else: | |||||
return { | |||||
'ids': self._sorted_ids[from_index:to_index], | |||||
'next': None, | |||||
} | |||||
def add(self, data, conflict_update): | |||||
"""Add data not present in storage. | |||||
Args: | |||||
data (iterable): dictionaries with keys: | |||||
- **id**: sha1 | - **id**: sha1 | ||||
- **translated_metadata**: arbitrary dict | |||||
- **indexer_configuration_id**: tool used to compute the | - **indexer_configuration_id**: tool used to compute the | ||||
results | results | ||||
- arbitrary data | |||||
conflict_update (bool): Flag to determine if we want to overwrite | conflict_update (bool): Flag to determine if we want to overwrite | ||||
(true) or skip duplicates (false) | (true) or skip duplicates (false) | ||||
""" | """ | ||||
for item in metadata: | for item in data: | ||||
tool_id = item['indexer_configuration_id'] | item = item.copy() | ||||
data = item['translated_metadata'] | tool_id = item.pop('indexer_configuration_id') | ||||
id_ = item['id'] | id_ = item.pop('id') | ||||
data = item | |||||
if not conflict_update and \ | if not conflict_update and \ | ||||
tool_id in self._tools_per_id.get(id_, set()): | tool_id in self._tools_per_id.get(id_, set()): | ||||
# Duplicate, should not be updated | # Duplicate, should not be updated | ||||
continue | continue | ||||
key = (id_, tool_id) | key = (id_, tool_id) | ||||
self._metadata[key] = data | self._data[key] = data | ||||
self._tools_per_id[id_].add(tool_id) | self._tools_per_id[id_].add(tool_id) | ||||
if id_ not in self._sorted_ids: | |||||
bisect.insort(self._sorted_ids, id_) | |||||
def add_merge(self, new_data, conflict_update, merged_key): | |||||
for new_item in new_data: | |||||
id_ = new_item['id'] | |||||
tool_id = new_item['indexer_configuration_id'] | |||||
if conflict_update: | |||||
all_subitems = [] | |||||
else: | |||||
existing = list(self.get([id_])) | |||||
all_subitems = [ | |||||
old_subitem | |||||
for existing_item in existing | |||||
if existing_item['tool']['id'] == tool_id | |||||
for old_subitem in existing_item[merged_key] | |||||
] | |||||
for new_subitem in new_item[merged_key]: | |||||
if new_subitem not in all_subitems: | |||||
all_subitems.append(new_subitem) | |||||
self.add([ | |||||
{ | |||||
'id': id_, | |||||
'indexer_configuration_id': tool_id, | |||||
merged_key: all_subitems, | |||||
} | |||||
], conflict_update=True) | |||||
if id_ not in self._sorted_ids: | |||||
bisect.insort(self._sorted_ids, id_) | |||||
class IndexerStorage: | class IndexerStorage: | ||||
"""In-memory SWH indexer storage.""" | """In-memory SWH indexer storage.""" | ||||
def __init__(self): | def __init__(self): | ||||
self._tools = {} | self._tools = {} | ||||
self._content_metadata = MetadataStorage(self._tools) | self._mimetypes = SubStorage(self._tools) | ||||
self._revision_metadata = MetadataStorage(self._tools) | self._languages = SubStorage(self._tools) | ||||
self._content_ctags = SubStorage(self._tools) | |||||
self._licenses = SubStorage(self._tools) | |||||
self._content_metadata = SubStorage(self._tools) | |||||
self._revision_metadata = SubStorage(self._tools) | |||||
def content_mimetype_missing(self, mimetypes): | |||||
"""Generate mimetypes missing from storage. | |||||
Args: | |||||
mimetypes (iterable): iterable of dict with keys: | |||||
- **id** (bytes): sha1 identifier | |||||
- **indexer_configuration_id** (int): tool used to compute the | |||||
results | |||||
Yields: | |||||
tuple (id, indexer_configuration_id): missing id | |||||
""" | |||||
yield from self._mimetypes.missing(mimetypes) | |||||
def content_mimetype_get_range( | |||||
self, start, end, indexer_configuration_id, limit=1000): | |||||
"""Retrieve mimetypes within range [start, end] bound by limit. | |||||
Args: | |||||
**start** (bytes): Starting identifier range (expected smaller | |||||
than end) | |||||
**end** (bytes): Ending identifier range (expected larger | |||||
than start) | |||||
**indexer_configuration_id** (int): The tool used to index data | |||||
**limit** (int): Limit result (default to 1000) | |||||
Raises: | |||||
ValueError for limit to None | |||||
Returns: | |||||
a dict with keys: | |||||
- **ids** [bytes]: iterable of content ids within the range. | |||||
- **next** (Optional[bytes]): The next range of sha1 starts at | |||||
this sha1 if any | |||||
""" | |||||
return self._mimetypes.get_range( | |||||
start, end, indexer_configuration_id, limit) | |||||
def content_mimetype_add(self, mimetypes, conflict_update=False): | |||||
"""Add mimetypes not present in storage. | |||||
Args: | |||||
mimetypes (iterable): dictionaries with keys: | |||||
- **id** (bytes): sha1 identifier | |||||
- **mimetype** (bytes): raw content's mimetype | |||||
- **encoding** (bytes): raw content's encoding | |||||
- **indexer_configuration_id** (int): tool's id used to | |||||
compute the results | |||||
- **conflict_update** (bool): Flag to determine if we want to | |||||
overwrite (``True``) or skip duplicates (``False``, the | |||||
default) | |||||
""" | |||||
if not all(isinstance(x['id'], bytes) for x in mimetypes): | |||||
raise TypeError('identifiers must be bytes.') | |||||
self._mimetypes.add(mimetypes, conflict_update) | |||||
def content_mimetype_get(self, ids, db=None, cur=None): | |||||
"""Retrieve full content mimetype per ids. | |||||
Args: | |||||
ids (iterable): sha1 identifier | |||||
Yields: | |||||
mimetypes (iterable): dictionaries with keys: | |||||
- **id** (bytes): sha1 identifier | |||||
- **mimetype** (bytes): raw content's mimetype | |||||
- **encoding** (bytes): raw content's encoding | |||||
- **tool** (dict): Tool used to compute the language | |||||
""" | |||||
yield from self._mimetypes.get(ids) | |||||
def content_language_missing(self, languages): | |||||
"""List languages missing from storage. | |||||
Args: | |||||
languages (iterable): dictionaries with keys: | |||||
- **id** (bytes): sha1 identifier | |||||
- **indexer_configuration_id** (int): tool used to compute | |||||
the results | |||||
Yields: | |||||
an iterable of missing id for the tuple (id, | |||||
indexer_configuration_id) | |||||
""" | |||||
yield from self._languages.missing(languages) | |||||
def content_language_get(self, ids): | |||||
"""Retrieve full content language per ids. | |||||
Args: | |||||
ids (iterable): sha1 identifier | |||||
Yields: | |||||
languages (iterable): dictionaries with keys: | |||||
- **id** (bytes): sha1 identifier | |||||
- **lang** (bytes): raw content's language | |||||
- **tool** (dict): Tool used to compute the language | |||||
""" | |||||
yield from self._languages.get(ids) | |||||
def content_language_add(self, languages, conflict_update=False): | |||||
"""Add languages not present in storage. | |||||
Args: | |||||
languages (iterable): dictionaries with keys: | |||||
- **id** (bytes): sha1 | |||||
- **lang** (bytes): language detected | |||||
conflict_update (bool): Flag to determine if we want to | |||||
overwrite (true) or skip duplicates (false, the | |||||
default) | |||||
""" | |||||
self._languages.add(languages, conflict_update) | |||||
def content_ctags_missing(self, ctags): | |||||
"""List ctags missing from storage. | |||||
Args: | |||||
ctags (iterable): dicts with keys: | |||||
- **id** (bytes): sha1 identifier | |||||
- **indexer_configuration_id** (int): tool used to compute | |||||
the results | |||||
Yields: | |||||
an iterable of missing id for the tuple (id, | |||||
indexer_configuration_id) | |||||
""" | |||||
yield from self._content_ctags.missing(ctags) | |||||
def content_ctags_get(self, ids): | |||||
"""Retrieve ctags per id. | |||||
Args: | |||||
ids (iterable): sha1 checksums | |||||
Yields: | |||||
Dictionaries with keys: | |||||
- **id** (bytes): content's identifier | |||||
- **name** (str): symbol's name | |||||
- **kind** (str): symbol's kind | |||||
- **lang** (str): language for that content | |||||
- **tool** (dict): tool used to compute the ctags' info | |||||
""" | |||||
for item in self._content_ctags.get(ids): | |||||
for item_ctags_item in item['ctags']: | |||||
yield { | |||||
'id': item['id'], | |||||
'tool': item['tool'], | |||||
**item_ctags_item | |||||
} | |||||
def content_ctags_add(self, ctags, conflict_update=False): | |||||
"""Add ctags not present in storage | |||||
Args: | |||||
ctags (iterable): dictionaries with keys: | |||||
- **id** (bytes): sha1 | |||||
- **ctags** ([list): List of dictionary with keys: name, kind, | |||||
line, lang | |||||
- **indexer_configuration_id**: tool used to compute the | |||||
results | |||||
""" | |||||
if not all(isinstance(x['id'], bytes) for x in ctags): | |||||
raise TypeError('identifiers must be bytes.') | |||||
self._content_ctags.add_merge(ctags, conflict_update, 'ctags') | |||||
def content_ctags_search(self, expression, | |||||
limit=10, last_sha1=None, db=None, cur=None): | |||||
"""Search through content's raw ctags symbols. | |||||
Args: | |||||
expression (str): Expression to search for | |||||
limit (int): Number of rows to return (default to 10). | |||||
last_sha1 (str): Offset from which retrieving data (default to ''). | |||||
Yields: | |||||
rows of ctags including id, name, lang, kind, line, etc... | |||||
""" | |||||
nb_matches = 0 | |||||
for ((id_, tool_id), item) in \ | |||||
sorted(self._content_ctags._data.items()): | |||||
if id_ <= (last_sha1 or bytes(0 for _ in range(SHA1_DIGEST_SIZE))): | |||||
continue | |||||
nb_matches += 1 | |||||
for ctags_item in item['ctags']: | |||||
if ctags_item['name'] != expression: | |||||
continue | |||||
yield { | |||||
'id': id_, | |||||
'tool': _transform_tool(self._tools[tool_id]), | |||||
**ctags_item | |||||
} | |||||
if nb_matches >= limit: | |||||
return | |||||
def content_fossology_license_get(self, ids): | |||||
"""Retrieve licenses per id. | |||||
Args: | |||||
ids (iterable): sha1 checksums | |||||
Yields: | |||||
`{id: facts}` where `facts` is a dict with the following keys: | |||||
- **licenses** ([str]): associated licenses for that content | |||||
- **tool** (dict): Tool used to compute the license | |||||
""" | |||||
# TODO: remove this reformatting in order to yield items with the | |||||
# same format as other _get methods. | |||||
res = {} | |||||
for d in self._licenses.get(ids): | |||||
res.setdefault(d.pop('id'), []).append(d) | |||||
for (id_, facts) in res.items(): | |||||
yield {id_: facts} | |||||
def content_fossology_license_add(self, licenses, conflict_update=False): | |||||
"""Add licenses not present in storage. | |||||
Args: | |||||
licenses (iterable): dictionaries with keys: | |||||
- **id**: sha1 | |||||
- **licenses** ([bytes]): List of licenses associated to sha1 | |||||
- **tool** (str): nomossa | |||||
conflict_update: Flag to determine if we want to overwrite (true) | |||||
or skip duplicates (false, the default) | |||||
Returns: | |||||
list: content_license entries which failed due to unknown licenses | |||||
""" | |||||
if not all(isinstance(x['id'], bytes) for x in licenses): | |||||
raise TypeError('identifiers must be bytes.') | |||||
self._licenses.add_merge(licenses, conflict_update, 'licenses') | |||||
def content_fossology_license_get_range( | |||||
self, start, end, indexer_configuration_id, limit=1000): | |||||
"""Retrieve licenses within range [start, end] bound by limit. | |||||
Args: | |||||
**start** (bytes): Starting identifier range (expected smaller | |||||
than end) | |||||
**end** (bytes): Ending identifier range (expected larger | |||||
than start) | |||||
**indexer_configuration_id** (int): The tool used to index data | |||||
**limit** (int): Limit result (default to 1000) | |||||
Raises: | |||||
ValueError for limit to None | |||||
Returns: | |||||
a dict with keys: | |||||
- **ids** [bytes]: iterable of content ids within the range. | |||||
- **next** (Optional[bytes]): The next range of sha1 starts at | |||||
this sha1 if any | |||||
""" | |||||
return self._licenses.get_range( | |||||
start, end, indexer_configuration_id, limit) | |||||
def content_metadata_missing(self, metadata): | def content_metadata_missing(self, metadata): | ||||
"""List metadata missing from storage. | """List metadata missing from storage. | ||||
Args: | Args: | ||||
metadata (iterable): dictionaries with keys: | metadata (iterable): dictionaries with keys: | ||||
- **id** (bytes): sha1 identifier | - **id** (bytes): sha1 identifier | ||||
Show All 32 Lines | def content_metadata_add(self, metadata, conflict_update=False): | ||||
- **translated_metadata**: arbitrary dict | - **translated_metadata**: arbitrary dict | ||||
- **indexer_configuration_id**: tool used to compute the | - **indexer_configuration_id**: tool used to compute the | ||||
results | results | ||||
conflict_update: Flag to determine if we want to overwrite (true) | conflict_update: Flag to determine if we want to overwrite (true) | ||||
or skip duplicates (false, the default) | or skip duplicates (false, the default) | ||||
""" | """ | ||||
if not all(isinstance(x['id'], bytes) for x in metadata): | |||||
raise TypeError('identifiers must be bytes.') | |||||
self._content_metadata.add(metadata, conflict_update) | self._content_metadata.add(metadata, conflict_update) | ||||
def revision_metadata_missing(self, metadata): | def revision_metadata_missing(self, metadata): | ||||
"""List metadata missing from storage. | """List metadata missing from storage. | ||||
Args: | Args: | ||||
metadata (iterable): dictionaries with keys: | metadata (iterable): dictionaries with keys: | ||||
Show All 32 Lines | def revision_metadata_add(self, metadata, conflict_update=False): | ||||
- **id**: sha1_git of revision | - **id**: sha1_git of revision | ||||
- **translated_metadata**: arbitrary dict | - **translated_metadata**: arbitrary dict | ||||
- **indexer_configuration_id**: tool used to compute metadata | - **indexer_configuration_id**: tool used to compute metadata | ||||
conflict_update: Flag to determine if we want to overwrite (true) | conflict_update: Flag to determine if we want to overwrite (true) | ||||
or skip duplicates (false, the default) | or skip duplicates (false, the default) | ||||
""" | """ | ||||
if not all(isinstance(x['id'], bytes) for x in metadata): | |||||
raise TypeError('identifiers must be bytes.') | |||||
self._revision_metadata.add(metadata, conflict_update) | self._revision_metadata.add(metadata, conflict_update) | ||||
def indexer_configuration_add(self, tools): | def indexer_configuration_add(self, tools): | ||||
"""Add new tools to the storage. | """Add new tools to the storage. | ||||
Args: | Args: | ||||
tools ([dict]): List of dictionary representing tool to | tools ([dict]): List of dictionary representing tool to | ||||
insert in the db. Dictionary with the following keys: | insert in the db. Dictionary with the following keys: | ||||
▲ Show 20 Lines • Show All 42 Lines • Show Last 20 Lines |