Page MenuHomeSoftware Heritage

D8023.diff
No OneTemporary

D8023.diff

diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -3,12 +3,14 @@
warn_unused_ignores = True
exclude = swh/provenance/tools/
-
# 3rd party libraries without stubs (yet)
[mypy-bson.*]
ignore_missing_imports = True
+[mypy-confluent_kafka.*]
+ignore_missing_imports = True
+
[mypy-iso8601.*]
ignore_missing_imports = True
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -3,3 +3,4 @@
swh.model >= 2.6.1
swh.storage
swh.graph
+swh.journal
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -5,3 +5,4 @@
swh.journal >= 0.8
swh.storage >= 0.40
swh.graph >= 0.3.2
+types-Deprecated
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,6 +2,7 @@
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
click
+deprecated
iso8601
methodtools
mongomock
diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py
--- a/swh/provenance/cli.py
+++ b/swh/provenance/cli.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2021 The Software Heritage developers
+# Copyright (C) 2021-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -6,10 +6,12 @@
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
from datetime import datetime, timezone
+from functools import partial
import os
from typing import Any, Dict, Generator, Optional, Tuple
import click
+from deprecated import deprecated
import iso8601
import yaml
@@ -139,6 +141,78 @@
atexit.register(exit)
+@cli.group(name="origin")
+@click.pass_context
+def origin(ctx: click.core.Context):
+ from . import get_archive, get_provenance
+
+ archive = get_archive(**ctx.obj["config"]["provenance"]["archive"])
+ provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"])
+
+ ctx.obj["provenance"] = provenance
+ ctx.obj["archive"] = archive
+
+
+@origin.command(name="from-csv")
+@click.argument("filename", type=click.Path(exists=True))
+@click.option(
+ "-l",
+ "--limit",
+ type=int,
+ help="""Limit the amount of entries (origins) to read from the input file.""",
+)
+@click.pass_context
+def origin_from_csv(ctx: click.core.Context, filename: str, limit: Optional[int]):
+ from .origin import CSVOriginIterator, origin_add
+
+ provenance = ctx.obj["provenance"]
+ archive = ctx.obj["archive"]
+
+ origins_provider = generate_origin_tuples(filename)
+ origins = CSVOriginIterator(origins_provider, limit=limit)
+
+ with provenance:
+ for origin in origins:
+ origin_add(provenance, archive, [origin])
+
+
+@origin.command(name="from-journal")
+@click.pass_context
+def origin_from_journal(ctx: click.core.Context):
+ from swh.journal.client import get_journal_client
+
+ from .journal_client import process_journal_objects
+
+ provenance = ctx.obj["provenance"]
+ archive = ctx.obj["archive"]
+
+ journal_cfg = ctx.obj["config"].get("journal_client", {})
+
+ worker_fn = partial(
+ process_journal_objects,
+ archive=archive,
+ provenance=provenance,
+ )
+
+ cls = journal_cfg.pop("cls", None) or "kafka"
+ client = get_journal_client(
+ cls,
+ **{
+ **journal_cfg,
+ "object_types": ["origin_visit_status"],
+ },
+ )
+
+ try:
+ client.process(worker_fn)
+ except KeyboardInterrupt:
+ ctx.exit(0)
+ else:
+ print("Done.")
+ finally:
+ client.close()
+
+
@cli.command(name="iter-frontiers")
@click.argument("filename")
@click.option(
@@ -289,6 +363,7 @@
help="""Limit the amount of entries (origins) to read from the input file.""",
)
@click.pass_context
+@deprecated(version="0.0.1", reason="Use `swh provenance origin from-csv` instead")
def iter_origins(ctx: click.core.Context, filename: str, limit: Optional[int]) -> None:
"""Process a provided list of origins."""
from . import get_archive, get_provenance
diff --git a/swh/provenance/journal_client.py b/swh/provenance/journal_client.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/journal_client.py
@@ -0,0 +1,22 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from swh.provenance.interface import ProvenanceInterface
+from swh.provenance.model import OriginEntry
+from swh.provenance.origin import origin_add
+from swh.storage.interface import StorageInterface
+
+
+def process_journal_objects(
+ messages, *, provenance: ProvenanceInterface, archive: StorageInterface
+) -> None:
+ """Worker function for `JournalClient.process(worker_fn)`."""
+ assert set(messages) == {"origin_visit_status"}, set(messages)
+ origin_entries = [
+ OriginEntry(url=visit["origin"], snapshot=visit["snapshot"])
+ for visit in messages["origin_visit_status"]
+ if visit["snapshot"] is not None
+ ]
+ origin_add(provenance, archive, origin_entries)
diff --git a/swh/provenance/postgresql/provenance.py b/swh/provenance/postgresql/provenance.py
--- a/swh/provenance/postgresql/provenance.py
+++ b/swh/provenance/postgresql/provenance.py
@@ -54,6 +54,7 @@
def __init__(
self, page_size: Optional[int] = None, raise_on_commit: bool = False, **kwargs
) -> None:
+ self.conn: Optional[psycopg2.extensions.connection] = None
self.conn_args = kwargs
self._flavor: Optional[str] = None
self.page_size = page_size
@@ -75,6 +76,9 @@
def transaction(
self, readonly: bool = False
) -> Generator[psycopg2.extras.RealDictCursor, None, None]:
+ if self.conn is None: # somehow, "implicit" __enter__ call did not happen
+ self.open()
+ assert self.conn is not None
self.conn.set_session(readonly=readonly)
with self.conn:
with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
@@ -95,6 +99,7 @@
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "close"})
def close(self) -> None:
+ assert self.conn is not None
self.conn.close()
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_add"})
diff --git a/swh/provenance/tests/data/origins.csv b/swh/provenance/tests/data/origins.csv
new file mode 100644
--- /dev/null
+++ b/swh/provenance/tests/data/origins.csv
@@ -0,0 +1 @@
+https://cmdbts2,5f577c4d4e5a1d0bca64f78facfb891933b17d94
diff --git a/swh/provenance/tests/test_cli.py b/swh/provenance/tests/test_cli.py
--- a/swh/provenance/tests/test_cli.py
+++ b/swh/provenance/tests/test_cli.py
@@ -1,9 +1,9 @@
-# Copyright (C) 2021 The Software Heritage developers
+# Copyright (C) 2021-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from typing import Set
+from typing import Dict, List, Set
from _pytest.monkeypatch import MonkeyPatch
from click.testing import CliRunner
@@ -14,7 +14,13 @@
import swh.core.cli.db # noqa ; ensure cli is loaded
from swh.core.db import BaseDb
from swh.core.db.db_utils import init_admin_extensions
+from swh.model.hashutil import MultiHash
import swh.provenance.cli # noqa ; ensure cli is loaded
+from swh.provenance.tests.conftest import fill_storage, load_repo_data
+from swh.storage.interface import StorageInterface
+
+from .conftest import get_datafile
+from .test_utils import invoke, write_configuration_path
def test_cli_swh_db_help() -> None:
@@ -107,3 +113,52 @@
with postgresql.cursor() as cur:
cur.execute("select swh_get_dbflavor()")
assert cur.fetchone() == ("with-path",)
+
+
+@pytest.mark.parametrize(
+ "subcommand",
+ (["origin", "from-csv"], ["iter-origins"]),
+)
+def test_cli_origin_from_csv(
+ swh_storage: StorageInterface,
+ subcommand: List[str],
+ swh_storage_backend_config: Dict,
+ provenance,
+ tmp_path,
+):
+ repo = "cmdbts2"
+ origin_url = f"https://{repo}"
+ data = load_repo_data(repo)
+ fill_storage(swh_storage, data)
+
+ assert len(data["origin"]) == 1
+ assert {"url": origin_url} in data["origin"]
+
+ cfg = {
+ "provenance": {
+ "archive": {
+ "cls": "api",
+ "storage": swh_storage_backend_config,
+ },
+ "storage": {
+ "cls": "postgresql",
+ # "db": provenance.storage.conn.dsn,
+ "db": provenance.storage.conn.get_dsn_parameters(),
+ },
+ },
+ }
+
+ config_path = write_configuration_path(cfg, tmp_path)
+
+ csv_filepath = get_datafile("origins.csv")
+ subcommand = subcommand + [csv_filepath]
+
+ result = invoke(subcommand, config_path)
+ assert result.exit_code == 0, f"Unexpected result: {result.output}"
+
+ origin_sha1 = MultiHash.from_data(
+ origin_url.encode(), hash_names=["sha1"]
+ ).digest()["sha1"]
+ actual_result = provenance.storage.origin_get([origin_sha1])
+
+ assert actual_result == {origin_sha1: origin_url}
diff --git a/swh/provenance/tests/test_journal_client.py b/swh/provenance/tests/test_journal_client.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/tests/test_journal_client.py
@@ -0,0 +1,81 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Dict
+
+from confluent_kafka import Consumer
+import pytest
+
+from swh.model.hashutil import MultiHash
+from swh.provenance.tests.conftest import fill_storage, load_repo_data
+from swh.storage.interface import StorageInterface
+
+from .test_utils import invoke, write_configuration_path
+
+
+@pytest.fixture
+def swh_storage_backend_config(swh_storage_backend_config, kafka_server, kafka_prefix):
+ writer_config = {
+ "cls": "kafka",
+ "brokers": [kafka_server],
+ "client_id": "kafka_writer",
+ "prefix": kafka_prefix,
+ "anonymize": False,
+ }
+ yield {**swh_storage_backend_config, "journal_writer": writer_config}
+
+
+def test_cli_origin_from_journal_client(
+ swh_storage: StorageInterface,
+ swh_storage_backend_config: Dict,
+ kafka_prefix: str,
+ kafka_server: str,
+ consumer: Consumer,
+ tmp_path: str,
+ provenance,
+) -> None:
+ """Test origin journal client cli"""
+
+ # Prepare storage data
+ data = load_repo_data("cmdbts2")
+ assert len(data["origin"]) == 1
+ origin_url = data["origin"][0]["url"]
+ fill_storage(swh_storage, data)
+
+ # Prepare configuration for cli call
+ swh_storage_backend_config.pop("journal_writer", None) # no need for that config
+ storage_config_dict = swh_storage_backend_config
+ cfg = {
+ "journal_client": {
+ "cls": "kafka",
+ "brokers": [kafka_server],
+ "group_id": "toto",
+ "prefix": kafka_prefix,
+ "object_types": ["origin_visit_status"],
+ "stop_on_eof": True,
+ },
+ "provenance": {
+ "archive": {
+ "cls": "api",
+ "storage": storage_config_dict,
+ },
+ "storage": {
+ "cls": "postgresql",
+ "db": provenance.storage.conn.get_dsn_parameters(),
+ },
+ },
+ }
+ config_path = write_configuration_path(cfg, tmp_path)
+
+ # call the cli 'swh provenance origin from-journal'
+ result = invoke(["origin", "from-journal"], config_path)
+ assert result.exit_code == 0, f"Unexpected result: {result.output}"
+
+ origin_sha1 = MultiHash.from_data(
+ origin_url.encode(), hash_names=["sha1"]
+ ).digest()["sha1"]
+ actual_result = provenance.storage.origin_get([origin_sha1])
+
+ assert actual_result == {origin_sha1: origin_url}
diff --git a/swh/provenance/tests/test_utils.py b/swh/provenance/tests/test_utils.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/tests/test_utils.py
@@ -0,0 +1,31 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
+from os.path import join
+from typing import Dict, List
+
+from click.testing import CliRunner, Result
+from yaml import safe_dump
+
+from swh.provenance.cli import cli
+
+
+def invoke(args: List[str], config_path: str, catch_exceptions: bool = False) -> Result:
+ """Invoke swh journal subcommands"""
+ runner = CliRunner()
+ result = runner.invoke(cli, ["-C" + config_path] + args)
+ if not catch_exceptions and result.exception:
+ print(result.output)
+ raise result.exception
+ return result
+
+
+def write_configuration_path(config: Dict, tmp_path: str) -> str:
+ """Serialize yaml dict on disk given a configuration dict and and a temporary path."""
+ config_path = join(str(tmp_path), "config.yml")
+ with open(config_path, "w") as f:
+ f.write(safe_dump(config))
+ return config_path

File Metadata

Mime Type
text/plain
Expires
Dec 21 2024, 6:42 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3219824

Event Timeline