diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -4,8 +4,9 @@ # See top-level LICENSE file for more information import copy -from time import time import logging +from time import time +from typing import Callable, Dict, List, Optional try: from systemd.daemon import notify @@ -16,7 +17,7 @@ from swh.model.identifiers import normalize_timestamp from swh.model.hashutil import hash_to_hex from swh.model.model import SHA1_SIZE -from swh.objstorage.objstorage import ID_HASH_ALGO +from swh.objstorage.objstorage import ID_HASH_ALGO, ObjStorage from swh.storage import HashCollision logger = logging.getLogger(__name__) @@ -321,8 +322,14 @@ return len(obj) -def process_replay_objects_content(all_objects, *, src, dst, - exclude_fn=None, check_dst=True): +def process_replay_objects_content( + all_objects: Dict[str, List[dict]], + *, + src: ObjStorage, + dst: ObjStorage, + exclude_fn: Optional[Callable[[dict], bool]] = None, + check_dst: bool = True, +): """ Takes a list of records from Kafka (see :py:func:`swh.journal.client.JournalClient.process`) and copies them @@ -330,15 +337,17 @@ * `obj['status']` is `'visible'` * `exclude_fn(obj)` is `False` (if `exclude_fn` is provided) + * `obj['sha1'] not in dst` (if `check_dst` is True) Args: - all_objects Dict[str, List[dict]]: Objects passed by the Kafka client. - Most importantly, `all_objects['content'][*]['sha1']` is the - sha1 hash of each content + all_objects: Objects passed by the Kafka client. Most importantly, + `all_objects['content'][*]['sha1']` is the sha1 hash of each + content. src: An object storage (see :py:func:`swh.objstorage.get_objstorage`) dst: An object storage (see :py:func:`swh.objstorage.get_objstorage`) - exclude_fn Optional[Callable[dict, bool]]: Determines whether - an object should be copied. + exclude_fn: Determines whether an object should be copied. + check_dst: Determines whether we should check the destination + objstorage before copying. Example: