Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7124763
D8023.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
13 KB
Subscribers
None
D8023.diff
View Options
diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -3,12 +3,14 @@
warn_unused_ignores = True
exclude = swh/provenance/tools/
-
# 3rd party libraries without stubs (yet)
[mypy-bson.*]
ignore_missing_imports = True
+[mypy-confluent_kafka.*]
+ignore_missing_imports = True
+
[mypy-iso8601.*]
ignore_missing_imports = True
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -3,3 +3,4 @@
swh.model >= 2.6.1
swh.storage
swh.graph
+swh.journal
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -5,3 +5,4 @@
swh.journal >= 0.8
swh.storage >= 0.40
swh.graph >= 0.3.2
+types-Deprecated
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,6 +2,7 @@
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
click
+deprecated
iso8601
methodtools
mongomock
diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py
--- a/swh/provenance/cli.py
+++ b/swh/provenance/cli.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2021 The Software Heritage developers
+# Copyright (C) 2021-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -6,10 +6,12 @@
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
from datetime import datetime, timezone
+from functools import partial
import os
from typing import Any, Dict, Generator, Optional, Tuple
import click
+from deprecated import deprecated
import iso8601
import yaml
@@ -139,6 +141,78 @@
atexit.register(exit)
+@cli.group(name="origin")
+@click.pass_context
+def origin(ctx: click.core.Context):
+ from . import get_archive, get_provenance
+
+ archive = get_archive(**ctx.obj["config"]["provenance"]["archive"])
+ provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"])
+
+ ctx.obj["provenance"] = provenance
+ ctx.obj["archive"] = archive
+
+
+@origin.command(name="from-csv")
+@click.argument("filename", type=click.Path(exists=True))
+@click.option(
+ "-l",
+ "--limit",
+ type=int,
+ help="""Limit the amount of entries (origins) to read from the input file.""",
+)
+@click.pass_context
+def origin_from_csv(ctx: click.core.Context, filename: str, limit: Optional[int]):
+ from .origin import CSVOriginIterator, origin_add
+
+ provenance = ctx.obj["provenance"]
+ archive = ctx.obj["archive"]
+
+ origins_provider = generate_origin_tuples(filename)
+ origins = CSVOriginIterator(origins_provider, limit=limit)
+
+ with provenance:
+ for origin in origins:
+ origin_add(provenance, archive, [origin])
+
+
+@origin.command(name="from-journal")
+@click.pass_context
+def origin_from_journal(ctx: click.core.Context):
+ from swh.journal.client import get_journal_client
+
+ from .journal_client import process_journal_objects
+
+ provenance = ctx.obj["provenance"]
+ archive = ctx.obj["archive"]
+
+ journal_cfg = ctx.obj["config"].get("journal_client", {})
+
+ worker_fn = partial(
+ process_journal_objects,
+ archive=archive,
+ provenance=provenance,
+ )
+
+ cls = journal_cfg.pop("cls", None) or "kafka"
+ client = get_journal_client(
+ cls,
+ **{
+ **journal_cfg,
+ "object_types": ["origin_visit_status"],
+ },
+ )
+
+ try:
+ client.process(worker_fn)
+ except KeyboardInterrupt:
+ ctx.exit(0)
+ else:
+ print("Done.")
+ finally:
+ client.close()
+
+
@cli.command(name="iter-frontiers")
@click.argument("filename")
@click.option(
@@ -289,6 +363,7 @@
help="""Limit the amount of entries (origins) to read from the input file.""",
)
@click.pass_context
+@deprecated(version="0.0.1", reason="Use `swh provenance origin from-csv` instead")
def iter_origins(ctx: click.core.Context, filename: str, limit: Optional[int]) -> None:
"""Process a provided list of origins."""
from . import get_archive, get_provenance
diff --git a/swh/provenance/journal_client.py b/swh/provenance/journal_client.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/journal_client.py
@@ -0,0 +1,22 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from swh.provenance.interface import ProvenanceInterface
+from swh.provenance.model import OriginEntry
+from swh.provenance.origin import origin_add
+from swh.storage.interface import StorageInterface
+
+
+def process_journal_objects(
+ messages, *, provenance: ProvenanceInterface, archive: StorageInterface
+) -> None:
+ """Worker function for `JournalClient.process(worker_fn)`."""
+ assert set(messages) == {"origin_visit_status"}, set(messages)
+ origin_entries = [
+ OriginEntry(url=visit["origin"], snapshot=visit["snapshot"])
+ for visit in messages["origin_visit_status"]
+ if visit["snapshot"] is not None
+ ]
+ origin_add(provenance, archive, origin_entries)
diff --git a/swh/provenance/postgresql/provenance.py b/swh/provenance/postgresql/provenance.py
--- a/swh/provenance/postgresql/provenance.py
+++ b/swh/provenance/postgresql/provenance.py
@@ -54,6 +54,7 @@
def __init__(
self, page_size: Optional[int] = None, raise_on_commit: bool = False, **kwargs
) -> None:
+ self.conn: Optional[psycopg2.extensions.connection] = None
self.conn_args = kwargs
self._flavor: Optional[str] = None
self.page_size = page_size
@@ -75,6 +76,9 @@
def transaction(
self, readonly: bool = False
) -> Generator[psycopg2.extras.RealDictCursor, None, None]:
+ if self.conn is None: # somehow, "implicit" __enter__ call did not happen
+ self.open()
+ assert self.conn is not None
self.conn.set_session(readonly=readonly)
with self.conn:
with self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
@@ -95,6 +99,7 @@
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "close"})
def close(self) -> None:
+ assert self.conn is not None
self.conn.close()
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "content_add"})
diff --git a/swh/provenance/tests/data/origins.csv b/swh/provenance/tests/data/origins.csv
new file mode 100644
--- /dev/null
+++ b/swh/provenance/tests/data/origins.csv
@@ -0,0 +1 @@
+https://cmdbts2,5f577c4d4e5a1d0bca64f78facfb891933b17d94
diff --git a/swh/provenance/tests/test_cli.py b/swh/provenance/tests/test_cli.py
--- a/swh/provenance/tests/test_cli.py
+++ b/swh/provenance/tests/test_cli.py
@@ -1,9 +1,9 @@
-# Copyright (C) 2021 The Software Heritage developers
+# Copyright (C) 2021-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from typing import Set
+from typing import Dict, List, Set
from _pytest.monkeypatch import MonkeyPatch
from click.testing import CliRunner
@@ -14,7 +14,13 @@
import swh.core.cli.db # noqa ; ensure cli is loaded
from swh.core.db import BaseDb
from swh.core.db.db_utils import init_admin_extensions
+from swh.model.hashutil import MultiHash
import swh.provenance.cli # noqa ; ensure cli is loaded
+from swh.provenance.tests.conftest import fill_storage, load_repo_data
+from swh.storage.interface import StorageInterface
+
+from .conftest import get_datafile
+from .test_utils import invoke, write_configuration_path
def test_cli_swh_db_help() -> None:
@@ -107,3 +113,52 @@
with postgresql.cursor() as cur:
cur.execute("select swh_get_dbflavor()")
assert cur.fetchone() == ("with-path",)
+
+
+@pytest.mark.parametrize(
+ "subcommand",
+ (["origin", "from-csv"], ["iter-origins"]),
+)
+def test_cli_origin_from_csv(
+ swh_storage: StorageInterface,
+ subcommand: List[str],
+ swh_storage_backend_config: Dict,
+ provenance,
+ tmp_path,
+):
+ repo = "cmdbts2"
+ origin_url = f"https://{repo}"
+ data = load_repo_data(repo)
+ fill_storage(swh_storage, data)
+
+ assert len(data["origin"]) == 1
+ assert {"url": origin_url} in data["origin"]
+
+ cfg = {
+ "provenance": {
+ "archive": {
+ "cls": "api",
+ "storage": swh_storage_backend_config,
+ },
+ "storage": {
+ "cls": "postgresql",
+ # "db": provenance.storage.conn.dsn,
+ "db": provenance.storage.conn.get_dsn_parameters(),
+ },
+ },
+ }
+
+ config_path = write_configuration_path(cfg, tmp_path)
+
+ csv_filepath = get_datafile("origins.csv")
+ subcommand = subcommand + [csv_filepath]
+
+ result = invoke(subcommand, config_path)
+ assert result.exit_code == 0, f"Unexpected result: {result.output}"
+
+ origin_sha1 = MultiHash.from_data(
+ origin_url.encode(), hash_names=["sha1"]
+ ).digest()["sha1"]
+ actual_result = provenance.storage.origin_get([origin_sha1])
+
+ assert actual_result == {origin_sha1: origin_url}
diff --git a/swh/provenance/tests/test_journal_client.py b/swh/provenance/tests/test_journal_client.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/tests/test_journal_client.py
@@ -0,0 +1,81 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Dict
+
+from confluent_kafka import Consumer
+import pytest
+
+from swh.model.hashutil import MultiHash
+from swh.provenance.tests.conftest import fill_storage, load_repo_data
+from swh.storage.interface import StorageInterface
+
+from .test_utils import invoke, write_configuration_path
+
+
+@pytest.fixture
+def swh_storage_backend_config(swh_storage_backend_config, kafka_server, kafka_prefix):
+ writer_config = {
+ "cls": "kafka",
+ "brokers": [kafka_server],
+ "client_id": "kafka_writer",
+ "prefix": kafka_prefix,
+ "anonymize": False,
+ }
+ yield {**swh_storage_backend_config, "journal_writer": writer_config}
+
+
+def test_cli_origin_from_journal_client(
+ swh_storage: StorageInterface,
+ swh_storage_backend_config: Dict,
+ kafka_prefix: str,
+ kafka_server: str,
+ consumer: Consumer,
+ tmp_path: str,
+ provenance,
+) -> None:
+ """Test origin journal client cli"""
+
+ # Prepare storage data
+ data = load_repo_data("cmdbts2")
+ assert len(data["origin"]) == 1
+ origin_url = data["origin"][0]["url"]
+ fill_storage(swh_storage, data)
+
+ # Prepare configuration for cli call
+ swh_storage_backend_config.pop("journal_writer", None) # no need for that config
+ storage_config_dict = swh_storage_backend_config
+ cfg = {
+ "journal_client": {
+ "cls": "kafka",
+ "brokers": [kafka_server],
+ "group_id": "toto",
+ "prefix": kafka_prefix,
+ "object_types": ["origin_visit_status"],
+ "stop_on_eof": True,
+ },
+ "provenance": {
+ "archive": {
+ "cls": "api",
+ "storage": storage_config_dict,
+ },
+ "storage": {
+ "cls": "postgresql",
+ "db": provenance.storage.conn.get_dsn_parameters(),
+ },
+ },
+ }
+ config_path = write_configuration_path(cfg, tmp_path)
+
+ # call the cli 'swh provenance origin from-journal'
+ result = invoke(["origin", "from-journal"], config_path)
+ assert result.exit_code == 0, f"Unexpected result: {result.output}"
+
+ origin_sha1 = MultiHash.from_data(
+ origin_url.encode(), hash_names=["sha1"]
+ ).digest()["sha1"]
+ actual_result = provenance.storage.origin_get([origin_sha1])
+
+ assert actual_result == {origin_sha1: origin_url}
diff --git a/swh/provenance/tests/test_utils.py b/swh/provenance/tests/test_utils.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/tests/test_utils.py
@@ -0,0 +1,31 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
+from os.path import join
+from typing import Dict, List
+
+from click.testing import CliRunner, Result
+from yaml import safe_dump
+
+from swh.provenance.cli import cli
+
+
+def invoke(args: List[str], config_path: str, catch_exceptions: bool = False) -> Result:
+ """Invoke swh journal subcommands"""
+ runner = CliRunner()
+ result = runner.invoke(cli, ["-C" + config_path] + args)
+ if not catch_exceptions and result.exception:
+ print(result.output)
+ raise result.exception
+ return result
+
+
+def write_configuration_path(config: Dict, tmp_path: str) -> str:
+ """Serialize yaml dict on disk given a configuration dict and and a temporary path."""
+ config_path = join(str(tmp_path), "config.yml")
+ with open(config_path, "w") as f:
+ f.write(safe_dump(config))
+ return config_path
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 21 2024, 6:42 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3219824
Attached To
D8023: Install `swh provenance origin from-journal` cli and tests
Event Timeline
Log In to Comment