diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -5,7 +5,7 @@ import logging from time import time -from typing import Any, Callable, Dict, Iterable, List, Optional +from typing import Callable, Dict, List, Optional from sentry_sdk import capture_exception, push_scope @@ -43,7 +43,6 @@ ObjNotFoundError, ObjStorage, ) -from swh.storage.exc import HashCollision logger = logging.getLogger(__name__) @@ -79,42 +78,6 @@ notify("WATCHDOG=1") -def collision_aware_content_add( - content_add_fn: Callable[[Iterable[Any]], None], contents: List[BaseContent] -) -> None: - """Add contents to storage. If a hash collision is detected, an error is - logged. Then this adds the other non colliding contents to the storage. - - Args: - content_add_fn: Storage content callable - contents: List of contents or skipped contents to add to storage - - """ - if not contents: - return - colliding_content_hashes: List[Dict[str, Any]] = [] - while True: - try: - content_add_fn(contents) - except HashCollision as e: - colliding_content_hashes.append( - { - "algo": e.algo, - "hash": e.hash_id, # hex hash id - "objects": e.colliding_contents, # hex hashes - } - ) - colliding_hashes = e.colliding_content_hashes() - # Drop the colliding contents from the transaction - contents = [c for c in contents if c.hashes() not in colliding_hashes] - else: - # Successfully added contents, we are done - break - if colliding_content_hashes: - for collision in colliding_content_hashes: - logger.error("Collision detected: %(collision)s", {"collision": collision}) - - def _insert_objects(object_type: str, objects: List[Dict], storage) -> None: """Insert objects of type object_type in the storage. @@ -131,8 +94,8 @@ else: contents.append(c) - collision_aware_content_add(storage.skipped_content_add, skipped_contents) - collision_aware_content_add(storage.content_add_metadata, contents) + storage.skipped_content_add(skipped_contents) + storage.content_add_metadata(contents) elif object_type == "origin_visit": visits: List[OriginVisit] = [] origins: List[Origin] = [] diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -28,7 +28,7 @@ from .utils import MockedJournalClient, MockedKafkaWriter -storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]} +storage_config = {"cls": "pipeline", "steps": [{"cls": "retry"}, {"cls": "memory"},]} def make_topic(kafka_prefix: str, object_type: str) -> str: