Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/replay.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from time import time | from time import time | ||||
import logging | import logging | ||||
from concurrent.futures import ThreadPoolExecutor | from concurrent.futures import ThreadPoolExecutor | ||||
from swh.storage import HashCollision | from swh.core.statsd import statsd | ||||
from swh.model.hashutil import hash_to_hex | from swh.model.hashutil import hash_to_hex | ||||
from swh.model.model import SHA1_SIZE | |||||
from swh.objstorage.objstorage import ID_HASH_ALGO | from swh.objstorage.objstorage import ID_HASH_ALGO | ||||
from swh.core.statsd import statsd | from swh.storage import HashCollision | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
SHA1_SIZE = 20 | |||||
def process_replay_objects(all_objects, *, storage): | def process_replay_objects(all_objects, *, storage): | ||||
for (object_type, objects) in all_objects.items(): | for (object_type, objects) in all_objects.items(): | ||||
_insert_objects(object_type, objects, storage) | _insert_objects(object_type, objects, storage) | ||||
def _insert_objects(object_type, objects, storage): | def _insert_objects(object_type, objects, storage): | ||||
if object_type == 'content': | if object_type == 'content': | ||||
# TODO: insert 'content' in batches | # TODO: insert 'content' in batches | ||||
▲ Show 20 Lines • Show All 89 Lines • ▼ Show 20 Lines | try: | ||||
'swh_journal_content_replayer_bytes_total', | 'swh_journal_content_replayer_bytes_total', | ||||
len(obj)) | len(obj)) | ||||
except Exception: | except Exception: | ||||
obj = '' | obj = '' | ||||
logger.exception('Failed to copy %s', hash_to_hex(obj_id)) | logger.exception('Failed to copy %s', hash_to_hex(obj_id)) | ||||
return len(obj) | return len(obj) | ||||
def process_replay_objects_content(all_objects, *, src, dst, concurrency=8): | def process_replay_objects_content(all_objects, *, src, dst, concurrency=8, | ||||
exclude_fn=None): | |||||
""" | |||||
Takes a list of records from Kafka (see | |||||
:py:func:`swh.journal.client.JournalClient.process`) and copies them | |||||
from the `src` objstorage to the `dst` objstorage, if: | |||||
* `obj['status']` is `'visible'` | |||||
* `exclude_fn(obj)` is `False` (if `exclude_fn` is provided) | |||||
Args: | |||||
all_objects Dict[str, List[dict]]: Objects passed by the Kafka client. | |||||
Most importantly, `all_objects['content'][*]['sha1']` is the | |||||
sha1 hash of each content | |||||
src: An object storage (see :py:func:`swh.objstorage.get_objstorage`) | |||||
dst: An object storage (see :py:func:`swh.objstorage.get_objstorage`) | |||||
exclude_fn Optional[Callable[dict, bool]]: Determines whether | |||||
an object should be copied. | |||||
Example: | |||||
>>> from swh.objstorage import get_objstorage | |||||
>>> src = get_objstorage('memory', {}) | |||||
>>> dst = get_objstorage('memory', {}) | |||||
>>> id1 = src.add(b'foo bar') | |||||
>>> id2 = src.add(b'baz qux') | |||||
>>> kafka_partitions = { | |||||
... 'content': [ | |||||
... { | |||||
... 'sha1': id1, | |||||
... 'status': 'visible', | |||||
... }, | |||||
... { | |||||
... 'sha1': id2, | |||||
... 'status': 'visible', | |||||
... }, | |||||
... ] | |||||
... } | |||||
>>> process_replay_objects_content( | |||||
... kafka_partitions, src=src, dst=dst, | |||||
... exclude_fn=lambda obj: obj['sha1'] == id1) | |||||
>>> id1 in dst | |||||
False | |||||
>>> id2 in dst | |||||
True | |||||
""" | |||||
vol = [] | vol = [] | ||||
nb_skipped = 0 | |||||
t0 = time() | t0 = time() | ||||
with ThreadPoolExecutor(max_workers=concurrency) as executor: | with ThreadPoolExecutor(max_workers=concurrency) as executor: | ||||
for (object_type, objects) in all_objects.items(): | for (object_type, objects) in all_objects.items(): | ||||
if object_type != 'content': | if object_type != 'content': | ||||
logger.warning( | logger.warning( | ||||
'Received a series of %s, this should not happen', | 'Received a series of %s, this should not happen', | ||||
object_type) | object_type) | ||||
continue | continue | ||||
for obj in objects: | for obj in objects: | ||||
obj_id = obj[ID_HASH_ALGO] | obj_id = obj[ID_HASH_ALGO] | ||||
if obj['status'] == 'visible': | if obj['status'] != 'visible': | ||||
nb_skipped += 1 | |||||
logger.debug('skipped %s (status=%s)', | |||||
hash_to_hex(obj_id), obj['status']) | |||||
elif exclude_fn and exclude_fn(obj): | |||||
nb_skipped += 1 | |||||
logger.debug('skipped %s (manually excluded)', | |||||
hash_to_hex(obj_id)) | |||||
else: | |||||
fut = executor.submit(copy_object, obj_id, src, dst) | fut = executor.submit(copy_object, obj_id, src, dst) | ||||
fut.add_done_callback(lambda fn: vol.append(fn.result())) | fut.add_done_callback(lambda fn: vol.append(fn.result())) | ||||
else: | |||||
logger.debug('skipped %s (%s)', | |||||
hash_to_hex(obj_id), obj['status']) | |||||
dt = time() - t0 | dt = time() - t0 | ||||
logger.info( | logger.info( | ||||
'processed %s content objects in %.1fsec ' | 'processed %s content objects in %.1fsec ' | ||||
'(%.1f obj/sec, %.1fMB/sec) - %s failures', | '(%.1f obj/sec, %.1fMB/sec) - %d failures - %d skipped', | ||||
len(vol), dt, | len(vol), dt, | ||||
len(vol)/dt, | len(vol)/dt, | ||||
sum(vol)/1024/1024/dt, len([x for x in vol if not x])) | sum(vol)/1024/1024/dt, | ||||
len([x for x in vol if not x]), | |||||
nb_skipped) |