Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/cli.py
# Copyright (C) 2016-2019 The Software Heritage developers | # Copyright (C) 2016-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import functools | |||||
import logging | import logging | ||||
import mmap | |||||
import os | |||||
import warnings | |||||
import click | import click | ||||
try: | |||||
from systemd.daemon import notify | |||||
except ImportError: | |||||
notify = None | |||||
from swh.core import config | |||||
from swh.core.cli import CONTEXT_SETTINGS | from swh.core.cli import CONTEXT_SETTINGS | ||||
from swh.model.model import SHA1_SIZE | |||||
from swh.objstorage import get_objstorage | |||||
from swh.journal.client import get_journal_client as get_client | |||||
from swh.journal.replay import is_hash_in_bytearray | |||||
from swh.journal.replay import process_replay_objects_content | |||||
@click.group(name="journal", context_settings=CONTEXT_SETTINGS) | @click.group(name="journal", context_settings=CONTEXT_SETTINGS) | ||||
@click.option( | @click.option( | ||||
"--config-file", | "--config-file", | ||||
"-C", | "-C", | ||||
default=None, | default=None, | ||||
type=click.Path(exists=True, dir_okay=False,), | type=click.Path(exists=True, dir_okay=False,), | ||||
help="Configuration file.", | help="Configuration file.", | ||||
) | ) | ||||
@click.pass_context | @click.pass_context | ||||
def cli(ctx, config_file): | def cli(ctx, config_file): | ||||
"""Software Heritage Journal tools. | """DEPRECATED Software Heritage Journal tools. | ||||
The journal is a persistent logger of changes to the archive, with | |||||
publish-subscribe support. | |||||
""" | """ | ||||
if not config_file: | pass | ||||
config_file = os.environ.get("SWH_CONFIG_FILENAME") | |||||
if config_file: | |||||
if not os.path.exists(config_file): | |||||
raise ValueError("%s does not exist" % config_file) | |||||
conf = config.read(config_file) | |||||
else: | |||||
conf = {} | |||||
ctx.ensure_object(dict) | |||||
ctx.obj["config"] = conf | |||||
def get_journal_client(ctx, **kwargs): | |||||
conf = ctx.obj["config"].copy() | |||||
if "journal" in conf: | |||||
warnings.warn( | |||||
"Journal client configuration should now be under the " | |||||
"`journal_client` field and have a `cls` argument.", | |||||
DeprecationWarning, | |||||
) | |||||
conf["journal_client"] = {"cls": "kafka", **conf.pop("journal")} | |||||
client_conf = conf.get("journal_client").copy() | |||||
client_conf.update(kwargs) | |||||
try: | |||||
return get_client(**client_conf) | |||||
except ValueError as exc: | |||||
ctx.fail(exc) | |||||
@cli.command() | @cli.command() | ||||
@click.option( | @click.option( | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
"-n", | "-n", | ||||
default=None, | default=None, | ||||
type=int, | type=int, | ||||
Show All 39 Lines | |||||
) | ) | ||||
@click.option( | @click.option( | ||||
"--check-dst/--no-check-dst", | "--check-dst/--no-check-dst", | ||||
default=True, | default=True, | ||||
help="Check whether the destination contains the object before " "copying.", | help="Check whether the destination contains the object before " "copying.", | ||||
) | ) | ||||
@click.pass_context | @click.pass_context | ||||
def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst): | def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst): | ||||
"""Fill a destination Object Storage (typically a mirror) by reading a Journal | """DEPRECATED: use `swh objstorage replay` instead. | ||||
and retrieving objects from an existing source ObjStorage. | |||||
There can be several 'replayers' filling a given ObjStorage as long as they | |||||
use the same `group-id`. You can use the `KAFKA_GROUP_INSTANCE_ID` | |||||
environment variable to use KIP-345 static group membership. | |||||
This service retrieves object ids to copy from the 'content' topic. It will | |||||
only copy object's content if the object's description in the kafka | |||||
nmessage has the status:visible set. | |||||
`--exclude-sha1-file` may be used to exclude some hashes to speed-up the | |||||
replay in case many of the contents are already in the destination | |||||
objstorage. It must contain a concatenation of all (sha1) hashes, | |||||
and it must be sorted. | |||||
This file will not be fully loaded into memory at any given time, | |||||
so it can be arbitrarily large. | |||||
`--check-dst` sets whether the replayer should check in the destination | |||||
ObjStorage before copying an object. You can turn that off if you know | |||||
you're copying to an empty ObjStorage. | |||||
""" | |||||
conf = ctx.obj["config"] | |||||
try: | |||||
objstorage_src = get_objstorage(**conf.pop("objstorage_src")) | |||||
except KeyError: | |||||
ctx.fail("You must have a source objstorage configured in " "your config file.") | |||||
try: | |||||
objstorage_dst = get_objstorage(**conf.pop("objstorage_dst")) | |||||
except KeyError: | |||||
ctx.fail( | |||||
"You must have a destination objstorage configured " "in your config file." | |||||
) | |||||
if exclude_sha1_file: | |||||
map_ = mmap.mmap(exclude_sha1_file.fileno(), 0, prot=mmap.PROT_READ) | |||||
if map_.size() % SHA1_SIZE != 0: | |||||
ctx.fail( | |||||
"--exclude-sha1 must link to a file whose size is an " | |||||
"exact multiple of %d bytes." % SHA1_SIZE | |||||
) | |||||
nb_excluded_hashes = int(map_.size() / SHA1_SIZE) | |||||
def exclude_fn(obj): | |||||
return is_hash_in_bytearray(obj["sha1"], map_, nb_excluded_hashes) | |||||
else: | |||||
exclude_fn = None | |||||
client = get_journal_client( | This needs the swh.objstorage.replayer package.""" | ||||
ctx, stop_after_objects=stop_after_objects, object_types=("content",) | ctx.fail("DEPRECATED") | ||||
) | |||||
worker_fn = functools.partial( | |||||
process_replay_objects_content, | |||||
src=objstorage_src, | |||||
dst=objstorage_dst, | |||||
exclude_fn=exclude_fn, | |||||
check_dst=check_dst, | |||||
) | |||||
if notify: | |||||
notify("READY=1") | |||||
try: | |||||
client.process(worker_fn) | |||||
except KeyboardInterrupt: | |||||
ctx.exit(0) | |||||
else: | |||||
print("Done.") | |||||
finally: | |||||
if notify: | |||||
notify("STOPPING=1") | |||||
client.close() | |||||
def main(): | def main(): | ||||
logging.basicConfig() | logging.basicConfig() | ||||
return cli(auto_envvar_prefix="SWH_JOURNAL") | return cli(auto_envvar_prefix="SWH_JOURNAL") | ||||
if __name__ == "__main__": | if __name__ == "__main__": | ||||
main() | main() |