Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/replay.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import copy | import copy | ||||
from time import time | from time import time | ||||
import logging | import logging | ||||
from contextlib import contextmanager | from contextlib import contextmanager | ||||
try: | |||||
from systemd.daemon import notify | |||||
except ImportError: | |||||
notify = None | |||||
from swh.core.statsd import statsd | from swh.core.statsd import statsd | ||||
from swh.model.identifiers import normalize_timestamp | from swh.model.identifiers import normalize_timestamp | ||||
from swh.model.hashutil import hash_to_hex | from swh.model.hashutil import hash_to_hex | ||||
from swh.model.model import SHA1_SIZE | from swh.model.model import SHA1_SIZE | ||||
from swh.objstorage.objstorage import ID_HASH_ALGO | from swh.objstorage.objstorage import ID_HASH_ALGO | ||||
from swh.storage import HashCollision | from swh.storage import HashCollision | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" | GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" | ||||
GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" | GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" | ||||
CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" | CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" | ||||
CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" | CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" | ||||
CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds" | CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds" | ||||
def process_replay_objects(all_objects, *, storage): | def process_replay_objects(all_objects, *, storage): | ||||
for (object_type, objects) in all_objects.items(): | for (object_type, objects) in all_objects.items(): | ||||
logger.debug("Inserting %s %s objects", len(objects), object_type) | logger.debug("Inserting %s %s objects", len(objects), object_type) | ||||
with statsd.timed(GRAPH_DURATION_METRIC, | with statsd.timed(GRAPH_DURATION_METRIC, | ||||
tags={'object_type': object_type}): | tags={'object_type': object_type}): | ||||
_insert_objects(object_type, objects, storage) | _insert_objects(object_type, objects, storage) | ||||
statsd.increment(GRAPH_OPERATIONS_METRIC, len(objects), | statsd.increment(GRAPH_OPERATIONS_METRIC, len(objects), | ||||
tags={'object_type': object_type}) | tags={'object_type': object_type}) | ||||
if notify: | |||||
notify('WATCHDOG=1') | |||||
def _fix_revision_pypi_empty_string(rev): | def _fix_revision_pypi_empty_string(rev): | ||||
"""PyPI loader failed to encode empty strings as bytes, see: | """PyPI loader failed to encode empty strings as bytes, see: | ||||
swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 | swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 | ||||
or https://forge.softwareheritage.org/D1772 | or https://forge.softwareheritage.org/D1772 | ||||
""" | """ | ||||
rev = { | rev = { | ||||
▲ Show 20 Lines • Show All 356 Lines • ▼ Show 20 Lines | def process_replay_objects_content(all_objects, *, src, dst, | ||||
logger.info( | logger.info( | ||||
'processed %s content objects in %.1fsec ' | 'processed %s content objects in %.1fsec ' | ||||
'(%.1f obj/sec, %.1fMB/sec) - %d failures - %d skipped', | '(%.1f obj/sec, %.1fMB/sec) - %d failures - %d skipped', | ||||
len(vol), dt, | len(vol), dt, | ||||
len(vol)/dt, | len(vol)/dt, | ||||
sum(vol)/1024/1024/dt, | sum(vol)/1024/1024/dt, | ||||
len([x for x in vol if not x]), | len([x for x in vol if not x]), | ||||
nb_skipped) | nb_skipped) | ||||
if notify: | |||||
notify('WATCHDOG=1') |