diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index 6b46269..b9ff6bb 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,688 +1,795 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control from datetime import datetime, timezone from functools import partial import os from typing import Any, Dict, Generator, Optional, Tuple import click from deprecated import deprecated import iso8601 import yaml try: from systemd.daemon import notify except ImportError: notify = None from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.model import Sha1Git # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) DEFAULT_CONFIG: Dict[str, Any] = { "provenance": { "archive": { # Storage API based Archive object # "cls": "api", # "storage": { # "cls": "remote", # "url": "http://uffizi.internal.softwareheritage.org:5002", # } # Direct access Archive object "cls": "direct", "db": { "host": "belvedere.internal.softwareheritage.org", "port": 5432, "dbname": "softwareheritage", "user": "guest", }, }, "storage": { # Local PostgreSQL Storage # "cls": "postgresql", # "db": { # "host": "localhost", # "user": "postgres", # "password": "postgres", # "dbname": "provenance", # }, # Remote RabbitMQ/PostgreSQL Storage "cls": "rabbitmq", "url": "amqp://localhost:5672/%2f", "storage_config": { "cls": "postgresql", "db": { "host": "localhost", "user": "postgres", "password": "postgres", "dbname": "provenance", }, }, "batch_size": 100, "prefetch_count": 100, }, } } CONFIG_FILE_HELP = f""" \b Configuration can be loaded from a yaml file given either as --config-file option or the {CONFIG_ENVVAR} environment variable. If no configuration file is specified, use the following default configuration:: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage provenance index database tools {CONFIG_FILE_HELP} """ @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=True, dir_okay=False, path_type=str), help="""YAML configuration file.""", ) @click.option( "-P", "--profile", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""Enable profiling to specified file.""", ) @click.pass_context def cli(ctx: click.core.Context, config_file: Optional[str], profile: str) -> None: if ( config_file is None and DEFAULT_PATH is not None and config.config_exists(DEFAULT_PATH) ): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not os.path.exists(config_file): raise FileNotFoundError(config_file) conf = yaml.safe_load(open(config_file, "rb")) ctx.ensure_object(dict) ctx.obj["config"] = conf if profile: import atexit import cProfile print("Profiling...") pr = cProfile.Profile() pr.enable() def exit() -> None: pr.disable() pr.dump_stats(profile) atexit.register(exit) +@cli.command(name="replay") +@click.option( + "--stop-after-objects", + "-n", + default=None, + type=int, + help="Stop after processing this many objects. Default is to " "run forever.", +) +@click.option( + "--type", + "-t", + "object_types", + default=[], + type=click.Choice( + [ + "content", + "directory", + "revision", + "location", + "content_in_revision", + "content_in_directory", + "directory_in_revision", + ] + ), + help="Object types to replay", + multiple=True, +) +@click.pass_context +def replay(ctx: click.core.Context, stop_after_objects, object_types): + """Fill a ProvenanceStorage by reading a Journal. + + This is typically used to replicate a Provenance database, reading the + Software Heritage kafka journal to retrieve objects of the Software + Heritage provenance storage to feed a replicate provenance storage. There + can be several 'replayers' filling a ProvenanceStorage as long as they use + the same `group-id`. + + The expected configuration file should have one 'provenance' section with 2 + subsections: + + - storage: the configuration of the provenance storage in which to add + objects received from the kafka journal, + + - journal_client: the configuration of access to the kafka journal. See the + documentation of `swh.journal` for more details on the possible + configuration entries in this section. + + https://docs.softwareheritage.org/devel/apidoc/swh.journal.client.html + + eg.:: + + provenance: + storage: + cls: postgresql + db: + [...] + journal_client: + cls: kafka + prefix: swh.journal.provenance + brokers: [...] + [...] + """ + import functools + + from swh.journal.client import get_journal_client + from swh.provenance.storage import get_provenance_storage + from swh.provenance.storage.replay import ( + ProvenanceObjectDeserializer, + process_replay_objects, + ) + + conf = ctx.obj["config"]["provenance"] + storage = get_provenance_storage(**conf.pop("storage")) + + client_cfg = conf.pop("journal_client") + + deserializer = ProvenanceObjectDeserializer() + + client_cfg["value_deserializer"] = deserializer.convert + if object_types: + client_cfg["object_types"] = object_types + if stop_after_objects: + client_cfg["stop_after_objects"] = stop_after_objects + + try: + client = get_journal_client(**client_cfg) + except ValueError as exc: + ctx.fail(str(exc)) + + worker_fn = functools.partial(process_replay_objects, storage=storage) + + if notify: + notify("READY=1") + + try: + with storage: + n = client.process(worker_fn) + except KeyboardInterrupt: + ctx.exit(0) + else: + print(f"Done, processed {n} messages") + finally: + if notify: + notify("STOPPING=1") + client.close() + + @cli.group(name="origin") @click.pass_context def origin(ctx: click.core.Context): from . import get_provenance from .archive import get_archive archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) ctx.obj["provenance"] = provenance ctx.obj["archive"] = archive @origin.command(name="from-csv") @click.argument("filename", type=click.Path(exists=True)) @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (origins) to read from the input file.""", ) @click.pass_context def origin_from_csv(ctx: click.core.Context, filename: str, limit: Optional[int]): from swh.provenance.algos.origin import CSVOriginIterator, origin_add provenance = ctx.obj["provenance"] archive = ctx.obj["archive"] origins_provider = generate_origin_tuples(filename) origins = CSVOriginIterator(origins_provider, limit=limit) with provenance: for origin in origins: origin_add(provenance, archive, [origin]) @origin.command(name="from-journal") @click.pass_context def origin_from_journal(ctx: click.core.Context): from swh.journal.client import get_journal_client from .journal_client import process_journal_origins provenance = ctx.obj["provenance"] archive = ctx.obj["archive"] journal_cfg = ctx.obj["config"].get("journal_client", {}) worker_fn = partial( process_journal_origins, archive=archive, provenance=provenance, ) cls = journal_cfg.pop("cls", None) or "kafka" client = get_journal_client( cls, **{ **journal_cfg, "object_types": ["origin_visit_status"], }, ) if notify: notify("READY=1") try: with provenance: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: if notify: notify("STOPPING=1") client.close() @cli.group(name="revision") @click.pass_context def revision(ctx: click.core.Context): from . import get_provenance from .archive import get_archive archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) ctx.obj["provenance"] = provenance ctx.obj["archive"] = archive @revision.command(name="from-csv") @click.argument("filename", type=click.Path(exists=True)) @click.option( "-a", "--track-all", default=True, type=bool, help="""Index all occurrences of files in the development history.""", ) @click.option( "-f", "--flatten", default=True, type=bool, help="""Create flat models for directories in the isochrone frontier.""", ) @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (revisions) to read from the input file.""", ) @click.option( "-m", "--min-depth", default=1, type=int, help="""Set minimum depth (in the directory tree) at which an isochrone """ """frontier can be defined.""", ) @click.option( "-r", "--reuse", default=True, type=bool, help="""Prioritize the usage of previously defined isochrone frontiers """ """whenever possible.""", ) @click.option( "-s", "--min-size", default=0, type=int, help="""Set the minimum size (in bytes) of files to be indexed. """ """Any smaller file will be ignored.""", ) @click.option( "-d", "--max-directory-size", default=0, type=int, help="""Set the maximum recursive directory size of revisions to be indexed.""", ) @click.pass_context def revision_from_csv( ctx: click.core.Context, filename: str, track_all: bool, flatten: bool, limit: Optional[int], min_depth: int, reuse: bool, min_size: int, max_directory_size: int, ) -> None: from swh.provenance.algos.revision import CSVRevisionIterator, revision_add provenance = ctx.obj["provenance"] archive = ctx.obj["archive"] revisions_provider = generate_revision_tuples(filename) revisions = CSVRevisionIterator(revisions_provider, limit=limit) with provenance: for revision in revisions: revision_add( provenance, archive, [revision], trackall=track_all, flatten=flatten, lower=reuse, mindepth=min_depth, minsize=min_size, max_directory_size=max_directory_size, ) @revision.command(name="from-journal") @click.option( "-a", "--track-all", default=True, type=bool, help="""Index all occurrences of files in the development history.""", ) @click.option( "-f", "--flatten", default=True, type=bool, help="""Create flat models for directories in the isochrone frontier.""", ) @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (revisions) to read from the input file.""", ) @click.option( "-m", "--min-depth", default=1, type=int, help="""Set minimum depth (in the directory tree) at which an isochrone """ """frontier can be defined.""", ) @click.option( "-r", "--reuse", default=True, type=bool, help="""Prioritize the usage of previously defined isochrone frontiers """ """whenever possible.""", ) @click.option( "-s", "--min-size", default=0, type=int, help="""Set the minimum size (in bytes) of files to be indexed. """ """Any smaller file will be ignored.""", ) @click.option( "-d", "--max-directory-size", default=0, type=int, help="""Set the maximum recursive directory size of revisions to be indexed.""", ) @click.pass_context def revision_from_journal( ctx: click.core.Context, track_all: bool, flatten: bool, limit: Optional[int], min_depth: int, reuse: bool, min_size: int, max_directory_size: int, ) -> None: from swh.journal.client import get_journal_client from .journal_client import process_journal_revisions provenance = ctx.obj["provenance"] archive = ctx.obj["archive"] journal_cfg = ctx.obj["config"].get("journal_client", {}) worker_fn = partial( process_journal_revisions, archive=archive, provenance=provenance, minsize=min_size, max_directory_size=max_directory_size, ) cls = journal_cfg.pop("cls", None) or "kafka" client = get_journal_client( cls, **{ **journal_cfg, "object_types": ["revision"], }, ) if notify: notify("READY=1") try: with provenance: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: if notify: notify("STOPPING=1") client.close() @cli.group(name="directory") @click.pass_context def directory(ctx: click.core.Context): from . import get_provenance from .archive import get_archive archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) ctx.obj["provenance"] = provenance ctx.obj["archive"] = archive @directory.command(name="flatten") @click.option( "--range-from", type=str, help="start ID of the range of directories to flatten" ) @click.option( "--range-to", type=str, help="stop ID of the range of directories to flatten" ) @click.option( "-s", "--min-size", default=0, type=int, help="""Set the minimum size (in bytes) of files to be indexed. Any smaller file will be ignored.""", ) @click.pass_context def directory_flatten(ctx: click.core.Context, range_from, range_to, min_size): from swh.provenance.algos.directory import directory_flatten_range provenance = ctx.obj["provenance"] archive = ctx.obj["archive"] directory_flatten_range( provenance, archive, hash_to_bytes(range_from), hash_to_bytes(range_to), min_size, ) # old (deprecated) commands @cli.command(name="iter-frontiers") @click.argument("filename") @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (directories) to read from the input file.""", ) @click.option( "-s", "--min-size", default=0, type=int, help="""Set the minimum size (in bytes) of files to be indexed. """ """Any smaller file will be ignored.""", ) @click.pass_context def iter_frontiers( ctx: click.core.Context, filename: str, limit: Optional[int], min_size: int, ) -> None: """Process a provided list of directories in the isochrone frontier.""" from swh.provenance import get_provenance from swh.provenance.algos.directory import CSVDirectoryIterator, directory_add from swh.provenance.archive import get_archive archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) directories_provider = generate_directory_ids(filename) directories = CSVDirectoryIterator(directories_provider, limit=limit) with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for directory in directories: directory_add( provenance, archive, [directory], minsize=min_size, ) def generate_directory_ids( filename: str, ) -> Generator[Sha1Git, None, None]: for line in open(filename, "r"): if line.strip(): yield hash_to_bytes(line.strip()) @cli.command(name="iter-revisions") @click.argument("filename") @click.option( "-a", "--track-all", default=True, type=bool, help="""Index all occurrences of files in the development history.""", ) @click.option( "-f", "--flatten", default=True, type=bool, help="""Create flat models for directories in the isochrone frontier.""", ) @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (revisions) to read from the input file.""", ) @click.option( "-m", "--min-depth", default=1, type=int, help="""Set minimum depth (in the directory tree) at which an isochrone """ """frontier can be defined.""", ) @click.option( "-r", "--reuse", default=True, type=bool, help="""Prioritize the usage of previously defined isochrone frontiers """ """whenever possible.""", ) @click.option( "-s", "--min-size", default=0, type=int, help="""Set the minimum size (in bytes) of files to be indexed. """ """Any smaller file will be ignored.""", ) @click.pass_context def iter_revisions( ctx: click.core.Context, filename: str, track_all: bool, flatten: bool, limit: Optional[int], min_depth: int, reuse: bool, min_size: int, ) -> None: """Process a provided list of revisions.""" from swh.provenance import get_provenance from swh.provenance.algos.revision import CSVRevisionIterator, revision_add from swh.provenance.archive import get_archive archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) revisions_provider = generate_revision_tuples(filename) revisions = CSVRevisionIterator(revisions_provider, limit=limit) with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for revision in revisions: revision_add( provenance, archive, [revision], trackall=track_all, flatten=flatten, lower=reuse, mindepth=min_depth, minsize=min_size, ) def generate_revision_tuples( filename: str, ) -> Generator[Tuple[Sha1Git, datetime, Sha1Git], None, None]: for line in open(filename, "r"): if line.strip(): revision, date, root = line.strip().split(",") yield ( hash_to_bytes(revision), iso8601.parse_date(date, default_timezone=timezone.utc), hash_to_bytes(root), ) @cli.command(name="iter-origins") @click.argument("filename") @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (origins) to read from the input file.""", ) @click.pass_context @deprecated(version="0.0.1", reason="Use `swh provenance origin from-csv` instead") def iter_origins(ctx: click.core.Context, filename: str, limit: Optional[int]) -> None: """Process a provided list of origins.""" from swh.provenance import get_provenance from swh.provenance.algos.origin import CSVOriginIterator, origin_add from swh.provenance.archive import get_archive archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) origins_provider = generate_origin_tuples(filename) origins = CSVOriginIterator(origins_provider, limit=limit) with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for origin in origins: origin_add(provenance, archive, [origin]) def generate_origin_tuples(filename: str) -> Generator[Tuple[str, bytes], None, None]: for line in open(filename, "r"): if line.strip(): url, snapshot = line.strip().split(",") yield (url, hash_to_bytes(snapshot)) @cli.command(name="find-first") @click.argument("swhid") @click.pass_context def find_first(ctx: click.core.Context, swhid: str) -> None: """Find first occurrence of the requested blob.""" from . import get_provenance with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: occur = provenance.content_find_first(hash_to_bytes(swhid)) if occur is not None: print( f"swh:1:cnt:{hash_to_hex(occur.content)}, " f"swh:1:rev:{hash_to_hex(occur.revision)}, " f"{occur.date}, " f"{occur.origin}, " f"{os.fsdecode(occur.path)}" ) else: print(f"Cannot find a content with the id {swhid}") @cli.command(name="find-all") @click.argument("swhid") @click.option( "-l", "--limit", type=int, help="""Limit the amount results to be retrieved.""" ) @click.pass_context def find_all(ctx: click.core.Context, swhid: str, limit: Optional[int]) -> None: """Find all occurrences of the requested blob.""" from . import get_provenance with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for occur in provenance.content_find_all(hash_to_bytes(swhid), limit=limit): print( f"swh:1:cnt:{hash_to_hex(occur.content)}, " f"swh:1:rev:{hash_to_hex(occur.revision)}, " f"{occur.date}, " f"{occur.origin}, " f"{os.fsdecode(occur.path)}" ) diff --git a/swh/provenance/storage/interface.py b/swh/provenance/storage/interface.py index 023d9e5..ec1121d 100644 --- a/swh/provenance/storage/interface.py +++ b/swh/provenance/storage/interface.py @@ -1,225 +1,225 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from dataclasses import dataclass from datetime import datetime import enum from types import TracebackType from typing import Dict, Generator, Iterable, List, Optional, Set, Type from typing_extensions import Protocol, runtime_checkable from swh.core.api import remote_api_endpoint from swh.model.model import Sha1Git class EntityType(enum.Enum): CONTENT = "content" DIRECTORY = "directory" REVISION = "revision" ORIGIN = "origin" class RelationType(enum.Enum): CNT_EARLY_IN_REV = "content_in_revision" CNT_IN_DIR = "content_in_directory" DIR_IN_REV = "directory_in_revision" REV_IN_ORG = "revision_in_origin" REV_BEFORE_REV = "revision_before_revision" @dataclass(eq=True, frozen=True) class ProvenanceResult: content: Sha1Git revision: Sha1Git date: datetime origin: Optional[str] path: bytes @dataclass(eq=True, frozen=True) class DirectoryData: """Object representing the data associated to a directory in the provenance model, where `date` is the date of the directory in the isochrone frontier, and `flat` is a flag acknowledging that a flat model for the elements outside the frontier has already been created. """ - date: datetime + date: Optional[datetime] flat: bool @dataclass(eq=True, frozen=True) class RevisionData: """Object representing the data associated to a revision in the provenance model, where `date` is the optional date of the revision (specifying it acknowledges that the revision was already processed by the revision-content algorithm); and `origin` identifies the preferred origin for the revision, if any. """ date: Optional[datetime] origin: Optional[Sha1Git] @dataclass(eq=True, frozen=True) class RelationData: """Object representing a relation entry in the provenance model, where `src` and `dst` are the sha1 ids of the entities being related, and `path` is optional depending on the relation being represented. """ dst: Sha1Git path: Optional[bytes] @runtime_checkable class ProvenanceStorageInterface(Protocol): def __enter__(self) -> ProvenanceStorageInterface: ... def __exit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: ... @remote_api_endpoint("close") def close(self) -> None: """Close connection to the storage and release resources.""" ... @remote_api_endpoint("content_add") def content_add(self, cnts: Dict[Sha1Git, datetime]) -> bool: """Add blobs identified by sha1 ids, with an associated date (as paired in `cnts`) to the provenance storage. Return a boolean stating whether the information was successfully stored. """ ... @remote_api_endpoint("content_find_first") def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: """Retrieve the first occurrence of the blob identified by `id`.""" ... @remote_api_endpoint("content_find_all") def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: """Retrieve all the occurrences of the blob identified by `id`.""" ... @remote_api_endpoint("content_get") def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: """Retrieve the associated date for each blob sha1 in `ids`.""" ... @remote_api_endpoint("directory_add") def directory_add(self, dirs: Dict[Sha1Git, DirectoryData]) -> bool: """Add directories identified by sha1 ids, with associated date and (optional) flatten flag (as paired in `dirs`) to the provenance storage. If the flatten flag is set to None, the previous value present in the storage is preserved. Return a boolean stating if the information was successfully stored. """ ... @remote_api_endpoint("directory_get") def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, DirectoryData]: """Retrieve the associated date and (optional) flatten flag for each directory sha1 in `ids`. If some directories has no associated date, it is not present in the resulting dictionary. """ ... @remote_api_endpoint("directory_iter_not_flattened") def directory_iter_not_flattened( self, limit: int, start_id: Sha1Git ) -> List[Sha1Git]: """Retrieve the unflattened directories after ``start_id`` up to ``limit`` entries.""" ... @remote_api_endpoint("entity_get_all") def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: """Retrieve all sha1 ids for entities of type `entity` present in the provenance model. This method is used only in tests. """ ... @remote_api_endpoint("location_add") def location_add(self, paths: Dict[Sha1Git, bytes]) -> bool: """Register the given `paths` in the storage.""" ... @remote_api_endpoint("location_get_all") def location_get_all(self) -> Dict[Sha1Git, bytes]: """Retrieve all paths present in the provenance model. This method is used only in tests.""" ... @remote_api_endpoint("open") def open(self) -> None: """Open connection to the storage and allocate necessary resources.""" ... @remote_api_endpoint("origin_add") def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool: """Add origins identified by sha1 ids, with their corresponding url (as paired in `orgs`) to the provenance storage. Return a boolean stating if the information was successfully stored. """ ... @remote_api_endpoint("origin_get") def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: """Retrieve the associated url for each origin sha1 in `ids`.""" ... @remote_api_endpoint("revision_add") def revision_add(self, revs: Dict[Sha1Git, RevisionData]) -> bool: """Add revisions identified by sha1 ids, with optional associated date or origin (as paired in `revs`) to the provenance storage. Return a boolean stating if the information was successfully stored. """ ... @remote_api_endpoint("revision_get") def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: """Retrieve the associated date and origin for each revision sha1 in `ids`. If some revision has no associated date nor origin, it is not present in the resulting dictionary. """ ... @remote_api_endpoint("relation_add") def relation_add( self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]] ) -> bool: """Add entries in the selected `relation`. This method assumes all entities being related are already registered in the storage. See `content_add`, `directory_add`, `origin_add`, and `revision_add`. """ ... @remote_api_endpoint("relation_get") def relation_get( self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False ) -> Dict[Sha1Git, Set[RelationData]]: """Retrieve all entries in the selected `relation` whose source entities are identified by some sha1 id in `ids`. If `reverse` is set, destination entities are matched instead. """ ... @remote_api_endpoint("relation_get_all") def relation_get_all( self, relation: RelationType ) -> Dict[Sha1Git, Set[RelationData]]: """Retrieve all entries in the selected `relation` that are present in the provenance model. This method is used only in tests. """ ... diff --git a/swh/provenance/storage/rabbitmq/server.py b/swh/provenance/storage/rabbitmq/server.py index 2dc7052..f75e4d9 100644 --- a/swh/provenance/storage/rabbitmq/server.py +++ b/swh/provenance/storage/rabbitmq/server.py @@ -1,738 +1,740 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter from datetime import datetime from enum import Enum import functools import logging import multiprocessing import os import queue import threading from typing import Any, Callable from typing import Counter as TCounter from typing import Dict, Iterable, Iterator, List, Optional, Set, Tuple, Union, cast import pika import pika.channel import pika.connection import pika.exceptions from pika.exchange_type import ExchangeType import pika.frame import pika.spec from swh.core import config from swh.core.api.serializers import encode_data_client as encode_data from swh.core.api.serializers import msgpack_loads as decode_data from swh.model.hashutil import hash_to_hex from swh.model.model import Sha1Git from swh.provenance.storage.interface import ( DirectoryData, EntityType, RelationData, RelationType, RevisionData, ) from swh.provenance.util import path_id from .serializers import DECODERS, ENCODERS LOG_FORMAT = ( "%(levelname) -10s %(asctime)s %(name) -30s %(funcName) " "-35s %(lineno) -5d: %(message)s" ) LOGGER = logging.getLogger(__name__) TERMINATE = object() class ServerCommand(Enum): TERMINATE = "terminate" CONSUMING = "consuming" class TerminateSignal(BaseException): pass def resolve_dates(dates: Iterable[Tuple[Sha1Git, datetime]]) -> Dict[Sha1Git, datetime]: result: Dict[Sha1Git, datetime] = {} for sha1, date in dates: known = result.setdefault(sha1, date) if date < known: result[sha1] = date return result def resolve_directory( data: Iterable[Tuple[Sha1Git, DirectoryData]] ) -> Dict[Sha1Git, DirectoryData]: result: Dict[Sha1Git, DirectoryData] = {} for sha1, dir in data: known = result.setdefault(sha1, dir) value = known + assert dir.date is not None + assert known.date is not None if dir.date < known.date: value = DirectoryData(date=dir.date, flat=value.flat) if dir.flat: value = DirectoryData(date=value.date, flat=dir.flat) if value != known: result[sha1] = value return result def resolve_revision( data: Iterable[Union[Tuple[Sha1Git, RevisionData], Tuple[Sha1Git]]] ) -> Dict[Sha1Git, RevisionData]: result: Dict[Sha1Git, RevisionData] = {} for row in data: sha1 = row[0] rev = ( cast(Tuple[Sha1Git, RevisionData], row)[1] if len(row) > 1 else RevisionData(date=None, origin=None) ) known = result.setdefault(sha1, RevisionData(date=None, origin=None)) value = known if rev.date is not None and (known.date is None or rev.date < known.date): value = RevisionData(date=rev.date, origin=value.origin) if rev.origin is not None: value = RevisionData(date=value.date, origin=rev.origin) if value != known: result[sha1] = value return result def resolve_relation( data: Iterable[Tuple[Sha1Git, Sha1Git, bytes]] ) -> Dict[Sha1Git, Set[RelationData]]: result: Dict[Sha1Git, Set[RelationData]] = {} for src, dst, path in data: result.setdefault(src, set()).add(RelationData(dst=dst, path=path)) return result class ProvenanceStorageRabbitMQWorker(multiprocessing.Process): EXCHANGE_TYPE = ExchangeType.direct extra_type_decoders = DECODERS extra_type_encoders = ENCODERS def __init__( self, url: str, exchange: str, range: int, storage_config: Dict[str, Any], batch_size: int = 100, prefetch_count: int = 100, ) -> None: """Setup the worker object, passing in the URL we will use to connect to RabbitMQ, the exchange to use, the range id on which to operate, and the connection information for the underlying local storage object. :param str url: The URL for connecting to RabbitMQ :param str exchange: The name of the RabbitMq exchange to use :param str range: The ID range to operate on :param dict storage_config: Configuration parameters for the underlying ``ProvenanceStorage`` object expected by ``swh.provenance.get_provenance_storage`` :param int batch_size: Max amount of elements call to the underlying storage :param int prefetch_count: Prefetch value for the RabbitMQ connection when receiving messaged """ super().__init__(name=f"{exchange}_{range:x}") self._connection = None self._channel = None self._closing = False self._consumer_tag: Dict[str, str] = {} self._consuming: Dict[str, bool] = {} self._prefetch_count = prefetch_count self._url = url self._exchange = exchange self._binding_keys = list( ProvenanceStorageRabbitMQServer.get_binding_keys(self._exchange, range) ) self._queues: Dict[str, str] = {} self._storage_config = storage_config self._batch_size = batch_size self.command: multiprocessing.Queue = multiprocessing.Queue() self.signal: multiprocessing.Queue = multiprocessing.Queue() def connect(self) -> pika.SelectConnection: LOGGER.info("Connecting to %s", self._url) return pika.SelectConnection( parameters=pika.URLParameters(self._url), on_open_callback=self.on_connection_open, on_open_error_callback=self.on_connection_open_error, on_close_callback=self.on_connection_closed, ) def close_connection(self) -> None: assert self._connection is not None self._consuming = {binding_key: False for binding_key in self._binding_keys} if self._connection.is_closing or self._connection.is_closed: LOGGER.info("Connection is closing or already closed") else: LOGGER.info("Closing connection") self._connection.close() def on_connection_open(self, _unused_connection: pika.SelectConnection) -> None: LOGGER.info("Connection opened") self.open_channel() def on_connection_open_error( self, _unused_connection: pika.SelectConnection, err: Exception ) -> None: LOGGER.error("Connection open failed, reopening in 5 seconds: %s", err) assert self._connection is not None self._connection.ioloop.call_later(5, self._connection.ioloop.stop) def on_connection_closed(self, _unused_connection: pika.SelectConnection, reason): assert self._connection is not None self._channel = None if self._closing: self._connection.ioloop.stop() else: LOGGER.warning("Connection closed, reopening in 5 seconds: %s", reason) self._connection.ioloop.call_later(5, self._connection.ioloop.stop) def open_channel(self) -> None: LOGGER.info("Creating a new channel") assert self._connection is not None self._connection.channel(on_open_callback=self.on_channel_open) def on_channel_open(self, channel: pika.channel.Channel) -> None: LOGGER.info("Channel opened") self._channel = channel LOGGER.info("Adding channel close callback") assert self._channel is not None self._channel.add_on_close_callback(callback=self.on_channel_closed) self.setup_exchange() def on_channel_closed( self, channel: pika.channel.Channel, reason: Exception ) -> None: LOGGER.warning("Channel %i was closed: %s", channel, reason) self.close_connection() def setup_exchange(self) -> None: LOGGER.info("Declaring exchange %s", self._exchange) assert self._channel is not None self._channel.exchange_declare( exchange=self._exchange, exchange_type=self.EXCHANGE_TYPE, callback=self.on_exchange_declare_ok, ) def on_exchange_declare_ok(self, _unused_frame: pika.frame.Method) -> None: LOGGER.info("Exchange declared: %s", self._exchange) self.setup_queues() def setup_queues(self) -> None: for binding_key in self._binding_keys: LOGGER.info("Declaring queue %s", binding_key) assert self._channel is not None callback = functools.partial( self.on_queue_declare_ok, binding_key=binding_key, ) self._channel.queue_declare(queue=binding_key, callback=callback) def on_queue_declare_ok(self, frame: pika.frame.Method, binding_key: str) -> None: LOGGER.info( "Binding queue %s to exchange %s with routing key %s", frame.method.queue, self._exchange, binding_key, ) assert self._channel is not None callback = functools.partial(self.on_bind_ok, queue_name=frame.method.queue) self._queues[binding_key] = frame.method.queue self._channel.queue_bind( queue=frame.method.queue, exchange=self._exchange, routing_key=binding_key, callback=callback, ) def on_bind_ok(self, _unused_frame: pika.frame.Method, queue_name: str) -> None: LOGGER.info("Queue bound: %s", queue_name) assert self._channel is not None self._channel.basic_qos( prefetch_count=self._prefetch_count, callback=self.on_basic_qos_ok ) def on_basic_qos_ok(self, _unused_frame: pika.frame.Method) -> None: LOGGER.info("QOS set to: %d", self._prefetch_count) self.start_consuming() def start_consuming(self) -> None: LOGGER.info("Issuing consumer related RPC commands") LOGGER.info("Adding consumer cancellation callback") assert self._channel is not None self._channel.add_on_cancel_callback(callback=self.on_consumer_cancelled) for binding_key in self._binding_keys: self._consumer_tag[binding_key] = self._channel.basic_consume( queue=self._queues[binding_key], on_message_callback=self.on_request ) self._consuming[binding_key] = True self.signal.put(ServerCommand.CONSUMING) def on_consumer_cancelled(self, method_frame: pika.frame.Method) -> None: LOGGER.info("Consumer was cancelled remotely, shutting down: %r", method_frame) if self._channel: self._channel.close() def on_request( self, channel: pika.channel.Channel, deliver: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, body: bytes, ) -> None: LOGGER.info( "Received message # %s from %s: %s", deliver.delivery_tag, properties.app_id, body, ) # XXX: for some reason this function is returning lists instead of tuples # (the client send tuples) batch = decode_data(data=body, extra_decoders=self.extra_type_decoders)["data"] for item in batch: self._request_queues[deliver.routing_key].put( (tuple(item), (properties.correlation_id, properties.reply_to)) ) LOGGER.info("Acknowledging message %s", deliver.delivery_tag) channel.basic_ack(delivery_tag=deliver.delivery_tag) def stop_consuming(self) -> None: if self._channel: LOGGER.info("Sending a Basic.Cancel RPC command to RabbitMQ") for binding_key in self._binding_keys: callback = functools.partial(self.on_cancel_ok, binding_key=binding_key) self._channel.basic_cancel( self._consumer_tag[binding_key], callback=callback ) def on_cancel_ok(self, _unused_frame: pika.frame.Method, binding_key: str) -> None: self._consuming[binding_key] = False LOGGER.info( "RabbitMQ acknowledged the cancellation of the consumer: %s", self._consuming[binding_key], ) LOGGER.info("Closing the channel") assert self._channel is not None self._channel.close() def run(self) -> None: self._command_thread = threading.Thread(target=self.run_command_thread) self._command_thread.start() self._request_queues: Dict[str, queue.Queue] = {} self._request_threads: Dict[str, threading.Thread] = {} for binding_key in self._binding_keys: meth_name, relation = ProvenanceStorageRabbitMQServer.get_meth_name( binding_key ) self._request_queues[binding_key] = queue.Queue() self._request_threads[binding_key] = threading.Thread( target=self.run_request_thread, args=(binding_key, meth_name, relation), ) self._request_threads[binding_key].start() while not self._closing: try: self._connection = self.connect() assert self._connection is not None self._connection.ioloop.start() except KeyboardInterrupt: LOGGER.info("Connection closed by keyboard interruption, reopening") if self._connection is not None: self._connection.ioloop.stop() except TerminateSignal as ex: LOGGER.info("Termination requested: %s", ex) self.stop() if self._connection is not None and not self._connection.is_closed: # Finish closing self._connection.ioloop.start() except BaseException as ex: LOGGER.warning("Unexpected exception, terminating: %s", ex) self.stop() if self._connection is not None and not self._connection.is_closed: # Finish closing self._connection.ioloop.start() for binding_key in self._binding_keys: self._request_queues[binding_key].put(TERMINATE) for binding_key in self._binding_keys: self._request_threads[binding_key].join() self._command_thread.join() LOGGER.info("Stopped") def run_command_thread(self) -> None: while True: try: command = self.command.get() if command == ServerCommand.TERMINATE: self.request_termination() break except queue.Empty: pass except BaseException as ex: self.request_termination(str(ex)) break def request_termination(self, reason: str = "Normal shutdown") -> None: assert self._connection is not None def termination_callback(): raise TerminateSignal(reason) self._connection.ioloop.add_callback_threadsafe(termination_callback) def run_request_thread( self, binding_key: str, meth_name: str, relation: Optional[RelationType] ) -> None: from swh.provenance import get_provenance_storage with get_provenance_storage(**self._storage_config) as storage: request_queue = self._request_queues[binding_key] merge_items = ProvenanceStorageRabbitMQWorker.get_conflicts_func(meth_name) while True: terminate = False elements = [] while True: try: # TODO: consider reducing this timeout or removing it elem = request_queue.get(timeout=0.1) if elem is TERMINATE: terminate = True break elements.append(elem) except queue.Empty: break if len(elements) >= self._batch_size: break if terminate: break if not elements: continue try: items, props = zip(*elements) acks_count: TCounter[Tuple[str, str]] = Counter(props) data = merge_items(items) args = (relation, data) if relation is not None else (data,) if getattr(storage, meth_name)(*args): for (correlation_id, reply_to), count in acks_count.items(): # FIXME: this is running in a different thread! Hence, if # self._connection drops, there is no guarantee that the # response can be sent for the current elements. This # situation should be handled properly. assert self._connection is not None self._connection.ioloop.add_callback_threadsafe( functools.partial( ProvenanceStorageRabbitMQWorker.respond, channel=self._channel, correlation_id=correlation_id, reply_to=reply_to, response=count, ) ) else: LOGGER.warning( "Unable to process elements for queue %s", binding_key ) for elem in elements: request_queue.put(elem) except BaseException as ex: self.request_termination(str(ex)) break def stop(self) -> None: assert self._connection is not None if not self._closing: self._closing = True LOGGER.info("Stopping") if any(self._consuming): self.stop_consuming() self._connection.ioloop.start() else: self._connection.ioloop.stop() LOGGER.info("Stopped") @staticmethod def get_conflicts_func(meth_name: str) -> Callable[[Iterable[Any]], Any]: if meth_name == "content_add": return resolve_dates elif meth_name == "directory_add": return resolve_directory elif meth_name == "location_add": return lambda data: dict(data) elif meth_name == "origin_add": return lambda data: dict(data) # last processed value is good enough elif meth_name == "revision_add": return resolve_revision elif meth_name == "relation_add": return resolve_relation else: LOGGER.warning( "Unexpected conflict resolution function request for method %s", meth_name, ) return lambda x: x @staticmethod def respond( channel: pika.channel.Channel, correlation_id: str, reply_to: str, response: Any, ): channel.basic_publish( exchange="", routing_key=reply_to, properties=pika.BasicProperties( content_type="application/msgpack", correlation_id=correlation_id, ), body=encode_data( response, extra_encoders=ProvenanceStorageRabbitMQServer.extra_type_encoders, ), ) class ProvenanceStorageRabbitMQServer: extra_type_decoders = DECODERS extra_type_encoders = ENCODERS queue_count = 16 def __init__( self, url: str, storage_config: Dict[str, Any], batch_size: int = 100, prefetch_count: int = 100, ) -> None: """Setup the server object, passing in the URL we will use to connect to RabbitMQ, and the connection information for the underlying local storage object. :param str url: The URL for connecting to RabbitMQ :param dict storage_config: Configuration parameters for the underlying ``ProvenanceStorage`` object expected by ``swh.provenance.get_provenance_storage`` :param int batch_size: Max amount of elements call to the underlying storage :param int prefetch_count: Prefetch value for the RabbitMQ connection when receiving messaged """ self._workers: List[ProvenanceStorageRabbitMQWorker] = [] for exchange in ProvenanceStorageRabbitMQServer.get_exchanges(): for range in ProvenanceStorageRabbitMQServer.get_ranges(exchange): worker = ProvenanceStorageRabbitMQWorker( url=url, exchange=exchange, range=range, storage_config=storage_config, batch_size=batch_size, prefetch_count=prefetch_count, ) self._workers.append(worker) self._running = False def start(self) -> None: if not self._running: self._running = True for worker in self._workers: worker.start() for worker in self._workers: try: signal = worker.signal.get(timeout=60) assert signal == ServerCommand.CONSUMING except queue.Empty: LOGGER.error( "Could not initialize worker %s. Leaving...", worker.name ) self.stop() return LOGGER.info("Start serving") def stop(self) -> None: if self._running: for worker in self._workers: worker.command.put(ServerCommand.TERMINATE) for worker in self._workers: worker.join() LOGGER.info("Stop serving") self._running = False @staticmethod def get_binding_keys(exchange: str, range: int) -> Iterator[str]: for meth_name, relation in ProvenanceStorageRabbitMQServer.get_meth_names( exchange ): if relation is None: assert ( meth_name != "relation_add" ), "'relation_add' requires 'relation' to be provided" yield f"{meth_name}.unknown.{range:x}".lower() else: assert ( meth_name == "relation_add" ), f"'{meth_name}' requires 'relation' to be None" yield f"{meth_name}.{relation.value}.{range:x}".lower() @staticmethod def get_exchange(meth_name: str, relation: Optional[RelationType] = None) -> str: if meth_name == "relation_add": assert ( relation is not None ), "'relation_add' requires 'relation' to be provided" split = relation.value else: assert relation is None, f"'{meth_name}' requires 'relation' to be None" split = meth_name exchange, *_ = split.split("_") return exchange @staticmethod def get_exchanges() -> Iterator[str]: yield from [entity.value for entity in EntityType] + ["location"] @staticmethod def get_meth_name( binding_key: str, ) -> Tuple[str, Optional[RelationType]]: meth_name, relation, *_ = binding_key.split(".") return meth_name, (RelationType(relation) if relation != "unknown" else None) @staticmethod def get_meth_names( exchange: str, ) -> Iterator[Tuple[str, Optional[RelationType]]]: if exchange == EntityType.CONTENT.value: yield from [ ("content_add", None), ("relation_add", RelationType.CNT_EARLY_IN_REV), ("relation_add", RelationType.CNT_IN_DIR), ] elif exchange == EntityType.DIRECTORY.value: yield from [ ("directory_add", None), ("relation_add", RelationType.DIR_IN_REV), ] elif exchange == EntityType.ORIGIN.value: yield from [("origin_add", None)] elif exchange == EntityType.REVISION.value: yield from [ ("revision_add", None), ("relation_add", RelationType.REV_BEFORE_REV), ("relation_add", RelationType.REV_IN_ORG), ] elif exchange == "location": yield "location_add", None @staticmethod def get_ranges(unused_exchange: str) -> Iterator[int]: # XXX: we might want to have a different range per exchange yield from range(ProvenanceStorageRabbitMQServer.queue_count) @staticmethod def get_routing_key( item: bytes, meth_name: str, relation: Optional[RelationType] = None ) -> str: hashid = ( path_id(item).hex() if meth_name.startswith("location") else hash_to_hex(item) ) idx = int(hashid[0], 16) % ProvenanceStorageRabbitMQServer.queue_count if relation is None: assert ( meth_name != "relation_add" ), "'relation_add' requires 'relation' to be provided" return f"{meth_name}.unknown.{idx:x}".lower() else: assert ( meth_name == "relation_add" ), f"'{meth_name}' requires 'relation' to be None" return f"{meth_name}.{relation.value}.{idx:x}".lower() @staticmethod def is_write_method(meth_name: str) -> bool: return "_add" in meth_name def load_and_check_config( config_path: Optional[str], type: str = "local" ) -> Dict[str, Any]: """Check the minimal configuration is set to run the api or raise an error explanation. Args: config_path (str): Path to the configuration file to load type (str): configuration type. For 'local' type, more checks are done. Raises: Error if the setup is not as expected Returns: configuration as a dict """ if config_path is None: raise EnvironmentError("Configuration file must be defined") if not os.path.exists(config_path): raise FileNotFoundError(f"Configuration file {config_path} does not exist") cfg = config.read(config_path) pcfg: Optional[Dict[str, Any]] = cfg.get("provenance") if pcfg is None: raise KeyError("Missing 'provenance' configuration") rcfg: Optional[Dict[str, Any]] = pcfg.get("rabbitmq") if rcfg is None: raise KeyError("Missing 'provenance.rabbitmq' configuration") scfg: Optional[Dict[str, Any]] = rcfg.get("storage_config") if scfg is None: raise KeyError("Missing 'provenance.rabbitmq.storage_config' configuration") if type == "local": cls = scfg.get("cls") if cls != "postgresql": raise ValueError( "The provenance backend can only be started with a 'postgresql' " "configuration" ) db = scfg.get("db") if not db: raise KeyError("Invalid configuration; missing 'db' config entry") return cfg def make_server_from_configfile() -> ProvenanceStorageRabbitMQServer: config_path = os.environ.get("SWH_CONFIG_FILENAME") server_cfg = load_and_check_config(config_path) return ProvenanceStorageRabbitMQServer(**server_cfg["provenance"]["rabbitmq"]) diff --git a/swh/provenance/tests/test_cli.py b/swh/provenance/tests/test_cli.py index fbf86ec..eb84fb6 100644 --- a/swh/provenance/tests/test_cli.py +++ b/swh/provenance/tests/test_cli.py @@ -1,160 +1,295 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from datetime import datetime, timezone +import logging +import re from typing import Dict, List from _pytest.monkeypatch import MonkeyPatch from click.testing import CliRunner +from confluent_kafka import Producer import psycopg2.extensions import pytest from swh.core.cli import swh as swhmain import swh.core.cli.db # noqa ; ensure cli is loaded from swh.core.db import BaseDb from swh.core.db.db_utils import init_admin_extensions +from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.model.hashutil import MultiHash import swh.provenance.cli # noqa ; ensure cli is loaded +from swh.provenance.storage.interface import EntityType, RelationType from swh.provenance.tests.conftest import fill_storage, load_repo_data from swh.storage.interface import StorageInterface from .conftest import get_datafile -from .test_utils import invoke, write_configuration_path +from .test_utils import invoke + +logger = logging.getLogger(__name__) + + +def now(): + return datetime.now(timezone.utc) def test_cli_swh_db_help() -> None: # swhmain.add_command(provenance_cli) result = CliRunner().invoke(swhmain, ["provenance", "-h"]) assert result.exit_code == 0 assert "Commands:" in result.output commands = result.output.split("Commands:")[1] for command in ( "find-all", "find-first", "iter-frontiers", "iter-origins", "iter-revisions", ): assert f" {command} " in commands TABLES = { "dbflavor", "dbmodule", "dbversion", "content", "content_in_revision", "content_in_directory", "directory", "directory_in_revision", "location", "origin", "revision", "revision_before_revision", "revision_in_origin", } @pytest.mark.parametrize("flavor", ("normalized", "denormalized")) def test_cli_db_create_and_init_db_with_flavor( monkeypatch: MonkeyPatch, postgresql: psycopg2.extensions.connection, flavor: str, ) -> None: """Test that 'swh db init provenance' works with flavors""" dbname = f"{flavor}-db" # DB creation using 'swh db create' db_params = postgresql.get_dsn_parameters() monkeypatch.setenv("PGHOST", db_params["host"]) monkeypatch.setenv("PGUSER", db_params["user"]) monkeypatch.setenv("PGPORT", db_params["port"]) result = CliRunner().invoke(swhmain, ["db", "create", "-d", dbname, "provenance"]) assert result.exit_code == 0, result.output # DB init using 'swh db init' result = CliRunner().invoke( swhmain, ["db", "init", "-d", dbname, "--flavor", flavor, "provenance"] ) assert result.exit_code == 0, result.output assert f"(flavor {flavor})" in result.output db_params["dbname"] = dbname cnx = BaseDb.connect(**db_params).conn # check the DB looks OK (check for db_flavor and expected tables) with cnx.cursor() as cur: cur.execute("select swh_get_dbflavor()") assert cur.fetchone() == (flavor,) cur.execute( "select table_name from information_schema.tables " "where table_schema = 'public' " f"and table_catalog = '{dbname}'" ) tables = set(x for (x,) in cur.fetchall()) assert tables == TABLES def test_cli_init_db_default_flavor(postgresql: psycopg2.extensions.connection) -> None: "Test that 'swh db init provenance' defaults to a normalized flavored DB" dbname = postgresql.dsn init_admin_extensions("swh.provenance", dbname) result = CliRunner().invoke(swhmain, ["db", "init", "-d", dbname, "provenance"]) assert result.exit_code == 0, result.output with postgresql.cursor() as cur: cur.execute("select swh_get_dbflavor()") assert cur.fetchone() == ("normalized",) @pytest.mark.origin_layer @pytest.mark.parametrize( "subcommand", (["origin", "from-csv"], ["iter-origins"]), ) def test_cli_origin_from_csv( swh_storage: StorageInterface, subcommand: List[str], swh_storage_backend_config: Dict, provenance, tmp_path, ): repo = "cmdbts2" origin_url = f"https://{repo}" data = load_repo_data(repo) fill_storage(swh_storage, data) assert len(data["origin"]) >= 1 assert origin_url in [o["url"] for o in data["origin"]] cfg = { "provenance": { "archive": { "cls": "api", "storage": swh_storage_backend_config, }, "storage": { "cls": "postgresql", # "db": provenance.storage.conn.dsn, "db": provenance.storage.conn.get_dsn_parameters(), }, }, } - config_path = write_configuration_path(cfg, tmp_path) - csv_filepath = get_datafile("origins.csv") subcommand = subcommand + [csv_filepath] - result = invoke(subcommand, config_path) + result = invoke(subcommand, config=cfg) assert result.exit_code == 0, f"Unexpected result: {result.output}" origin_sha1 = MultiHash.from_data( origin_url.encode(), hash_names=["sha1"] ).digest()["sha1"] actual_result = provenance.storage.origin_get([origin_sha1]) assert actual_result == {origin_sha1: origin_url} + + +@pytest.mark.kafka +def test_replay( + provenance_storage, + provenance_postgresqldb, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: str, +): + kafka_prefix += ".swh.journal.provenance" + + producer = Producer( + { + "bootstrap.servers": kafka_server, + "client.id": "test-producer", + "acks": "all", + } + ) + + for i in range(10): + cntkey = (b"cnt:" + bytes([i])).ljust(20, b"\x00") + producer.produce( + topic=kafka_prefix + ".content", + key=key_to_kafka(cntkey), + value=value_to_kafka({"id": cntkey, "value": {"date": None}}), + ) + dirkey = (b"dir:" + bytes([i])).ljust(20, b"\x00") + producer.produce( + topic=kafka_prefix + ".directory", + key=key_to_kafka(dirkey), + value=value_to_kafka( + {"id": dirkey, "value": {"date": None, "flat": False}} + ), + ) + revkey = (b"rev:" + bytes([i])).ljust(20, b"\x00") + producer.produce( + topic=kafka_prefix + ".revision", + key=key_to_kafka(revkey), + value=value_to_kafka( + {"id": revkey, "value": {"date": None, "origin": None}} + ), + ) + loc = f"dir/{i}".encode() + lockey = (b"loc:" + bytes([i])).ljust(20, b"\x00") + producer.produce( + topic=kafka_prefix + ".location", + key=key_to_kafka(lockey), + value=value_to_kafka({"id": lockey, "value": loc}), + ) + + producer.produce( + topic=kafka_prefix + ".content_in_revision", + key=key_to_kafka(cntkey), + value=value_to_kafka( + {"id": cntkey, "value": [{"dst": revkey, "path": loc}]} + ), + ) + producer.produce( + topic=kafka_prefix + ".content_in_directory", + key=key_to_kafka(cntkey), + value=value_to_kafka( + {"id": cntkey, "value": [{"dst": dirkey, "path": loc}]} + ), + ) + producer.produce( + topic=kafka_prefix + ".directory_in_revision", + key=key_to_kafka(dirkey), + value=value_to_kafka( + {"id": dirkey, "value": [{"dst": revkey, "path": loc}]} + ), + ) + + # now add dates to entities + producer.produce( + topic=kafka_prefix + ".content", + key=key_to_kafka(cntkey), + value=value_to_kafka({"id": cntkey, "value": {"date": now()}}), + ) + producer.produce( + topic=kafka_prefix + ".directory", + key=key_to_kafka(dirkey), + value=value_to_kafka( + {"id": dirkey, "value": {"date": now(), "flat": False}} + ), + ) + producer.produce( + topic=kafka_prefix + ".revision", + key=key_to_kafka(revkey), + value=value_to_kafka( + {"id": revkey, "value": {"date": now(), "origin": None}} + ), + ) + + producer.flush() + logger.debug("Flushed producer") + config = { + "provenance": { + "storage": { + "cls": "postgresql", + "db": provenance_postgresqldb, + }, + "journal_client": { + "cls": "kafka", + "brokers": [kafka_server], + "group_id": kafka_consumer_group, + "prefix": kafka_prefix, + "stop_on_eof": True, + }, + } + } + + result = invoke(["replay"], config=config) + expected = r"Done. processed 100 messages\n" + + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + assert len(provenance_storage.entity_get_all(EntityType.CONTENT)) == 10 + assert len(provenance_storage.entity_get_all(EntityType.REVISION)) == 10 + assert len(provenance_storage.entity_get_all(EntityType.DIRECTORY)) == 10 + assert len(provenance_storage.location_get_all()) == 10 + assert len(provenance_storage.relation_get_all(RelationType.CNT_EARLY_IN_REV)) == 10 + assert len(provenance_storage.relation_get_all(RelationType.DIR_IN_REV)) == 10 + assert len(provenance_storage.relation_get_all(RelationType.CNT_IN_DIR)) == 10 diff --git a/swh/provenance/tests/test_journal_client.py b/swh/provenance/tests/test_journal_client.py index 865db1a..79d6490 100644 --- a/swh/provenance/tests/test_journal_client.py +++ b/swh/provenance/tests/test_journal_client.py @@ -1,138 +1,134 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict from confluent_kafka import Consumer import pytest from swh.model.hashutil import MultiHash from swh.provenance.tests.conftest import fill_storage, load_repo_data from swh.storage.interface import StorageInterface -from .test_utils import invoke, write_configuration_path +from .test_utils import invoke @pytest.fixture def swh_storage_backend_config(swh_storage_backend_config, kafka_server, kafka_prefix): writer_config = { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, "anonymize": False, } yield {**swh_storage_backend_config, "journal_writer": writer_config} @pytest.mark.origin_layer @pytest.mark.kafka def test_cli_origin_from_journal_client( swh_storage: StorageInterface, swh_storage_backend_config: Dict, kafka_prefix: str, kafka_server: str, consumer: Consumer, - tmp_path: str, provenance, postgres_provenance, ) -> None: """Test origin journal client cli""" # Prepare storage data data = load_repo_data("cmdbts2") assert len(data["origin"]) >= 1 origin_url = data["origin"][0]["url"] fill_storage(swh_storage, data) # Prepare configuration for cli call swh_storage_backend_config.pop("journal_writer", None) # no need for that config storage_config_dict = swh_storage_backend_config cfg = { "journal_client": { "cls": "kafka", "brokers": [kafka_server], "group_id": "toto", "prefix": kafka_prefix, "stop_on_eof": True, }, "provenance": { "archive": { "cls": "api", "storage": storage_config_dict, }, "storage": { "cls": "postgresql", "db": postgres_provenance.get_dsn_parameters(), }, }, } - config_path = write_configuration_path(cfg, tmp_path) # call the cli 'swh provenance origin from-journal' - result = invoke(["origin", "from-journal"], config_path) + result = invoke(["origin", "from-journal"], config=cfg) assert result.exit_code == 0, f"Unexpected result: {result.output}" origin_sha1 = MultiHash.from_data( origin_url.encode(), hash_names=["sha1"] ).digest()["sha1"] actual_result = provenance.storage.origin_get([origin_sha1]) assert actual_result == {origin_sha1: origin_url} @pytest.mark.kafka def test_cli_revision_from_journal_client( swh_storage: StorageInterface, swh_storage_backend_config: Dict, kafka_prefix: str, kafka_server: str, consumer: Consumer, - tmp_path: str, provenance, postgres_provenance, ) -> None: """Test revision journal client cli""" # Prepare storage data data = load_repo_data("cmdbts2") assert len(data["origin"]) >= 1 fill_storage(swh_storage, data) # Prepare configuration for cli call swh_storage_backend_config.pop("journal_writer", None) # no need for that config storage_config_dict = swh_storage_backend_config cfg = { "journal_client": { "cls": "kafka", "brokers": [kafka_server], "group_id": "toto", "prefix": kafka_prefix, "stop_on_eof": True, }, "provenance": { "archive": { "cls": "api", "storage": storage_config_dict, }, "storage": { "cls": "postgresql", "db": postgres_provenance.get_dsn_parameters(), }, }, } - config_path = write_configuration_path(cfg, tmp_path) revisions = [rev["id"] for rev in data["revision"]] result = provenance.storage.revision_get(revisions) assert not result # call the cli 'swh provenance revision from-journal' - cli_result = invoke(["revision", "from-journal"], config_path) + cli_result = invoke(["revision", "from-journal"], config=cfg) assert cli_result.exit_code == 0, f"Unexpected result: {result.output}" result = provenance.storage.revision_get(revisions) assert set(result.keys()) == set(revisions) diff --git a/swh/provenance/tests/test_revision_content_layer.py b/swh/provenance/tests/test_revision_content_layer.py index 958f7c4..d9759e0 100644 --- a/swh/provenance/tests/test_revision_content_layer.py +++ b/swh/provenance/tests/test_revision_content_layer.py @@ -1,466 +1,467 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple import pytest from typing_extensions import TypedDict from swh.model.hashutil import hash_to_bytes from swh.model.model import Sha1Git from swh.provenance.algos.directory import directory_add from swh.provenance.algos.revision import revision_add from swh.provenance.archive import ArchiveInterface from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import DirectoryEntry, RevisionEntry from swh.provenance.storage.interface import EntityType, RelationType from swh.provenance.tests.conftest import ( fill_storage, get_datafile, load_repo_data, ts2dt, ) class SynthRelation(TypedDict): prefix: Optional[str] path: str src: Sha1Git dst: Sha1Git rel_ts: float class SynthRevision(TypedDict): sha1: Sha1Git date: float msg: str R_C: List[SynthRelation] R_D: List[SynthRelation] D_C: List[SynthRelation] def synthetic_revision_content_result(filename: str) -> Iterator[SynthRevision]: """Generates dict representations of synthetic revisions found in the synthetic file (from the data/ directory) given as argument of the generator. Generated SynthRevision (typed dict) with the following elements: "sha1": (Sha1Git) sha1 of the revision, "date": (float) timestamp of the revision, "msg": (str) commit message of the revision, "R_C": (list) new R---C relations added by this revision "R_D": (list) new R-D relations added by this revision "D_C": (list) new D-C relations added by this revision Each relation above is a SynthRelation typed dict with: "path": (str) location "src": (Sha1Git) sha1 of the source of the relation "dst": (Sha1Git) sha1 of the destination of the relation "rel_ts": (float) timestamp of the target of the relation (related to the timestamp of the revision) """ with open(get_datafile(filename), "r") as fobj: yield from _parse_synthetic_revision_content_file(fobj) def _parse_synthetic_revision_content_file( fobj: Iterable[str], ) -> Iterator[SynthRevision]: """Read a 'synthetic' file and generate a dict representation of the synthetic revision for each revision listed in the synthetic file. """ regs = [ "(?PR[0-9]{2,4})?", "(?P[^| ]*)", "([+] )?(?P[^| +]*?)[/]?", "(?P[RDC]) (?P[0-9a-f]{40})", "(?P-?[0-9]+(.[0-9]+)?)", ] regex = re.compile("^ *" + r" *[|] *".join(regs) + r" *(#.*)?$") current_rev: List[dict] = [] for m in (regex.match(line) for line in fobj): if m: d = m.groupdict() if d["revname"]: if current_rev: yield _mk_synth_rev(current_rev) current_rev.clear() current_rev.append(d) if current_rev: yield _mk_synth_rev(current_rev) def _mk_synth_rev(synth_rev: List[Dict[str, str]]) -> SynthRevision: assert synth_rev[0]["type"] == "R" rev = SynthRevision( sha1=hash_to_bytes(synth_rev[0]["sha1"]), date=float(synth_rev[0]["ts"]), msg=synth_rev[0]["revname"], R_C=[], R_D=[], D_C=[], ) current_path = None # path of the last R-D relation we parsed, used a prefix for next D-C # relations for row in synth_rev[1:]: if row["reltype"] == "R---C": assert row["type"] == "C" rev["R_C"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = None elif row["reltype"] == "R-D": assert row["type"] == "D" rev["R_D"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = row["path"] elif row["reltype"] == "D-C": assert row["type"] == "C" rev["D_C"].append( SynthRelation( prefix=current_path, path=row["path"], src=rev["R_D"][-1]["dst"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) return rev @pytest.mark.parametrize( "repo, lower, mindepth, flatten", ( ("cmdbts2", True, 1, True), ("cmdbts2", True, 1, False), ("cmdbts2", False, 1, True), ("cmdbts2", False, 1, False), ("cmdbts2", True, 2, True), ("cmdbts2", True, 2, False), ("cmdbts2", False, 2, True), ("cmdbts2", False, 2, False), ("out-of-order", True, 1, True), ("out-of-order", True, 1, False), ), ) def test_revision_content_result( provenance: ProvenanceInterface, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, flatten: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) revisions = {rev["id"]: rev for rev in data["revision"]} rows: Dict[str, Set[Any]] = { "content": set(), "content_in_directory": set(), "content_in_revision": set(), "directory": set(), "directory_in_revision": set(), "location": set(), "revision": set(), } for synth_rev in synthetic_revision_content_result(syntheticfile): revision = revisions[synth_rev["sha1"]] entry = RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) if flatten: revision_add(provenance, archive, [entry], lower=lower, mindepth=mindepth) else: prev_directories = provenance.storage.entity_get_all(EntityType.DIRECTORY) revision_add( provenance, archive, [entry], lower=lower, mindepth=mindepth, flatten=False, ) directories = [ DirectoryEntry(id=sha1) for sha1 in provenance.storage.entity_get_all( EntityType.DIRECTORY ).difference(prev_directories) ] for directory in directories: assert not provenance.directory_already_flattened(directory) directory_add(provenance, archive, directories) # each "entry" in the synth file is one new revision rows["revision"].add(synth_rev["sha1"]) assert rows["revision"] == provenance.storage.entity_get_all( EntityType.REVISION ), synth_rev["msg"] # check the timestamp of the revision rev_ts = synth_rev["date"] rev_data = provenance.storage.revision_get([synth_rev["sha1"]])[ synth_rev["sha1"] ] assert ( rev_data.date is not None and rev_ts == rev_data.date.timestamp() ), synth_rev["msg"] # this revision might have added new content objects rows["content"] |= set(x["dst"] for x in synth_rev["R_C"]) rows["content"] |= set(x["dst"] for x in synth_rev["D_C"]) assert rows["content"] == provenance.storage.entity_get_all( EntityType.CONTENT ), synth_rev["msg"] # check for R-C (direct) entries # these are added directly in the content_early_in_rev table rows["content_in_revision"] |= set( (x["dst"], x["src"], x["path"].encode()) for x in synth_rev["R_C"] ) assert rows["content_in_revision"] == { (src, rel.dst, rel.path) for src, rels in provenance.storage.relation_get_all( RelationType.CNT_EARLY_IN_REV ).items() for rel in rels }, synth_rev["msg"] # check timestamps for rc in synth_rev["R_C"]: assert ( rev_ts + rc["rel_ts"] == provenance.storage.content_get([rc["dst"]])[rc["dst"]].timestamp() ), synth_rev["msg"] # check directories # each directory stored in the provenance index is an entry # in the "directory" table... rows["directory"] |= set(x["dst"] for x in synth_rev["R_D"]) assert rows["directory"] == provenance.storage.entity_get_all( EntityType.DIRECTORY ), synth_rev["msg"] # ... + a number of rows in the "directory_in_rev" table... # check for R-D entries rows["directory_in_revision"] |= set( (x["dst"], x["src"], x["path"].encode()) for x in synth_rev["R_D"] ) assert rows["directory_in_revision"] == { (src, rel.dst, rel.path) for src, rels in provenance.storage.relation_get_all( RelationType.DIR_IN_REV ).items() for rel in rels }, synth_rev["msg"] # check timestamps for rd in synth_rev["R_D"]: dir_data = provenance.storage.directory_get([rd["dst"]])[rd["dst"]] + assert dir_data.date is not None assert rev_ts + rd["rel_ts"] == dir_data.date.timestamp(), synth_rev["msg"] assert dir_data.flat, synth_rev["msg"] # ... + a number of rows in the "content_in_dir" table # for content of the directory. # check for D-C entries rows["content_in_directory"] |= set( (x["dst"], x["src"], x["path"].encode()) for x in synth_rev["D_C"] ) assert rows["content_in_directory"] == { (src, rel.dst, rel.path) for src, rels in provenance.storage.relation_get_all( RelationType.CNT_IN_DIR ).items() for rel in rels }, synth_rev["msg"] # check timestamps for dc in synth_rev["D_C"]: assert ( rev_ts + dc["rel_ts"] == provenance.storage.content_get([dc["dst"]])[dc["dst"]].timestamp() ), synth_rev["msg"] # check for location entries rows["location"] |= set(x["path"].encode() for x in synth_rev["R_C"]) rows["location"] |= set(x["path"].encode() for x in synth_rev["D_C"]) rows["location"] |= set(x["path"].encode() for x in synth_rev["R_D"]) assert rows["location"] == set( provenance.storage.location_get_all().values() ), synth_rev["msg"] @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) @pytest.mark.parametrize("batch", (True, False)) def test_provenance_heuristics_content_find_all( provenance: ProvenanceInterface, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, batch: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] if batch: revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) else: for revision in revisions: revision_add( provenance, archive, [revision], lower=lower, mindepth=mindepth ) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_occurrences: Dict[str, List[Tuple[str, float, Optional[str], str]]] = {} for synth_rev in synthetic_revision_content_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: expected_occurrences.setdefault(rc["dst"].hex(), []).append( (rev_id, rev_ts, None, rc["path"]) ) for dc in synth_rev["D_C"]: assert dc["prefix"] is not None # to please mypy expected_occurrences.setdefault(dc["dst"].hex(), []).append( (rev_id, rev_ts, None, dc["prefix"] + "/" + dc["path"]) ) for content_id, results in expected_occurrences.items(): expected = [(content_id, *result) for result in results] db_occurrences = [ ( occur.content.hex(), occur.revision.hex(), occur.date.timestamp(), occur.origin, occur.path.decode(), ) for occur in provenance.content_find_all(hash_to_bytes(content_id)) ] assert len(db_occurrences) == len(expected) assert set(db_occurrences) == set(expected) @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) @pytest.mark.parametrize("batch", (True, False)) def test_provenance_heuristics_content_find_first( provenance: ProvenanceInterface, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, batch: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] if batch: revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) else: for revision in revisions: revision_add( provenance, archive, [revision], lower=lower, mindepth=mindepth ) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_first: Dict[str, Tuple[str, float, List[str]]] = {} # dict of tuples (blob_id, rev_id, [path, ...]) the third element for path # is a list because a content can be added at several places in a single # revision, in which case the result of content_find_first() is one of # those path, but we have no guarantee which one it will return. for synth_rev in synthetic_revision_content_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: sha1 = rc["dst"].hex() if sha1 not in expected_first: assert rc["rel_ts"] == 0 expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) else: if rev_ts == expected_first[sha1][1]: expected_first[sha1][2].append(rc["path"]) elif rev_ts < expected_first[sha1][1]: expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) for dc in synth_rev["D_C"]: sha1 = rc["dst"].hex() assert sha1 in expected_first # nothing to do there, this content cannot be a "first seen file" for content_id, (rev_id, ts, paths) in expected_first.items(): occur = provenance.content_find_first(hash_to_bytes(content_id)) assert occur is not None assert occur.content.hex() == content_id assert occur.revision.hex() == rev_id assert occur.date.timestamp() == ts assert occur.origin is None assert occur.path.decode() in paths diff --git a/swh/provenance/tests/test_utils.py b/swh/provenance/tests/test_utils.py index 9fe7ba2..a8361c3 100644 --- a/swh/provenance/tests/test_utils.py +++ b/swh/provenance/tests/test_utils.py @@ -1,31 +1,32 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from os.path import join -from typing import Dict, List +import logging +import tempfile +from typing import Dict, List, Optional from click.testing import CliRunner, Result from yaml import safe_dump from swh.provenance.cli import cli -def invoke(args: List[str], config_path: str, catch_exceptions: bool = False) -> Result: +def invoke( + args: List[str], config: Optional[Dict] = None, catch_exceptions: bool = False +) -> Result: """Invoke swh journal subcommands""" runner = CliRunner() - result = runner.invoke(cli, ["-C" + config_path] + args) - if not catch_exceptions and result.exception: - print(result.output) - raise result.exception + with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: + if config is not None: + safe_dump(config, config_fd) + config_fd.seek(0) + args = ["-C" + config_fd.name] + args + + result = runner.invoke(cli, args, obj={"log_level": logging.DEBUG}, env=None) + if not catch_exceptions and result.exception: + print(result.output) + raise result.exception return result - - -def write_configuration_path(config: Dict, tmp_path: str) -> str: - """Serialize yaml dict on disk given a configuration dict and and a temporary path.""" - config_path = join(str(tmp_path), "config.yml") - with open(config_path, "w") as f: - f.write(safe_dump(config)) - return config_path