Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/replay.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from time import time | from time import time | ||||
import logging | import logging | ||||
from contextlib import contextmanager | |||||
from swh.core.statsd import statsd | from swh.core.statsd import statsd | ||||
from swh.model.identifiers import normalize_timestamp | from swh.model.identifiers import normalize_timestamp | ||||
from swh.model.hashutil import hash_to_hex | from swh.model.hashutil import hash_to_hex | ||||
from swh.model.model import SHA1_SIZE | from swh.model.model import SHA1_SIZE | ||||
from swh.objstorage.objstorage import ID_HASH_ALGO | from swh.objstorage.objstorage import ID_HASH_ALGO | ||||
from swh.storage import HashCollision | from swh.storage import HashCollision | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
def process_replay_objects(all_objects, *, storage): | def process_replay_objects(all_objects, *, storage): | ||||
for (object_type, objects) in all_objects.items(): | for (object_type, objects) in all_objects.items(): | ||||
vlorentz: logger.debug; the CLI already logs enough stuff as info. | |||||
logger.debug("Inserting %s %s objects", len(objects), object_type) | |||||
_insert_objects(object_type, objects, storage) | _insert_objects(object_type, objects, storage) | ||||
def _fix_revision_pypi_empty_string(rev): | def _fix_revision_pypi_empty_string(rev): | ||||
"""PyPI loader failed to encode empty strings as bytes, see: | """PyPI loader failed to encode empty strings as bytes, see: | ||||
swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 | swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 | ||||
or https://forge.softwareheritage.org/D1772 | or https://forge.softwareheritage.org/D1772 | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 211 Lines • ▼ Show 20 Lines | while left < right-1: | ||||
return True | return True | ||||
elif pivot < hash_: | elif pivot < hash_: | ||||
left = middle | left = middle | ||||
else: | else: | ||||
right = middle | right = middle | ||||
return get_hash(left) == hash_ | return get_hash(left) == hash_ | ||||
def copy_object(obj_id, src, dst): | @contextmanager | ||||
def retry(max_retries): | |||||
lasterror = None | |||||
for i in range(max_retries): | |||||
try: | |||||
yield | |||||
break | |||||
except Exception as exc: | |||||
lasterror = exc | |||||
else: | |||||
raise lasterror | |||||
def copy_object(obj_id, src, dst, max_retries=3): | |||||
statsd_name = 'swh_journal_content_replayer_%s_duration_seconds' | statsd_name = 'swh_journal_content_replayer_%s_duration_seconds' | ||||
try: | try: | ||||
with statsd.timed(statsd_name % 'get'): | with statsd.timed(statsd_name % 'get'): | ||||
with retry(max_retries): | |||||
obj = src.get(obj_id) | obj = src.get(obj_id) | ||||
logger.debug('retrieved %s', hash_to_hex(obj_id)) | |||||
with statsd.timed(statsd_name % 'put'): | with statsd.timed(statsd_name % 'put'): | ||||
with retry(max_retries): | |||||
dst.add(obj, obj_id=obj_id, check_presence=False) | dst.add(obj, obj_id=obj_id, check_presence=False) | ||||
logger.debug('copied %s', hash_to_hex(obj_id)) | logger.debug('copied %s', hash_to_hex(obj_id)) | ||||
statsd.increment( | statsd.increment( | ||||
'swh_journal_content_replayer_bytes_total', | 'swh_journal_content_replayer_bytes_total', | ||||
len(obj)) | len(obj)) | ||||
except Exception: | except Exception: | ||||
obj = '' | obj = '' | ||||
logger.error('Failed to copy %s', hash_to_hex(obj_id)) | logger.error('Failed to copy %s', hash_to_hex(obj_id)) | ||||
raise | raise | ||||
return len(obj) | return len(obj) | ||||
▲ Show 20 Lines • Show All 80 Lines • Show Last 20 Lines |
logger.debug; the CLI already logs enough stuff as info.