diff --git a/swh/indexer/cli.py b/swh/indexer/cli.py --- a/swh/indexer/cli.py +++ b/swh/indexer/cli.py @@ -205,7 +205,7 @@ origins = list_origins_by_producer(idx_storage, mappings, tool_ids) - kwargs = {"policy_update": "update-dups", "retries_left": 1} + kwargs = {"retries_left": 1} schedule_origin_batches(scheduler, task_type, origins, origin_batch_size, kwargs) diff --git a/swh/indexer/ctags.py b/swh/indexer/ctags.py --- a/swh/indexer/ctags.py +++ b/swh/indexer/ctags.py @@ -136,16 +136,12 @@ return ctags def persist_index_computations( - self, results: List[ContentCtagsRow], policy_update: str + self, results: List[ContentCtagsRow] ) -> Dict[str, int]: """Persist the results in storage. Args: results: list of ctags returned by index() - policy_update: either 'update-dups' or 'ignore-dups' to - respectively update duplicates or ignore them """ - return self.idx_storage.content_ctags_add( - results, conflict_update=(policy_update == "update-dups") - ) + return self.idx_storage.content_ctags_add(results) diff --git a/swh/indexer/fossology_license.py b/swh/indexer/fossology_license.py --- a/swh/indexer/fossology_license.py +++ b/swh/indexer/fossology_license.py @@ -114,7 +114,7 @@ ] def persist_index_computations( - self, results: List[ContentLicenseRow], policy_update: str + self, results: List[ContentLicenseRow] ) -> Dict[str, int]: """Persist the results in storage. @@ -126,13 +126,8 @@ - license (bytes): license in bytes - path (bytes): path - policy_update: either 'update-dups' or 'ignore-dups' to - respectively update duplicates or ignore them - """ - return self.idx_storage.content_fossology_license_add( - results, conflict_update=(policy_update == "update-dups") - ) + return self.idx_storage.content_fossology_license_add(results) class FossologyLicenseIndexer( diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -10,6 +10,7 @@ import shutil import tempfile from typing import Any, Dict, Generic, Iterator, List, Optional, Set, TypeVar, Union +import warnings from swh.core import utils from swh.core.config import load_from_envvar, merge_configs @@ -250,17 +251,13 @@ yield from ids @abc.abstractmethod - def persist_index_computations( - self, results: List[TResult], policy_update: str - ) -> Dict[str, int]: + def persist_index_computations(self, results: List[TResult]) -> Dict[str, int]: """Persist the computation resulting from the index. Args: results: List of results. One result is the result of the index function. - policy_update: either 'update-dups' or 'ignore-dups' to - respectively update duplicates or ignore them Returns: a summary dict of what has been inserted in the storage @@ -281,24 +278,28 @@ """ - def run(self, ids: List[Sha1], policy_update: str, **kwargs) -> Dict: + def run(self, ids: List[Sha1], **kwargs) -> Dict: """Given a list of ids: - retrieve the content from the storage - execute the indexing computations - - store the results (according to policy_update) + - store the results Args: ids (Iterable[Union[bytes, str]]): sha1's identifier list - policy_update (str): either 'update-dups' or 'ignore-dups' to - respectively update duplicates or ignore - them **kwargs: passed to the `index` method Returns: A summary Dict of the task's status """ + if "policy_update" in kwargs: + warnings.warn( + "'policy_update' argument is deprecated and ignored.", + DeprecationWarning, + ) + del kwargs["policy_update"] + sha1s = [ hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids ] @@ -318,7 +319,7 @@ if res: # If no results, skip it results.extend(res) summary["status"] = "eventful" - summary = self.persist_index_computations(results, policy_update) + summary = self.persist_index_computations(results) self.results = results except Exception: if not self.catch_exceptions: @@ -478,9 +479,7 @@ count_object_added_key: Optional[str] = None for contents in utils.grouper(gen, n=self.config["write_batch_size"]): - res = self.persist_index_computations( - list(contents), policy_update="update-dups" - ) + res = self.persist_index_computations(list(contents)) if not count_object_added_key: count_object_added_key = list(res.keys())[0] count += res[count_object_added_key] @@ -508,22 +507,24 @@ """ - def run( - self, origin_urls: List[str], policy_update: str = "update-dups", **kwargs - ) -> Dict: + def run(self, origin_urls: List[str], **kwargs) -> Dict: """Given a list of origin urls: - retrieve origins from storage - execute the indexing computations - - store the results (according to policy_update) + - store the results Args: origin_urls: list of origin urls. - policy_update: either 'update-dups' or 'ignore-dups' to - respectively update duplicates (default) or ignore them **kwargs: passed to the `index` method """ + if "policy_update" in kwargs: + warnings.warn( + "'policy_update' argument is deprecated and ignored.", + DeprecationWarning, + ) + del kwargs["policy_update"] summary: Dict[str, Any] = {"status": "uneventful"} try: results = self.index_list(origin_urls, **kwargs) @@ -533,7 +534,7 @@ summary["status"] = "failed" return summary - summary_persist = self.persist_index_computations(results, policy_update) + summary_persist = self.persist_index_computations(results) self.results = results if summary_persist: for value in summary_persist.values(): @@ -564,19 +565,23 @@ """ - def run(self, ids: List[Sha1Git], policy_update: str) -> Dict: + def run(self, ids: List[Sha1Git], **kwargs) -> Dict: """Given a list of sha1_gits: - retrieve revisions from storage - execute the indexing computations - - store the results (according to policy_update) + - store the results Args: ids: sha1_git's identifier list - policy_update: either 'update-dups' or 'ignore-dups' to - respectively update duplicates or ignore them """ + if "policy_update" in kwargs: + warnings.warn( + "'policy_update' argument is deprecated and ignored.", + DeprecationWarning, + ) + del kwargs["policy_update"] summary: Dict[str, Any] = {"status": "uneventful"} results = [] @@ -599,7 +604,7 @@ summary["status"] = "failed" return summary - summary_persist = self.persist_index_computations(results, policy_update) + summary_persist = self.persist_index_computations(results) if summary_persist: for value in summary_persist.values(): if value > 0: diff --git a/swh/indexer/journal_client.py b/swh/indexer/journal_client.py --- a/swh/indexer/journal_client.py +++ b/swh/indexer/journal_client.py @@ -36,7 +36,6 @@ task_names["origin_metadata"], "oneshot", visit_urls, - policy_update="update-dups", retries_left=1, ) ) diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -109,7 +109,7 @@ ] def persist_index_computations( - self, results: List[ContentMetadataRow], policy_update: str + self, results: List[ContentMetadataRow] ) -> Dict[str, int]: """Persist the results in storage. @@ -118,13 +118,9 @@ following keys: - id (bytes): content's identifier (sha1) - metadata (jsonb): detected metadata - policy_update: either 'update-dups' or 'ignore-dups' to - respectively update duplicates or ignore them """ - return self.idx_storage.content_metadata_add( - results, conflict_update=(policy_update == "update-dups") - ) + return self.idx_storage.content_metadata_add(results) DEFAULT_CONFIG: Dict[str, Any] = { @@ -217,7 +213,7 @@ ] def persist_index_computations( - self, results: List[RevisionIntrinsicMetadataRow], policy_update: str + self, results: List[RevisionIntrinsicMetadataRow] ) -> Dict[str, int]: """Persist the results in storage. @@ -227,15 +223,11 @@ - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes - policy_update: either 'update-dups' or 'ignore-dups' to - respectively update duplicates or ignore them """ # TODO: add functions in storage to keep data in # revision_intrinsic_metadata - return self.idx_storage.revision_intrinsic_metadata_add( - results, conflict_update=(policy_update == "update-dups") - ) + return self.idx_storage.revision_intrinsic_metadata_add(results) def translate_revision_intrinsic_metadata( self, detected_files: Dict[str, List[Any]], log_suffix: str @@ -291,9 +283,7 @@ # content indexing try: c_metadata_indexer.run( - sha1s_filtered, - policy_update="ignore-dups", - log_suffix=log_suffix, + sha1s_filtered, log_suffix=log_suffix, ) # on the fly possibility: for result in c_metadata_indexer.results: @@ -364,10 +354,7 @@ def persist_index_computations( self, results: List[Tuple[OriginIntrinsicMetadataRow, RevisionIntrinsicMetadataRow]], - policy_update: str, ) -> Dict[str, int]: - conflict_update = policy_update == "update-dups" - # Deduplicate revisions rev_metadata: List[RevisionIntrinsicMetadataRow] = [] orig_metadata: List[OriginIntrinsicMetadataRow] = [] @@ -382,14 +369,10 @@ orig_metadata.append(orig_item) if rev_metadata: - summary_rev = self.idx_storage.revision_intrinsic_metadata_add( - rev_metadata, conflict_update=conflict_update - ) + summary_rev = self.idx_storage.revision_intrinsic_metadata_add(rev_metadata) summary.update(summary_rev) if orig_metadata: - summary_ori = self.idx_storage.origin_intrinsic_metadata_add( - orig_metadata, conflict_update=conflict_update - ) + summary_ori = self.idx_storage.origin_intrinsic_metadata_add(orig_metadata) summary.update(summary_ori) return summary diff --git a/swh/indexer/mimetype.py b/swh/indexer/mimetype.py --- a/swh/indexer/mimetype.py +++ b/swh/indexer/mimetype.py @@ -95,7 +95,7 @@ ] def persist_index_computations( - self, results: List[ContentMimetypeRow], policy_update: str + self, results: List[ContentMimetypeRow] ) -> Dict[str, int]: """Persist the results in storage. @@ -103,13 +103,8 @@ results: list of content's mimetype dicts (see :meth:`.index`) - policy_update: either 'update-dups' or 'ignore-dups' to - respectively update duplicates or ignore them - """ - return self.idx_storage.content_mimetype_add( - results, conflict_update=(policy_update == "update-dups") - ) + return self.idx_storage.content_mimetype_add(results) class MimetypeIndexer(MixinMimetypeIndexer, ContentIndexer[ContentMimetypeRow]): diff --git a/swh/indexer/origin_head.py b/swh/indexer/origin_head.py --- a/swh/indexer/origin_head.py +++ b/swh/indexer/origin_head.py @@ -25,9 +25,7 @@ USE_TOOLS = False - def persist_index_computations( - self, results: Any, policy_update: str - ) -> Dict[str, int]: + def persist_index_computations(self, results: Any) -> Dict[str, int]: """Do nothing. The indexer's results are not persistent, they should only be piped to another indexer.""" return {} diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py --- a/swh/indexer/tests/test_cli.py +++ b/swh/indexer/tests/test_cli.py @@ -74,7 +74,7 @@ def _assert_tasks_for_origins(tasks, origins): - expected_kwargs = {"policy_update": "update-dups"} + expected_kwargs = {} assert {task["type"] for task in tasks} == {"index-origin-metadata"} assert all(len(task["arguments"]["args"]) == 1 for task in tasks) for task in tasks: diff --git a/swh/indexer/tests/test_indexer.py b/swh/indexer/tests/test_indexer.py --- a/swh/indexer/tests/test_indexer.py +++ b/swh/indexer/tests/test_indexer.py @@ -31,7 +31,7 @@ ) -> List[Dict[str, Any]]: raise _TestException() - def persist_index_computations(self, results, policy_update) -> Dict[str, int]: + def persist_index_computations(self, results) -> Dict[str, int]: return {} def indexed_contents_in_partition( @@ -61,12 +61,12 @@ indexer.objstorage = Mock() indexer.objstorage.get.return_value = b"content" - assert indexer.run([b"foo"], policy_update=True) == {"status": "failed"} + assert indexer.run([b"foo"]) == {"status": "failed"} indexer.catch_exceptions = False with pytest.raises(_TestException): - indexer.run([b"foo"], policy_update=True) + indexer.run([b"foo"]) def test_revision_indexer_catch_exceptions(): @@ -74,25 +74,23 @@ indexer.storage = Mock() indexer.storage.revision_get.return_value = ["rev"] - assert indexer.run([b"foo"], policy_update=True) == {"status": "failed"} + assert indexer.run([b"foo"]) == {"status": "failed"} indexer.catch_exceptions = False with pytest.raises(_TestException): - indexer.run([b"foo"], policy_update=True) + indexer.run([b"foo"]) def test_origin_indexer_catch_exceptions(): indexer = CrashingOriginIndexer(config=BASE_TEST_CONFIG) - assert indexer.run(["http://example.org"], policy_update=True) == { - "status": "failed" - } + assert indexer.run(["http://example.org"]) == {"status": "failed"} indexer.catch_exceptions = False with pytest.raises(_TestException): - indexer.run(["http://example.org"], policy_update=True) + indexer.run(["http://example.org"]) def test_content_partition_indexer_catch_exceptions(): @@ -100,9 +98,9 @@ config={**BASE_TEST_CONFIG, "write_batch_size": 42} ) - assert indexer.run(0, 42, policy_update=True) == {"status": "failed"} + assert indexer.run(0, 42) == {"status": "failed"} indexer.catch_exceptions = False with pytest.raises(_TestException): - indexer.run(0, 42, policy_update=True) + indexer.run(0, 42) diff --git a/swh/indexer/tests/test_journal_client.py b/swh/indexer/tests/test_journal_client.py --- a/swh/indexer/tests/test_journal_client.py +++ b/swh/indexer/tests/test_journal_client.py @@ -30,10 +30,7 @@ ( [ { - "arguments": { - "kwargs": {"policy_update": "update-dups"}, - "args": (["file:///dev/zero"],), - }, + "arguments": {"kwargs": {}, "args": (["file:///dev/zero"],),}, "policy": "oneshot", "type": "task-name", "retries_left": 1, @@ -64,10 +61,7 @@ ( [ { - "arguments": { - "kwargs": {"policy_update": "update-dups"}, - "args": (["file:///dev/zero"],), - }, + "arguments": {"kwargs": {}, "args": (["file:///dev/zero"],),}, "policy": "oneshot", "type": "task-name", "retries_left": 1, @@ -100,7 +94,7 @@ [ { "arguments": { - "kwargs": {"policy_update": "update-dups"}, + "kwargs": {}, "args": (["file:///dev/zero", "file:///tmp/foobar"],), }, "policy": "oneshot", @@ -138,7 +132,7 @@ [ { "arguments": { - "kwargs": {"policy_update": "update-dups"}, + "kwargs": {}, "args": (["file:///dev/zero", "file:///tmp/foobar"],), }, "policy": "oneshot", @@ -147,7 +141,7 @@ }, { "arguments": { - "kwargs": {"policy_update": "update-dups"}, + "kwargs": {}, "args": (["file:///tmp/spamegg"],), }, "policy": "oneshot", diff --git a/swh/indexer/tests/test_metadata.py b/swh/indexer/tests/test_metadata.py --- a/swh/indexer/tests/test_metadata.py +++ b/swh/indexer/tests/test_metadata.py @@ -140,7 +140,7 @@ fill_storage(metadata_indexer.storage) # when - metadata_indexer.run(sha1s, policy_update="ignore-dups") + metadata_indexer.run(sha1s) results = list(metadata_indexer.idx_storage.content_metadata_get(sha1s)) expected_results = [ @@ -1122,7 +1122,7 @@ ] ) - metadata_indexer.run([rev.id], "update-dups") + metadata_indexer.run([rev.id]) results = list( metadata_indexer.idx_storage.revision_intrinsic_metadata_get([REVISION.id]) @@ -1183,7 +1183,7 @@ ] ) - metadata_indexer.run([new_rev.id], "update-dups") + metadata_indexer.run([new_rev.id]) results = list( metadata_indexer.idx_storage.revision_intrinsic_metadata_get([new_rev.id]) diff --git a/swh/indexer/tests/test_origin_head.py b/swh/indexer/tests/test_origin_head.py --- a/swh/indexer/tests/test_origin_head.py +++ b/swh/indexer/tests/test_origin_head.py @@ -46,7 +46,7 @@ indexing tests. """ - def persist_index_computations(self, results, policy_update): + def persist_index_computations(self, results): self.results = results diff --git a/swh/indexer/tests/utils.py b/swh/indexer/tests/utils.py --- a/swh/indexer/tests/utils.py +++ b/swh/indexer/tests/utils.py @@ -262,7 +262,6 @@ class MockStorage(): def content_mimetype_add(self, mimetypes): self.state = mimetypes - self.conflict_update = conflict_update def indexer_configuration_add(self, tools): return [{ @@ -606,12 +605,12 @@ sha1s = [self.id0, self.id1, self.id2] # when - self.indexer.run(sha1s, policy_update="update-dups") + self.indexer.run(sha1s) self.assert_results_ok(sha1s) # 2nd pass - self.indexer.run(sha1s, policy_update="ignore-dups") + self.indexer.run(sha1s) self.assert_results_ok(sha1s) @@ -624,7 +623,7 @@ ] # unknown # when - self.indexer.run(sha1s, policy_update="update-dups") + self.indexer.run(sha1s) # then expected_results = [