Index: swh/indexer/indexer.py =================================================================== --- swh/indexer/indexer.py +++ swh/indexer/indexer.py @@ -18,6 +18,7 @@ from swh.objstorage.exc import ObjNotFoundError from swh.indexer.storage import get_indexer_storage, INDEXER_CFG_KEY from swh.model import hashutil +from swh.core import utils class DiskIndexer: @@ -396,6 +397,9 @@ **end** (bytes): End range identifier **indexed** (Set[bytes]): Set of content already indexed. + Yields: + Identifier (bytes) of contents to index. + """ while start: result = self.storage.content_get_range(start, end) @@ -407,6 +411,29 @@ yield _id start = result['next'] + def index_contents(self, start, end, indexed, **kwargs): + """Index the contents from within range [start, end] + + Args: + **start** (bytes): Starting bound from range identifier + **end** (bytes): End range identifier + **indexed** (Set[bytes]): Set of content already indexed. + + Yields: + Data indexed (dict) to persist using the indexer storage + + """ + for sha1 in self.list_contents_to_index(start, end, indexed): + try: + raw_content = self.objstorage.get(sha1) + except ObjNotFoundError: + self.log.warning('Content %s not found in objstorage' % + hashutil.hash_to_hex(sha1)) + continue + res = self.index(sha1, raw_content, **kwargs) + if res: + yield res + def run(self, ids, policy_update, **kwargs): """Given a range of content ids, compute the indexing computation on the contents within. Either only new ones (policy_update to @@ -420,10 +447,14 @@ only compute new ones **kwargs: passed to the `index` method + Returns: + None if no data was indexed, a partial result otherwise (dict). + Partial because the result is not really used later. + """ if len(ids) != 2: # range raise ValueError('Range of ids expected') - results = [] + results = None try: [start, end] = ids if isinstance(start, str): @@ -436,19 +467,11 @@ else: indexed = set() - for sha1 in self.list_contents_to_index(start, end, indexed): - try: - raw_content = self.objstorage.get(sha1) - except ObjNotFoundError: - self.log.warning('Content %s not found in objstorage' % - hashutil.hash_to_hex(sha1)) - continue - res = self.index(sha1, raw_content, **kwargs) - if res: # If no results, skip it - results.append(res) - - self.persist_index_computations(results, policy_update) - return results + index_computations = self.index_contents(start, end, indexed) + for results in utils.grouper(index_computations, + n=self.config['batch_write']): + self.persist_index_computations(results, policy_update) + return results # return a partial result if any except Exception: self.log.exception( 'Problem when computing metadata.') Index: swh/indexer/mimetype.py =================================================================== --- swh/indexer/mimetype.py +++ swh/indexer/mimetype.py @@ -37,12 +37,12 @@ """ ADDITIONAL_CONFIG = { - 'scheduler': { + 'scheduler': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5008', }, - }, + }), 'tools': ('dict', { 'name': 'file', 'version': '1:5.30-1+deb9u1', @@ -51,6 +51,7 @@ "debian-package": "python3-magic" }, }), + 'batch_write': ('int', 100), } CONFIG_BASE_FILENAME = 'indexer/mimetype' Index: swh/indexer/tests/test_mimetype.py =================================================================== --- swh/indexer/tests/test_mimetype.py +++ swh/indexer/tests/test_mimetype.py @@ -234,6 +234,7 @@ "debian-package": "python3-magic" }, }, + 'batch_write': 100, } self.idx_storage = _MockIndexerStorage() self.log = logging.getLogger('swh.indexer')