Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/indexer.py
Show All 32 Lines | class DiskIndexer: | ||||
""" | """ | ||||
def write_to_temp(self, filename, data): | def write_to_temp(self, filename, data): | ||||
"""Write the sha1's content in a temporary file. | """Write the sha1's content in a temporary file. | ||||
Args: | Args: | ||||
filename (str): one of sha1's many filenames | filename (str): one of sha1's many filenames | ||||
data (bytes): the sha1's content to write in temporary | data (bytes): the sha1's content to write in temporary | ||||
file | file | ||||
Returns: | Returns: | ||||
The path to the temporary file created. That file is | The path to the temporary file created. That file is | ||||
filled in with the raw content's data. | filled in with the raw content's data. | ||||
""" | """ | ||||
os.makedirs(self.working_directory, exist_ok=True) | os.makedirs(self.working_directory, exist_ok=True) | ||||
temp_dir = tempfile.mkdtemp(dir=self.working_directory) | temp_dir = tempfile.mkdtemp(dir=self.working_directory) | ||||
Show All 27 Lines | class BaseIndexer(SWHConfig, metaclass=abc.ABCMeta): | ||||
- filter out ids whose data has already been indexed. | - filter out ids whose data has already been indexed. | ||||
- retrieve ids data from storage or objstorage | - retrieve ids data from storage or objstorage | ||||
- index this data depending on the object and store the result in | - index this data depending on the object and store the result in | ||||
storage. | storage. | ||||
To implement a new object type indexer, inherit from the | To implement a new object type indexer, inherit from the | ||||
BaseIndexer and implement indexing: | BaseIndexer and implement indexing: | ||||
:func:`run`: | :meth:`.run`: | ||||
object_ids are different depending on object. For example: sha1 for | object_ids are different depending on object. For example: sha1 for | ||||
content, sha1_git for revision, directory, release, and id for origin | content, sha1_git for revision, directory, release, and id for origin | ||||
To implement a new concrete indexer, inherit from the object level | To implement a new concrete indexer, inherit from the object level | ||||
classes: :class:`ContentIndexer`, :class:`RevisionIndexer`, | classes: :class:`ContentIndexer`, :class:`RevisionIndexer`, | ||||
:class:`OriginIndexer`. | :class:`OriginIndexer`. | ||||
Then you need to implement the following functions: | Then you need to implement the following functions: | ||||
:func:`filter`: | :meth:`.filter`: | ||||
filter out data already indexed (in storage). | filter out data already indexed (in storage). | ||||
:func:`index_object`: | :meth:`.index_object`: | ||||
compute index on id with data (retrieved from the storage or the | compute index on id with data (retrieved from the storage or the | ||||
objstorage by the id key) and return the resulting index computation. | objstorage by the id key) and return the resulting index computation. | ||||
:func:`persist_index_computations`: | :meth:`.persist_index_computations`: | ||||
persist the results of multiple index computations in the storage. | persist the results of multiple index computations in the storage. | ||||
The new indexer implementation can also override the following functions: | The new indexer implementation can also override the following functions: | ||||
:func:`prepare`: | :meth:`.prepare`: | ||||
Configuration preparation for the indexer. When overriding, this must | Configuration preparation for the indexer. When overriding, this must | ||||
call the `super().prepare()` instruction. | call the `super().prepare()` instruction. | ||||
:func:`check`: | :meth:`.check`: | ||||
Configuration check for the indexer. When overriding, this must call the | Configuration check for the indexer. When overriding, this must call the | ||||
`super().check()` instruction. | `super().check()` instruction. | ||||
:func:`register_tools`: | :meth:`.register_tools`: | ||||
This should return a dict of the tool(s) to use when indexing or | This should return a dict of the tool(s) to use when indexing or | ||||
filtering. | filtering. | ||||
""" | """ | ||||
CONFIG = 'indexer/base' | CONFIG = 'indexer/base' | ||||
DEFAULT_CONFIG = { | DEFAULT_CONFIG = { | ||||
INDEXER_CFG_KEY: ('dict', { | INDEXER_CFG_KEY: ('dict', { | ||||
▲ Show 20 Lines • Show All 71 Lines • ▼ Show 20 Lines | def register_tools(self, tools): | ||||
Expects the self.config['tools'] property to be set with | Expects the self.config['tools'] property to be set with | ||||
one or more tools. | one or more tools. | ||||
Args: | Args: | ||||
tools (dict/[dict]): Either a dict or a list of dict. | tools (dict/[dict]): Either a dict or a list of dict. | ||||
Returns: | Returns: | ||||
List of dict with additional id key. | list: List of dicts with additional id key. | ||||
Raises: | Raises: | ||||
ValueError if not a list nor a dict. | ValueError: if not a list nor a dict. | ||||
""" | """ | ||||
if isinstance(tools, list): | if isinstance(tools, list): | ||||
tools = list(map(self._prepare_tool, tools)) | tools = list(map(self._prepare_tool, tools)) | ||||
elif isinstance(tools, dict): | elif isinstance(tools, dict): | ||||
tools = [self._prepare_tool(tools)] | tools = [self._prepare_tool(tools)] | ||||
else: | else: | ||||
raise ValueError('Configuration tool(s) must be a dict or list!') | raise ValueError('Configuration tool(s) must be a dict or list!') | ||||
if tools: | if tools: | ||||
return self.idx_storage.indexer_configuration_add(tools) | return self.idx_storage.indexer_configuration_add(tools) | ||||
else: | else: | ||||
return [] | return [] | ||||
@abc.abstractmethod | @abc.abstractmethod | ||||
def index(self, id, data): | def index(self, id, data): | ||||
"""Index computation for the id and associated raw data. | """Index computation for the id and associated raw data. | ||||
Args: | Args: | ||||
id (bytes): identifier | id (bytes): identifier | ||||
data (bytes): id's data from storage or objstorage depending on | data (bytes): id's data from storage or objstorage depending on | ||||
object type | object type | ||||
Returns: | Returns: | ||||
a dict that makes sense for the persist_index_computations | dict: a dict that makes sense for the | ||||
function. | :meth:`.persist_index_computations` method. | ||||
""" | """ | ||||
pass | pass | ||||
@abc.abstractmethod | @abc.abstractmethod | ||||
def persist_index_computations(self, results, policy_update): | def persist_index_computations(self, results, policy_update): | ||||
"""Persist the computation resulting from the index. | """Persist the computation resulting from the index. | ||||
Args: | Args: | ||||
results ([result]): List of results. One result is the | results ([result]): List of results. One result is the | ||||
result of the index function. | result of the index function. | ||||
policy_update ([str]): either 'update-dups' or 'ignore-dups' to | policy_update ([str]): either 'update-dups' or 'ignore-dups' to | ||||
respectively update duplicates or ignore | respectively update duplicates or ignore them | ||||
them | |||||
Returns: | Returns: | ||||
None | None | ||||
""" | """ | ||||
pass | pass | ||||
def next_step(self, results, task): | def next_step(self, results, task): | ||||
"""Do something else with computations results (e.g. send to another | """Do something else with computations results (e.g. send to another | ||||
queue, ...). | queue, ...). | ||||
(This is not an abstractmethod since it is optional). | (This is not an abstractmethod since it is optional). | ||||
Args: | Args: | ||||
results ([result]): List of results (dict) as returned | results ([result]): List of results (dict) as returned | ||||
by index function. | by index function. | ||||
task (dict): a dict in the form expected by | task (dict): a dict in the form expected by | ||||
`scheduler.backend.SchedulerBackend.create_tasks` | `scheduler.backend.SchedulerBackend.create_tasks` | ||||
without `next_run`, plus an optional `result_name` key. | without `next_run`, plus an optional `result_name` key. | ||||
Returns: | Returns: | ||||
None | None | ||||
""" | """ | ||||
if task: | if task: | ||||
if getattr(self, 'scheduler', None): | if getattr(self, 'scheduler', None): | ||||
scheduler = self.scheduler | scheduler = self.scheduler | ||||
Show All 13 Lines | def run(self, ids, policy_update, | ||||
- retrieves the data from the storage | - retrieves the data from the storage | ||||
- executes the indexing computations | - executes the indexing computations | ||||
- stores the results (according to policy_update) | - stores the results (according to policy_update) | ||||
Args: | Args: | ||||
ids ([bytes]): id's identifier list | ids ([bytes]): id's identifier list | ||||
policy_update (str): either 'update-dups' or 'ignore-dups' to | policy_update (str): either 'update-dups' or 'ignore-dups' to | ||||
respectively update duplicates or ignore them | respectively update duplicates or ignore them | ||||
next_step (dict): a dict in the form expected by | next_step (dict): a dict in the form expected by | ||||
`scheduler.backend.SchedulerBackend.create_tasks` | `scheduler.backend.SchedulerBackend.create_tasks` | ||||
without `next_run`, plus a `result_name` key. | without `next_run`, plus a `result_name` key. | ||||
**kwargs: passed to the `index` method | **kwargs: passed to the `index` method | ||||
""" | """ | ||||
pass | pass | ||||
class ContentIndexer(BaseIndexer): | class ContentIndexer(BaseIndexer): | ||||
"""A content indexer working on a list of ids directly. | """A content indexer working on a list of ids directly. | ||||
▲ Show 20 Lines • Show All 70 Lines • ▼ Show 20 Lines | class ContentRangeIndexer(BaseIndexer): | ||||
object. To use it, one should inherit from this class and override | object. To use it, one should inherit from this class and override | ||||
the methods mentioned in the :class:`BaseIndexer` class. | the methods mentioned in the :class:`BaseIndexer` class. | ||||
""" | """ | ||||
@abc.abstractmethod | @abc.abstractmethod | ||||
def indexed_contents_in_range(self, start, end): | def indexed_contents_in_range(self, start, end): | ||||
"""Retrieve indexed contents within range [start, end]. | """Retrieve indexed contents within range [start, end]. | ||||
Args | Args: | ||||
**start** (bytes): Starting bound from range identifier | start (bytes): Starting bound from range identifier | ||||
**end** (bytes): End range identifier | end (bytes): End range identifier | ||||
Yields: | Yields: | ||||
Content identifier (bytes) present in the range [start, end] | bytes: Content identifier present in the range ``[start, end]`` | ||||
""" | """ | ||||
pass | pass | ||||
def _list_contents_to_index(self, start, end, indexed): | def _list_contents_to_index(self, start, end, indexed): | ||||
"""Compute from storage the new contents to index in the range [start, | """Compute from storage the new contents to index in the range [start, | ||||
end]. The already indexed contents are skipped. | end]. The already indexed contents are skipped. | ||||
Args: | Args: | ||||
**start** (bytes): Starting bound from range identifier | start (bytes): Starting bound from range identifier | ||||
**end** (bytes): End range identifier | end (bytes): End range identifier | ||||
**indexed** (Set[bytes]): Set of content already indexed. | indexed (Set[bytes]): Set of content already indexed. | ||||
Yields: | Yields: | ||||
Identifier (bytes) of contents to index. | bytes: Identifier of contents to index. | ||||
""" | """ | ||||
while start: | while start: | ||||
result = self.storage.content_get_range(start, end) | result = self.storage.content_get_range(start, end) | ||||
contents = result['contents'] | contents = result['contents'] | ||||
for c in contents: | for c in contents: | ||||
_id = c['sha1'] | _id = c['sha1'] | ||||
if _id in indexed: | if _id in indexed: | ||||
continue | continue | ||||
yield _id | yield _id | ||||
start = result['next'] | start = result['next'] | ||||
def _index_contents(self, start, end, indexed, **kwargs): | def _index_contents(self, start, end, indexed, **kwargs): | ||||
"""Index the contents from within range [start, end] | """Index the contents from within range [start, end] | ||||
Args: | Args: | ||||
**start** (bytes): Starting bound from range identifier | start (bytes): Starting bound from range identifier | ||||
**end** (bytes): End range identifier | end (bytes): End range identifier | ||||
**indexed** (Set[bytes]): Set of content already indexed. | indexed (Set[bytes]): Set of content already indexed. | ||||
Yields: | Yields: | ||||
Data indexed (dict) to persist using the indexer storage | dict: Data indexed to persist using the indexer storage | ||||
""" | """ | ||||
for sha1 in self._list_contents_to_index(start, end, indexed): | for sha1 in self._list_contents_to_index(start, end, indexed): | ||||
try: | try: | ||||
raw_content = self.objstorage.get(sha1) | raw_content = self.objstorage.get(sha1) | ||||
except ObjNotFoundError: | except ObjNotFoundError: | ||||
self.log.warning('Content %s not found in objstorage' % | self.log.warning('Content %s not found in objstorage' % | ||||
hashutil.hash_to_hex(sha1)) | hashutil.hash_to_hex(sha1)) | ||||
continue | continue | ||||
res = self.index(sha1, raw_content, **kwargs) | res = self.index(sha1, raw_content, **kwargs) | ||||
if res: | if res: | ||||
yield res | yield res | ||||
def _index_with_skipping_already_done(self, start, end): | def _index_with_skipping_already_done(self, start, end): | ||||
"""Index not already indexed contents in range [start, end]. | """Index not already indexed contents in range [start, end]. | ||||
Args: | Args: | ||||
**start** (Union[bytes, str]): Starting range identifier | start** (Union[bytes, str]): Starting range identifier | ||||
**end** (Union[bytes, str]): Ending range identifier | end (Union[bytes, str]): Ending range identifier | ||||
Yields: | Yields: | ||||
Content identifier (bytes) present in the range [start, | bytes: Content identifier present in the range | ||||
end] which are not already indexed. | ``[start, end]`` which are not already indexed. | ||||
""" | """ | ||||
while start: | while start: | ||||
indexed_page = self.indexed_contents_in_range(start, end) | indexed_page = self.indexed_contents_in_range(start, end) | ||||
contents = indexed_page['ids'] | contents = indexed_page['ids'] | ||||
_end = contents[-1] if contents else end | _end = contents[-1] if contents else end | ||||
yield from self._index_contents( | yield from self._index_contents( | ||||
start, _end, contents) | start, _end, contents) | ||||
start = indexed_page['next'] | start = indexed_page['next'] | ||||
def run(self, start, end, skip_existing=True, **kwargs): | def run(self, start, end, skip_existing=True, **kwargs): | ||||
"""Given a range of content ids, compute the indexing computations on | """Given a range of content ids, compute the indexing computations on | ||||
the contents within. Either the indexer is incremental | the contents within. Either the indexer is incremental | ||||
(filter out existing computed data) or not (compute | (filter out existing computed data) or not (compute | ||||
everything from scratch). | everything from scratch). | ||||
Args: | Args: | ||||
**start** (Union[bytes, str]): Starting range identifier | start (Union[bytes, str]): Starting range identifier | ||||
**end** (Union[bytes, str]): Ending range identifier | end (Union[bytes, str]): Ending range identifier | ||||
**skip_existing** (bool): Skip existing indexed data | skip_existing (bool): Skip existing indexed data | ||||
(default) or not | (default) or not | ||||
**kwargs: passed to the `index` method | **kwargs: passed to the `index` method | ||||
Returns: | Returns: | ||||
a boolean. True if data was indexed, False otherwise. | bool: True if data was indexed, False otherwise. | ||||
""" | """ | ||||
with_indexed_data = False | with_indexed_data = False | ||||
try: | try: | ||||
if isinstance(start, str): | if isinstance(start, str): | ||||
start = hashutil.hash_to_bytes(start) | start = hashutil.hash_to_bytes(start) | ||||
if isinstance(end, str): | if isinstance(end, str): | ||||
end = hashutil.hash_to_bytes(end) | end = hashutil.hash_to_bytes(end) | ||||
Show All 30 Lines | def run(self, ids, policy_update='update-dups', parse_ids=True, | ||||
"""Given a list of origin ids: | """Given a list of origin ids: | ||||
- retrieve origins from storage | - retrieve origins from storage | ||||
- execute the indexing computations | - execute the indexing computations | ||||
- store the results (according to policy_update) | - store the results (according to policy_update) | ||||
Args: | Args: | ||||
ids ([Union[int, Tuple[str, bytes]]]): list of origin ids or | ids ([Union[int, Tuple[str, bytes]]]): list of origin ids or | ||||
(type, url) tuples. | (type, url) tuples. | ||||
policy_update (str): either 'update-dups' or 'ignore-dups' to | policy_update (str): either 'update-dups' or 'ignore-dups' to | ||||
respectively update duplicates (default) | respectively update duplicates (default) or ignore them | ||||
or ignore them | |||||
next_step (dict): a dict in the form expected by | next_step (dict): a dict in the form expected by | ||||
`scheduler.backend.SchedulerBackend.create_tasks` | `scheduler.backend.SchedulerBackend.create_tasks` without | ||||
without `next_run`, plus an optional `result_name` key. | `next_run`, plus an optional `result_name` key. | ||||
parse_ids (bool): Do we need to parse id or not (default) | parse_ids (bool): Do we need to parse id or not (default) | ||||
**kwargs: passed to the `index` method | **kwargs: passed to the `index` method | ||||
""" | """ | ||||
if parse_ids: | if parse_ids: | ||||
ids = [o.split('+', 1) if ':' in o else int(o) # type+url or id | ids = [o.split('+', 1) if ':' in o else int(o) # type+url or id | ||||
for o in ids] | for o in ids] | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | def run(self, ids, policy_update, next_step=None): | ||||
- retrieve revisions from storage | - retrieve revisions from storage | ||||
- execute the indexing computations | - execute the indexing computations | ||||
- store the results (according to policy_update) | - store the results (according to policy_update) | ||||
Args: | Args: | ||||
ids ([bytes or str]): sha1_git's identifier list | ids ([bytes or str]): sha1_git's identifier list | ||||
policy_update (str): either 'update-dups' or 'ignore-dups' to | policy_update (str): either 'update-dups' or 'ignore-dups' to | ||||
respectively update duplicates or ignore | respectively update duplicates or ignore them | ||||
them | |||||
""" | """ | ||||
results = [] | results = [] | ||||
ids = [hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ | ids = [hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ | ||||
for id_ in ids] | for id_ in ids] | ||||
revs = self.storage.revision_get(ids) | revs = self.storage.revision_get(ids) | ||||
for rev in revs: | for rev in revs: | ||||
Show All 14 Lines |