Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/replay.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from time import time | from time import time | ||||
import logging | import logging | ||||
from concurrent.futures import ThreadPoolExecutor | |||||
from swh.core.statsd import statsd | from swh.core.statsd import statsd | ||||
from swh.model.identifiers import normalize_timestamp | from swh.model.identifiers import normalize_timestamp | ||||
from swh.model.hashutil import hash_to_hex | from swh.model.hashutil import hash_to_hex | ||||
from swh.model.model import SHA1_SIZE | from swh.model.model import SHA1_SIZE | ||||
from swh.objstorage.objstorage import ID_HASH_ALGO | from swh.objstorage.objstorage import ID_HASH_ALGO | ||||
from swh.storage import HashCollision | from swh.storage import HashCollision | ||||
▲ Show 20 Lines • Show All 237 Lines • ▼ Show 20 Lines | try: | ||||
with statsd.timed(statsd_name % 'put'): | with statsd.timed(statsd_name % 'put'): | ||||
dst.add(obj, obj_id=obj_id, check_presence=False) | dst.add(obj, obj_id=obj_id, check_presence=False) | ||||
logger.debug('copied %s', hash_to_hex(obj_id)) | logger.debug('copied %s', hash_to_hex(obj_id)) | ||||
statsd.increment( | statsd.increment( | ||||
'swh_journal_content_replayer_bytes_total', | 'swh_journal_content_replayer_bytes_total', | ||||
len(obj)) | len(obj)) | ||||
except Exception: | except Exception: | ||||
obj = '' | obj = '' | ||||
logger.exception('Failed to copy %s', hash_to_hex(obj_id)) | logger.error('Failed to copy %s', hash_to_hex(obj_id)) | ||||
raise | |||||
return len(obj) | return len(obj) | ||||
def process_replay_objects_content(all_objects, *, src, dst, concurrency=8, | def process_replay_objects_content(all_objects, *, src, dst, | ||||
exclude_fn=None): | exclude_fn=None): | ||||
""" | """ | ||||
Takes a list of records from Kafka (see | Takes a list of records from Kafka (see | ||||
:py:func:`swh.journal.client.JournalClient.process`) and copies them | :py:func:`swh.journal.client.JournalClient.process`) and copies them | ||||
from the `src` objstorage to the `dst` objstorage, if: | from the `src` objstorage to the `dst` objstorage, if: | ||||
* `obj['status']` is `'visible'` | * `obj['status']` is `'visible'` | ||||
* `exclude_fn(obj)` is `False` (if `exclude_fn` is provided) | * `exclude_fn(obj)` is `False` (if `exclude_fn` is provided) | ||||
Show All 32 Lines | def process_replay_objects_content(all_objects, *, src, dst, | ||||
>>> id1 in dst | >>> id1 in dst | ||||
False | False | ||||
>>> id2 in dst | >>> id2 in dst | ||||
True | True | ||||
""" | """ | ||||
vol = [] | vol = [] | ||||
nb_skipped = 0 | nb_skipped = 0 | ||||
t0 = time() | t0 = time() | ||||
with ThreadPoolExecutor(max_workers=concurrency) as executor: | |||||
for (object_type, objects) in all_objects.items(): | for (object_type, objects) in all_objects.items(): | ||||
if object_type != 'content': | if object_type != 'content': | ||||
logger.warning( | logger.warning( | ||||
'Received a series of %s, this should not happen', | 'Received a series of %s, this should not happen', | ||||
object_type) | object_type) | ||||
continue | continue | ||||
for obj in objects: | for obj in objects: | ||||
obj_id = obj[ID_HASH_ALGO] | obj_id = obj[ID_HASH_ALGO] | ||||
if obj['status'] != 'visible': | if obj['status'] != 'visible': | ||||
nb_skipped += 1 | nb_skipped += 1 | ||||
logger.debug('skipped %s (status=%s)', | logger.debug('skipped %s (status=%s)', | ||||
hash_to_hex(obj_id), obj['status']) | hash_to_hex(obj_id), obj['status']) | ||||
elif exclude_fn and exclude_fn(obj): | elif exclude_fn and exclude_fn(obj): | ||||
nb_skipped += 1 | nb_skipped += 1 | ||||
logger.debug('skipped %s (manually excluded)', | logger.debug('skipped %s (manually excluded)', | ||||
hash_to_hex(obj_id)) | hash_to_hex(obj_id)) | ||||
else: | else: | ||||
fut = executor.submit(copy_object, obj_id, src, dst) | vol.append(copy_object(obj_id, src, dst)) | ||||
fut.add_done_callback(lambda fn: vol.append(fn.result())) | |||||
dt = time() - t0 | dt = time() - t0 | ||||
logger.info( | logger.info( | ||||
'processed %s content objects in %.1fsec ' | 'processed %s content objects in %.1fsec ' | ||||
'(%.1f obj/sec, %.1fMB/sec) - %d failures - %d skipped', | '(%.1f obj/sec, %.1fMB/sec) - %d failures - %d skipped', | ||||
len(vol), dt, | len(vol), dt, | ||||
len(vol)/dt, | len(vol)/dt, | ||||
sum(vol)/1024/1024/dt, | sum(vol)/1024/1024/dt, | ||||
len([x for x in vol if not x]), | len([x for x in vol if not x]), | ||||
nb_skipped) | nb_skipped) |