Changeset View
Standalone View
swh/indexer/storage/in_memory.py
# Copyright (C) 2018 The Software Heritage developers | # Copyright (C) 2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | from collections import defaultdict | ||||
import json | import json | ||||
SHA1_DIGEST_SIZE = 160 | |||||
class MetadataStorage: | |||||
"""Implements missing/get/add logic for both content_metadata and | |||||
revision_metadata.""" | |||||
def __init__(self, tools): | |||||
self._tools = tools | |||||
self._metadata = {} # map (id_, tool_id) -> metadata_dict | |||||
self._tools_per_id = defaultdict(set) # map id_ -> Set[tool_id] | |||||
def _transform_tool(self, tool): | def _transform_tool(tool): | ||||
return { | return { | ||||
'id': tool['id'], | 'id': tool['id'], | ||||
'name': tool['tool_name'], | 'name': tool['tool_name'], | ||||
'version': tool['tool_version'], | 'version': tool['tool_version'], | ||||
'configuration': tool['tool_configuration'], | 'configuration': tool['tool_configuration'], | ||||
} | } | ||||
class SubStorage: | |||||
"""Implements common missing/get/add logic for each indexer type.""" | |||||
def __init__(self, tools, columns): | |||||
self._columns = columns | |||||
self._tools = tools | |||||
self._data = {} # map (id_, tool_id) -> metadata_dict | |||||
self._tools_per_id = defaultdict(set) # map id_ -> Set[tool_id] | |||||
def missing(self, ids): | def missing(self, ids): | ||||
"""List metadata missing from storage. | """List data missing from storage. | ||||
Args: | Args: | ||||
metadata (iterable): dictionaries with keys: | data (iterable): dictionaries with keys: | ||||
- **id** (bytes): sha1 identifier | - **id** (bytes): sha1 identifier | ||||
- **indexer_configuration_id** (int): tool used to compute | - **indexer_configuration_id** (int): tool used to compute | ||||
the results | the results | ||||
Yields: | Yields: | ||||
missing sha1s | missing sha1s | ||||
""" | """ | ||||
for id_ in ids: | for id_ in ids: | ||||
tool_id = id_['indexer_configuration_id'] | tool_id = id_['indexer_configuration_id'] | ||||
id_ = id_['id'] | id_ = id_['id'] | ||||
if tool_id not in self._tools_per_id.get(id_, set()): | if tool_id not in self._tools_per_id.get(id_, set()): | ||||
yield id_ | yield id_ | ||||
def get(self, ids): | def get(self, ids): | ||||
"""Retrieve metadata per id. | """Retrieve data per id. | ||||
Args: | Args: | ||||
ids (iterable): sha1 checksums | ids (iterable): sha1 checksums | ||||
Yields: | Yields: | ||||
dict: dictionaries with the following keys: | dict: dictionaries with the following keys: | ||||
- **id** (bytes) | - **id** (bytes) | ||||
- **translated_metadata** (str): associated metadata | |||||
- **tool** (dict): tool used to compute metadata | - **tool** (dict): tool used to compute metadata | ||||
- arbitrary data (as provided to `add`) | |||||
""" | """ | ||||
for id_ in ids: | for id_ in ids: | ||||
for tool_id in self._tools_per_id.get(id_, set()): | for tool_id in self._tools_per_id.get(id_, set()): | ||||
key = (id_, tool_id) | key = (id_, tool_id) | ||||
yield { | yield { | ||||
'id': id_, | 'id': id_, | ||||
'tool': self._transform_tool(self._tools[tool_id]), | 'tool': _transform_tool(self._tools[tool_id]), | ||||
'translated_metadata': self._metadata[key], | **self._data[key], | ||||
} | } | ||||
def add(self, metadata, conflict_update): | def add(self, data, conflict_update): | ||||
"""Add metadata not present in storage. | """Add data not present in storage. | ||||
Args: | Args: | ||||
metadata (iterable): dictionaries with keys: | data (iterable): dictionaries with keys: | ||||
- **id**: sha1 | - **id**: sha1 | ||||
- **translated_metadata**: arbitrary dict | |||||
- **indexer_configuration_id**: tool used to compute the | - **indexer_configuration_id**: tool used to compute the | ||||
results | results | ||||
- arbitrary data | |||||
conflict_update (bool): Flag to determine if we want to overwrite | conflict_update (bool): Flag to determine if we want to overwrite | ||||
(true) or skip duplicates (false) | (true) or skip duplicates (false) | ||||
""" | """ | ||||
for item in metadata: | for item in data: | ||||
tool_id = item['indexer_configuration_id'] | item = item.copy() | ||||
data = item['translated_metadata'] | tool_id = item.pop('indexer_configuration_id') | ||||
id_ = item['id'] | id_ = item.pop('id') | ||||
data = item | |||||
if not conflict_update and \ | if not conflict_update and \ | ||||
tool_id in self._tools_per_id.get(id_, set()): | tool_id in self._tools_per_id.get(id_, set()): | ||||
# Duplicate, should not be updated | # Duplicate, should not be updated | ||||
continue | continue | ||||
key = (id_, tool_id) | key = (id_, tool_id) | ||||
self._metadata[key] = data | self._data[key] = data | ||||
self._tools_per_id[id_].add(tool_id) | self._tools_per_id[id_].add(tool_id) | ||||
class IndexerStorage: | class IndexerStorage: | ||||
"""In-memory SWH indexer storage.""" | """In-memory SWH indexer storage.""" | ||||
def __init__(self): | def __init__(self): | ||||
self._tools = {} | self._tools = {} | ||||
self._content_metadata = MetadataStorage(self._tools) | self._content_ctags = SubStorage( | ||||
self._revision_metadata = MetadataStorage(self._tools) | self._tools, ['name', 'kind', 'lang']) | ||||
self._content_metadata = SubStorage(self._tools, ['metadata']) | |||||
self._revision_metadata = SubStorage(self._tools, ['metadata']) | |||||
def content_ctags_missing(self, ctags): | |||||
"""List ctags missing from storage. | |||||
Args: | |||||
ctags (iterable): dicts with keys: | |||||
- **id** (bytes): sha1 identifier | |||||
- **indexer_configuration_id** (int): tool used to compute | |||||
the results | |||||
Yields: | |||||
an iterable of missing id for the tuple (id, | |||||
indexer_configuration_id) | |||||
""" | |||||
yield from self._content_ctags.missing(ctags) | |||||
def content_ctags_get(self, ids): | |||||
"""Retrieve ctags per id. | |||||
Args: | |||||
ids (iterable): sha1 checksums | |||||
Yields: | |||||
Dictionaries with keys: | |||||
- **id** (bytes): content's identifier | |||||
- **name** (str): symbol's name | |||||
- **kind** (str): symbol's kind | |||||
- **lang** (str): language for that content | |||||
- **tool** (dict): tool used to compute the ctags' info | |||||
""" | |||||
for item in self._content_ctags.get(ids): | |||||
for item_ctags_item in item['ctags']: | |||||
yield { | |||||
'id': item['id'], | |||||
'tool': item['tool'], | |||||
**item_ctags_item | |||||
} | |||||
def content_ctags_add(self, ctags, conflict_update=False): | |||||
"""Add ctags not present in storage | |||||
Args: | |||||
ctags (iterable): dictionaries with keys: | |||||
- **id** (bytes): sha1 | |||||
- **ctags** ([list): List of dictionary with keys: name, kind, | |||||
line, lang | |||||
- **indexer_configuration_id**: tool used to compute the | |||||
results | |||||
""" | |||||
for item in ctags: | |||||
tool_id = item['indexer_configuration_id'] | |||||
if conflict_update: | |||||
item_ctags = [] | |||||
else: | |||||
ardumont: This is not pointless.
This is an implementation detail from the indexer storage.
I expected… | |||||
Done Inline Actions
ctags implementations are registered as tools, and rows from different tools do not conflict with each other: create unique index on content_ctags(id, hash_sha1(name), kind, line, lang, indexer_configuration_id); vlorentz: > I expected the multiple ctags implementations (universal, exuberant, etc...) to be… | |||||
Not Done Inline Actions
Yes, i did that. What i meant was for the case same tool, same content, the computed data is the same. The also supposedly gain here is that there is no writes operation with this approach. So it's supposedly faster (we'd need metric to ensure that ;). Against what you proposed which would always write. Hoping this is clearer. ardumont: > as tools, and rows from different tools do not conflict with each other from different tools… | |||||
# TODO: this merges old ctags with new ctags. This is | |||||
# pointless, new ctags should replace the old ones. | |||||
existing = list(self._content_ctags.get([item['id']])) | |||||
item_ctags = [ | |||||
{ | |||||
key: ctags_item[key] | |||||
for key in ('name', 'kind', 'line', 'lang') | |||||
} | |||||
for existing_item in existing | |||||
if existing_item['tool']['id'] == tool_id | |||||
for ctags_item in existing_item['ctags'] | |||||
] | |||||
for new_item_ctags in item['ctags']: | |||||
if new_item_ctags not in item_ctags: | |||||
item_ctags.append(new_item_ctags) | |||||
self._content_ctags.add([ | |||||
{ | |||||
'id': item['id'], | |||||
'indexer_configuration_id': tool_id, | |||||
'ctags': item_ctags, | |||||
} | |||||
], conflict_update=True) | |||||
def content_ctags_search(self, expression, | |||||
limit=10, last_sha1=None, db=None, cur=None): | |||||
"""Search through content's raw ctags symbols. | |||||
Args: | |||||
expression (str): Expression to search for | |||||
limit (int): Number of rows to return (default to 10). | |||||
last_sha1 (str): Offset from which retrieving data (default to ''). | |||||
Yields: | |||||
rows of ctags including id, name, lang, kind, line, etc... | |||||
""" | |||||
nb_matches = 0 | |||||
for ((id_, tool_id), item) in \ | |||||
sorted(self._content_ctags._data.items()): | |||||
if id_ <= (last_sha1 or bytes(0 for _ in range(SHA1_DIGEST_SIZE))): | |||||
continue | |||||
nb_matches += 1 | |||||
for ctags_item in item['ctags']: | |||||
if ctags_item['name'] != expression: | |||||
continue | |||||
yield { | |||||
'id': id_, | |||||
'tool': _transform_tool(self._tools[tool_id]), | |||||
**ctags_item | |||||
} | |||||
if nb_matches >= limit: | |||||
return | |||||
def content_metadata_missing(self, metadata): | def content_metadata_missing(self, metadata): | ||||
"""List metadata missing from storage. | """List metadata missing from storage. | ||||
Args: | Args: | ||||
metadata (iterable): dictionaries with keys: | metadata (iterable): dictionaries with keys: | ||||
- **id** (bytes): sha1 identifier | - **id** (bytes): sha1 identifier | ||||
▲ Show 20 Lines • Show All 138 Lines • Show Last 20 Lines |
This is not pointless.
This is an implementation detail from the indexer storage.
I expected the multiple ctags implementations (universal, exuberant, etc...) to be idempotent in their computations (still do).
So in the indexer storage, the function that add those data simply ignore the conflicted data (which should be exactly the same as before). In the end, only read operations are expected when we pass yet again on the same content.
Why were we expected to pass on the same content, you might ask?
Because not so long ago, the indexers were a pipeline. Thus, adding a new indexer would have triggered such behavior.
Because orchestrator would have broadcast yet again same contents to the new and possibly the other indexers as we..
As it's an implementation detail, in theory, you could implement this as you wish here as long as tests are fine ;)