Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/replay.py
Show All 11 Lines | |||||
from swh.model.identifiers import normalize_timestamp | from swh.model.identifiers import normalize_timestamp | ||||
from swh.model.hashutil import hash_to_hex | from swh.model.hashutil import hash_to_hex | ||||
from swh.model.model import SHA1_SIZE | from swh.model.model import SHA1_SIZE | ||||
from swh.objstorage.objstorage import ID_HASH_ALGO | from swh.objstorage.objstorage import ID_HASH_ALGO | ||||
from swh.storage import HashCollision | from swh.storage import HashCollision | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" | |||||
GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" | |||||
CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" | |||||
CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" | |||||
CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds" | |||||
def process_replay_objects(all_objects, *, storage): | def process_replay_objects(all_objects, *, storage): | ||||
for (object_type, objects) in all_objects.items(): | for (object_type, objects) in all_objects.items(): | ||||
logger.debug("Inserting %s %s objects", len(objects), object_type) | logger.debug("Inserting %s %s objects", len(objects), object_type) | ||||
statsd_name = 'swh_journal_graph_replayer_%s_total_duration_seconds' | with statsd.timed(GRAPH_DURATION_METRIC, | ||||
with statsd.timed(statsd_name % 'object_type'): | tags={'object_type': object_type}): | ||||
_insert_objects(object_type, objects, storage) | _insert_objects(object_type, objects, storage) | ||||
statsd.increment( | statsd.increment(GRAPH_OPERATIONS_METRIC, len(objects), | ||||
'swh_journal_graph_replayer_%s_total' % object_type, | tags={'object_type': object_type}) | ||||
len(objects)) | |||||
def _fix_revision_pypi_empty_string(rev): | def _fix_revision_pypi_empty_string(rev): | ||||
"""PyPI loader failed to encode empty strings as bytes, see: | """PyPI loader failed to encode empty strings as bytes, see: | ||||
swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 | swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 | ||||
or https://forge.softwareheritage.org/D1772 | or https://forge.softwareheritage.org/D1772 | ||||
""" | """ | ||||
rev = { | rev = { | ||||
▲ Show 20 Lines • Show All 259 Lines • ▼ Show 20 Lines | for i in range(max_retries): | ||||
break | break | ||||
except Exception as exc: | except Exception as exc: | ||||
lasterror = exc | lasterror = exc | ||||
else: | else: | ||||
raise lasterror | raise lasterror | ||||
def copy_object(obj_id, src, dst, max_retries=3): | def copy_object(obj_id, src, dst, max_retries=3): | ||||
statsd_name = 'swh_journal_content_replayer_%s_duration_seconds' | |||||
try: | try: | ||||
with statsd.timed(statsd_name % 'get'): | with statsd.timed(CONTENT_DURATION_METRIC, tags={'request': 'get'}): | ||||
with retry(max_retries): | with retry(max_retries): | ||||
obj = src.get(obj_id) | obj = src.get(obj_id) | ||||
logger.debug('retrieved %s', hash_to_hex(obj_id)) | logger.debug('retrieved %s', hash_to_hex(obj_id)) | ||||
with statsd.timed(statsd_name % 'put'): | with statsd.timed(CONTENT_DURATION_METRIC, tags={'request': 'put'}): | ||||
with retry(max_retries): | with retry(max_retries): | ||||
dst.add(obj, obj_id=obj_id, check_presence=False) | dst.add(obj, obj_id=obj_id, check_presence=False) | ||||
logger.debug('copied %s', hash_to_hex(obj_id)) | logger.debug('copied %s', hash_to_hex(obj_id)) | ||||
statsd.increment( | statsd.increment(CONTENT_OPERATIONS_METRIC) | ||||
'swh_journal_content_replayer_bytes_total', | statsd.increment(CONTENT_BYTES_METRIC, len(obj)) | ||||
len(obj)) | |||||
except Exception: | except Exception: | ||||
obj = '' | obj = '' | ||||
logger.error('Failed to copy %s', hash_to_hex(obj_id)) | logger.error('Failed to copy %s', hash_to_hex(obj_id)) | ||||
raise | raise | ||||
return len(obj) | return len(obj) | ||||
def process_replay_objects_content(all_objects, *, src, dst, | def process_replay_objects_content(all_objects, *, src, dst, | ||||
▲ Show 20 Lines • Show All 77 Lines • Show Last 20 Lines |