diff --git a/README.md b/README.md index d8e6e8a..d2b8af3 100644 --- a/README.md +++ b/README.md @@ -1,50 +1,48 @@ swh-provenance ============== Provenance DB module to query the provenance of source code artifacts present in the Software Heritage archive. This project allows to build such a provenance db from the Software Heritage Archive, and query this database. ## Building a provenance database Building the provenance database requires a read access to the Software Heritage archive, either via a direct access to the database (preferred for better performances), or using the RPC API to a Software Heritage Storage instance. It also need a postgresql database in which the provenance db will be written into. A configuration file is needed with with the access to both these databases: ``` archive: cls: api storage: cls: remote url: http://uffizi.internal.softwareheritage.org:5002 provenance: cls: direct - db: - dbname: provenance - host: localhost + db: dbname=provenance host=localhost ``` Running in Docker ----------------- ### Build the image ``` docker build -t swh-provenance . ``` ### Run the services ``` docker-compose up -d docker-compose logs -f ``` diff --git a/docker/origin_client/config.yml b/docker/origin_client/config.yml index 59aae7c..27356a1 100644 --- a/docker/origin_client/config.yml +++ b/docker/origin_client/config.yml @@ -1,41 +1,37 @@ provenance: storage: cls: rabbitmq # client configuration url: amqp://rabbitmq:5672/%2f storage_config: cls: postgresql - db: - host: storage-db - dbname: provenance - user: provenance - password: provenancepassword + db: host=storage-db dbname=provenance user=provenance password=provenancepassword batch_size: 10000 prefetch_count: 100 wait_min: 60 wait_per_batch: 60 archive: cls: multiplexer archives: - cls: graph url: http://graph.internal.softwareheritage.org:5009/graph storage: cls: remote url: http://webapp1.internal.softwareheritage.org:5002 - cls: api storage: cls: remote url: http://webapp1.internal.softwareheritage.org:5002 # cls: direct - # db: + # db: # host: swh-storage-db # port: 5432 # dbname: swh # user: guest # cls: api org_server: # origin provider host: origin_server port: 5555 batch_size: 1 diff --git a/docker/storage/config.yml b/docker/storage/config.yml index 8ed06fd..4dab15b 100644 --- a/docker/storage/config.yml +++ b/docker/storage/config.yml @@ -1,12 +1,8 @@ provenance: rabbitmq: # remote storage server configuration url: amqp://rabbitmq:5672/%2f storage_config: cls: postgresql - db: - host: storage-db - dbname: provenance - user: provenance - password: provenancepassword + db: host=storage-db dbname=provenance user=provenance password=provenancepassword batch_size: 10000 prefetch_count: 100 diff --git a/docs/index.rst b/docs/index.rst index cd9f407..89000c1 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,146 +1,143 @@ .. _swh-provenance: Software Heritage Provenance ============================ A provenance index database based on the Software Heritage Archive. This is an implementation of the paper `Software Provenance Tracking at the Scale of Public Source Code`_ published in `Empirical Software Engineering`_ This provenance index database is a tool to help answering the question "where does this source code artifact come from?", which the main Software Heritage Archive cannot easily solve. Quick Start ----------- Database creation ~~~~~~~~~~~~~~~~~ Create a provenance index database (in this example we use pifpaf_ to easily set up a test Postgresql database. Adapt the example below to your Postgresql setup): .. code-block:: shell eval $(pifpaf run postgresql) swh db create -d provdb provenance swh db init-admin -d provdb provenance swh db init -d provdb provenance The provenance index DB comes in 2 feature flags, so there are 4 possible flavors. Feature flags are: - ``with-path`` / ``without-path``: whether the provenance index database will store file path, - ``normalized`` / ``denormalized``: whether or not the main relation tables are normalized (see below). So the possible flavors are: - ``with-path`` - ``without-path`` - ``with-path-denormalized`` - ``without-path-denormalized`` Filling the provenance index database ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ This step requires an access to the Software Heritage Archive to retrieve the actual data from the Archive. It currently also needs an input CSV file of revisions and origins to insert in the provenance database. Examples of such files are available in the `provenance public dataset`_. .. _`provenance public dataset`: https://annex.softwareheritage.org/public/dataset/provenance .. code-block:: shell wget https://annex.softwareheritage.org/public/dataset/provenance/sample_10k.csv.bz2 bunzip2 sample_10k.csv.bz2 You need a configuration file, like: .. code-block:: yaml # config.yaml provenance: storage: cls: postgresql - db: - host: /tmp/tmpifn2ov_j - port: 9824 - dbname: provdb + db: host=/tmp/tmpifn2ov_j port=9824 dbname=provdb archive: cls: api storage: cls: remote url: http://storage:5002/ Note that you need access to the internal API of a :ref:`swh-storage ` instance (here the machine named ``storage``) for this. Then you can feed the provenance index database using: .. code-block:: shell swh provenance -C config.yaml iter-revisions sample_10k.csv This may take a while to complete. Querying the provenance index database ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Using the same config file, you may look for the first known occurrence of a file content: .. code-block:: shell swh provenance -C config.yaml find-first 8a54694c92c944fcb06d73c17743ac72444a5b72 swh:1:cnt:8a54694c92c944fcb06d73c17743ac72444a5b72, swh:1:rev:6193fae0668d082d90207f6c9f33d6e8c98dd04a, 2008-10-06 18:32:23+00:00, None, lua/effects/bloodstream/init.lua Or all the known occurrences: .. code-block:: shell swh provenance -C config.yaml find-all 8a54694c92c944fcb06d73c17743ac72444a5b72 swh:1:cnt:8a54694c92c944fcb06d73c17743ac72444a5b72, swh:1:rev:6193fae0668d082d90207f6c9f33d6e8c98dd04a, 2008-10-06 18:32:23+00:00, None, lua/effects/bloodstream/init.lua swh:1:cnt:8a54694c92c944fcb06d73c17743ac72444a5b72, swh:1:rev:f0a5078eed8808323b93ed09cddb003dbe2a85e4, 2008-10-06 18:32:23+00:00, None, trunk/lua/effects/bloodstream/init.lua [...] (De)normalized database ----------------------- For some relation tables (like the ``content_in_revision`` storing, for each content object, in which revision it has been found), the default data schema is to store one row for each relation. For a big database, this can have a significant cost in terms of storage. So it is possible to store these relations using an array as destination column (the ``revision`` column in the case of the ``content_in_revisison`` table). This can drastically reduce the database storage size, possibly at the price of a slight performance hit. Warning: the denormalized version of the database is still under test and validation. Do not use for serious work. .. _`Empirical Software Engineering`: http://link.springer.com/journal/10664 .. _`Software Provenance Tracking at the Scale of Public Source Code`: http://dx.doi.org/10.1007/s10664-020-09828-5 .. _pifpaf: https://github.com/jd/pifpaf .. toctree:: :maxdepth: 2 :caption: Contents: Indices and tables ================== * :ref:`genindex` * :ref:`modindex` * :ref:`search` diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index b9ff6bb..40e8a6e 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,795 +1,789 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control from datetime import datetime, timezone from functools import partial import os from typing import Any, Dict, Generator, Optional, Tuple import click from deprecated import deprecated import iso8601 import yaml try: from systemd.daemon import notify except ImportError: notify = None from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.model import Sha1Git # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) DEFAULT_CONFIG: Dict[str, Any] = { "provenance": { "archive": { # Storage API based Archive object # "cls": "api", # "storage": { # "cls": "remote", # "url": "http://uffizi.internal.softwareheritage.org:5002", # } # Direct access Archive object "cls": "direct", "db": { "host": "belvedere.internal.softwareheritage.org", "port": 5432, "dbname": "softwareheritage", "user": "guest", }, }, "storage": { # Local PostgreSQL Storage # "cls": "postgresql", # "db": { # "host": "localhost", # "user": "postgres", # "password": "postgres", # "dbname": "provenance", # }, # Remote RabbitMQ/PostgreSQL Storage "cls": "rabbitmq", "url": "amqp://localhost:5672/%2f", "storage_config": { "cls": "postgresql", - "db": { - "host": "localhost", - "user": "postgres", - "password": "postgres", - "dbname": "provenance", - }, + "db": "host=localhost user=postgres password=postgres dbname=provenance", }, "batch_size": 100, "prefetch_count": 100, }, } } CONFIG_FILE_HELP = f""" \b Configuration can be loaded from a yaml file given either as --config-file option or the {CONFIG_ENVVAR} environment variable. If no configuration file is specified, use the following default configuration:: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage provenance index database tools {CONFIG_FILE_HELP} """ @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=True, dir_okay=False, path_type=str), help="""YAML configuration file.""", ) @click.option( "-P", "--profile", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""Enable profiling to specified file.""", ) @click.pass_context def cli(ctx: click.core.Context, config_file: Optional[str], profile: str) -> None: if ( config_file is None and DEFAULT_PATH is not None and config.config_exists(DEFAULT_PATH) ): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not os.path.exists(config_file): raise FileNotFoundError(config_file) conf = yaml.safe_load(open(config_file, "rb")) ctx.ensure_object(dict) ctx.obj["config"] = conf if profile: import atexit import cProfile print("Profiling...") pr = cProfile.Profile() pr.enable() def exit() -> None: pr.disable() pr.dump_stats(profile) atexit.register(exit) @cli.command(name="replay") @click.option( "--stop-after-objects", "-n", default=None, type=int, help="Stop after processing this many objects. Default is to " "run forever.", ) @click.option( "--type", "-t", "object_types", default=[], type=click.Choice( [ "content", "directory", "revision", "location", "content_in_revision", "content_in_directory", "directory_in_revision", ] ), help="Object types to replay", multiple=True, ) @click.pass_context def replay(ctx: click.core.Context, stop_after_objects, object_types): """Fill a ProvenanceStorage by reading a Journal. This is typically used to replicate a Provenance database, reading the Software Heritage kafka journal to retrieve objects of the Software Heritage provenance storage to feed a replicate provenance storage. There can be several 'replayers' filling a ProvenanceStorage as long as they use the same `group-id`. The expected configuration file should have one 'provenance' section with 2 subsections: - storage: the configuration of the provenance storage in which to add objects received from the kafka journal, - journal_client: the configuration of access to the kafka journal. See the documentation of `swh.journal` for more details on the possible configuration entries in this section. https://docs.softwareheritage.org/devel/apidoc/swh.journal.client.html eg.:: provenance: storage: cls: postgresql - db: - [...] + db: [...] journal_client: cls: kafka prefix: swh.journal.provenance brokers: [...] [...] """ import functools from swh.journal.client import get_journal_client from swh.provenance.storage import get_provenance_storage from swh.provenance.storage.replay import ( ProvenanceObjectDeserializer, process_replay_objects, ) conf = ctx.obj["config"]["provenance"] storage = get_provenance_storage(**conf.pop("storage")) client_cfg = conf.pop("journal_client") deserializer = ProvenanceObjectDeserializer() client_cfg["value_deserializer"] = deserializer.convert if object_types: client_cfg["object_types"] = object_types if stop_after_objects: client_cfg["stop_after_objects"] = stop_after_objects try: client = get_journal_client(**client_cfg) except ValueError as exc: ctx.fail(str(exc)) worker_fn = functools.partial(process_replay_objects, storage=storage) if notify: notify("READY=1") try: with storage: n = client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print(f"Done, processed {n} messages") finally: if notify: notify("STOPPING=1") client.close() @cli.group(name="origin") @click.pass_context def origin(ctx: click.core.Context): from . import get_provenance from .archive import get_archive archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) ctx.obj["provenance"] = provenance ctx.obj["archive"] = archive @origin.command(name="from-csv") @click.argument("filename", type=click.Path(exists=True)) @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (origins) to read from the input file.""", ) @click.pass_context def origin_from_csv(ctx: click.core.Context, filename: str, limit: Optional[int]): from swh.provenance.algos.origin import CSVOriginIterator, origin_add provenance = ctx.obj["provenance"] archive = ctx.obj["archive"] origins_provider = generate_origin_tuples(filename) origins = CSVOriginIterator(origins_provider, limit=limit) with provenance: for origin in origins: origin_add(provenance, archive, [origin]) @origin.command(name="from-journal") @click.pass_context def origin_from_journal(ctx: click.core.Context): from swh.journal.client import get_journal_client from .journal_client import process_journal_origins provenance = ctx.obj["provenance"] archive = ctx.obj["archive"] journal_cfg = ctx.obj["config"].get("journal_client", {}) worker_fn = partial( process_journal_origins, archive=archive, provenance=provenance, ) cls = journal_cfg.pop("cls", None) or "kafka" client = get_journal_client( cls, **{ **journal_cfg, "object_types": ["origin_visit_status"], }, ) if notify: notify("READY=1") try: with provenance: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: if notify: notify("STOPPING=1") client.close() @cli.group(name="revision") @click.pass_context def revision(ctx: click.core.Context): from . import get_provenance from .archive import get_archive archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) ctx.obj["provenance"] = provenance ctx.obj["archive"] = archive @revision.command(name="from-csv") @click.argument("filename", type=click.Path(exists=True)) @click.option( "-a", "--track-all", default=True, type=bool, help="""Index all occurrences of files in the development history.""", ) @click.option( "-f", "--flatten", default=True, type=bool, help="""Create flat models for directories in the isochrone frontier.""", ) @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (revisions) to read from the input file.""", ) @click.option( "-m", "--min-depth", default=1, type=int, help="""Set minimum depth (in the directory tree) at which an isochrone """ """frontier can be defined.""", ) @click.option( "-r", "--reuse", default=True, type=bool, help="""Prioritize the usage of previously defined isochrone frontiers """ """whenever possible.""", ) @click.option( "-s", "--min-size", default=0, type=int, help="""Set the minimum size (in bytes) of files to be indexed. """ """Any smaller file will be ignored.""", ) @click.option( "-d", "--max-directory-size", default=0, type=int, help="""Set the maximum recursive directory size of revisions to be indexed.""", ) @click.pass_context def revision_from_csv( ctx: click.core.Context, filename: str, track_all: bool, flatten: bool, limit: Optional[int], min_depth: int, reuse: bool, min_size: int, max_directory_size: int, ) -> None: from swh.provenance.algos.revision import CSVRevisionIterator, revision_add provenance = ctx.obj["provenance"] archive = ctx.obj["archive"] revisions_provider = generate_revision_tuples(filename) revisions = CSVRevisionIterator(revisions_provider, limit=limit) with provenance: for revision in revisions: revision_add( provenance, archive, [revision], trackall=track_all, flatten=flatten, lower=reuse, mindepth=min_depth, minsize=min_size, max_directory_size=max_directory_size, ) @revision.command(name="from-journal") @click.option( "-a", "--track-all", default=True, type=bool, help="""Index all occurrences of files in the development history.""", ) @click.option( "-f", "--flatten", default=True, type=bool, help="""Create flat models for directories in the isochrone frontier.""", ) @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (revisions) to read from the input file.""", ) @click.option( "-m", "--min-depth", default=1, type=int, help="""Set minimum depth (in the directory tree) at which an isochrone """ """frontier can be defined.""", ) @click.option( "-r", "--reuse", default=True, type=bool, help="""Prioritize the usage of previously defined isochrone frontiers """ """whenever possible.""", ) @click.option( "-s", "--min-size", default=0, type=int, help="""Set the minimum size (in bytes) of files to be indexed. """ """Any smaller file will be ignored.""", ) @click.option( "-d", "--max-directory-size", default=0, type=int, help="""Set the maximum recursive directory size of revisions to be indexed.""", ) @click.pass_context def revision_from_journal( ctx: click.core.Context, track_all: bool, flatten: bool, limit: Optional[int], min_depth: int, reuse: bool, min_size: int, max_directory_size: int, ) -> None: from swh.journal.client import get_journal_client from .journal_client import process_journal_revisions provenance = ctx.obj["provenance"] archive = ctx.obj["archive"] journal_cfg = ctx.obj["config"].get("journal_client", {}) worker_fn = partial( process_journal_revisions, archive=archive, provenance=provenance, minsize=min_size, max_directory_size=max_directory_size, ) cls = journal_cfg.pop("cls", None) or "kafka" client = get_journal_client( cls, **{ **journal_cfg, "object_types": ["revision"], }, ) if notify: notify("READY=1") try: with provenance: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: if notify: notify("STOPPING=1") client.close() @cli.group(name="directory") @click.pass_context def directory(ctx: click.core.Context): from . import get_provenance from .archive import get_archive archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) ctx.obj["provenance"] = provenance ctx.obj["archive"] = archive @directory.command(name="flatten") @click.option( "--range-from", type=str, help="start ID of the range of directories to flatten" ) @click.option( "--range-to", type=str, help="stop ID of the range of directories to flatten" ) @click.option( "-s", "--min-size", default=0, type=int, help="""Set the minimum size (in bytes) of files to be indexed. Any smaller file will be ignored.""", ) @click.pass_context def directory_flatten(ctx: click.core.Context, range_from, range_to, min_size): from swh.provenance.algos.directory import directory_flatten_range provenance = ctx.obj["provenance"] archive = ctx.obj["archive"] directory_flatten_range( provenance, archive, hash_to_bytes(range_from), hash_to_bytes(range_to), min_size, ) # old (deprecated) commands @cli.command(name="iter-frontiers") @click.argument("filename") @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (directories) to read from the input file.""", ) @click.option( "-s", "--min-size", default=0, type=int, help="""Set the minimum size (in bytes) of files to be indexed. """ """Any smaller file will be ignored.""", ) @click.pass_context def iter_frontiers( ctx: click.core.Context, filename: str, limit: Optional[int], min_size: int, ) -> None: """Process a provided list of directories in the isochrone frontier.""" from swh.provenance import get_provenance from swh.provenance.algos.directory import CSVDirectoryIterator, directory_add from swh.provenance.archive import get_archive archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) directories_provider = generate_directory_ids(filename) directories = CSVDirectoryIterator(directories_provider, limit=limit) with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for directory in directories: directory_add( provenance, archive, [directory], minsize=min_size, ) def generate_directory_ids( filename: str, ) -> Generator[Sha1Git, None, None]: for line in open(filename, "r"): if line.strip(): yield hash_to_bytes(line.strip()) @cli.command(name="iter-revisions") @click.argument("filename") @click.option( "-a", "--track-all", default=True, type=bool, help="""Index all occurrences of files in the development history.""", ) @click.option( "-f", "--flatten", default=True, type=bool, help="""Create flat models for directories in the isochrone frontier.""", ) @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (revisions) to read from the input file.""", ) @click.option( "-m", "--min-depth", default=1, type=int, help="""Set minimum depth (in the directory tree) at which an isochrone """ """frontier can be defined.""", ) @click.option( "-r", "--reuse", default=True, type=bool, help="""Prioritize the usage of previously defined isochrone frontiers """ """whenever possible.""", ) @click.option( "-s", "--min-size", default=0, type=int, help="""Set the minimum size (in bytes) of files to be indexed. """ """Any smaller file will be ignored.""", ) @click.pass_context def iter_revisions( ctx: click.core.Context, filename: str, track_all: bool, flatten: bool, limit: Optional[int], min_depth: int, reuse: bool, min_size: int, ) -> None: """Process a provided list of revisions.""" from swh.provenance import get_provenance from swh.provenance.algos.revision import CSVRevisionIterator, revision_add from swh.provenance.archive import get_archive archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) revisions_provider = generate_revision_tuples(filename) revisions = CSVRevisionIterator(revisions_provider, limit=limit) with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for revision in revisions: revision_add( provenance, archive, [revision], trackall=track_all, flatten=flatten, lower=reuse, mindepth=min_depth, minsize=min_size, ) def generate_revision_tuples( filename: str, ) -> Generator[Tuple[Sha1Git, datetime, Sha1Git], None, None]: for line in open(filename, "r"): if line.strip(): revision, date, root = line.strip().split(",") yield ( hash_to_bytes(revision), iso8601.parse_date(date, default_timezone=timezone.utc), hash_to_bytes(root), ) @cli.command(name="iter-origins") @click.argument("filename") @click.option( "-l", "--limit", type=int, help="""Limit the amount of entries (origins) to read from the input file.""", ) @click.pass_context @deprecated(version="0.0.1", reason="Use `swh provenance origin from-csv` instead") def iter_origins(ctx: click.core.Context, filename: str, limit: Optional[int]) -> None: """Process a provided list of origins.""" from swh.provenance import get_provenance from swh.provenance.algos.origin import CSVOriginIterator, origin_add from swh.provenance.archive import get_archive archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) origins_provider = generate_origin_tuples(filename) origins = CSVOriginIterator(origins_provider, limit=limit) with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for origin in origins: origin_add(provenance, archive, [origin]) def generate_origin_tuples(filename: str) -> Generator[Tuple[str, bytes], None, None]: for line in open(filename, "r"): if line.strip(): url, snapshot = line.strip().split(",") yield (url, hash_to_bytes(snapshot)) @cli.command(name="find-first") @click.argument("swhid") @click.pass_context def find_first(ctx: click.core.Context, swhid: str) -> None: """Find first occurrence of the requested blob.""" from . import get_provenance with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: occur = provenance.content_find_first(hash_to_bytes(swhid)) if occur is not None: print( f"swh:1:cnt:{hash_to_hex(occur.content)}, " f"swh:1:rev:{hash_to_hex(occur.revision)}, " f"{occur.date}, " f"{occur.origin}, " f"{os.fsdecode(occur.path)}" ) else: print(f"Cannot find a content with the id {swhid}") @cli.command(name="find-all") @click.argument("swhid") @click.option( "-l", "--limit", type=int, help="""Limit the amount results to be retrieved.""" ) @click.pass_context def find_all(ctx: click.core.Context, swhid: str, limit: Optional[int]) -> None: """Find all occurrences of the requested blob.""" from . import get_provenance with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance: for occur in provenance.content_find_all(hash_to_bytes(swhid), limit=limit): print( f"swh:1:cnt:{hash_to_hex(occur.content)}, " f"swh:1:rev:{hash_to_hex(occur.revision)}, " f"{occur.date}, " f"{occur.origin}, " f"{os.fsdecode(occur.path)}" ) diff --git a/swh/provenance/storage/__init__.py b/swh/provenance/storage/__init__.py index fdd2d34..9085f9f 100644 --- a/swh/provenance/storage/__init__.py +++ b/swh/provenance/storage/__init__.py @@ -1,61 +1,61 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from typing import TYPE_CHECKING import warnings from .interface import ProvenanceStorageInterface def get_provenance_storage(cls: str, **kwargs) -> ProvenanceStorageInterface: """Get an archive object of class ``cls`` with arguments ``args``. Args: cls: storage's class, only 'local' is currently supported args: dictionary of arguments passed to the storage class constructor Returns: an instance of storage object Raises: :cls:`ValueError` if passed an unknown archive class. """ if cls in ["local", "postgresql"]: from swh.provenance.storage.postgresql import ProvenanceStoragePostgreSql if cls == "local": warnings.warn( '"local" class is deprecated for provenance storage, please ' 'use "postgresql" class instead.', DeprecationWarning, ) raise_on_commit = kwargs.get("raise_on_commit", False) return ProvenanceStoragePostgreSql( - raise_on_commit=raise_on_commit, **kwargs["db"] + raise_on_commit=raise_on_commit, db=kwargs["db"] ) elif cls == "rabbitmq": from swh.provenance.storage.rabbitmq.client import ( ProvenanceStorageRabbitMQClient, ) rmq_storage = ProvenanceStorageRabbitMQClient(**kwargs) if TYPE_CHECKING: assert isinstance(rmq_storage, ProvenanceStorageInterface) return rmq_storage elif cls == "journal": from swh.journal.writer import get_journal_writer from swh.provenance.storage.journal import ProvenanceStorageJournal storage = get_provenance_storage(**kwargs["storage"]) journal = get_journal_writer(**kwargs["journal_writer"]) ret = ProvenanceStorageJournal(storage=storage, journal=journal) return ret raise ValueError diff --git a/swh/provenance/storage/postgresql.py b/swh/provenance/storage/postgresql.py index 9ce88c6..9d61efa 100644 --- a/swh/provenance/storage/postgresql.py +++ b/swh/provenance/storage/postgresql.py @@ -1,393 +1,396 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from contextlib import contextmanager from datetime import datetime from functools import wraps from hashlib import sha1 import itertools import logging from types import TracebackType from typing import Dict, Generator, Iterable, List, Optional, Set, Type import psycopg2.extensions import psycopg2.extras from swh.core.db import BaseDb from swh.core.statsd import statsd from swh.model.model import Sha1Git from swh.provenance.storage.interface import ( DirectoryData, EntityType, ProvenanceResult, ProvenanceStorageInterface, RelationData, RelationType, RevisionData, ) LOGGER = logging.getLogger(__name__) STORAGE_DURATION_METRIC = "swh_provenance_storage_postgresql_duration_seconds" def handle_raise_on_commit(f): @wraps(f) def handle(self, *args, **kwargs): try: return f(self, *args, **kwargs) except BaseException as ex: # Unexpected error occurred, rollback all changes and log message LOGGER.exception("Unexpected error") if self.raise_on_commit: raise ex return False return handle class ProvenanceStoragePostgreSql: current_version = 4 def __init__( - self, page_size: Optional[int] = None, raise_on_commit: bool = False, **kwargs + self, + page_size: Optional[int] = None, + raise_on_commit: bool = False, + db: str = "", ) -> None: self.conn: Optional[psycopg2.extensions.connection] = None - self.conn_args = kwargs + self.dsn = db self._flavor: Optional[str] = None self.page_size = page_size self.raise_on_commit = raise_on_commit def __enter__(self) -> ProvenanceStorageInterface: self.open() return self def __exit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: self.close() @contextmanager def transaction( self, readonly: bool = False ) -> Generator[psycopg2.extras.RealDictCursor, None, None]: if self.conn is None: raise RuntimeError( "Tried to access ProvenanceStoragePostgreSQL transaction() without opening it" ) self.conn.set_session(readonly=readonly) with self.conn: with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: yield cur @property def flavor(self) -> str: if self._flavor is None: with self.transaction(readonly=True) as cursor: cursor.execute("SELECT swh_get_dbflavor() AS flavor") flavor = cursor.fetchone() assert flavor # please mypy self._flavor = flavor["flavor"] assert self._flavor is not None return self._flavor @property def denormalized(self) -> bool: return "denormalized" in self.flavor @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "close"}) def close(self) -> None: assert self.conn is not None self.conn.close() @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_add"}) @handle_raise_on_commit def content_add(self, cnts: Dict[Sha1Git, datetime]) -> bool: if cnts: sql = """ INSERT INTO content(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,content.date) """ page_size = self.page_size or len(cnts) with self.transaction() as cursor: psycopg2.extras.execute_values( cursor, sql, argslist=cnts.items(), page_size=page_size ) return True @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_first"}) def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: sql = "SELECT * FROM swh_provenance_content_find_first(%s)" with self.transaction(readonly=True) as cursor: cursor.execute(query=sql, vars=(id,)) row = cursor.fetchone() return ProvenanceResult(**row) if row is not None else None @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_find_all"}) def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: sql = "SELECT * FROM swh_provenance_content_find_all(%s, %s)" with self.transaction(readonly=True) as cursor: cursor.execute(query=sql, vars=(id, limit)) yield from (ProvenanceResult(**row) for row in cursor) @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_get"}) def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: dates: Dict[Sha1Git, datetime] = {} sha1s = tuple(ids) if sha1s: # TODO: consider splitting this query in several ones if sha1s is too big! values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT sha1, date FROM content WHERE sha1 IN ({values}) AND date IS NOT NULL """ with self.transaction(readonly=True) as cursor: cursor.execute(query=sql, vars=sha1s) dates.update((row["sha1"], row["date"]) for row in cursor) return dates @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_add"}) @handle_raise_on_commit def directory_add(self, dirs: Dict[Sha1Git, DirectoryData]) -> bool: data = [(sha1, rev.date, rev.flat) for sha1, rev in dirs.items()] if data: sql = """ INSERT INTO directory(sha1, date, flat) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date, directory.date), flat=(EXCLUDED.flat OR directory.flat) """ page_size = self.page_size or len(data) with self.transaction() as cursor: psycopg2.extras.execute_values( cur=cursor, sql=sql, argslist=data, page_size=page_size ) return True @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_get"}) def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, DirectoryData]: result: Dict[Sha1Git, DirectoryData] = {} sha1s = tuple(ids) if sha1s: # TODO: consider splitting this query in several ones if sha1s is too big! values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT sha1, date, flat FROM directory WHERE sha1 IN ({values}) AND date IS NOT NULL """ with self.transaction(readonly=True) as cursor: cursor.execute(query=sql, vars=sha1s) result.update( (row["sha1"], DirectoryData(date=row["date"], flat=row["flat"])) for row in cursor ) return result @statsd.timed( metric=STORAGE_DURATION_METRIC, tags={"method": "directory_iter_not_flattened"} ) def directory_iter_not_flattened( self, limit: int, start_id: Sha1Git ) -> List[Sha1Git]: sql = """ SELECT sha1 FROM directory WHERE flat=false AND sha1>%s ORDER BY sha1 LIMIT %s """ with self.transaction(readonly=True) as cursor: cursor.execute(query=sql, vars=(start_id, limit)) return [row["sha1"] for row in cursor] @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "entity_get_all"}) def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: with self.transaction(readonly=True) as cursor: cursor.execute(f"SELECT sha1 FROM {entity.value}") return {row["sha1"] for row in cursor} @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_add"}) @handle_raise_on_commit def location_add(self, paths: Dict[Sha1Git, bytes]) -> bool: values = [(path,) for path in paths.values()] if values: sql = """ INSERT INTO location(path) VALUES %s ON CONFLICT DO NOTHING """ page_size = self.page_size or len(values) with self.transaction() as cursor: psycopg2.extras.execute_values( cursor, sql, argslist=values, page_size=page_size ) return True @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_get_all"}) def location_get_all(self) -> Dict[Sha1Git, bytes]: with self.transaction(readonly=True) as cursor: cursor.execute("SELECT location.path AS path FROM location") return {sha1(row["path"]).digest(): row["path"] for row in cursor} @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_add"}) @handle_raise_on_commit def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool: if orgs: sql = """ INSERT INTO origin(sha1, url) VALUES %s ON CONFLICT DO NOTHING """ page_size = self.page_size or len(orgs) with self.transaction() as cursor: psycopg2.extras.execute_values( cur=cursor, sql=sql, argslist=orgs.items(), page_size=page_size, ) return True @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "open"}) def open(self) -> None: - self.conn = BaseDb.connect(**self.conn_args).conn + self.conn = BaseDb.connect(self.dsn).conn BaseDb.adapt_conn(self.conn) with self.transaction() as cursor: cursor.execute("SET timezone TO 'UTC'") @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "origin_get"}) def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: urls: Dict[Sha1Git, str] = {} sha1s = tuple(ids) if sha1s: # TODO: consider splitting this query in several ones if sha1s is too big! values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT sha1, url FROM origin WHERE sha1 IN ({values}) """ with self.transaction(readonly=True) as cursor: cursor.execute(query=sql, vars=sha1s) urls.update((row["sha1"], row["url"]) for row in cursor) return urls @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_add"}) @handle_raise_on_commit def revision_add(self, revs: Dict[Sha1Git, RevisionData]) -> bool: if revs: data = [(sha1, rev.date, rev.origin) for sha1, rev in revs.items()] sql = """ INSERT INTO revision(sha1, date, origin) (SELECT V.rev AS sha1, V.date::timestamptz AS date, O.id AS origin FROM (VALUES %s) AS V(rev, date, org) LEFT JOIN origin AS O ON (O.sha1=V.org::sha1_git)) ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date, revision.date), origin=COALESCE(EXCLUDED.origin, revision.origin) """ page_size = self.page_size or len(data) with self.transaction() as cursor: psycopg2.extras.execute_values( cur=cursor, sql=sql, argslist=data, page_size=page_size ) return True @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "revision_get"}) def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: result: Dict[Sha1Git, RevisionData] = {} sha1s = tuple(ids) if sha1s: # TODO: consider splitting this query in several ones if sha1s is too big! values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT R.sha1, R.date, O.sha1 AS origin FROM revision AS R LEFT JOIN origin AS O ON (O.id=R.origin) WHERE R.sha1 IN ({values}) AND (R.date is not NULL OR O.sha1 is not NULL) """ with self.transaction(readonly=True) as cursor: cursor.execute(query=sql, vars=sha1s) result.update( (row["sha1"], RevisionData(date=row["date"], origin=row["origin"])) for row in cursor ) return result @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_add"}) @handle_raise_on_commit def relation_add( self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]] ) -> bool: rows = [(src, rel.dst, rel.path) for src, dsts in data.items() for rel in dsts] if rows: rel_table = relation.value src_table, *_, dst_table = rel_table.split("_") page_size = self.page_size or len(rows) # Put the next three queries in a manual single transaction: # they use the same temp table with self.transaction() as cursor: cursor.execute("SELECT swh_mktemp_relation_add()") psycopg2.extras.execute_values( cur=cursor, sql="INSERT INTO tmp_relation_add(src, dst, path) VALUES %s", argslist=rows, page_size=page_size, ) sql = "SELECT swh_provenance_relation_add_from_temp(%s, %s, %s)" cursor.execute(query=sql, vars=(rel_table, src_table, dst_table)) return True @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_get"}) def relation_get( self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False ) -> Dict[Sha1Git, Set[RelationData]]: return self._relation_get(relation, ids, reverse) @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "relation_get_all"}) def relation_get_all( self, relation: RelationType ) -> Dict[Sha1Git, Set[RelationData]]: return self._relation_get(relation, None) def _relation_get( self, relation: RelationType, ids: Optional[Iterable[Sha1Git]], reverse: bool = False, ) -> Dict[Sha1Git, Set[RelationData]]: result: Dict[Sha1Git, Set[RelationData]] = {} sha1s: List[Sha1Git] if ids is not None: sha1s = list(ids) filter = "filter-src" if not reverse else "filter-dst" else: sha1s = [] filter = "no-filter" if filter == "no-filter" or sha1s: rel_table = relation.value src_table, *_, dst_table = rel_table.split("_") sql = "SELECT * FROM swh_provenance_relation_get(%s, %s, %s, %s, %s)" with self.transaction(readonly=True) as cursor: cursor.execute( query=sql, vars=(rel_table, src_table, dst_table, filter, sha1s) ) for row in cursor: src = row.pop("src") result.setdefault(src, set()).add(RelationData(**row)) return result diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py index e2f719b..74f453c 100644 --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -1,84 +1,84 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from functools import partial -from typing import Dict, Generator +from typing import Generator from _pytest.fixtures import SubRequest import psycopg2.extensions import pytest from pytest_postgresql import factories from swh.core.db.db_utils import initialize_database_for_module from swh.provenance import get_provenance from swh.provenance.archive.interface import ArchiveInterface from swh.provenance.archive.storage import ArchiveStorage from swh.provenance.interface import ProvenanceInterface from swh.provenance.storage import get_provenance_storage from swh.provenance.storage.interface import ProvenanceStorageInterface from swh.provenance.storage.postgresql import ProvenanceStoragePostgreSql from swh.storage.interface import StorageInterface provenance_postgresql_proc = factories.postgresql_proc( load=[ partial( initialize_database_for_module, modname="provenance", flavor="normalized", version=ProvenanceStoragePostgreSql.current_version, ) ], ) postgres_provenance = factories.postgresql("provenance_postgresql_proc") @pytest.fixture() -def provenance_postgresqldb(request, postgres_provenance): - return postgres_provenance.get_dsn_parameters() +def provenance_postgresqldb(request, postgres_provenance) -> str: + return postgres_provenance.dsn @pytest.fixture() def provenance_storage( request: SubRequest, - provenance_postgresqldb: Dict[str, str], + provenance_postgresqldb: str, ) -> Generator[ProvenanceStorageInterface, None, None]: """Return a working and initialized ProvenanceStorageInterface object""" # in test sessions, we DO want to raise any exception occurring at commit time with get_provenance_storage( cls="postgresql", db=provenance_postgresqldb, raise_on_commit=True ) as storage: yield storage @pytest.fixture def provenance( postgres_provenance: psycopg2.extensions.connection, ) -> Generator[ProvenanceInterface, None, None]: """Return a working and initialized ProvenanceInterface object""" from swh.core.db.db_utils import ( init_admin_extensions, populate_database_for_package, ) init_admin_extensions("swh.provenance", postgres_provenance.dsn) populate_database_for_package( "swh.provenance", postgres_provenance.dsn, flavor="normalized" ) # in test sessions, we DO want to raise any exception occurring at commit time with get_provenance( cls="postgresql", - db=postgres_provenance.get_dsn_parameters(), + db=postgres_provenance.dsn, raise_on_commit=True, ) as provenance: yield provenance @pytest.fixture def archive(swh_storage: StorageInterface) -> ArchiveInterface: """Return an ArchiveStorage-based ArchiveInterface object""" return ArchiveStorage(swh_storage) diff --git a/swh/provenance/tests/test_cli.py b/swh/provenance/tests/test_cli.py index 1941ad2..2c49e3b 100644 --- a/swh/provenance/tests/test_cli.py +++ b/swh/provenance/tests/test_cli.py @@ -1,260 +1,259 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import logging import re from typing import Dict, List from _pytest.monkeypatch import MonkeyPatch from click.testing import CliRunner from confluent_kafka import Producer import psycopg2.extensions import pytest from swh.core.cli import swh as swhmain import swh.core.cli.db # noqa ; ensure cli is loaded from swh.core.db import BaseDb from swh.core.db.db_utils import init_admin_extensions from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.model.hashutil import MultiHash import swh.provenance.cli # noqa ; ensure cli is loaded from swh.provenance.storage.interface import EntityType, RelationType from swh.storage.interface import StorageInterface from .utils import fill_storage, get_datafile, invoke, load_repo_data logger = logging.getLogger(__name__) def now(): return datetime.now(timezone.utc) def test_cli_swh_db_help() -> None: # swhmain.add_command(provenance_cli) result = CliRunner().invoke(swhmain, ["provenance", "-h"]) assert result.exit_code == 0 assert "Commands:" in result.output commands = result.output.split("Commands:")[1] for command in ( "find-all", "find-first", "iter-frontiers", "iter-origins", "iter-revisions", ): assert f" {command} " in commands TABLES = { "dbflavor", "dbmodule", "dbversion", "content", "content_in_revision", "content_in_directory", "directory", "directory_in_revision", "location", "origin", "revision", "revision_before_revision", "revision_in_origin", } @pytest.mark.parametrize("flavor", ("normalized", "denormalized")) def test_cli_db_create_and_init_db_with_flavor( monkeypatch: MonkeyPatch, postgresql: psycopg2.extensions.connection, flavor: str, ) -> None: """Test that 'swh db init provenance' works with flavors""" dbname = f"{flavor}-db" # DB creation using 'swh db create' db_params = postgresql.get_dsn_parameters() monkeypatch.setenv("PGHOST", db_params["host"]) monkeypatch.setenv("PGUSER", db_params["user"]) monkeypatch.setenv("PGPORT", db_params["port"]) result = CliRunner().invoke(swhmain, ["db", "create", "-d", dbname, "provenance"]) assert result.exit_code == 0, result.output # DB init using 'swh db init' result = CliRunner().invoke( swhmain, ["db", "init", "-d", dbname, "--flavor", flavor, "provenance"] ) assert result.exit_code == 0, result.output assert f"(flavor {flavor})" in result.output db_params["dbname"] = dbname cnx = BaseDb.connect(**db_params).conn # check the DB looks OK (check for db_flavor and expected tables) with cnx.cursor() as cur: cur.execute("select swh_get_dbflavor()") assert cur.fetchone() == (flavor,) cur.execute( "select table_name from information_schema.tables " "where table_schema = 'public' " f"and table_catalog = '{dbname}'" ) tables = set(x for (x,) in cur.fetchall()) assert tables == TABLES def test_cli_init_db_default_flavor(postgresql: psycopg2.extensions.connection) -> None: "Test that 'swh db init provenance' defaults to a normalized flavored DB" dbname = postgresql.dsn init_admin_extensions("swh.provenance", dbname) result = CliRunner().invoke(swhmain, ["db", "init", "-d", dbname, "provenance"]) assert result.exit_code == 0, result.output with postgresql.cursor() as cur: cur.execute("select swh_get_dbflavor()") assert cur.fetchone() == ("normalized",) @pytest.mark.origin_layer @pytest.mark.parametrize( "subcommand", (["origin", "from-csv"], ["iter-origins"]), ) def test_cli_origin_from_csv( swh_storage: StorageInterface, subcommand: List[str], swh_storage_backend_config: Dict, provenance, tmp_path, ): repo = "cmdbts2" origin_url = f"https://{repo}" data = load_repo_data(repo) fill_storage(swh_storage, data) assert len(data["origin"]) >= 1 assert origin_url in [o["url"] for o in data["origin"]] cfg = { "provenance": { "archive": { "cls": "api", "storage": swh_storage_backend_config, }, "storage": { "cls": "postgresql", - # "db": provenance.storage.conn.dsn, - "db": provenance.storage.conn.get_dsn_parameters(), + "db": provenance.storage.conn.dsn, }, }, } csv_filepath = get_datafile("origins.csv") subcommand = subcommand + [csv_filepath] result = invoke(subcommand, config=cfg) assert result.exit_code == 0, f"Unexpected result: {result.output}" origin_sha1 = MultiHash.from_data( origin_url.encode(), hash_names=["sha1"] ).digest()["sha1"] actual_result = provenance.storage.origin_get([origin_sha1]) assert actual_result == {origin_sha1: origin_url} @pytest.mark.kafka def test_replay( provenance_storage, - provenance_postgresqldb, + provenance_postgresqldb: str, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, ): kafka_prefix += ".swh.journal.provenance" producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test-producer", "acks": "all", } ) for i in range(10): date = datetime.fromtimestamp(i, tz=timezone.utc) cntkey = (b"cnt:" + bytes([i])).ljust(20, b"\x00") dirkey = (b"dir:" + bytes([i])).ljust(20, b"\x00") revkey = (b"rev:" + bytes([i])).ljust(20, b"\x00") loc = f"dir/{i}".encode() producer.produce( topic=kafka_prefix + ".content_in_revision", key=key_to_kafka(cntkey), value=value_to_kafka({"src": cntkey, "dst": revkey, "path": loc}), ) producer.produce( topic=kafka_prefix + ".content_in_directory", key=key_to_kafka(cntkey), value=value_to_kafka({"src": cntkey, "dst": dirkey, "path": loc}), ) producer.produce( topic=kafka_prefix + ".directory_in_revision", key=key_to_kafka(dirkey), value=value_to_kafka({"src": dirkey, "dst": revkey, "path": loc}), ) # now add dates to entities producer.produce( topic=kafka_prefix + ".content", key=key_to_kafka(cntkey), value=value_to_kafka({"id": cntkey, "value": date}), ) producer.produce( topic=kafka_prefix + ".directory", key=key_to_kafka(dirkey), value=value_to_kafka({"id": dirkey, "value": date}), ) producer.produce( topic=kafka_prefix + ".revision", key=key_to_kafka(revkey), value=value_to_kafka({"id": revkey, "value": date}), ) producer.flush() logger.debug("Flushed producer") config = { "provenance": { "storage": { "cls": "postgresql", "db": provenance_postgresqldb, }, "journal_client": { "cls": "kafka", "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, "stop_on_eof": True, }, } } result = invoke(["replay"], config=config) expected = r"Done. processed 60 messages\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output assert len(provenance_storage.entity_get_all(EntityType.CONTENT)) == 10 assert len(provenance_storage.entity_get_all(EntityType.REVISION)) == 10 assert len(provenance_storage.entity_get_all(EntityType.DIRECTORY)) == 10 assert len(provenance_storage.location_get_all()) == 10 assert len(provenance_storage.relation_get_all(RelationType.CNT_EARLY_IN_REV)) == 10 assert len(provenance_storage.relation_get_all(RelationType.DIR_IN_REV)) == 10 assert len(provenance_storage.relation_get_all(RelationType.CNT_IN_DIR)) == 10 diff --git a/swh/provenance/tests/test_journal_client.py b/swh/provenance/tests/test_journal_client.py index a61db46..411594e 100644 --- a/swh/provenance/tests/test_journal_client.py +++ b/swh/provenance/tests/test_journal_client.py @@ -1,134 +1,134 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict from confluent_kafka import Consumer import pytest from swh.model.hashutil import MultiHash from swh.storage.interface import StorageInterface from .utils import fill_storage, invoke, load_repo_data @pytest.fixture def swh_storage_backend_config(swh_storage_backend_config, kafka_server, kafka_prefix): writer_config = { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, "anonymize": False, "auto_flush": False, } yield {**swh_storage_backend_config, "journal_writer": writer_config} @pytest.mark.origin_layer @pytest.mark.kafka def test_cli_origin_from_journal_client( swh_storage: StorageInterface, swh_storage_backend_config: Dict, kafka_prefix: str, kafka_server: str, consumer: Consumer, provenance, postgres_provenance, ) -> None: """Test origin journal client cli""" # Prepare storage data data = load_repo_data("cmdbts2") assert len(data["origin"]) >= 1 origin_url = data["origin"][0]["url"] fill_storage(swh_storage, data) # Prepare configuration for cli call swh_storage_backend_config.pop("journal_writer", None) # no need for that config storage_config_dict = swh_storage_backend_config cfg = { "journal_client": { "cls": "kafka", "brokers": [kafka_server], "group_id": "toto", "prefix": kafka_prefix, "stop_on_eof": True, }, "provenance": { "archive": { "cls": "api", "storage": storage_config_dict, }, "storage": { "cls": "postgresql", - "db": postgres_provenance.get_dsn_parameters(), + "db": postgres_provenance.dsn, }, }, } # call the cli 'swh provenance origin from-journal' cli_result = invoke(["origin", "from-journal"], config=cfg) assert cli_result.exit_code == 0, f"Unexpected result: {cli_result.output}" origin_sha1 = MultiHash.from_data( origin_url.encode(), hash_names=["sha1"] ).digest()["sha1"] actual_result = provenance.storage.origin_get([origin_sha1]) assert actual_result == {origin_sha1: origin_url} @pytest.mark.kafka def test_cli_revision_from_journal_client( swh_storage: StorageInterface, swh_storage_backend_config: Dict, kafka_prefix: str, kafka_server: str, consumer: Consumer, provenance, postgres_provenance, ) -> None: """Test revision journal client cli""" # Prepare storage data data = load_repo_data("cmdbts2") assert len(data["origin"]) >= 1 fill_storage(swh_storage, data) # Prepare configuration for cli call swh_storage_backend_config.pop("journal_writer", None) # no need for that config storage_config_dict = swh_storage_backend_config cfg = { "journal_client": { "cls": "kafka", "brokers": [kafka_server], "group_id": "toto", "prefix": kafka_prefix, "stop_on_eof": True, }, "provenance": { "archive": { "cls": "api", "storage": storage_config_dict, }, "storage": { "cls": "postgresql", - "db": postgres_provenance.get_dsn_parameters(), + "db": postgres_provenance.dsn, }, }, } revisions = [rev["id"] for rev in data["revision"]] result = provenance.storage.revision_get(revisions) assert not result # call the cli 'swh provenance revision from-journal' cli_result = invoke(["revision", "from-journal"], config=cfg) assert cli_result.exit_code == 0, f"Unexpected result: {cli_result.output}" result = provenance.storage.revision_get(revisions) assert set(result.keys()) == set(revisions) diff --git a/swh/provenance/tests/test_provenance_journal_writer.py b/swh/provenance/tests/test_provenance_journal_writer.py index edfa536..4a77690 100644 --- a/swh/provenance/tests/test_provenance_journal_writer.py +++ b/swh/provenance/tests/test_provenance_journal_writer.py @@ -1,187 +1,187 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Dict, Generator +from typing import Generator import pytest from swh.provenance import get_provenance_storage from swh.provenance.storage.interface import ( EntityType, ProvenanceStorageInterface, RelationType, ) from .test_provenance_storage import TestProvenanceStorage as _TestProvenanceStorage @pytest.fixture() def provenance_storage( - provenance_postgresqldb: Dict[str, str], + provenance_postgresqldb: str, ) -> Generator[ProvenanceStorageInterface, None, None]: cfg = { "storage": { "cls": "postgresql", "db": provenance_postgresqldb, "raise_on_commit": True, }, "journal_writer": { "cls": "memory", }, } with get_provenance_storage(cls="journal", **cfg) as storage: yield storage class TestProvenanceStorageJournal(_TestProvenanceStorage): def test_provenance_storage_content(self, provenance_storage): super().test_provenance_storage_content(provenance_storage) assert provenance_storage.journal objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} assert objtypes == {"content"} journal_objs = { obj.id for (objtype, obj) in provenance_storage.journal.objects if objtype == "content" } assert provenance_storage.entity_get_all(EntityType.CONTENT) == journal_objs def test_provenance_storage_directory(self, provenance_storage): super().test_provenance_storage_directory(provenance_storage) assert provenance_storage.journal objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} assert objtypes == {"directory"} journal_objs = { obj.id for (objtype, obj) in provenance_storage.journal.objects if objtype == "directory" } assert provenance_storage.entity_get_all(EntityType.DIRECTORY) == journal_objs def test_provenance_storage_origin(self, provenance_storage): super().test_provenance_storage_origin(provenance_storage) assert provenance_storage.journal objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} assert objtypes == {"origin"} journal_objs = { obj.id for (objtype, obj) in provenance_storage.journal.objects if objtype == "origin" } assert provenance_storage.entity_get_all(EntityType.ORIGIN) == journal_objs def test_provenance_storage_revision(self, provenance_storage): super().test_provenance_storage_revision(provenance_storage) assert provenance_storage.journal objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} assert objtypes == {"revision", "origin"} journal_objs = { obj.id for (objtype, obj) in provenance_storage.journal.objects if objtype == "revision" } all_revisions = provenance_storage.revision_get( provenance_storage.entity_get_all(EntityType.REVISION) ) assert { id for (id, value) in all_revisions.items() if value.date is not None } == journal_objs def test_provenance_storage_relation_revision_layer(self, provenance_storage): super().test_provenance_storage_relation_revision_layer(provenance_storage) assert provenance_storage.journal objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} assert objtypes == { "content", "directory", "content_in_revision", "content_in_directory", "directory_in_revision", } journal_rels = { tuple(obj.value[k] for k in ("src", "dst", "path")) for (objtype, obj) in provenance_storage.journal.objects if objtype == "content_in_revision" } prov_rels = { (k, relation.dst, relation.path) for k, v in provenance_storage.relation_get_all( RelationType.CNT_EARLY_IN_REV ).items() for relation in v } assert prov_rels == journal_rels journal_rels = { tuple(obj.value[k] for k in ("src", "dst", "path")) for (objtype, obj) in provenance_storage.journal.objects if objtype == "content_in_directory" } prov_rels = { (k, relation.dst, relation.path) for k, v in provenance_storage.relation_get_all( RelationType.CNT_IN_DIR ).items() for relation in v } assert prov_rels == journal_rels journal_rels = { tuple(obj.value[k] for k in ("src", "dst", "path")) for (objtype, obj) in provenance_storage.journal.objects if objtype == "directory_in_revision" } prov_rels = { (k, relation.dst, relation.path) for k, v in provenance_storage.relation_get_all( RelationType.DIR_IN_REV ).items() for relation in v } assert prov_rels == journal_rels def test_provenance_storage_relation_origin_layer(self, provenance_storage): super().test_provenance_storage_relation_origin_layer(provenance_storage) assert provenance_storage.journal objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} assert objtypes == { "origin", "revision_in_origin", "revision_before_revision", } journal_rels = { tuple(obj.value[k] for k in ("src", "dst", "path")) for (objtype, obj) in provenance_storage.journal.objects if objtype == "revision_in_origin" } prov_rels = { (k, relation.dst, relation.path) for k, v in provenance_storage.relation_get_all( RelationType.REV_IN_ORG ).items() for relation in v } assert prov_rels == journal_rels journal_rels = { tuple(obj.value[k] for k in ("src", "dst", "path")) for (objtype, obj) in provenance_storage.journal.objects if objtype == "revision_before_revision" } prov_rels = { (k, relation.dst, relation.path) for k, v in provenance_storage.relation_get_all( RelationType.REV_BEFORE_REV ).items() for relation in v } assert prov_rels == journal_rels diff --git a/swh/provenance/tests/test_provenance_journal_writer_kafka.py b/swh/provenance/tests/test_provenance_journal_writer_kafka.py index 6b70994..3c4b680 100644 --- a/swh/provenance/tests/test_provenance_journal_writer_kafka.py +++ b/swh/provenance/tests/test_provenance_journal_writer_kafka.py @@ -1,46 +1,46 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Dict, Generator +from typing import Generator from confluent_kafka import Consumer import pytest from swh.provenance import get_provenance_storage from swh.provenance.storage.interface import ProvenanceStorageInterface from .test_provenance_storage import ( # noqa TestProvenanceStorage as _TestProvenanceStorage, ) @pytest.fixture() def provenance_storage( - provenance_postgresqldb: Dict[str, str], + provenance_postgresqldb: str, kafka_prefix: str, kafka_server: str, consumer: Consumer, ) -> Generator[ProvenanceStorageInterface, None, None]: cfg = { "storage": { "cls": "postgresql", "db": provenance_postgresqldb, "raise_on_commit": True, }, "journal_writer": { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": "swh.provenance", "anonymize": False, }, } with get_provenance_storage(cls="journal", **cfg) as storage: yield storage @pytest.mark.kafka class TestProvenanceStorageJournal(_TestProvenanceStorage): pass diff --git a/swh/provenance/tests/test_provenance_storage_rabbitmq.py b/swh/provenance/tests/test_provenance_storage_rabbitmq.py index bfe2ecb..e49fe73 100644 --- a/swh/provenance/tests/test_provenance_storage_rabbitmq.py +++ b/swh/provenance/tests/test_provenance_storage_rabbitmq.py @@ -1,48 +1,48 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Generator import pytest from swh.provenance import get_provenance_storage from swh.provenance.storage.interface import ProvenanceStorageInterface from .test_provenance_storage import TestProvenanceStorage as _TestProvenanceStorage @pytest.fixture() def provenance_storage( - provenance_postgresqldb: Dict[str, str], + provenance_postgresqldb: str, rabbitmq, ) -> Generator[ProvenanceStorageInterface, None, None]: """Return a working and initialized ProvenanceStorageInterface object""" from swh.provenance.storage.rabbitmq.server import ProvenanceStorageRabbitMQServer host = rabbitmq.args["host"] port = rabbitmq.args["port"] rabbitmq_params: Dict[str, Any] = { "url": f"amqp://guest:guest@{host}:{port}/%2f", "storage_config": { "cls": "postgresql", "db": provenance_postgresqldb, "raise_on_commit": True, }, } server = ProvenanceStorageRabbitMQServer( url=rabbitmq_params["url"], storage_config=rabbitmq_params["storage_config"] ) server.start() try: with get_provenance_storage(cls="rabbitmq", **rabbitmq_params) as storage: yield storage finally: server.stop() @pytest.mark.rabbitmq class TestProvenanceStorageRabbitMQ(_TestProvenanceStorage): pass diff --git a/swh/provenance/tests/test_replay.py b/swh/provenance/tests/test_replay.py index 8b4f544..2d098d3 100644 --- a/swh/provenance/tests/test_replay.py +++ b/swh/provenance/tests/test_replay.py @@ -1,171 +1,172 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools -from typing import Dict import psycopg2 +from psycopg2.extensions import parse_dsn import pytest from swh.journal.client import JournalClient from swh.provenance.algos.revision import revision_add from swh.provenance.archive.interface import ArchiveInterface from swh.provenance.model import RevisionEntry from swh.provenance.provenance import Provenance from swh.provenance.storage import get_provenance_storage from swh.provenance.storage.interface import ( EntityType, ProvenanceStorageInterface, RelationType, ) from swh.provenance.storage.replay import ( ProvenanceObjectDeserializer, process_replay_objects, ) from .utils import fill_storage, load_repo_data, ts2dt @pytest.fixture(scope="function") def object_types(): """Set of object types to precreate topics for.""" return { # objects "revision", "directory", "content", "location", # relations "content_in_revision", "content_in_directory", "directory_in_revision", } @pytest.fixture() def replayer_storage_and_client( - provenance_postgresqldb: Dict[str, str], + provenance_postgresqldb: str, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, ): cfg = { "storage": { "cls": "postgresql", "db": provenance_postgresqldb, "raise_on_commit": True, }, "journal_writer": { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, "anonymize": False, "auto_flush": False, }, } with get_provenance_storage(cls="journal", **cfg) as storage: deserializer = ProvenanceObjectDeserializer() replayer = JournalClient( brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, value_deserializer=deserializer.convert, ) yield storage, replayer @pytest.fixture() -def secondary_db(provenance_postgresqldb: Dict[str, str]): +def secondary_db(provenance_postgresqldb: str): """Create a new test db the new db is named after the db configured in provenance_postgresqldb and is using the same template as this later. """ - dsn = provenance_postgresqldb.copy() + dsn = parse_dsn(provenance_postgresqldb) + conn = psycopg2.connect( dbname="postgres", user=dsn["user"], password=dsn.get("password"), host=dsn["host"], port=dsn["port"], ) conn.autocommit = True with conn.cursor() as cur: dbname = dsn["dbname"] template_name = f"{dbname}_tmpl" secondary_dbname = f"{dbname}_dst" cur.execute(f'CREATE DATABASE "{secondary_dbname}" TEMPLATE "{template_name}"') try: dsn["dbname"] = secondary_dbname - yield dsn + yield " ".join(f"{k}={v}" for k, v in dsn.items()) finally: with conn.cursor() as cur: cur.execute(f'DROP DATABASE "{secondary_dbname}"') @pytest.mark.kafka @pytest.mark.parametrize( "repo", ( "cmdbts2", "out-of-order", "with-merges", ), ) def test_provenance_replayer( provenance_storage: ProvenanceStorageInterface, archive: ArchiveInterface, replayer_storage_and_client, - secondary_db, + secondary_db: str, repo: str, ): """Optimal replayer scenario. This: - writes objects to a provenance storage (which have a journal writer) - replayer consumes objects from the topic and replays them - a destination provenance storage is filled from this In the end, both storages should have the same content. """ # load test data and fill a swh-storage data = load_repo_data(repo) fill_storage(archive.storage, data) prov_sto_src, replayer = replayer_storage_and_client # Fill Kafka by filling the source provenance storage revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] revision_add(Provenance(prov_sto_src), archive, revisions) # now replay the kafka log in a new provenance storage with get_provenance_storage( cls="postgresql", db=secondary_db, raise_on_commit=True ) as prov_sto_dst: worker_fn = functools.partial(process_replay_objects, storage=prov_sto_dst) replayer.process(worker_fn) compare_provenance_storages(prov_sto_src, prov_sto_dst) def compare_provenance_storages(sto1, sto2): entities1 = {etype: sto1.entity_get_all(etype) for etype in EntityType} entities2 = {etype: sto2.entity_get_all(etype) for etype in EntityType} assert entities1 == entities2 relations1 = {rtype: sto1.relation_get_all(rtype) for rtype in RelationType} relations2 = {rtype: sto2.relation_get_all(rtype) for rtype in RelationType} assert relations1 == relations2