diff --git a/swh/provenance/storage/__init__.py b/swh/provenance/storage/__init__.py index 9be2f19..fdd2d34 100644 --- a/swh/provenance/storage/__init__.py +++ b/swh/provenance/storage/__init__.py @@ -1,52 +1,61 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from typing import TYPE_CHECKING import warnings from .interface import ProvenanceStorageInterface def get_provenance_storage(cls: str, **kwargs) -> ProvenanceStorageInterface: """Get an archive object of class ``cls`` with arguments ``args``. Args: cls: storage's class, only 'local' is currently supported args: dictionary of arguments passed to the storage class constructor Returns: an instance of storage object Raises: :cls:`ValueError` if passed an unknown archive class. """ if cls in ["local", "postgresql"]: from swh.provenance.storage.postgresql import ProvenanceStoragePostgreSql if cls == "local": warnings.warn( '"local" class is deprecated for provenance storage, please ' 'use "postgresql" class instead.', DeprecationWarning, ) raise_on_commit = kwargs.get("raise_on_commit", False) return ProvenanceStoragePostgreSql( raise_on_commit=raise_on_commit, **kwargs["db"] ) elif cls == "rabbitmq": from swh.provenance.storage.rabbitmq.client import ( ProvenanceStorageRabbitMQClient, ) rmq_storage = ProvenanceStorageRabbitMQClient(**kwargs) if TYPE_CHECKING: assert isinstance(rmq_storage, ProvenanceStorageInterface) return rmq_storage + elif cls == "journal": + from swh.journal.writer import get_journal_writer + from swh.provenance.storage.journal import ProvenanceStorageJournal + + storage = get_provenance_storage(**kwargs["storage"]) + journal = get_journal_writer(**kwargs["journal_writer"]) + + ret = ProvenanceStorageJournal(storage=storage, journal=journal) + return ret raise ValueError diff --git a/swh/provenance/storage/journal.py b/swh/provenance/storage/journal.py new file mode 100644 index 0000000..b05ba88 --- /dev/null +++ b/swh/provenance/storage/journal.py @@ -0,0 +1,152 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from __future__ import annotations + +from dataclasses import asdict +from datetime import datetime +from types import TracebackType +from typing import Dict, Generator, Iterable, List, Optional, Set, Type + +from swh.model.model import Sha1Git +from swh.provenance.storage.interface import ( + DirectoryData, + EntityType, + ProvenanceResult, + ProvenanceStorageInterface, + RelationData, + RelationType, + RevisionData, +) + + +class JournalMessage: + def __init__(self, id, value): + self.id = id + self.value = value + + def anonymize(self): + return None + + def unique_key(self): + return self.id + + def to_dict(self): + return { + "id": self.id, + "value": self.value, + } + + +class ProvenanceStorageJournal: + def __init__(self, storage, journal): + self.storage = storage + self.journal = journal + + def __enter__(self) -> ProvenanceStorageInterface: + self.storage.__enter__() + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + return self.storage.__exit__(exc_type, exc_val, exc_tb) + + def open(self) -> None: + self.storage.open() + + def close(self) -> None: + self.storage.close() + + def content_add(self, cnts: Dict[Sha1Git, datetime]) -> bool: + self.journal.write_additions( + "content", [JournalMessage(key, value) for (key, value) in cnts.items()] + ) + return self.storage.content_add(cnts) + + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: + return self.storage.content_find_first(id) + + def content_find_all( + self, id: Sha1Git, limit: Optional[int] = None + ) -> Generator[ProvenanceResult, None, None]: + return self.storage.content_find_all(id, limit) + + def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: + return self.storage.content_get(ids) + + def directory_add(self, dirs: Dict[Sha1Git, DirectoryData]) -> bool: + self.journal.write_additions( + "directory", + [JournalMessage(key, asdict(value)) for (key, value) in dirs.items()], + ) + return self.storage.directory_add(dirs) + + def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, DirectoryData]: + return self.storage.directory_get(ids) + + def directory_iter_not_flattened( + self, limit: int, start_id: Sha1Git + ) -> List[Sha1Git]: + return self.storage.directory_iter_not_flattened(limit, start_id) + + def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: + return self.storage.entity_get_all(entity) + + def location_add(self, paths: Dict[Sha1Git, bytes]) -> bool: + self.journal.write_additions( + "location", [JournalMessage(key, value) for (key, value) in paths.items()] + ) + return self.storage.location_add(paths) + + def location_get_all(self) -> Dict[Sha1Git, bytes]: + return self.storage.location_get_all() + + def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool: + self.journal.write_additions( + "origin", [JournalMessage(key, value) for (key, value) in orgs.items()] + ) + return self.storage.origin_add(orgs) + + def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: + return self.storage.origin_get(ids) + + def revision_add(self, revs: Dict[Sha1Git, RevisionData]) -> bool: + self.journal.write_additions( + "revision", + [JournalMessage(key, asdict(value)) for (key, value) in revs.items()], + ) + return self.storage.revision_add(revs) + + def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: + return self.storage.revision_get(ids) + + def relation_add( + self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]] + ) -> bool: + self.journal.write_additions( + relation.value, + [ + JournalMessage(key, [asdict(reldata) for reldata in value]) + for (key, value) in data.items() + ], + ) + return self.storage.relation_add(relation, data) + + def relation_get( + self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False + ) -> Dict[Sha1Git, Set[RelationData]]: + return self.storage.relation_get(relation, ids, reverse) + + def relation_get_all( + self, relation: RelationType + ) -> Dict[Sha1Git, Set[RelationData]]: + return self.storage.relation_get_all(relation) + + def with_path(self) -> bool: + return self.storage.with_path() diff --git a/swh/provenance/tests/test_journal_client.py b/swh/provenance/tests/test_journal_client.py index 27bb7af..865db1a 100644 --- a/swh/provenance/tests/test_journal_client.py +++ b/swh/provenance/tests/test_journal_client.py @@ -1,136 +1,138 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict from confluent_kafka import Consumer import pytest from swh.model.hashutil import MultiHash from swh.provenance.tests.conftest import fill_storage, load_repo_data from swh.storage.interface import StorageInterface from .test_utils import invoke, write_configuration_path @pytest.fixture def swh_storage_backend_config(swh_storage_backend_config, kafka_server, kafka_prefix): writer_config = { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, "anonymize": False, } yield {**swh_storage_backend_config, "journal_writer": writer_config} @pytest.mark.origin_layer +@pytest.mark.kafka def test_cli_origin_from_journal_client( swh_storage: StorageInterface, swh_storage_backend_config: Dict, kafka_prefix: str, kafka_server: str, consumer: Consumer, tmp_path: str, provenance, postgres_provenance, ) -> None: """Test origin journal client cli""" # Prepare storage data data = load_repo_data("cmdbts2") assert len(data["origin"]) >= 1 origin_url = data["origin"][0]["url"] fill_storage(swh_storage, data) # Prepare configuration for cli call swh_storage_backend_config.pop("journal_writer", None) # no need for that config storage_config_dict = swh_storage_backend_config cfg = { "journal_client": { "cls": "kafka", "brokers": [kafka_server], "group_id": "toto", "prefix": kafka_prefix, "stop_on_eof": True, }, "provenance": { "archive": { "cls": "api", "storage": storage_config_dict, }, "storage": { "cls": "postgresql", "db": postgres_provenance.get_dsn_parameters(), }, }, } config_path = write_configuration_path(cfg, tmp_path) # call the cli 'swh provenance origin from-journal' result = invoke(["origin", "from-journal"], config_path) assert result.exit_code == 0, f"Unexpected result: {result.output}" origin_sha1 = MultiHash.from_data( origin_url.encode(), hash_names=["sha1"] ).digest()["sha1"] actual_result = provenance.storage.origin_get([origin_sha1]) assert actual_result == {origin_sha1: origin_url} +@pytest.mark.kafka def test_cli_revision_from_journal_client( swh_storage: StorageInterface, swh_storage_backend_config: Dict, kafka_prefix: str, kafka_server: str, consumer: Consumer, tmp_path: str, provenance, postgres_provenance, ) -> None: """Test revision journal client cli""" # Prepare storage data data = load_repo_data("cmdbts2") assert len(data["origin"]) >= 1 fill_storage(swh_storage, data) # Prepare configuration for cli call swh_storage_backend_config.pop("journal_writer", None) # no need for that config storage_config_dict = swh_storage_backend_config cfg = { "journal_client": { "cls": "kafka", "brokers": [kafka_server], "group_id": "toto", "prefix": kafka_prefix, "stop_on_eof": True, }, "provenance": { "archive": { "cls": "api", "storage": storage_config_dict, }, "storage": { "cls": "postgresql", "db": postgres_provenance.get_dsn_parameters(), }, }, } config_path = write_configuration_path(cfg, tmp_path) revisions = [rev["id"] for rev in data["revision"]] result = provenance.storage.revision_get(revisions) assert not result # call the cli 'swh provenance revision from-journal' cli_result = invoke(["revision", "from-journal"], config_path) assert cli_result.exit_code == 0, f"Unexpected result: {result.output}" result = provenance.storage.revision_get(revisions) assert set(result.keys()) == set(revisions) diff --git a/swh/provenance/tests/test_provenance_journal_writer.py b/swh/provenance/tests/test_provenance_journal_writer.py new file mode 100644 index 0000000..9134819 --- /dev/null +++ b/swh/provenance/tests/test_provenance_journal_writer.py @@ -0,0 +1,193 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from dataclasses import asdict +from typing import Dict, Generator + +import pytest + +from swh.provenance import get_provenance_storage +from swh.provenance.storage.interface import ( + EntityType, + ProvenanceStorageInterface, + RelationType, +) + +from .test_provenance_storage import TestProvenanceStorage as _TestProvenanceStorage + + +@pytest.fixture() +def provenance_storage( + provenance_postgresqldb: Dict[str, str], +) -> Generator[ProvenanceStorageInterface, None, None]: + cfg = { + "storage": { + "cls": "postgresql", + "db": provenance_postgresqldb, + "raise_on_commit": True, + }, + "journal_writer": { + "cls": "memory", + }, + } + with get_provenance_storage(cls="journal", **cfg) as storage: + yield storage + + +class TestProvenanceStorageJournal(_TestProvenanceStorage): + def test_provenance_storage_content(self, provenance_storage): + super().test_provenance_storage_content(provenance_storage) + assert provenance_storage.journal + objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} + assert objtypes == {"content"} + + journal_objs = { + obj.id + for (objtype, obj) in provenance_storage.journal.objects + if objtype == "content" + } + assert provenance_storage.entity_get_all(EntityType.CONTENT) == journal_objs + + def test_provenance_storage_directory(self, provenance_storage): + super().test_provenance_storage_directory(provenance_storage) + assert provenance_storage.journal + objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} + assert objtypes == {"directory"} + + journal_objs = { + obj.id + for (objtype, obj) in provenance_storage.journal.objects + if objtype == "directory" + } + assert provenance_storage.entity_get_all(EntityType.DIRECTORY) == journal_objs + + def test_provenance_storage_location(self, provenance_storage): + super().test_provenance_storage_location(provenance_storage) + assert provenance_storage.journal + objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} + assert objtypes == {"location"} + + journal_objs = { + obj.id: obj.value + for (objtype, obj) in provenance_storage.journal.objects + if objtype == "location" + } + assert provenance_storage.location_get_all() == journal_objs + + def test_provenance_storage_orign(self, provenance_storage): + super().test_provenance_storage_origin(provenance_storage) + assert provenance_storage.journal + objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} + assert objtypes == {"origin"} + + journal_objs = { + obj.id + for (objtype, obj) in provenance_storage.journal.objects + if objtype == "origin" + } + assert provenance_storage.entity_get_all(EntityType.ORIGIN) == journal_objs + + def test_provenance_storage_revision(self, provenance_storage): + super().test_provenance_storage_revision(provenance_storage) + assert provenance_storage.journal + objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} + assert objtypes == {"revision", "origin"} + + journal_objs = { + obj.id + for (objtype, obj) in provenance_storage.journal.objects + if objtype == "revision" + } + assert provenance_storage.entity_get_all(EntityType.REVISION) == journal_objs + + def test_provenance_storage_relation_revision_layer(self, provenance_storage): + super().test_provenance_storage_relation_revision_layer(provenance_storage) + assert provenance_storage.journal + objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} + assert objtypes == { + "location", + "content", + "directory", + "revision", + "content_in_revision", + "content_in_directory", + "directory_in_revision", + } + + journal_rels = { + obj.id: {tuple(v.items()) for v in obj.value} + for (objtype, obj) in provenance_storage.journal.objects + if objtype == "content_in_revision" + } + prov_rels = { + k: {tuple(asdict(reldata).items()) for reldata in v} + for k, v in provenance_storage.relation_get_all( + RelationType.CNT_EARLY_IN_REV + ).items() + } + assert prov_rels == journal_rels + + journal_rels = { + obj.id: {tuple(v.items()) for v in obj.value} + for (objtype, obj) in provenance_storage.journal.objects + if objtype == "content_in_directory" + } + prov_rels = { + k: {tuple(asdict(reldata).items()) for reldata in v} + for k, v in provenance_storage.relation_get_all( + RelationType.CNT_IN_DIR + ).items() + } + assert prov_rels == journal_rels + + journal_rels = { + obj.id: {tuple(v.items()) for v in obj.value} + for (objtype, obj) in provenance_storage.journal.objects + if objtype == "directory_in_revision" + } + prov_rels = { + k: {tuple(asdict(reldata).items()) for reldata in v} + for k, v in provenance_storage.relation_get_all( + RelationType.DIR_IN_REV + ).items() + } + assert prov_rels == journal_rels + + def test_provenance_storage_relation_origin_layer(self, provenance_storage): + super().test_provenance_storage_relation_orign_layer(provenance_storage) + assert provenance_storage.journal + objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} + assert objtypes == { + "origin", + "revision", + "revision_in_origin", + "revision_before_revision", + } + + journal_rels = { + obj.id: {tuple(v.items()) for v in obj.value} + for (objtype, obj) in provenance_storage.journal.objects + if objtype == "revision_in_origin" + } + prov_rels = { + k: {tuple(asdict(reldata).items()) for reldata in v} + for k, v in provenance_storage.relation_get_all( + RelationType.REV_IN_ORG + ).items() + } + assert prov_rels == journal_rels + + journal_rels = { + obj.id: {tuple(v.items()) for v in obj.value} + for (objtype, obj) in provenance_storage.journal.objects + if objtype == "revision_before_revision" + } + prov_rels = { + k: {tuple(asdict(reldata).items()) for reldata in v} + for k, v in provenance_storage.relation_get_all( + RelationType.REV_BEFORE_REV + ).items() + } + assert prov_rels == journal_rels diff --git a/swh/provenance/tests/test_provenance_journal_writer_kafka.py b/swh/provenance/tests/test_provenance_journal_writer_kafka.py new file mode 100644 index 0000000..6b70994 --- /dev/null +++ b/swh/provenance/tests/test_provenance_journal_writer_kafka.py @@ -0,0 +1,46 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Dict, Generator + +from confluent_kafka import Consumer +import pytest + +from swh.provenance import get_provenance_storage +from swh.provenance.storage.interface import ProvenanceStorageInterface + +from .test_provenance_storage import ( # noqa + TestProvenanceStorage as _TestProvenanceStorage, +) + + +@pytest.fixture() +def provenance_storage( + provenance_postgresqldb: Dict[str, str], + kafka_prefix: str, + kafka_server: str, + consumer: Consumer, +) -> Generator[ProvenanceStorageInterface, None, None]: + cfg = { + "storage": { + "cls": "postgresql", + "db": provenance_postgresqldb, + "raise_on_commit": True, + }, + "journal_writer": { + "cls": "kafka", + "brokers": [kafka_server], + "client_id": "kafka_writer", + "prefix": "swh.provenance", + "anonymize": False, + }, + } + with get_provenance_storage(cls="journal", **cfg) as storage: + yield storage + + +@pytest.mark.kafka +class TestProvenanceStorageJournal(_TestProvenanceStorage): + pass diff --git a/tox.ini b/tox.ini index a632e51..7581c2e 100644 --- a/tox.ini +++ b/tox.ini @@ -1,87 +1,87 @@ [tox] envlist = black,flake8,mypy,py3 [testenv] extras = testing deps = pytest-cov commands = # Under certain circumstances (e.g. in jenkins), the swh.journal kafka server # fixture (which makes librdkafka spawn internal threads, that can't be torn # down), and the swh.provenance rabbitmq storage server fixture (which is # brought up by multiprocessing within the pytest runner context) poorly # interact, causing the Python interpreter to SIGABRT. # Running the tests for the journal client separately from the rest allow to # isolate the fixtures from one another and lets the tests run to completion. pytest {envsitepackagesdir}/swh/provenance \ --cov={envsitepackagesdir}/swh/provenance \ --cov-branch \ - -k 'journal' {posargs} + -m kafka {posargs} pytest --doctest-modules \ {envsitepackagesdir}/swh/provenance \ --cov={envsitepackagesdir}/swh/provenance \ --cov-branch --cov-append \ - -k 'not journal' {posargs} + -m 'not kafka' {posargs} passenv = HOME # required by pytest-rabbitmq [testenv:black] skip_install = true deps = black==22.3.0 commands = {envpython} -m black --check swh [testenv:flake8] skip_install = true deps = flake8==4.0.1 flake8-bugbear==22.3.23 commands = {envpython} -m flake8 [testenv:mypy] extras = testing deps = mypy==0.942 commands = mypy swh # build documentation outside swh-environment using the current # git HEAD of swh-docs, is executed on CI for each diff to prevent # breaking doc build [testenv:sphinx] whitelist_externals = make usedevelop = true extras = testing deps = # fetch and install swh-docs in develop mode -e git+https://forge.softwareheritage.org/source/swh-docs#egg=swh.docs setenv = SWH_PACKAGE_DOC_TOX_BUILD = 1 # turn warnings into errors SPHINXOPTS = -W commands = make -I ../.tox/sphinx/src/swh-docs/swh/ -C docs # build documentation only inside swh-environment using local state # of swh-docs package [testenv:sphinx-dev] whitelist_externals = make usedevelop = true extras = testing deps = # install swh-docs in develop mode -e ../swh-docs setenv = SWH_PACKAGE_DOC_TOX_BUILD = 1 # turn warnings into errors SPHINXOPTS = -W commands = make -I ../.tox/sphinx-dev/src/swh-docs/swh/ -C docs