diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -3,12 +3,14 @@ warn_unused_ignores = True exclude = swh/provenance/tools/ - # 3rd party libraries without stubs (yet) [mypy-bson.*] ignore_missing_imports = True +[mypy-confluent_kafka.*] +ignore_missing_imports = True + [mypy-iso8601.*] ignore_missing_imports = True diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -3,3 +3,4 @@ swh.model >= 2.6.1 swh.storage swh.graph +swh.journal diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -5,3 +5,4 @@ swh.journal >= 0.8 swh.storage >= 0.40 swh.graph >= 0.3.2 +types-Deprecated diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html click +deprecated iso8601 methodtools mongomock diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -1,4 +1,4 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -81,17 +81,18 @@ def get_provenance_storage(cls: str, **kwargs) -> ProvenanceStorageInterface: - """Get an archive object of class ``cls`` with arguments ``args``. + """Get an archive object of class ``cls`` with arguments ``kwargs``. Args: cls: storage's class, only 'local' is currently supported - args: dictionary of arguments passed to the storage class constructor + kwargs: dictionary of arguments passed to the storage class constructor Returns: an instance of storage object Raises: - :cls:`ValueError` if passed an unknown archive class. + :cls:`ValueError` if passed an unknown storage class. + """ if cls in ["local", "postgresql"]: from .postgresql.provenance import ProvenanceStoragePostgreSql @@ -122,4 +123,6 @@ assert isinstance(rmq_storage, ProvenanceStorageInterface) return rmq_storage - raise ValueError + raise ValueError( + f"Unknown '{cls}' class, must be one of ['postgres', 'mongodb', 'rabbitmq']" + ) diff --git a/swh/provenance/api/server.py b/swh/provenance/api/server.py --- a/swh/provenance/api/server.py +++ b/swh/provenance/api/server.py @@ -402,63 +402,64 @@ def run_request_thread( self, binding_key: str, meth_name: str, relation: Optional[RelationType] ) -> None: - with get_provenance_storage(**self._storage_config) as storage: - request_queue = self._request_queues[binding_key] - merge_items = ProvenanceStorageRabbitMQWorker.get_conflicts_func(meth_name) - while True: - terminate = False - elements = [] - while True: - try: - # TODO: consider reducing this timeout or removing it - elem = request_queue.get(timeout=0.1) - if elem is TERMINATE: - terminate = True - break - elements.append(elem) - except queue.Empty: - break + provenance_storage = get_provenance_storage(**self._storage_config) - if len(elements) >= self._batch_size: + request_queue = self._request_queues[binding_key] + merge_items = ProvenanceStorageRabbitMQWorker.get_conflicts_func(meth_name) + while True: + terminate = False + elements = [] + while True: + try: + # TODO: consider reducing this timeout or removing it + elem = request_queue.get(timeout=0.1) + if elem is TERMINATE: + terminate = True break + elements.append(elem) + except queue.Empty: + break - if terminate: + if len(elements) >= self._batch_size: break - if not elements: - continue + if terminate: + break + + if not elements: + continue - try: - items, props = zip(*elements) - acks_count: TCounter[Tuple[str, str]] = Counter(props) - data = merge_items(items) - - args = (relation, data) if relation is not None else (data,) - if getattr(storage, meth_name)(*args): - for (correlation_id, reply_to), count in acks_count.items(): - # FIXME: this is running in a different thread! Hence, if - # self._connection drops, there is no guarantee that the - # response can be sent for the current elements. This - # situation should be handled properly. - assert self._connection is not None - self._connection.ioloop.add_callback_threadsafe( - functools.partial( - ProvenanceStorageRabbitMQWorker.respond, - channel=self._channel, - correlation_id=correlation_id, - reply_to=reply_to, - response=count, - ) + try: + items, props = zip(*elements) + acks_count: TCounter[Tuple[str, str]] = Counter(props) + data = merge_items(items) + + args = (relation, data) if relation is not None else (data,) + if getattr(provenance_storage, meth_name)(*args): + for (correlation_id, reply_to), count in acks_count.items(): + # FIXME: this is running in a different thread! Hence, if + # self._connection drops, there is no guarantee that the + # response can be sent for the current elements. This + # situation should be handled properly. + assert self._connection is not None + self._connection.ioloop.add_callback_threadsafe( + functools.partial( + ProvenanceStorageRabbitMQWorker.respond, + channel=self._channel, + correlation_id=correlation_id, + reply_to=reply_to, + response=count, ) - else: - LOGGER.warning( - "Unable to process elements for queue %s", binding_key ) - for elem in elements: - request_queue.put(elem) - except BaseException as ex: - self.request_termination(str(ex)) - break + else: + LOGGER.warning( + "Unable to process elements for queue %s", binding_key + ) + for elem in elements: + request_queue.put(elem) + except BaseException as ex: + self.request_termination(str(ex)) + break def stop(self) -> None: assert self._connection is not None diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,4 +1,4 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -6,10 +6,12 @@ # WARNING: do not import unnecessary things here to keep cli startup time under # control from datetime import datetime, timezone +from functools import partial import os from typing import Any, Dict, Generator, Optional, Tuple import click +from deprecated import deprecated import iso8601 import yaml @@ -139,6 +141,78 @@ atexit.register(exit) +@cli.group(name="origin") +@click.pass_context +def origin(ctx: click.core.Context): + from . import get_archive, get_provenance + + archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) + provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) + + ctx.obj["provenance"] = provenance + ctx.obj["archive"] = archive + + +@origin.command(name="from-csv") +@click.argument("filename", type=click.Path(exists=True)) +@click.option( + "-l", + "--limit", + type=int, + help="""Limit the amount of entries (origins) to read from the input file.""", +) +@click.pass_context +def origin_from_csv(ctx: click.core.Context, filename: str, limit: Optional[int]): + from .origin import CSVOriginIterator, origin_add + + provenance = ctx.obj["provenance"] + archive = ctx.obj["archive"] + + origins_provider = generate_origin_tuples(filename) + origins = CSVOriginIterator(origins_provider, limit=limit) + + with provenance: + for origin in origins: + origin_add(provenance, archive, [origin]) + + +@origin.command(name="from-journal") +@click.pass_context +def origin_from_journal(ctx: click.core.Context): + from swh.journal.client import get_journal_client + + from .journal_client import process_journal_objects + + provenance = ctx.obj["provenance"] + archive = ctx.obj["archive"] + + journal_cfg = ctx.obj["config"].get("journal_client", {}) + + worker_fn = partial( + process_journal_objects, + archive=archive, + provenance=provenance, + ) + + cls = journal_cfg.pop("cls", None) or "kafka" + client = get_journal_client( + cls, + **{ + **journal_cfg, + "object_types": ["origin_visit_status"], + }, + ) + + try: + client.process(worker_fn) + except KeyboardInterrupt: + ctx.exit(0) + else: + print("Done.") + finally: + client.close() + + @cli.command(name="iter-frontiers") @click.argument("filename") @click.option( @@ -289,6 +363,7 @@ help="""Limit the amount of entries (origins) to read from the input file.""", ) @click.pass_context +@deprecated(version="0.0.1", reason="Use `swh provenance origin from-csv` instead") def iter_origins(ctx: click.core.Context, filename: str, limit: Optional[int]) -> None: """Process a provided list of origins.""" from . import get_archive, get_provenance diff --git a/swh/provenance/journal_client.py b/swh/provenance/journal_client.py new file mode 100644 --- /dev/null +++ b/swh/provenance/journal_client.py @@ -0,0 +1,22 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.provenance.interface import ProvenanceInterface +from swh.provenance.model import OriginEntry +from swh.provenance.origin import origin_add +from swh.storage.interface import StorageInterface + + +def process_journal_objects( + messages, *, provenance: ProvenanceInterface, archive: StorageInterface +) -> None: + """Worker function for `JournalClient.process(worker_fn)`.""" + assert set(messages) == {"origin_visit_status"}, set(messages) + origin_entries = [ + OriginEntry(url=visit["origin"], snapshot=visit["snapshot"]) + for visit in messages["origin_visit_status"] + if visit["snapshot"] is not None + ] + origin_add(provenance, archive, origin_entries) diff --git a/swh/provenance/postgresql/provenance.py b/swh/provenance/postgresql/provenance.py --- a/swh/provenance/postgresql/provenance.py +++ b/swh/provenance/postgresql/provenance.py @@ -54,6 +54,7 @@ def __init__( self, page_size: Optional[int] = None, raise_on_commit: bool = False, **kwargs ) -> None: + self.conn: Optional[psycopg2.extensions.connection] = None self.conn_args = kwargs self._flavor: Optional[str] = None self.page_size = page_size @@ -75,6 +76,9 @@ def transaction( self, readonly: bool = False ) -> Generator[psycopg2.extras.RealDictCursor, None, None]: + if self.conn is None: # somehow, "implicit" __enter__ call did not happen + self.open() + assert self.conn is not None self.conn.set_session(readonly=readonly) with self.conn: with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: @@ -95,6 +99,7 @@ @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "close"}) def close(self) -> None: + assert self.conn is not None self.conn.close() @statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_add"}) diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -1,11 +1,11 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta, timezone from os import path -from typing import Any, Dict, Generator, List +from typing import Any, Dict, Generator, Iterator, List from _pytest.fixtures import SubRequest import mongomock.database @@ -54,7 +54,7 @@ request: SubRequest, provenance_postgresqldb: Dict[str, str], mongodb: mongomock.database.Database, -) -> Generator[ProvenanceStorageInterface, None, None]: +) -> Iterator[ProvenanceStorageInterface]: """Return a working and initialized ProvenanceStorageInterface object""" if request.param == "mongodb": @@ -86,7 +86,7 @@ url=rabbitmq_params["url"], storage_config=rabbitmq_params["storage_config"] ) server.start() - with get_provenance_storage(cls=request.param, **rabbitmq_params) as storage: + with get_provenance_storage(cls="rabbitmq", **rabbitmq_params) as storage: yield storage server.stop() @@ -97,6 +97,8 @@ ) as storage: yield storage + storage.close() + provenance_postgresql = postgresql("postgresql_proc", dbname="provenance_tests") diff --git a/swh/provenance/tests/data/origins.csv b/swh/provenance/tests/data/origins.csv new file mode 100644 --- /dev/null +++ b/swh/provenance/tests/data/origins.csv @@ -0,0 +1 @@ +https://cmdbts2,5f577c4d4e5a1d0bca64f78facfb891933b17d94 diff --git a/swh/provenance/tests/test_cli.py b/swh/provenance/tests/test_cli.py --- a/swh/provenance/tests/test_cli.py +++ b/swh/provenance/tests/test_cli.py @@ -1,9 +1,9 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Set +from typing import Dict, List, Set from _pytest.monkeypatch import MonkeyPatch from click.testing import CliRunner @@ -14,7 +14,13 @@ import swh.core.cli.db # noqa ; ensure cli is loaded from swh.core.db import BaseDb from swh.core.db.db_utils import init_admin_extensions +from swh.model.hashutil import MultiHash import swh.provenance.cli # noqa ; ensure cli is loaded +from swh.provenance.tests.conftest import fill_storage, load_repo_data +from swh.storage.interface import StorageInterface + +from .conftest import get_datafile +from .test_utils import invoke, write_configuration_path def test_cli_swh_db_help() -> None: @@ -107,3 +113,52 @@ with postgresql.cursor() as cur: cur.execute("select swh_get_dbflavor()") assert cur.fetchone() == ("with-path",) + + +@pytest.mark.parametrize( + "subcommand", + (["origin", "from-csv"], ["iter-origins"]), +) +def test_cli_origin_from_csv( + swh_storage: StorageInterface, + subcommand: List[str], + swh_storage_backend_config: Dict, + provenance, + tmp_path, +): + repo = "cmdbts2" + origin_url = f"https://{repo}" + data = load_repo_data(repo) + fill_storage(swh_storage, data) + + assert len(data["origin"]) == 1 + assert {"url": origin_url} in data["origin"] + + cfg = { + "provenance": { + "archive": { + "cls": "api", + "storage": swh_storage_backend_config, + }, + "storage": { + "cls": "postgresql", + # "db": provenance.storage.conn.dsn, + "db": provenance.storage.conn.get_dsn_parameters(), + }, + }, + } + + config_path = write_configuration_path(cfg, tmp_path) + + csv_filepath = get_datafile("origins.csv") + subcommand = subcommand + [csv_filepath] + + result = invoke(subcommand, config_path) + assert result.exit_code == 0, f"Unexpected result: {result.output}" + + origin_sha1 = MultiHash.from_data( + origin_url.encode(), hash_names=["sha1"] + ).digest()["sha1"] + actual_result = provenance.storage.origin_get([origin_sha1]) + + assert actual_result == {origin_sha1: origin_url} diff --git a/swh/provenance/tests/test_journal_client.py b/swh/provenance/tests/test_journal_client.py new file mode 100644 --- /dev/null +++ b/swh/provenance/tests/test_journal_client.py @@ -0,0 +1,81 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Dict + +from confluent_kafka import Consumer +import pytest + +from swh.model.hashutil import MultiHash +from swh.provenance.tests.conftest import fill_storage, load_repo_data +from swh.storage.interface import StorageInterface + +from .test_utils import invoke, write_configuration_path + + +@pytest.fixture +def swh_storage_backend_config(swh_storage_backend_config, kafka_server, kafka_prefix): + writer_config = { + "cls": "kafka", + "brokers": [kafka_server], + "client_id": "kafka_writer", + "prefix": kafka_prefix, + "anonymize": False, + } + yield {**swh_storage_backend_config, "journal_writer": writer_config} + + +def test_cli_origin_from_journal_client( + swh_storage: StorageInterface, + swh_storage_backend_config: Dict, + kafka_prefix: str, + kafka_server: str, + consumer: Consumer, + tmp_path: str, + provenance, +) -> None: + """Test origin journal client cli""" + + # Prepare storage data + data = load_repo_data("cmdbts2") + assert len(data["origin"]) == 1 + origin_url = data["origin"][0]["url"] + fill_storage(swh_storage, data) + + # Prepare configuration for cli call + swh_storage_backend_config.pop("journal_writer", None) # no need for that config + storage_config_dict = swh_storage_backend_config + cfg = { + "journal_client": { + "cls": "kafka", + "brokers": [kafka_server], + "group_id": "toto", + "prefix": kafka_prefix, + "object_types": ["origin_visit_status"], + "stop_on_eof": True, + }, + "provenance": { + "archive": { + "cls": "api", + "storage": storage_config_dict, + }, + "storage": { + "cls": "postgresql", + "db": provenance.storage.conn.get_dsn_parameters(), + }, + }, + } + config_path = write_configuration_path(cfg, tmp_path) + + # call the cli 'swh provenance origin from-journal' + result = invoke(["origin", "from-journal"], config_path) + assert result.exit_code == 0, f"Unexpected result: {result.output}" + + origin_sha1 = MultiHash.from_data( + origin_url.encode(), hash_names=["sha1"] + ).digest()["sha1"] + actual_result = provenance.storage.origin_get([origin_sha1]) + + assert actual_result == {origin_sha1: origin_url} diff --git a/swh/provenance/tests/test_utils.py b/swh/provenance/tests/test_utils.py new file mode 100644 --- /dev/null +++ b/swh/provenance/tests/test_utils.py @@ -0,0 +1,31 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +from os.path import join +from typing import Dict, List + +from click.testing import CliRunner, Result +from yaml import safe_dump + +from swh.provenance.cli import cli + + +def invoke(args: List[str], config_path: str, catch_exceptions: bool = False) -> Result: + """Invoke swh journal subcommands""" + runner = CliRunner() + result = runner.invoke(cli, ["-C" + config_path] + args) + if not catch_exceptions and result.exception: + print(result.output) + raise result.exception + return result + + +def write_configuration_path(config: Dict, tmp_path: str) -> str: + """Serialize yaml dict on disk given a configuration dict and and a temporary path.""" + config_path = join(str(tmp_path), "config.yml") + with open(config_path, "w") as f: + f.write(safe_dump(config)) + return config_path