diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -3,12 +3,14 @@ warn_unused_ignores = True exclude = swh/provenance/tools/ - # 3rd party libraries without stubs (yet) [mypy-bson.*] ignore_missing_imports = True +[mypy-confluent_kafka.*] +ignore_missing_imports = True + [mypy-iso8601.*] ignore_missing_imports = True diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -3,3 +3,4 @@ swh.model >= 2.6.1 swh.storage swh.graph +swh.journal diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -5,3 +5,4 @@ swh.journal >= 0.8 swh.storage >= 0.40 swh.graph >= 0.3.2 +types-Deprecated diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html click +deprecated iso8601 methodtools mongomock diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,4 +1,4 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -6,10 +6,12 @@ # WARNING: do not import unnecessary things here to keep cli startup time under # control from datetime import datetime, timezone +from functools import partial import os from typing import Any, Dict, Generator, Optional, Tuple import click +from deprecated import deprecated import iso8601 import yaml @@ -139,6 +141,78 @@ atexit.register(exit) +@cli.group(name="origin") +@click.pass_context +def origin(ctx: click.core.Context): + from . import get_archive, get_provenance + + archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) + provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) + + ctx.obj["provenance"] = provenance + ctx.obj["archive"] = archive + + +@origin.command(name="from-csv") +@click.argument("filename", type=click.Path(exists=True)) +@click.option( + "-l", + "--limit", + type=int, + help="""Limit the amount of entries (origins) to read from the input file.""", +) +@click.pass_context +def origin_from_csv(ctx: click.core.Context, filename: str, limit: Optional[int]): + from .origin import CSVOriginIterator, origin_add + + provenance = ctx.obj["provenance"] + archive = ctx.obj["archive"] + + origins_provider = generate_origin_tuples(filename) + origins = CSVOriginIterator(origins_provider, limit=limit) + + with provenance: + for origin in origins: + origin_add(provenance, archive, [origin]) + + +@origin.command(name="from-journal") +@click.pass_context +def origin_from_journal(ctx: click.core.Context): + from swh.journal.client import get_journal_client + + from .journal_client import process_journal_objects + + provenance = ctx.obj["provenance"] + archive = ctx.obj["archive"] + + journal_cfg = ctx.obj["config"].get("journal_client", {}) + + worker_fn = partial( + process_journal_objects, + archive=archive, + provenance=provenance, + ) + + cls = journal_cfg.pop("cls", None) or "kafka" + client = get_journal_client( + cls, + **{ + **journal_cfg, + "object_types": ["origin_visit_status"], + }, + ) + + try: + client.process(worker_fn) + except KeyboardInterrupt: + ctx.exit(0) + else: + print("Done.") + finally: + client.close() + + @cli.command(name="iter-frontiers") @click.argument("filename") @click.option( @@ -289,6 +363,7 @@ help="""Limit the amount of entries (origins) to read from the input file.""", ) @click.pass_context +@deprecated(version="0.0.1", reason="Use `swh provenance origin from-csv` instead") def iter_origins(ctx: click.core.Context, filename: str, limit: Optional[int]) -> None: """Process a provided list of origins.""" from . import get_archive, get_provenance diff --git a/swh/provenance/journal_client.py b/swh/provenance/journal_client.py new file mode 100644 --- /dev/null +++ b/swh/provenance/journal_client.py @@ -0,0 +1,22 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.provenance.interface import ProvenanceInterface +from swh.provenance.model import OriginEntry +from swh.provenance.origin import origin_add +from swh.storage.interface import StorageInterface + + +def process_journal_objects( + messages, *, provenance: ProvenanceInterface, archive: StorageInterface +) -> None: + """Worker function for `JournalClient.process(worker_fn)`.""" + assert set(messages) == {"origin_visit_status"}, set(messages) + origin_entries = [ + OriginEntry(url=visit["origin"], snapshot=visit["snapshot"]) + for visit in messages["origin_visit_status"] + if visit["snapshot"] is not None + ] + origin_add(provenance, archive, origin_entries) diff --git a/swh/provenance/postgresql/provenance.py b/swh/provenance/postgresql/provenance.py --- a/swh/provenance/postgresql/provenance.py +++ b/swh/provenance/postgresql/provenance.py @@ -54,6 +54,7 @@ def __init__( self, page_size: Optional[int] = None, raise_on_commit: bool = False, **kwargs ) -> None: + self.conn: Optional[psycopg2.extensions.connection] = None self.conn_args = kwargs self._flavor: Optional[str] = None self.page_size = page_size @@ -75,6 +76,9 @@ def transaction( self, readonly: bool = False ) -> Generator[psycopg2.extras.RealDictCursor, None, None]: + if self.conn is None: # somehow, "implicit" __enter__ call did not happen + self.open() + assert self.conn is not None self.conn.set_session(readonly=readonly) with self.conn: with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: @@ -95,6 +99,7 @@ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "close"}) def close(self) -> None: + assert self.conn is not None self.conn.close() @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_add"}) diff --git a/swh/provenance/tests/data/origins.csv b/swh/provenance/tests/data/origins.csv new file mode 100644 --- /dev/null +++ b/swh/provenance/tests/data/origins.csv @@ -0,0 +1 @@ +https://cmdbts2,5f577c4d4e5a1d0bca64f78facfb891933b17d94 diff --git a/swh/provenance/tests/test_cli.py b/swh/provenance/tests/test_cli.py --- a/swh/provenance/tests/test_cli.py +++ b/swh/provenance/tests/test_cli.py @@ -1,9 +1,9 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Set +from typing import Dict, List, Set from _pytest.monkeypatch import MonkeyPatch from click.testing import CliRunner @@ -14,7 +14,13 @@ import swh.core.cli.db # noqa ; ensure cli is loaded from swh.core.db import BaseDb from swh.core.db.db_utils import init_admin_extensions +from swh.model.hashutil import MultiHash import swh.provenance.cli # noqa ; ensure cli is loaded +from swh.provenance.tests.conftest import fill_storage, load_repo_data +from swh.storage.interface import StorageInterface + +from .conftest import get_datafile +from .test_utils import invoke, write_configuration_path def test_cli_swh_db_help() -> None: @@ -107,3 +113,52 @@ with postgresql.cursor() as cur: cur.execute("select swh_get_dbflavor()") assert cur.fetchone() == ("with-path",) + + +@pytest.mark.parametrize( + "subcommand", + (["origin", "from-csv"], ["iter-origins"]), +) +def test_cli_origin_from_csv( + swh_storage: StorageInterface, + subcommand: List[str], + swh_storage_backend_config: Dict, + provenance, + tmp_path, +): + repo = "cmdbts2" + origin_url = f"https://{repo}" + data = load_repo_data(repo) + fill_storage(swh_storage, data) + + assert len(data["origin"]) == 1 + assert {"url": origin_url} in data["origin"] + + cfg = { + "provenance": { + "archive": { + "cls": "api", + "storage": swh_storage_backend_config, + }, + "storage": { + "cls": "postgresql", + # "db": provenance.storage.conn.dsn, + "db": provenance.storage.conn.get_dsn_parameters(), + }, + }, + } + + config_path = write_configuration_path(cfg, tmp_path) + + csv_filepath = get_datafile("origins.csv") + subcommand = subcommand + [csv_filepath] + + result = invoke(subcommand, config_path) + assert result.exit_code == 0, f"Unexpected result: {result.output}" + + origin_sha1 = MultiHash.from_data( + origin_url.encode(), hash_names=["sha1"] + ).digest()["sha1"] + actual_result = provenance.storage.origin_get([origin_sha1]) + + assert actual_result == {origin_sha1: origin_url} diff --git a/swh/provenance/tests/test_journal_client.py b/swh/provenance/tests/test_journal_client.py new file mode 100644 --- /dev/null +++ b/swh/provenance/tests/test_journal_client.py @@ -0,0 +1,81 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Dict + +from confluent_kafka import Consumer +import pytest + +from swh.model.hashutil import MultiHash +from swh.provenance.tests.conftest import fill_storage, load_repo_data +from swh.storage.interface import StorageInterface + +from .test_utils import invoke, write_configuration_path + + +@pytest.fixture +def swh_storage_backend_config(swh_storage_backend_config, kafka_server, kafka_prefix): + writer_config = { + "cls": "kafka", + "brokers": [kafka_server], + "client_id": "kafka_writer", + "prefix": kafka_prefix, + "anonymize": False, + } + yield {**swh_storage_backend_config, "journal_writer": writer_config} + + +def test_cli_origin_from_journal_client( + swh_storage: StorageInterface, + swh_storage_backend_config: Dict, + kafka_prefix: str, + kafka_server: str, + consumer: Consumer, + tmp_path: str, + provenance, +) -> None: + """Test origin journal client cli""" + + # Prepare storage data + data = load_repo_data("cmdbts2") + assert len(data["origin"]) == 1 + origin_url = data["origin"][0]["url"] + fill_storage(swh_storage, data) + + # Prepare configuration for cli call + swh_storage_backend_config.pop("journal_writer", None) # no need for that config + storage_config_dict = swh_storage_backend_config + cfg = { + "journal_client": { + "cls": "kafka", + "brokers": [kafka_server], + "group_id": "toto", + "prefix": kafka_prefix, + "object_types": ["origin_visit_status"], + "stop_on_eof": True, + }, + "provenance": { + "archive": { + "cls": "api", + "storage": storage_config_dict, + }, + "storage": { + "cls": "postgresql", + "db": provenance.storage.conn.get_dsn_parameters(), + }, + }, + } + config_path = write_configuration_path(cfg, tmp_path) + + # call the cli 'swh provenance origin from-journal' + result = invoke(["origin", "from-journal"], config_path) + assert result.exit_code == 0, f"Unexpected result: {result.output}" + + origin_sha1 = MultiHash.from_data( + origin_url.encode(), hash_names=["sha1"] + ).digest()["sha1"] + actual_result = provenance.storage.origin_get([origin_sha1]) + + assert actual_result == {origin_sha1: origin_url} diff --git a/swh/provenance/tests/test_utils.py b/swh/provenance/tests/test_utils.py new file mode 100644 --- /dev/null +++ b/swh/provenance/tests/test_utils.py @@ -0,0 +1,31 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +from os.path import join +from typing import Dict, List + +from click.testing import CliRunner, Result +from yaml import safe_dump + +from swh.provenance.cli import cli + + +def invoke(args: List[str], config_path: str, catch_exceptions: bool = False) -> Result: + """Invoke swh journal subcommands""" + runner = CliRunner() + result = runner.invoke(cli, ["-C" + config_path] + args) + if not catch_exceptions and result.exception: + print(result.output) + raise result.exception + return result + + +def write_configuration_path(config: Dict, tmp_path: str) -> str: + """Serialize yaml dict on disk given a configuration dict and and a temporary path.""" + config_path = join(str(tmp_path), "config.yml") + with open(config_path, "w") as f: + f.write(safe_dump(config)) + return config_path