diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -16,8 +16,8 @@ from swh.core.config import SWHConfig from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError -from swh.model import hashutil from swh.indexer.storage import get_indexer_storage, INDEXER_CFG_KEY +from swh.model import hashutil class DiskIndexer: @@ -213,19 +213,6 @@ return self.idx_storage.indexer_configuration_add(tools) - @abc.abstractmethod - def filter(self, ids): - """Filter missing ids for that particular indexer. - - Args: - ids ([bytes]): list of ids - - Yields: - iterator of missing ids - - """ - pass - @abc.abstractmethod def index(self, id, data): """Index computation for the id and associated raw data. @@ -311,15 +298,28 @@ class ContentIndexer(BaseIndexer): - """An object type indexer, inherits from the :class:`BaseIndexer` and - implements Content indexing using the run method + """A content indexer working on a list of ids directly. + + To work on indexer range, use the :class:`ContentRangeIndexer` + instead. - Note: the :class:`ContentIndexer` is not an instantiable - object. To use it in another context, one should inherit from this - class and override the methods mentioned in the - :class:`BaseIndexer` class. + Note: :class:`ContentIndexer` is not an instantiable object. To + use it, one should inherit from this class and override the + methods mentioned in the :class:`BaseIndexer` class. """ + @abc.abstractmethod + def filter(self, ids): + """Filter missing ids for that particular indexer. + + Args: + ids ([bytes]): list of ids + + Yields: + iterator of missing ids + + """ + pass def run(self, ids, policy_update, next_step=None, **kwargs): @@ -361,6 +361,96 @@ 'Problem when reading contents metadata.') +class ContentRangeIndexer(BaseIndexer): + """A content range indexer. + + This expects as input a range of ids to index. + + To work on a list of ids, use the :class:`ContentIndexer` instead. + + Note: :class:`ContentRangeIndexer` is not an instantiable + object. To use it, one should inherit from this class and override + the methods mentioned in the :class:`BaseIndexer` class. + + """ + @abc.abstractmethod + def indexed_contents_in_range(self, start, end): + """Retrieve indexed contents within range [start, end]. + + Args + **start** (bytes): Starting bound from range identifier + **end** (bytes): End range identifier + + Yields: + Content identifier (bytes) present in the range [start, end] + + """ + pass + + def list_contents_to_index(self, start, end, indexed): + """Compute from storage the new contents to index in the range [start, + end]. The already indexed contents are skipped. + + Args: + **start** (bytes): Starting bound from range identifier + **end** (bytes): End range identifier + **indexed** (Set[bytes]): Set of content already indexed. + + """ + while start: + result = self.storage.content_get_range(start, end) + contents = result['contents'] + for c in contents: + _id = c['sha1'] + if _id in indexed: + continue + yield _id + start = result['next'] + + def run(self, start, end, policy_update, **kwargs): + """Given a range of content ids, compute the indexing computation on + the contents within. Either only new ones (policy_update to + 'update-dups') or all (policy_update to 'ignore-dups'. + + Args: + **start** (bytes/str): Starting range identifier + **end** (bytes/str): Ending range identifier + **policy_update** (str): either 'update-dups' to do all + contents, or 'ignore-dups' to + only compute new ones + **kwargs: passed to the `index` method + + """ + results = [] + try: + if isinstance(start, str): + start = hashutil.hash_to_bytes(start) + if isinstance(end, str): + end = hashutil.hash_to_bytes(end) + + if policy_update == 'update-dups': # incremental + indexed = set(self.indexed_contents_in_range(start, end)) + else: + indexed = set() + + for sha1 in self.list_contents_to_index(start, end, indexed): + try: + raw_content = self.objstorage.get(sha1) + except ObjNotFoundError: + self.log.warning('Content %s not found in objstorage' % + hashutil.hash_to_hex(sha1)) + continue + res = self.index(sha1, raw_content, **kwargs) + if res: # If no results, skip it + results.append(res) + + self.persist_index_computations(results, policy_update) + return results + except Exception: + self.log.exception( + 'Problem when computing metadata.') + + class OriginIndexer(BaseIndexer): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Origin indexing using the run method diff --git a/swh/indexer/mimetype.py b/swh/indexer/mimetype.py --- a/swh/indexer/mimetype.py +++ b/swh/indexer/mimetype.py @@ -9,7 +9,7 @@ from swh.model import hashutil from swh.scheduler import get_scheduler -from .indexer import ContentIndexer +from .indexer import ContentIndexer, ContentRangeIndexer def compute_mimetype_encoding(raw_content): @@ -30,22 +30,19 @@ } -class ContentMimetypeIndexer(ContentIndexer): - """Indexer in charge of: +class MixinMimetypeIndexer: + """Mixin mimetype indexer. - - filtering out content already indexed - - reading content from objstorage per the content's id (sha1) - - computing {mimetype, encoding} from that content - - store result in storage + See :class:`ContentMimetypeIndexer` and :class:`MimetypeRangeIndexer` """ ADDITIONAL_CONFIG = { - 'scheduler': { + 'scheduler': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5008', }, - }, + }), 'tools': ('dict', { 'name': 'file', 'version': '1:5.30-1+deb9u1', @@ -63,17 +60,6 @@ self.scheduler = get_scheduler(**self.config['scheduler']) self.tool = self.tools[0] - def filter(self, ids): - """Filter out known sha1s and return only missing ones. - - """ - yield from self.idx_storage.content_mimetype_missing(( - { - 'id': sha1, - 'indexer_configuration_id': self.tool['id'], - } for sha1 in ids - )) - def index(self, id, data): """Index sha1s' content and store result. @@ -121,6 +107,64 @@ results, conflict_update=(policy_update == 'update-dups')) +class ContentMimetypeIndexer(MixinMimetypeIndexer, ContentIndexer): + """Mimetype Indexer working on list of content identifiers. + + It: + - (optionally) filters out content already indexed (cf. :callable:`filter`) + - reads content from objstorage per the content's id (sha1) + - computes {mimetype, encoding} from that content + - stores result in storage + + FIXME: + - 1. Rename redundant ContentMimetypeIndexer to MimetypeIndexer + - 2. Do we keep it afterwards? ~> i think this can be used with the journal + + """ + def filter(self, ids): + """Filter out known sha1s and return only missing ones. + + """ + yield from self.idx_storage.content_mimetype_missing(( + { + 'id': sha1, + 'indexer_configuration_id': self.tool['id'], + } for sha1 in ids + )) + + +class MimetypeRangeIndexer(MixinMimetypeIndexer, ContentRangeIndexer): + """Mimetype Range Indexer working on range of content identifiers. + + It: + - (optionally) filters out content already indexed (cf :callable:`range`) + - reads content from objstorage per the content's id (sha1) + - computes {mimetype, encoding} from that content + - stores result in storage + + """ + def indexed_contents_in_range(self, start, end): + """Retrieve indexed content id within range [start, end]. + + Args + **start** (bytes): Starting bound from range identifier + **end** (bytes): End range identifier + + Yields: + Content identifier (bytes) present in the range [start, end] + + """ + while start: + result = self.idx_storage.content_mimetype_get_range( + start, end, self.tool['id']) + contents = result['ids'] + for _id in contents: + yield _id + start = result['next'] + if start is None: + break + + @click.command() @click.option('--path', help="Path to execute index on") def main(path): diff --git a/swh/indexer/storage/api/server.py b/swh/indexer/storage/api/server.py --- a/swh/indexer/storage/api/server.py +++ b/swh/indexer/storage/api/server.py @@ -9,9 +9,9 @@ from swh.core import config from swh.core.api import (SWHServerAPIApp, error_handler, encode_data_server as encode_data) -from swh.indexer.storage import get_indexer_storage, INDEXER_CFG_KEY - -from .. import IndexerStorage +from swh.indexer.storage import ( + get_indexer_storage, INDEXER_CFG_KEY, IndexerStorage +) DEFAULT_CONFIG_PATH = 'storage/indexer' diff --git a/swh/indexer/tasks.py b/swh/indexer/tasks.py --- a/swh/indexer/tasks.py +++ b/swh/indexer/tasks.py @@ -7,7 +7,7 @@ from swh.scheduler.task import Task as SchedulerTask -from .mimetype import ContentMimetypeIndexer +from .mimetype import ContentMimetypeIndexer, MimetypeRangeIndexer from .language import ContentLanguageIndexer from .ctags import CtagsIndexer from .fossology_license import ContentFossologyLicenseIndexer @@ -19,6 +19,9 @@ class Task(SchedulerTask): + """Task whose results is needed for other computations. + + """ def run_task(self, *args, **kwargs): indexer = self.Indexer().run(*args, **kwargs) if hasattr(indexer, 'results'): # indexer tasks @@ -26,6 +29,15 @@ return indexer +class StatusTask(SchedulerTask): + """Task which returns a status either eventful or uneventful. + + """ + def run_task(self, *args, **kwargs): + results = self.Indexer().run(*args, **kwargs) + return {'status': 'eventful' if results else 'uneventful'} + + class RevisionMetadata(Task): task_queue = 'swh_indexer_revision_metadata' @@ -46,23 +58,29 @@ Indexer = OriginHeadIndexer -class ContentMimetype(Task): - """Task which computes the mimetype, encoding from the sha1's content. +class ContentMimetype(StatusTask): + """Compute (mimetype, encoding) from the sha1's content. """ task_queue = 'swh_indexer_content_mimetype' - Indexer = ContentMimetypeIndexer +class ContentRangeMimetype(StatusTask): + """Compute (mimetype, encoding) on a range of sha1s. + + """ + task_queue = 'swh_indexer_content_mimetype_range' + Indexer = MimetypeRangeIndexer + + class ContentLanguage(Task): """Task which computes the language from the sha1's content. """ task_queue = 'swh_indexer_content_language' - def run_task(self, *args, **kwargs): - ContentLanguageIndexer().run(*args, **kwargs) + Indexer = ContentLanguageIndexer class Ctags(Task): diff --git a/swh/indexer/tests/storage/__init__.py b/swh/indexer/tests/storage/__init__.py --- a/swh/indexer/tests/storage/__init__.py +++ b/swh/indexer/tests/storage/__init__.py @@ -10,7 +10,6 @@ from hypothesis.strategies import (composite, sets, one_of, uuids, tuples, sampled_from) - SQL_DIR = path.join(path.dirname(swh.indexer.__file__), 'sql') diff --git a/swh/indexer/tests/test_mimetype.py b/swh/indexer/tests/test_mimetype.py --- a/swh/indexer/tests/test_mimetype.py +++ b/swh/indexer/tests/test_mimetype.py @@ -6,19 +6,90 @@ import unittest import logging -from swh.indexer.mimetype import ContentMimetypeIndexer +from swh.indexer.mimetype import ( + ContentMimetypeIndexer, MimetypeRangeIndexer +) from swh.indexer.tests.test_utils import MockObjStorage +from swh.model import hashutil + + +class _MockStorage(): + """In memory implementation to fake the content_get_range api. + + FIXME: To remove when the actual in-memory lands. + + """ + contents = [] + + def __init__(self, contents): + self.contents = contents + + def content_get_range(self, start, end, limit=1000): + # to make input test data consilient with actual runtime the + # other way of doing properly things would be to rewrite all + # tests (that's another task entirely so not right now) + if isinstance(start, bytes): + start = hashutil.hash_to_hex(start) + if isinstance(end, bytes): + end = hashutil.hash_to_hex(end) + results = [] + _next_id = None + counter = 0 + for c in self.contents: + _id = c['sha1'] + if start <= _id and _id <= end: + results.append(c) + if counter >= limit: + break + counter += 1 + + return { + 'contents': results, + 'next': _next_id + } class _MockIndexerStorage(): """Mock storage to simplify reading indexers' outputs. """ + state = [] + def content_mimetype_add(self, mimetypes, conflict_update=None): self.state = mimetypes self.conflict_update = conflict_update + def content_mimetype_get_range(self, start, end, indexer_configuration_id, + limit=1000): + """Basic in-memory implementation (limit is unused). + + """ + # to make input test data consilient with actual runtime the + # other way of doing properly things would be to rewrite all + # tests (that's another task entirely so not right now) + if isinstance(start, bytes): + start = hashutil.hash_to_hex(start) + if isinstance(end, bytes): + end = hashutil.hash_to_hex(end) + results = [] + _next = None + counter = 0 + for m in self.state: + _id = m['id'] + _tool_id = m['indexer_configuration_id'] + if (start <= _id and _id <= end and + _tool_id == indexer_configuration_id): + results.append(_id) + if counter >= limit: + break + counter += 1 + + return { + 'ids': results, + 'next': _next + } + def indexer_configuration_add(self, tools): return [{ 'id': 10, @@ -146,3 +217,94 @@ self.assertTrue(self.indexer.idx_storage.conflict_update) self.assertEqual(expected_results, self.indexer.idx_storage.state) + + +class MimetypeRangeIndexerTest(MimetypeRangeIndexer): + """Specific mimetype whose configuration is enough to satisfy the + indexing tests. + + """ + def prepare(self): + self.config = { + 'tools': { + 'name': 'file', + 'version': '1:5.30-1+deb9u1', + 'configuration': { + "type": "library", + "debian-package": "python3-magic" + }, + }, + } + self.idx_storage = _MockIndexerStorage() + self.log = logging.getLogger('swh.indexer') + # this hardcodes some contents, will use this to setup the storage + self.objstorage = MockObjStorage() + # sync objstorage and storage + contents = [{'sha1': c_id} for c_id in self.objstorage] + self.storage = _MockStorage(contents) + self.tools = self.register_tools(self.config['tools']) + self.tool = self.tools[0] + + +class TestMimetypeRangeIndexer(unittest.TestCase): + def setUp(self): + self.indexer = MimetypeRangeIndexerTest() + # will play along with the objstorage's mocked contents for now + self.contents = sorted(self.indexer.objstorage) + # FIXME: leverage swh.objstorage.in_memory_storage's + # InMemoryObjStorage, swh.storage.tests's gen_contents, and + # hypothesis to generate data to actually run indexer on those + + self.expected_results = { + '01c9379dfc33803963d07c1ccc748d3fe4c96bb5': { + 'encoding': b'us-ascii', + 'id': '01c9379dfc33803963d07c1ccc748d3fe4c96bb5', + 'indexer_configuration_id': 10, + 'mimetype': b'text/plain'}, + '02fb2c89e14f7fab46701478c83779c7beb7b069': { + 'encoding': b'us-ascii', + 'id': '02fb2c89e14f7fab46701478c83779c7beb7b069', + 'indexer_configuration_id': 10, + 'mimetype': b'text/x-python'}, + '103bc087db1d26afc3a0283f38663d081e9b01e6': { + 'encoding': b'us-ascii', + 'id': '103bc087db1d26afc3a0283f38663d081e9b01e6', + 'indexer_configuration_id': 10, + 'mimetype': b'text/plain'} + } + + def assert_mimetypes_ok(self, start, end, actual_results): + for mimetype in actual_results: + _id = mimetype['id'] + self.assertEqual(mimetype, self.expected_results[_id]) + self.assertTrue(start <= _id and _id <= end) + _tool_id = mimetype['indexer_configuration_id'] + self.assertEqual(_tool_id, self.indexer.tool['id']) + + def test_generate_content_mimetype_get(self): + """Optimal indexing should result in persisted computations + + """ + start, end = [self.contents[0], self.contents[2]] # output hex ids + # given + actual_results = self.indexer.run( + start, end, policy_update='update-dups') + + # then + self.assert_mimetypes_ok(start, end, actual_results) + + def test_generate_content_mimetype_get_input_as_bytes(self): + """Optimal indexing should result in persisted computations + + Input are in bytes here. + + """ + _start, _end = [self.contents[0], self.contents[2]] # output hex ids + start, end = map(hashutil.hash_to_bytes, (_start, _end)) + + # given + actual_results = self.indexer.run( # checks the bytes input this time + start, end, policy_update='ignore-dups') # no data so same result + + # then + self.assert_mimetypes_ok(_start, _end, actual_results)