diff --git a/swh/provenance/storage/journal.py b/swh/provenance/storage/journal.py index 8e96bd6..fac6e08 100644 --- a/swh/provenance/storage/journal.py +++ b/swh/provenance/storage/journal.py @@ -1,149 +1,162 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations -from dataclasses import asdict from datetime import datetime +import hashlib from types import TracebackType from typing import Dict, Generator, Iterable, List, Optional, Set, Type from swh.model.model import Sha1Git from swh.provenance.storage.interface import ( DirectoryData, EntityType, ProvenanceResult, ProvenanceStorageInterface, RelationData, RelationType, RevisionData, ) class JournalMessage: - def __init__(self, id, value): + def __init__(self, id, value, add_id=True): self.id = id self.value = value + self.add_id = add_id def anonymize(self): return None def unique_key(self): return self.id def to_dict(self): - return { - "id": self.id, - "value": self.value, - } + if self.add_id: + return { + "id": self.id, + "value": self.value, + } + else: + return self.value class ProvenanceStorageJournal: def __init__(self, storage, journal): self.storage = storage self.journal = journal def __enter__(self) -> ProvenanceStorageInterface: self.storage.__enter__() return self def __exit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: return self.storage.__exit__(exc_type, exc_val, exc_tb) def open(self) -> None: self.storage.open() def close(self) -> None: self.storage.close() def content_add(self, cnts: Dict[Sha1Git, datetime]) -> bool: self.journal.write_additions( "content", [JournalMessage(key, value) for (key, value) in cnts.items()] ) return self.storage.content_add(cnts) def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: return self.storage.content_find_first(id) def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: return self.storage.content_find_all(id, limit) def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return self.storage.content_get(ids) def directory_add(self, dirs: Dict[Sha1Git, DirectoryData]) -> bool: self.journal.write_additions( "directory", - [JournalMessage(key, asdict(value)) for (key, value) in dirs.items()], + [ + JournalMessage(key, value.date) + for (key, value) in dirs.items() + if value.date is not None + ], ) return self.storage.directory_add(dirs) def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, DirectoryData]: return self.storage.directory_get(ids) def directory_iter_not_flattened( self, limit: int, start_id: Sha1Git ) -> List[Sha1Git]: return self.storage.directory_iter_not_flattened(limit, start_id) def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: return self.storage.entity_get_all(entity) def location_add(self, paths: Dict[Sha1Git, bytes]) -> bool: - self.journal.write_additions( - "location", [JournalMessage(key, value) for (key, value) in paths.items()] - ) return self.storage.location_add(paths) def location_get_all(self) -> Dict[Sha1Git, bytes]: return self.storage.location_get_all() def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool: self.journal.write_additions( "origin", [JournalMessage(key, value) for (key, value) in orgs.items()] ) return self.storage.origin_add(orgs) def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: return self.storage.origin_get(ids) def revision_add(self, revs: Dict[Sha1Git, RevisionData]) -> bool: self.journal.write_additions( "revision", - [JournalMessage(key, asdict(value)) for (key, value) in revs.items()], + [ + JournalMessage(key, value.date) + for (key, value) in revs.items() + if value.date is not None + ], ) return self.storage.revision_add(revs) def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: return self.storage.revision_get(ids) def relation_add( self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]] ) -> bool: - self.journal.write_additions( - relation.value, - [ - JournalMessage(key, [asdict(reldata) for reldata in value]) - for (key, value) in data.items() - ], - ) + for src, relations in data.items(): + for reldata in relations: + key = hashlib.sha1(src + reldata.dst + (reldata.path or b"")).digest() + self.journal.write_addition( + relation.value, + JournalMessage( + key, + {"src": src, "dst": reldata.dst, "path": reldata.path}, + add_id=False, + ), + ) return self.storage.relation_add(relation, data) def relation_get( self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False ) -> Dict[Sha1Git, Set[RelationData]]: return self.storage.relation_get(relation, ids, reverse) def relation_get_all( self, relation: RelationType ) -> Dict[Sha1Git, Set[RelationData]]: return self.storage.relation_get_all(relation) diff --git a/swh/provenance/storage/replay.py b/swh/provenance/storage/replay.py index 78e2a7f..0f19e7c 100644 --- a/swh/provenance/storage/replay.py +++ b/swh/provenance/storage/replay.py @@ -1,107 +1,122 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from collections import defaultdict +from datetime import datetime import logging -from typing import Any, Callable, Dict, List, Optional, Tuple +from typing import Any, Callable, Dict, List, Optional, Tuple, Union try: from systemd.daemon import notify except ImportError: notify = None from swh.core.statsd import statsd from swh.journal.serializers import kafka_to_value from swh.provenance.storage.interface import ( DirectoryData, RelationData, RelationType, RevisionData, + Sha1Git, ) from .interface import ProvenanceStorageInterface logger = logging.getLogger(__name__) REPLAY_OPERATIONS_METRIC = "swh_provenance_replayer_operations_total" REPLAY_DURATION_METRIC = "swh_provenance_replayer_duration_seconds" def cvrt_directory(msg_d): - return (msg_d["id"], DirectoryData(**msg_d["value"])) + return (msg_d["id"], DirectoryData(date=msg_d["value"], flat=False)) def cvrt_revision(msg_d): - return (msg_d["id"], RevisionData(**msg_d["value"])) + return (msg_d["id"], RevisionData(date=msg_d["value"], origin=None)) def cvrt_default(msg_d): return (msg_d["id"], msg_d["value"]) def cvrt_relation(msg_d): - return (msg_d["id"], {RelationData(**v) for v in msg_d["value"]}) + return (msg_d["src"], RelationData(dst=msg_d["dst"], path=msg_d["path"])) OBJECT_CONVERTERS: Dict[str, Callable[[Dict], Tuple[bytes, Any]]] = { "directory": cvrt_directory, "revision": cvrt_revision, "content": cvrt_default, - "location": cvrt_default, "content_in_revision": cvrt_relation, "content_in_directory": cvrt_relation, "directory_in_revision": cvrt_relation, } class ProvenanceObjectDeserializer: def __init__( self, raise_on_error: bool = False, reporter: Optional[Callable[[str, bytes], None]] = None, ): self.reporter = reporter self.raise_on_error = raise_on_error def convert(self, object_type: str, msg: bytes) -> Optional[Tuple[bytes, Any]]: dict_repr = kafka_to_value(msg) obj = OBJECT_CONVERTERS[object_type](dict_repr) return obj def report_failure(self, msg: bytes, obj: Dict): if self.reporter: self.reporter(obj["id"].hex(), msg) def process_replay_objects( all_objects: Dict[str, List[Tuple[bytes, Any]]], *, storage: ProvenanceStorageInterface, ) -> None: for object_type, objects in all_objects.items(): logger.debug("Inserting %s %s objects", len(objects), object_type) with statsd.timed(REPLAY_DURATION_METRIC, tags={"object_type": object_type}): _insert_objects(object_type, objects, storage) statsd.increment( REPLAY_OPERATIONS_METRIC, len(objects), tags={"object_type": object_type} ) if notify: notify("WATCHDOG=1") def _insert_objects( object_type: str, objects: List[Tuple[bytes, Any]], storage: ProvenanceStorageInterface, ) -> None: """Insert objects of type object_type in the storage.""" if object_type not in OBJECT_CONVERTERS: logger.warning("Received a series of %s, this should not happen", object_type) return - data = dict(objects) if "_in_" in object_type: - storage.relation_add(relation=RelationType(object_type), data=data) + reldata = defaultdict(set) + for k, v in objects: + reldata[k].add(v) + storage.relation_add(relation=RelationType(object_type), data=reldata) + elif object_type in ("revision", "directory"): + entitydata: Dict[Sha1Git, Union[RevisionData, DirectoryData]] = {} + for k, v in objects: + if k not in entitydata or entitydata[k].date > v.date: + entitydata[k] = v + getattr(storage, f"{object_type}_add")(entitydata) else: + data: Dict[Sha1Git, datetime] = {} + for k, v in objects: + assert isinstance(v, datetime) + if k not in data or data[k] > v: + data[k] = v getattr(storage, f"{object_type}_add")(data) diff --git a/swh/provenance/tests/test_provenance_journal_writer.py b/swh/provenance/tests/test_provenance_journal_writer.py index ec11c4b..edfa536 100644 --- a/swh/provenance/tests/test_provenance_journal_writer.py +++ b/swh/provenance/tests/test_provenance_journal_writer.py @@ -1,193 +1,187 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from dataclasses import asdict from typing import Dict, Generator import pytest from swh.provenance import get_provenance_storage from swh.provenance.storage.interface import ( EntityType, ProvenanceStorageInterface, RelationType, ) from .test_provenance_storage import TestProvenanceStorage as _TestProvenanceStorage @pytest.fixture() def provenance_storage( provenance_postgresqldb: Dict[str, str], ) -> Generator[ProvenanceStorageInterface, None, None]: cfg = { "storage": { "cls": "postgresql", "db": provenance_postgresqldb, "raise_on_commit": True, }, "journal_writer": { "cls": "memory", }, } with get_provenance_storage(cls="journal", **cfg) as storage: yield storage class TestProvenanceStorageJournal(_TestProvenanceStorage): def test_provenance_storage_content(self, provenance_storage): super().test_provenance_storage_content(provenance_storage) assert provenance_storage.journal objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} assert objtypes == {"content"} journal_objs = { obj.id for (objtype, obj) in provenance_storage.journal.objects if objtype == "content" } assert provenance_storage.entity_get_all(EntityType.CONTENT) == journal_objs def test_provenance_storage_directory(self, provenance_storage): super().test_provenance_storage_directory(provenance_storage) assert provenance_storage.journal objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} assert objtypes == {"directory"} journal_objs = { obj.id for (objtype, obj) in provenance_storage.journal.objects if objtype == "directory" } assert provenance_storage.entity_get_all(EntityType.DIRECTORY) == journal_objs - def test_provenance_storage_location(self, provenance_storage): - super().test_provenance_storage_location(provenance_storage) - assert provenance_storage.journal - objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} - assert objtypes == {"location"} - - journal_objs = { - obj.id: obj.value - for (objtype, obj) in provenance_storage.journal.objects - if objtype == "location" - } - assert provenance_storage.location_get_all() == journal_objs - def test_provenance_storage_origin(self, provenance_storage): super().test_provenance_storage_origin(provenance_storage) assert provenance_storage.journal objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} assert objtypes == {"origin"} journal_objs = { obj.id for (objtype, obj) in provenance_storage.journal.objects if objtype == "origin" } assert provenance_storage.entity_get_all(EntityType.ORIGIN) == journal_objs def test_provenance_storage_revision(self, provenance_storage): super().test_provenance_storage_revision(provenance_storage) assert provenance_storage.journal objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} assert objtypes == {"revision", "origin"} journal_objs = { obj.id for (objtype, obj) in provenance_storage.journal.objects if objtype == "revision" } - assert provenance_storage.entity_get_all(EntityType.REVISION) == journal_objs + all_revisions = provenance_storage.revision_get( + provenance_storage.entity_get_all(EntityType.REVISION) + ) + + assert { + id for (id, value) in all_revisions.items() if value.date is not None + } == journal_objs def test_provenance_storage_relation_revision_layer(self, provenance_storage): super().test_provenance_storage_relation_revision_layer(provenance_storage) assert provenance_storage.journal objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} assert objtypes == { - "location", "content", "directory", - "revision", "content_in_revision", "content_in_directory", "directory_in_revision", } journal_rels = { - obj.id: {tuple(v.items()) for v in obj.value} + tuple(obj.value[k] for k in ("src", "dst", "path")) for (objtype, obj) in provenance_storage.journal.objects if objtype == "content_in_revision" } prov_rels = { - k: {tuple(asdict(reldata).items()) for reldata in v} + (k, relation.dst, relation.path) for k, v in provenance_storage.relation_get_all( RelationType.CNT_EARLY_IN_REV ).items() + for relation in v } assert prov_rels == journal_rels journal_rels = { - obj.id: {tuple(v.items()) for v in obj.value} + tuple(obj.value[k] for k in ("src", "dst", "path")) for (objtype, obj) in provenance_storage.journal.objects if objtype == "content_in_directory" } prov_rels = { - k: {tuple(asdict(reldata).items()) for reldata in v} + (k, relation.dst, relation.path) for k, v in provenance_storage.relation_get_all( RelationType.CNT_IN_DIR ).items() + for relation in v } assert prov_rels == journal_rels journal_rels = { - obj.id: {tuple(v.items()) for v in obj.value} + tuple(obj.value[k] for k in ("src", "dst", "path")) for (objtype, obj) in provenance_storage.journal.objects if objtype == "directory_in_revision" } prov_rels = { - k: {tuple(asdict(reldata).items()) for reldata in v} + (k, relation.dst, relation.path) for k, v in provenance_storage.relation_get_all( RelationType.DIR_IN_REV ).items() + for relation in v } assert prov_rels == journal_rels def test_provenance_storage_relation_origin_layer(self, provenance_storage): - super().test_provenance_storage_relation_orign_layer(provenance_storage) + super().test_provenance_storage_relation_origin_layer(provenance_storage) assert provenance_storage.journal objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} assert objtypes == { "origin", - "revision", "revision_in_origin", "revision_before_revision", } journal_rels = { - obj.id: {tuple(v.items()) for v in obj.value} + tuple(obj.value[k] for k in ("src", "dst", "path")) for (objtype, obj) in provenance_storage.journal.objects if objtype == "revision_in_origin" } prov_rels = { - k: {tuple(asdict(reldata).items()) for reldata in v} + (k, relation.dst, relation.path) for k, v in provenance_storage.relation_get_all( RelationType.REV_IN_ORG ).items() + for relation in v } assert prov_rels == journal_rels journal_rels = { - obj.id: {tuple(v.items()) for v in obj.value} + tuple(obj.value[k] for k in ("src", "dst", "path")) for (objtype, obj) in provenance_storage.journal.objects if objtype == "revision_before_revision" } prov_rels = { - k: {tuple(asdict(reldata).items()) for reldata in v} + (k, relation.dst, relation.path) for k, v in provenance_storage.relation_get_all( RelationType.REV_BEFORE_REV ).items() + for relation in v } assert prov_rels == journal_rels