diff --git a/debian/control b/debian/control --- a/debian/control +++ b/debian/control @@ -16,7 +16,7 @@ python3-swh.model (>= 0.0.15~), python3-swh.objstorage (>= 0.0.13~), python3-swh.scheduler (>= 0.0.35~), - python3-swh.storage (>= 0.0.102~), + python3-swh.storage (>= 0.0.110~), python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/78/ @@ -27,7 +27,7 @@ python3-swh.model (>= 0.0.15~), python3-swh.objstorage (>= 0.0.13~), python3-swh.scheduler (>= 0.0.35~), - python3-swh.storage (>= 0.0.102~), + python3-swh.storage (>= 0.0.110~), ${misc:Depends}, ${python3:Depends} Description: Software Heritage Content Indexer Storage @@ -39,7 +39,7 @@ python3-swh.model (>= 0.0.15~), python3-swh.objstorage (>= 0.0.13~), python3-swh.scheduler (>= 0.0.35~), - python3-swh.storage (>= 0.0.102~), + python3-swh.storage (>= 0.0.110~), python3-swh.indexer.storage (= ${binary:Version}), universal-ctags (>= 0.8~), fossology-nomossa (>= 3.1~), diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -2,4 +2,4 @@ swh.model >= 0.0.15 swh.objstorage >= 0.0.13 swh.scheduler >= 0.0.35 -swh.storage >= 0.0.102 +swh.storage >= 0.0.110 diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -18,6 +18,7 @@ from swh.objstorage.exc import ObjNotFoundError from swh.indexer.storage import get_indexer_storage, INDEXER_CFG_KEY from swh.model import hashutil +from swh.core import utils class DiskIndexer: @@ -387,7 +388,7 @@ """ pass - def list_contents_to_index(self, start, end, indexed): + def _list_contents_to_index(self, start, end, indexed): """Compute from storage the new contents to index in the range [start, end]. The already indexed contents are skipped. @@ -396,6 +397,9 @@ **end** (bytes): End range identifier **indexed** (Set[bytes]): Set of content already indexed. + Yields: + Identifier (bytes) of contents to index. + """ while start: result = self.storage.content_get_range(start, end) @@ -407,21 +411,47 @@ yield _id start = result['next'] + def _index_contents(self, start, end, indexed, **kwargs): + """Index the contents from within range [start, end] + + Args: + **start** (bytes): Starting bound from range identifier + **end** (bytes): End range identifier + **indexed** (Set[bytes]): Set of content already indexed. + + Yields: + Data indexed (dict) to persist using the indexer storage + + """ + for sha1 in self._list_contents_to_index(start, end, indexed): + try: + raw_content = self.objstorage.get(sha1) + except ObjNotFoundError: + self.log.warning('Content %s not found in objstorage' % + hashutil.hash_to_hex(sha1)) + continue + res = self.index(sha1, raw_content, **kwargs) + if res: + yield res + def run(self, start, end, policy_update, **kwargs): """Given a range of content ids, compute the indexing computation on the contents within. Either only new ones (policy_update to 'update-dups') or all (policy_update to 'ignore-dups'. Args: - **start** (bytes/str): Starting range identifier - **end** (bytes/str): Ending range identifier **policy_update** (str): either 'update-dups' to do all contents, or 'ignore-dups' to only compute new ones + **start** (Union[bytes, str]): Starting range identifier + **end** (Union[bytes, str]): Ending range identifier **kwargs: passed to the `index` method + Returns: + a boolean. True if data was indexed, False otherwise. + """ - results = [] + with_indexed_data = False try: if isinstance(start, str): start = hashutil.hash_to_bytes(start) @@ -433,19 +463,12 @@ else: indexed = set() - for sha1 in self.list_contents_to_index(start, end, indexed): - try: - raw_content = self.objstorage.get(sha1) - except ObjNotFoundError: - self.log.warning('Content %s not found in objstorage' % - hashutil.hash_to_hex(sha1)) - continue - res = self.index(sha1, raw_content, **kwargs) - if res: # If no results, skip it - results.append(res) - - self.persist_index_computations(results, policy_update) - return results + index_computations = self._index_contents(start, end, indexed) + for results in utils.grouper(index_computations, + n=self.config['write_batch_size']): + self.persist_index_computations(results, policy_update) + with_indexed_data = True + return with_indexed_data except Exception: self.log.exception( 'Problem when computing metadata.') diff --git a/swh/indexer/mimetype.py b/swh/indexer/mimetype.py --- a/swh/indexer/mimetype.py +++ b/swh/indexer/mimetype.py @@ -51,6 +51,7 @@ "debian-package": "python3-magic" }, }), + 'write_batch_size': ('int', 100), } CONFIG_BASE_FILENAME = 'indexer/mimetype' diff --git a/swh/indexer/tests/test_mimetype.py b/swh/indexer/tests/test_mimetype.py --- a/swh/indexer/tests/test_mimetype.py +++ b/swh/indexer/tests/test_mimetype.py @@ -234,6 +234,7 @@ "debian-package": "python3-magic" }, }, + 'write_batch_size': 100, } self.idx_storage = _MockIndexerStorage() self.log = logging.getLogger('swh.indexer') @@ -273,16 +274,53 @@ 'mimetype': b'text/plain'} } - def assert_mimetypes_ok(self, start, end, actual_results): + def assert_mimetypes_ok(self, start, end, actual_results, + expected_results=None): + if expected_results is None: + expected_results = self.expected_results + for mimetype in actual_results: _id = mimetype['id'] - self.assertEqual(mimetype, self.expected_results[_id]) + self.assertEqual(mimetype, expected_results[_id]) self.assertTrue(start <= _id and _id <= end) _tool_id = mimetype['indexer_configuration_id'] self.assertEqual(_tool_id, self.indexer.tool['id']) + def test__index_contents(self): + """Indexing contents without existing data results in indexed data + + """ + start, end = [self.contents[0], self.contents[2]] # output hex ids + # given + actual_results = list(self.indexer._index_contents( + start, end, indexed={})) + + self.assert_mimetypes_ok(start, end, actual_results) + + def test__index_contents_with_indexed_data(self): + """Indexing contents with existing data results in less indexed data + + """ + start, end = [self.contents[0], self.contents[2]] # output hex ids + data_indexed = [ + '01c9379dfc33803963d07c1ccc748d3fe4c96bb5', + '103bc087db1d26afc3a0283f38663d081e9b01e6' + ] + + # given + actual_results = self.indexer._index_contents( + start, end, indexed=set(data_indexed)) + + # craft the expected results + expected_results = self.expected_results.copy() + for already_indexed_key in data_indexed: + expected_results.pop(already_indexed_key) + + self.assert_mimetypes_ok( + start, end, actual_results, expected_results) + def test_generate_content_mimetype_get(self): - """Optimal indexing should result in persisted computations + """Optimal indexing should result in indexed data """ start, end = [self.contents[0], self.contents[2]] # output hex ids @@ -291,10 +329,10 @@ start, end, policy_update='update-dups') # then - self.assert_mimetypes_ok(start, end, actual_results) + self.assertTrue(actual_results) def test_generate_content_mimetype_get_input_as_bytes(self): - """Optimal indexing should result in persisted computations + """Optimal indexing should result in indexed data Input are in bytes here. @@ -307,4 +345,15 @@ start, end, policy_update='ignore-dups') # no data so same result # then - self.assert_mimetypes_ok(_start, _end, actual_results) + self.assertTrue(actual_results) + + def test_generate_content_mimetype_get_no_result(self): + """No result indexed returns False""" + start, end = ['0000000000000000000000000000000000000000', + '0000000000000000000000000000000000000001'] + # given + actual_results = self.indexer.run( + start, end, policy_update='update-dups') + + # then + self.assertFalse(actual_results)