diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -19,3 +19,6 @@ [mypy-pytest_kafka.*] ignore_missing_imports = True + +[mypy-systemd.daemon.*] +ignore_missing_imports = True diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -11,6 +11,11 @@ import click +try: + from systemd.daemon import notify +except ImportError: + notify = None + from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.model.model import SHA1_SIZE @@ -93,11 +98,16 @@ max_messages=max_messages) worker_fn = functools.partial(process_replay_objects, storage=storage) + if notify: + notify('READY=1') + try: nb_messages = 0 last_log_time = 0 while not max_messages or nb_messages < max_messages: nb_messages += client.process(worker_fn) + if notify: + notify('WATCHDOG=1') if time.time() - last_log_time >= 60: # Log at most once per minute. logger.info('Processed %d messages.' % nb_messages) @@ -107,6 +117,8 @@ else: print('Done.') finally: + if notify: + notify('STOPPING=1') client.close() @@ -135,12 +147,18 @@ """ conf = ctx.obj['config'] backfiller = JournalBackfiller(conf) + + if notify: + notify('READY=1') + try: backfiller.run( object_type=object_type, start_object=start_object, end_object=end_object, dry_run=dry_run) except KeyboardInterrupt: + if notify: + notify('STOPPING=1') ctx.exit(0) @@ -212,11 +230,16 @@ dst=objstorage_dst, exclude_fn=exclude_fn) + if notify: + notify('READY=1') + try: nb_messages = 0 last_log_time = 0 while not max_messages or nb_messages < max_messages: nb_messages += client.process(worker_fn) + if notify: + notify('WATCHDOG=1') if time.time() - last_log_time >= 60: # Log at most once per minute. logger.info('Processed %d messages.' % nb_messages) @@ -225,6 +248,10 @@ ctx.exit(0) else: print('Done.') + finally: + if notify: + notify('STOPPING=1') + client.close() def main(): diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -8,6 +8,11 @@ import logging from contextlib import contextmanager +try: + from systemd.daemon import notify +except ImportError: + notify = None + from swh.core.statsd import statsd from swh.model.identifiers import normalize_timestamp from swh.model.hashutil import hash_to_hex @@ -32,6 +37,8 @@ _insert_objects(object_type, objects, storage) statsd.increment(GRAPH_OPERATIONS_METRIC, len(objects), tags={'object_type': object_type}) + if notify: + notify('WATCHDOG=1') def _fix_revision_pypi_empty_string(rev): @@ -404,3 +411,6 @@ sum(vol)/1024/1024/dt, len([x for x in vol if not x]), nb_skipped) + + if notify: + notify('WATCHDOG=1')