Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/replayer/replay.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor | |||||
from concurrent.futures import wait as futures_wait | |||||
import logging | import logging | ||||
from time import time | from time import time | ||||
from typing import Callable, Dict, List, Optional | from typing import Callable, Dict, List, Optional | ||||
import msgpack | import msgpack | ||||
from sentry_sdk import capture_exception, push_scope | from sentry_sdk import capture_exception, push_scope | ||||
try: | try: | ||||
▲ Show 20 Lines • Show All 197 Lines • ▼ Show 20 Lines | |||||
def process_replay_objects_content( | def process_replay_objects_content( | ||||
all_objects: Dict[str, List[dict]], | all_objects: Dict[str, List[dict]], | ||||
*, | *, | ||||
src: ObjStorage, | src: ObjStorage, | ||||
dst: ObjStorage, | dst: ObjStorage, | ||||
exclude_fn: Optional[Callable[[dict], bool]] = None, | exclude_fn: Optional[Callable[[dict], bool]] = None, | ||||
check_dst: bool = True, | check_dst: bool = True, | ||||
concurrency: int = 16, | |||||
): | ): | ||||
""" | """ | ||||
Takes a list of records from Kafka (see | Takes a list of records from Kafka (see | ||||
:py:func:`swh.journal.client.JournalClient.process`) and copies them | :py:func:`swh.journal.client.JournalClient.process`) and copies them | ||||
from the `src` objstorage to the `dst` objstorage, if: | from the `src` objstorage to the `dst` objstorage, if: | ||||
* `obj['status']` is `'visible'` | * `obj['status']` is `'visible'` | ||||
* `exclude_fn(obj)` is `False` (if `exclude_fn` is provided) | * `exclude_fn(obj)` is `False` (if `exclude_fn` is provided) | ||||
Show All 36 Lines | ): | ||||
>>> id2 in dst | >>> id2 in dst | ||||
True | True | ||||
""" | """ | ||||
vol = [] | vol = [] | ||||
nb_skipped = 0 | nb_skipped = 0 | ||||
nb_failures = 0 | nb_failures = 0 | ||||
t0 = time() | t0 = time() | ||||
for (object_type, objects) in all_objects.items(): | def _copy_object(obj): | ||||
if object_type != "content": | nonlocal nb_skipped | ||||
logger.warning( | nonlocal nb_failures | ||||
"Received a series of %s, this should not happen", object_type | |||||
) | |||||
continue | |||||
for obj in objects: | |||||
obj_id = obj[ID_HASH_ALGO] | obj_id = obj[ID_HASH_ALGO] | ||||
if obj["status"] != "visible": | if obj["status"] != "visible": | ||||
nb_skipped += 1 | nb_skipped += 1 | ||||
logger.debug( | logger.debug("skipped %s (status=%s)", hash_to_hex(obj_id), obj["status"]) | ||||
"skipped %s (status=%s)", hash_to_hex(obj_id), obj["status"] | |||||
) | |||||
statsd.increment( | statsd.increment( | ||||
CONTENT_OPERATIONS_METRIC, | CONTENT_OPERATIONS_METRIC, | ||||
tags={"decision": "skipped", "status": obj["status"]}, | tags={"decision": "skipped", "status": obj["status"]}, | ||||
) | ) | ||||
elif exclude_fn and exclude_fn(obj): | elif exclude_fn and exclude_fn(obj): | ||||
nb_skipped += 1 | nb_skipped += 1 | ||||
logger.debug("skipped %s (manually excluded)", hash_to_hex(obj_id)) | logger.debug("skipped %s (manually excluded)", hash_to_hex(obj_id)) | ||||
statsd.increment( | statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"}) | ||||
CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"} | |||||
) | |||||
elif check_dst and obj_in_objstorage(obj_id, dst): | elif check_dst and obj_in_objstorage(obj_id, dst): | ||||
nb_skipped += 1 | nb_skipped += 1 | ||||
logger.debug("skipped %s (in dst)", hash_to_hex(obj_id)) | logger.debug("skipped %s (in dst)", hash_to_hex(obj_id)) | ||||
statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"}) | statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"}) | ||||
else: | else: | ||||
try: | try: | ||||
copied = copy_object(obj_id, src, dst) | copied = copy_object(obj_id, src, dst) | ||||
except ObjNotFoundError: | except ObjNotFoundError: | ||||
nb_skipped += 1 | nb_skipped += 1 | ||||
statsd.increment( | statsd.increment( | ||||
CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"} | CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"} | ||||
) | ) | ||||
else: | else: | ||||
if copied is None: | if copied is None: | ||||
nb_failures += 1 | nb_failures += 1 | ||||
statsd.increment( | statsd.increment( | ||||
CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"} | CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"} | ||||
) | ) | ||||
else: | else: | ||||
vol.append(copied) | vol.append(copied) | ||||
statsd.increment( | statsd.increment( | ||||
CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"} | CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"} | ||||
) | ) | ||||
with ThreadPoolExecutor(max_workers=concurrency) as pool: | |||||
futures = [] | |||||
for (object_type, objects) in all_objects.items(): | |||||
if object_type != "content": | |||||
logger.warning( | |||||
"Received a series of %s, this should not happen", object_type | |||||
) | |||||
continue | |||||
for obj in objects: | |||||
futures.append(pool.submit(_copy_object, obj=obj)) | |||||
futures_wait(futures, return_when=FIRST_EXCEPTION) | |||||
for f in futures: | |||||
if f.running(): | |||||
continue | |||||
exc = f.exception() | |||||
if exc: | |||||
pool.shutdown(wait=False) | |||||
f.result() | |||||
vlorentz: It seems simpler to implement it using [[ https://docs.python.org/3/library/multiprocessing. | |||||
Done Inline ActionsI don't know, maybe it's possible to simplify a bit this code, but about using mp.dummy, the documentation state:
douardda: I don't know, maybe it's possible to simplify a bit this code, but about using `mp.dummy`, the… | |||||
Not Done Inline Actionsthe rationale is only about the features of the API, which we don't use here. Anyway, no big deal vlorentz: the rationale is only about the features of the API, which we don't use here. Anyway, no big… | |||||
raise exc | |||||
dt = time() - t0 | dt = time() - t0 | ||||
logger.info( | logger.info( | ||||
"processed %s content objects in %.1fsec " | "processed %s content objects in %.1fsec " | ||||
"(%.1f obj/sec, %.1fMB/sec) - %d failed - %d skipped", | "(%.1f obj/sec, %.1fMB/sec) - %d failed - %d skipped", | ||||
len(vol), | len(vol), | ||||
dt, | dt, | ||||
len(vol) / dt, | len(vol) / dt, | ||||
sum(vol) / 1024 / 1024 / dt, | sum(vol) / 1024 / 1024 / dt, | ||||
nb_failures, | nb_failures, | ||||
nb_skipped, | nb_skipped, | ||||
) | ) | ||||
if notify: | if notify: | ||||
notify("WATCHDOG=1") | notify("WATCHDOG=1") |
It seems simpler to implement it using [[ https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.dummy | multiprocessing.dummy ]]: