diff --git a/setup.py b/setup.py index 21817cc..df08cbc 100755 --- a/setup.py +++ b/setup.py @@ -1,73 +1,74 @@ #!/usr/bin/env python3 # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from io import open from os import path from setuptools import find_packages, setup here = path.abspath(path.dirname(__file__)) # Get the long description from the README file with open(path.join(here, "README.md"), encoding="utf-8") as f: long_description = f.read() def parse_requirements(name=None): if name: reqf = "requirements-%s.txt" % name else: reqf = "requirements.txt" requirements = [] if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith("#"): continue requirements.append(line) return requirements setup( name="swh.scheduler", description="Software Heritage Scheduler", long_description=long_description, long_description_content_type="text/markdown", python_requires=">=3.7", author="Software Heritage developers", author_email="swh-devel@inria.fr", url="https://forge.softwareheritage.org/diffusion/DSCH/", packages=find_packages(), setup_requires=["setuptools-scm"], use_scm_version=True, install_requires=parse_requirements() + parse_requirements("swh"), extras_require={ "testing": parse_requirements("test") + parse_requirements("journal"), "journal": parse_requirements("journal"), }, include_package_data=True, entry_points=""" [swh.cli.subcommands] scheduler=swh.scheduler.cli + scheduler-journal=swh.scheduler.cli.journal """, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 5 - Production/Stable", ], project_urls={ "Bug Reports": "https://forge.softwareheritage.org/maniphest", "Funding": "https://www.softwareheritage.org/donate", "Source": "https://forge.softwareheritage.org/source/swh-scheduler", "Documentation": "https://docs.softwareheritage.org/devel/swh-scheduler/", }, ) diff --git a/swh/scheduler/cli/journal.py b/swh/scheduler/cli/journal.py index f3c562c..9551164 100644 --- a/swh/scheduler/cli/journal.py +++ b/swh/scheduler/cli/journal.py @@ -1,57 +1,59 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click -from . import cli +from . import cli as cli_scheduler_group -@cli.command("journal-client") +@cli_scheduler_group.command("journal-client") @click.pass_context @click.option( "--stop-after-objects", "-m", default=None, type=int, help="Maximum number of objects to replay. Default is to run forever.", ) def visit_stats_journal_client(ctx, stop_after_objects): + """Keep the the origin visits stats table up to date from a swh kafka journal + """ from functools import partial from swh.journal.client import get_journal_client from swh.scheduler.journal_client import process_journal_objects if not ctx.obj["scheduler"]: raise ValueError("Scheduler class (local/remote) must be instantiated") scheduler = ctx.obj["scheduler"] config = ctx.obj["config"] if "journal" not in config: raise ValueError("Missing 'journal' configuration key") journal_cfg = config["journal"] journal_cfg["stop_after_objects"] = stop_after_objects or journal_cfg.get( "stop_after_objects" ) client = get_journal_client( cls="kafka", object_types=["origin_visit_status"], prefix="swh.journal.objects", **journal_cfg, ) worker_fn = partial(process_journal_objects, scheduler=scheduler,) nb_messages = 0 try: nb_messages = client.process(worker_fn) print(f"Processed {nb_messages} message(s).") except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: client.close()