Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9348161
D8657.id31274.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
16 KB
Subscribers
None
D8657.id31274.diff
View Options
diff --git a/swh/provenance/storage/__init__.py b/swh/provenance/storage/__init__.py
--- a/swh/provenance/storage/__init__.py
+++ b/swh/provenance/storage/__init__.py
@@ -48,5 +48,14 @@
if TYPE_CHECKING:
assert isinstance(rmq_storage, ProvenanceStorageInterface)
return rmq_storage
+ elif cls == "journal":
+ from swh.journal.writer import get_journal_writer
+ from swh.provenance.storage.journal import ProvenanceStorageJournal
+
+ storage = get_provenance_storage(**kwargs["storage"])
+ journal = get_journal_writer(**kwargs["journal_writer"])
+
+ ret = ProvenanceStorageJournal(storage=storage, journal=journal)
+ return ret
raise ValueError
diff --git a/swh/provenance/storage/journal.py b/swh/provenance/storage/journal.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/storage/journal.py
@@ -0,0 +1,152 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from __future__ import annotations
+
+from dataclasses import asdict
+from datetime import datetime
+from types import TracebackType
+from typing import Dict, Generator, Iterable, List, Optional, Set, Type
+
+from swh.model.model import Sha1Git
+from swh.provenance.storage.interface import (
+ DirectoryData,
+ EntityType,
+ ProvenanceResult,
+ ProvenanceStorageInterface,
+ RelationData,
+ RelationType,
+ RevisionData,
+)
+
+
+class JournalMessage:
+ def __init__(self, id, value):
+ self.id = id
+ self.value = value
+
+ def anonymize(self):
+ return None
+
+ def unique_key(self):
+ return self.id
+
+ def to_dict(self):
+ return {
+ "id": self.id,
+ "value": self.value,
+ }
+
+
+class ProvenanceStorageJournal:
+ def __init__(self, storage, journal):
+ self.storage = storage
+ self.journal = journal
+
+ def __enter__(self) -> ProvenanceStorageInterface:
+ self.storage.__enter__()
+ return self
+
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> None:
+ return self.storage.__exit__(exc_type, exc_val, exc_tb)
+
+ def open(self) -> None:
+ self.storage.open()
+
+ def close(self) -> None:
+ self.storage.close()
+
+ def content_add(self, cnts: Dict[Sha1Git, datetime]) -> bool:
+ self.journal.write_additions(
+ "content", [JournalMessage(key, value) for (key, value) in cnts.items()]
+ )
+ return self.storage.content_add(cnts)
+
+ def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
+ return self.storage.content_find_first(id)
+
+ def content_find_all(
+ self, id: Sha1Git, limit: Optional[int] = None
+ ) -> Generator[ProvenanceResult, None, None]:
+ return self.storage.content_find_all(id, limit)
+
+ def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
+ return self.storage.content_get(ids)
+
+ def directory_add(self, dirs: Dict[Sha1Git, DirectoryData]) -> bool:
+ self.journal.write_additions(
+ "directory",
+ [JournalMessage(key, asdict(value)) for (key, value) in dirs.items()],
+ )
+ return self.storage.directory_add(dirs)
+
+ def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, DirectoryData]:
+ return self.storage.directory_get(ids)
+
+ def directory_iter_not_flattened(
+ self, limit: int, start_id: Sha1Git
+ ) -> List[Sha1Git]:
+ return self.storage.directory_iter_not_flattened(limit, start_id)
+
+ def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]:
+ return self.storage.entity_get_all(entity)
+
+ def location_add(self, paths: Dict[Sha1Git, bytes]) -> bool:
+ self.journal.write_additions(
+ "location", [JournalMessage(key, value) for (key, value) in paths.items()]
+ )
+ return self.storage.location_add(paths)
+
+ def location_get_all(self) -> Dict[Sha1Git, bytes]:
+ return self.storage.location_get_all()
+
+ def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool:
+ self.journal.write_additions(
+ "origin", [JournalMessage(key, value) for (key, value) in orgs.items()]
+ )
+ return self.storage.origin_add(orgs)
+
+ def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]:
+ return self.storage.origin_get(ids)
+
+ def revision_add(self, revs: Dict[Sha1Git, RevisionData]) -> bool:
+ self.journal.write_additions(
+ "revision",
+ [JournalMessage(key, asdict(value)) for (key, value) in revs.items()],
+ )
+ return self.storage.revision_add(revs)
+
+ def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]:
+ return self.storage.revision_get(ids)
+
+ def relation_add(
+ self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]]
+ ) -> bool:
+ self.journal.write_additions(
+ relation.value,
+ [
+ JournalMessage(key, [asdict(reldata) for reldata in value])
+ for (key, value) in data.items()
+ ],
+ )
+ return self.storage.relation_add(relation, data)
+
+ def relation_get(
+ self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False
+ ) -> Dict[Sha1Git, Set[RelationData]]:
+ return self.storage.relation_get(relation, ids, reverse)
+
+ def relation_get_all(
+ self, relation: RelationType
+ ) -> Dict[Sha1Git, Set[RelationData]]:
+ return self.storage.relation_get_all(relation)
+
+ def with_path(self) -> bool:
+ return self.storage.with_path()
diff --git a/swh/provenance/tests/test_journal_client.py b/swh/provenance/tests/test_journal_client.py
--- a/swh/provenance/tests/test_journal_client.py
+++ b/swh/provenance/tests/test_journal_client.py
@@ -28,6 +28,7 @@
@pytest.mark.origin_layer
+@pytest.mark.kafka
def test_cli_origin_from_journal_client(
swh_storage: StorageInterface,
swh_storage_backend_config: Dict,
@@ -82,6 +83,7 @@
assert actual_result == {origin_sha1: origin_url}
+@pytest.mark.kafka
def test_cli_revision_from_journal_client(
swh_storage: StorageInterface,
swh_storage_backend_config: Dict,
diff --git a/swh/provenance/tests/test_provenance_journal_writer.py b/swh/provenance/tests/test_provenance_journal_writer.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/tests/test_provenance_journal_writer.py
@@ -0,0 +1,193 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from dataclasses import asdict
+from typing import Dict, Generator
+
+import pytest
+
+from swh.provenance import get_provenance_storage
+from swh.provenance.storage.interface import (
+ EntityType,
+ ProvenanceStorageInterface,
+ RelationType,
+)
+
+from .test_provenance_storage import TestProvenanceStorage as _TestProvenanceStorage
+
+
+@pytest.fixture()
+def provenance_storage(
+ provenance_postgresqldb: Dict[str, str],
+) -> Generator[ProvenanceStorageInterface, None, None]:
+ cfg = {
+ "storage": {
+ "cls": "postgresql",
+ "db": provenance_postgresqldb,
+ "raise_on_commit": True,
+ },
+ "journal_writer": {
+ "cls": "memory",
+ },
+ }
+ with get_provenance_storage(cls="journal", **cfg) as storage:
+ yield storage
+
+
+class TestProvenanceStorageJournal(_TestProvenanceStorage):
+ def test_provenance_storage_content(self, provenance_storage):
+ super().test_provenance_storage_content(provenance_storage)
+ assert provenance_storage.journal
+ objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects}
+ assert objtypes == {"content"}
+
+ journal_objs = {
+ obj.id
+ for (objtype, obj) in provenance_storage.journal.objects
+ if objtype == "content"
+ }
+ assert provenance_storage.entity_get_all(EntityType.CONTENT) == journal_objs
+
+ def test_provenance_storage_directory(self, provenance_storage):
+ super().test_provenance_storage_directory(provenance_storage)
+ assert provenance_storage.journal
+ objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects}
+ assert objtypes == {"directory"}
+
+ journal_objs = {
+ obj.id
+ for (objtype, obj) in provenance_storage.journal.objects
+ if objtype == "directory"
+ }
+ assert provenance_storage.entity_get_all(EntityType.DIRECTORY) == journal_objs
+
+ def test_provenance_storage_location(self, provenance_storage):
+ super().test_provenance_storage_location(provenance_storage)
+ assert provenance_storage.journal
+ objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects}
+ assert objtypes == {"location"}
+
+ journal_objs = {
+ obj.id: obj.value
+ for (objtype, obj) in provenance_storage.journal.objects
+ if objtype == "location"
+ }
+ assert provenance_storage.location_get_all() == journal_objs
+
+ def test_provenance_storage_orign(self, provenance_storage):
+ super().test_provenance_storage_origin(provenance_storage)
+ assert provenance_storage.journal
+ objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects}
+ assert objtypes == {"origin"}
+
+ journal_objs = {
+ obj.id
+ for (objtype, obj) in provenance_storage.journal.objects
+ if objtype == "origin"
+ }
+ assert provenance_storage.entity_get_all(EntityType.ORIGIN) == journal_objs
+
+ def test_provenance_storage_revision(self, provenance_storage):
+ super().test_provenance_storage_revision(provenance_storage)
+ assert provenance_storage.journal
+ objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects}
+ assert objtypes == {"revision", "origin"}
+
+ journal_objs = {
+ obj.id
+ for (objtype, obj) in provenance_storage.journal.objects
+ if objtype == "revision"
+ }
+ assert provenance_storage.entity_get_all(EntityType.REVISION) == journal_objs
+
+ def test_provenance_storage_relation_revision_layer(self, provenance_storage):
+ super().test_provenance_storage_relation_revision_layer(provenance_storage)
+ assert provenance_storage.journal
+ objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects}
+ assert objtypes == {
+ "location",
+ "content",
+ "directory",
+ "revision",
+ "content_in_revision",
+ "content_in_directory",
+ "directory_in_revision",
+ }
+
+ journal_rels = {
+ obj.id: {tuple(v.items()) for v in obj.value}
+ for (objtype, obj) in provenance_storage.journal.objects
+ if objtype == "content_in_revision"
+ }
+ prov_rels = {
+ k: {tuple(asdict(reldata).items()) for reldata in v}
+ for k, v in provenance_storage.relation_get_all(
+ RelationType.CNT_EARLY_IN_REV
+ ).items()
+ }
+ assert prov_rels == journal_rels
+
+ journal_rels = {
+ obj.id: {tuple(v.items()) for v in obj.value}
+ for (objtype, obj) in provenance_storage.journal.objects
+ if objtype == "content_in_directory"
+ }
+ prov_rels = {
+ k: {tuple(asdict(reldata).items()) for reldata in v}
+ for k, v in provenance_storage.relation_get_all(
+ RelationType.CNT_IN_DIR
+ ).items()
+ }
+ assert prov_rels == journal_rels
+
+ journal_rels = {
+ obj.id: {tuple(v.items()) for v in obj.value}
+ for (objtype, obj) in provenance_storage.journal.objects
+ if objtype == "directory_in_revision"
+ }
+ prov_rels = {
+ k: {tuple(asdict(reldata).items()) for reldata in v}
+ for k, v in provenance_storage.relation_get_all(
+ RelationType.DIR_IN_REV
+ ).items()
+ }
+ assert prov_rels == journal_rels
+
+ def test_provenance_storage_relation_origin_layer(self, provenance_storage):
+ super().test_provenance_storage_relation_orign_layer(provenance_storage)
+ assert provenance_storage.journal
+ objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects}
+ assert objtypes == {
+ "origin",
+ "revision",
+ "revision_in_origin",
+ "revision_before_revision",
+ }
+
+ journal_rels = {
+ obj.id: {tuple(v.items()) for v in obj.value}
+ for (objtype, obj) in provenance_storage.journal.objects
+ if objtype == "revision_in_origin"
+ }
+ prov_rels = {
+ k: {tuple(asdict(reldata).items()) for reldata in v}
+ for k, v in provenance_storage.relation_get_all(
+ RelationType.REV_IN_ORG
+ ).items()
+ }
+ assert prov_rels == journal_rels
+
+ journal_rels = {
+ obj.id: {tuple(v.items()) for v in obj.value}
+ for (objtype, obj) in provenance_storage.journal.objects
+ if objtype == "revision_before_revision"
+ }
+ prov_rels = {
+ k: {tuple(asdict(reldata).items()) for reldata in v}
+ for k, v in provenance_storage.relation_get_all(
+ RelationType.REV_BEFORE_REV
+ ).items()
+ }
+ assert prov_rels == journal_rels
diff --git a/swh/provenance/tests/test_provenance_journal_writer_kafka.py b/swh/provenance/tests/test_provenance_journal_writer_kafka.py
new file mode 100644
--- /dev/null
+++ b/swh/provenance/tests/test_provenance_journal_writer_kafka.py
@@ -0,0 +1,46 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Dict, Generator
+
+from confluent_kafka import Consumer
+import pytest
+
+from swh.provenance import get_provenance_storage
+from swh.provenance.storage.interface import ProvenanceStorageInterface
+
+from .test_provenance_storage import ( # noqa
+ TestProvenanceStorage as _TestProvenanceStorage,
+)
+
+
+@pytest.fixture()
+def provenance_storage(
+ provenance_postgresqldb: Dict[str, str],
+ kafka_prefix: str,
+ kafka_server: str,
+ consumer: Consumer,
+) -> Generator[ProvenanceStorageInterface, None, None]:
+ cfg = {
+ "storage": {
+ "cls": "postgresql",
+ "db": provenance_postgresqldb,
+ "raise_on_commit": True,
+ },
+ "journal_writer": {
+ "cls": "kafka",
+ "brokers": [kafka_server],
+ "client_id": "kafka_writer",
+ "prefix": "swh.provenance",
+ "anonymize": False,
+ },
+ }
+ with get_provenance_storage(cls="journal", **cfg) as storage:
+ yield storage
+
+
+@pytest.mark.kafka
+class TestProvenanceStorageJournal(_TestProvenanceStorage):
+ pass
diff --git a/tox.ini b/tox.ini
--- a/tox.ini
+++ b/tox.ini
@@ -17,12 +17,12 @@
pytest {envsitepackagesdir}/swh/provenance \
--cov={envsitepackagesdir}/swh/provenance \
--cov-branch \
- -k 'journal' {posargs}
+ -m kafka {posargs}
pytest --doctest-modules \
{envsitepackagesdir}/swh/provenance \
--cov={envsitepackagesdir}/swh/provenance \
--cov-branch --cov-append \
- -k 'not journal' {posargs}
+ -m 'not kafka' {posargs}
passenv = HOME # required by pytest-rabbitmq
[testenv:black]
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Jul 3 2025, 6:15 PM (4 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3222350
Attached To
D8657: Add support for kafka journalization of the ProvenanceStorageInterface
Event Timeline
Log In to Comment