diff --git a/swh/provenance/tests/test_provenance_journal_writer.py b/swh/provenance/tests/test_provenance_journal_writer.py index 9134819..ec11c4b 100644 --- a/swh/provenance/tests/test_provenance_journal_writer.py +++ b/swh/provenance/tests/test_provenance_journal_writer.py @@ -1,193 +1,193 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from dataclasses import asdict from typing import Dict, Generator import pytest from swh.provenance import get_provenance_storage from swh.provenance.storage.interface import ( EntityType, ProvenanceStorageInterface, RelationType, ) from .test_provenance_storage import TestProvenanceStorage as _TestProvenanceStorage @pytest.fixture() def provenance_storage( provenance_postgresqldb: Dict[str, str], ) -> Generator[ProvenanceStorageInterface, None, None]: cfg = { "storage": { "cls": "postgresql", "db": provenance_postgresqldb, "raise_on_commit": True, }, "journal_writer": { "cls": "memory", }, } with get_provenance_storage(cls="journal", **cfg) as storage: yield storage class TestProvenanceStorageJournal(_TestProvenanceStorage): def test_provenance_storage_content(self, provenance_storage): super().test_provenance_storage_content(provenance_storage) assert provenance_storage.journal objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} assert objtypes == {"content"} journal_objs = { obj.id for (objtype, obj) in provenance_storage.journal.objects if objtype == "content" } assert provenance_storage.entity_get_all(EntityType.CONTENT) == journal_objs def test_provenance_storage_directory(self, provenance_storage): super().test_provenance_storage_directory(provenance_storage) assert provenance_storage.journal objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} assert objtypes == {"directory"} journal_objs = { obj.id for (objtype, obj) in provenance_storage.journal.objects if objtype == "directory" } assert provenance_storage.entity_get_all(EntityType.DIRECTORY) == journal_objs def test_provenance_storage_location(self, provenance_storage): super().test_provenance_storage_location(provenance_storage) assert provenance_storage.journal objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} assert objtypes == {"location"} journal_objs = { obj.id: obj.value for (objtype, obj) in provenance_storage.journal.objects if objtype == "location" } assert provenance_storage.location_get_all() == journal_objs - def test_provenance_storage_orign(self, provenance_storage): + def test_provenance_storage_origin(self, provenance_storage): super().test_provenance_storage_origin(provenance_storage) assert provenance_storage.journal objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} assert objtypes == {"origin"} journal_objs = { obj.id for (objtype, obj) in provenance_storage.journal.objects if objtype == "origin" } assert provenance_storage.entity_get_all(EntityType.ORIGIN) == journal_objs def test_provenance_storage_revision(self, provenance_storage): super().test_provenance_storage_revision(provenance_storage) assert provenance_storage.journal objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} assert objtypes == {"revision", "origin"} journal_objs = { obj.id for (objtype, obj) in provenance_storage.journal.objects if objtype == "revision" } assert provenance_storage.entity_get_all(EntityType.REVISION) == journal_objs def test_provenance_storage_relation_revision_layer(self, provenance_storage): super().test_provenance_storage_relation_revision_layer(provenance_storage) assert provenance_storage.journal objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} assert objtypes == { "location", "content", "directory", "revision", "content_in_revision", "content_in_directory", "directory_in_revision", } journal_rels = { obj.id: {tuple(v.items()) for v in obj.value} for (objtype, obj) in provenance_storage.journal.objects if objtype == "content_in_revision" } prov_rels = { k: {tuple(asdict(reldata).items()) for reldata in v} for k, v in provenance_storage.relation_get_all( RelationType.CNT_EARLY_IN_REV ).items() } assert prov_rels == journal_rels journal_rels = { obj.id: {tuple(v.items()) for v in obj.value} for (objtype, obj) in provenance_storage.journal.objects if objtype == "content_in_directory" } prov_rels = { k: {tuple(asdict(reldata).items()) for reldata in v} for k, v in provenance_storage.relation_get_all( RelationType.CNT_IN_DIR ).items() } assert prov_rels == journal_rels journal_rels = { obj.id: {tuple(v.items()) for v in obj.value} for (objtype, obj) in provenance_storage.journal.objects if objtype == "directory_in_revision" } prov_rels = { k: {tuple(asdict(reldata).items()) for reldata in v} for k, v in provenance_storage.relation_get_all( RelationType.DIR_IN_REV ).items() } assert prov_rels == journal_rels def test_provenance_storage_relation_origin_layer(self, provenance_storage): super().test_provenance_storage_relation_orign_layer(provenance_storage) assert provenance_storage.journal objtypes = {objtype for (objtype, obj) in provenance_storage.journal.objects} assert objtypes == { "origin", "revision", "revision_in_origin", "revision_before_revision", } journal_rels = { obj.id: {tuple(v.items()) for v in obj.value} for (objtype, obj) in provenance_storage.journal.objects if objtype == "revision_in_origin" } prov_rels = { k: {tuple(asdict(reldata).items()) for reldata in v} for k, v in provenance_storage.relation_get_all( RelationType.REV_IN_ORG ).items() } assert prov_rels == journal_rels journal_rels = { obj.id: {tuple(v.items()) for v in obj.value} for (objtype, obj) in provenance_storage.journal.objects if objtype == "revision_before_revision" } prov_rels = { k: {tuple(asdict(reldata).items()) for reldata in v} for k, v in provenance_storage.relation_get_all( RelationType.REV_BEFORE_REV ).items() } assert prov_rels == journal_rels diff --git a/swh/provenance/tests/test_provenance_storage.py b/swh/provenance/tests/test_provenance_storage.py index 57d0cac..571787e 100644 --- a/swh/provenance/tests/test_provenance_storage.py +++ b/swh/provenance/tests/test_provenance_storage.py @@ -1,488 +1,488 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import hashlib import inspect import os from typing import Any, Dict, Iterable, Optional, Set, Tuple import pytest from swh.model.hashutil import hash_to_bytes from swh.model.model import Origin, Sha1Git from swh.provenance.algos.origin import origin_add from swh.provenance.algos.revision import revision_add from swh.provenance.archive import ArchiveInterface from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import OriginEntry, RevisionEntry from swh.provenance.provenance import Provenance from swh.provenance.storage.interface import ( DirectoryData, EntityType, ProvenanceResult, ProvenanceStorageInterface, RelationData, RelationType, RevisionData, ) from .utils import fill_storage, load_repo_data, ts2dt class TestProvenanceStorage: def test_provenance_storage_content( self, provenance_storage: ProvenanceStorageInterface, ) -> None: """Tests content methods for every `ProvenanceStorageInterface` implementation.""" # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") # Add all content present in the current repo to the storage, just assigning their # creation dates. Then check that the returned results when querying are the same. cnt_dates = { cnt["sha1_git"]: cnt["ctime"] for idx, cnt in enumerate(data["content"]) } assert provenance_storage.content_add(cnt_dates) assert provenance_storage.content_get(set(cnt_dates.keys())) == cnt_dates assert provenance_storage.entity_get_all(EntityType.CONTENT) == set( cnt_dates.keys() ) def test_provenance_storage_directory( self, provenance_storage: ProvenanceStorageInterface, ) -> None: """Tests directory methods for every `ProvenanceStorageInterface` implementation.""" # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") # Of all directories present in the current repo, only assign a date to those # containing blobs (picking the max date among the available ones). Then check that # the returned results when querying are the same. def getmaxdate( directory: Dict[str, Any], contents: Iterable[Dict[str, Any]] ) -> Optional[datetime]: dates = [ content["ctime"] for entry in directory["entries"] for content in contents if entry["type"] == "file" and entry["target"] == content["sha1_git"] ] return max(dates) if dates else None flat_values = (False, True) dir_dates = {} for idx, dir in enumerate(data["directory"]): date = getmaxdate(dir, data["content"]) if date is not None: dir_dates[dir["id"]] = DirectoryData( date=date, flat=flat_values[idx % 2] ) assert provenance_storage.directory_add(dir_dates) assert provenance_storage.directory_get(set(dir_dates.keys())) == dir_dates assert provenance_storage.entity_get_all(EntityType.DIRECTORY) == set( dir_dates.keys() ) def test_provenance_storage_location( self, provenance_storage: ProvenanceStorageInterface, ) -> None: """Tests location methods for every `ProvenanceStorageInterface` implementation.""" # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") # Add all names of entries present in the directories of the current repo as paths # to the storage. Then check that the returned results when querying are the same. paths = { hashlib.sha1(entry["name"]).digest(): entry["name"] for dir in data["directory"] for entry in dir["entries"] } assert provenance_storage.location_add(paths) assert provenance_storage.location_get_all() == paths @pytest.mark.origin_layer def test_provenance_storage_origin( self, provenance_storage: ProvenanceStorageInterface, ) -> None: """Tests origin methods for every `ProvenanceStorageInterface` implementation.""" # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") # Test origin methods. # Add all origins present in the current repo to the storage. Then check that the # returned results when querying are the same. orgs = {Origin(url=org["url"]).id: org["url"] for org in data["origin"]} assert orgs assert provenance_storage.origin_add(orgs) assert provenance_storage.origin_get(set(orgs.keys())) == orgs assert provenance_storage.entity_get_all(EntityType.ORIGIN) == set(orgs.keys()) def test_provenance_storage_revision( self, provenance_storage: ProvenanceStorageInterface, ) -> None: """Tests revision methods for every `ProvenanceStorageInterface` implementation.""" # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") # Test revision methods. # Add all revisions present in the current repo to the storage, assigning their # dates and an arbitrary origin to each one. Then check that the returned results # when querying are the same. origin = Origin(url=next(iter(data["origin"]))["url"]) # Origin must be inserted in advance. assert provenance_storage.origin_add({origin.id: origin.url}) revs = {rev["id"] for idx, rev in enumerate(data["revision"])} rev_data = { rev["id"]: RevisionData( date=ts2dt(rev["date"]) if idx % 2 != 0 else None, origin=origin.id if idx % 3 != 0 else None, ) for idx, rev in enumerate(data["revision"]) } assert revs assert provenance_storage.revision_add(rev_data) assert provenance_storage.revision_get(set(rev_data.keys())) == { k: v for (k, v) in rev_data.items() if v.date is not None or v.origin is not None } assert provenance_storage.entity_get_all(EntityType.REVISION) == set(rev_data) def test_provenance_storage_relation_revision_layer( self, provenance_storage: ProvenanceStorageInterface, ) -> None: """Tests relation methods for every `ProvenanceStorageInterface` implementation.""" # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") # Test content-in-revision relation. # Create flat models of every root directory for the revisions in the dataset. cnt_in_rev: Dict[Sha1Git, Set[RelationData]] = {} for rev in data["revision"]: root = next( subdir for subdir in data["directory"] if subdir["id"] == rev["directory"] ) for cnt, rel in dircontent(data, rev["id"], root): cnt_in_rev.setdefault(cnt, set()).add(rel) relation_add_and_compare_result( provenance_storage, RelationType.CNT_EARLY_IN_REV, cnt_in_rev ) # Test content-in-directory relation. # Create flat models for every directory in the dataset. cnt_in_dir: Dict[Sha1Git, Set[RelationData]] = {} for dir in data["directory"]: for cnt, rel in dircontent(data, dir["id"], dir): cnt_in_dir.setdefault(cnt, set()).add(rel) relation_add_and_compare_result( provenance_storage, RelationType.CNT_IN_DIR, cnt_in_dir ) # Test content-in-directory relation. # Add root directories to their correspondent revision in the dataset. dir_in_rev: Dict[Sha1Git, Set[RelationData]] = {} for rev in data["revision"]: dir_in_rev.setdefault(rev["directory"], set()).add( RelationData(dst=rev["id"], path=b".") ) relation_add_and_compare_result( provenance_storage, RelationType.DIR_IN_REV, dir_in_rev ) @pytest.mark.origin_layer - def test_provenance_storage_relation_orign_layer( + def test_provenance_storage_relation_origin_layer( self, provenance_storage: ProvenanceStorageInterface, ) -> None: """Tests relation methods for every `ProvenanceStorageInterface` implementation.""" # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") # Test revision-in-origin relation. # Origins must be inserted in advance (cannot be done by `entity_add` inside # `relation_add_and_compare_result`). orgs = {Origin(url=org["url"]).id: org["url"] for org in data["origin"]} assert provenance_storage.origin_add(orgs) # Add all revisions that are head of some snapshot branch to the corresponding # origin. rev_in_org: Dict[Sha1Git, Set[RelationData]] = {} for status in data["origin_visit_status"]: if status["snapshot"] is not None: for snapshot in data["snapshot"]: if snapshot["id"] == status["snapshot"]: for branch in snapshot["branches"].values(): if branch["target_type"] == "revision": rev_in_org.setdefault(branch["target"], set()).add( RelationData( dst=Origin(url=status["origin"]).id, path=None, ) ) relation_add_and_compare_result( provenance_storage, RelationType.REV_IN_ORG, rev_in_org ) # Test revision-before-revision relation. # For each revision in the data set add an entry for each parent to the relation. rev_before_rev: Dict[Sha1Git, Set[RelationData]] = {} for rev in data["revision"]: for parent in rev["parents"]: rev_before_rev.setdefault(parent, set()).add( RelationData(dst=rev["id"], path=None) ) relation_add_and_compare_result( provenance_storage, RelationType.REV_BEFORE_REV, rev_before_rev ) def test_provenance_storage_find_revision_layer( self, provenance: ProvenanceInterface, provenance_storage: ProvenanceStorageInterface, archive: ArchiveInterface, ) -> None: """Tests `content_find_first` and `content_find_all` methods for every `ProvenanceStorageInterface` implementation. """ # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") fill_storage(archive.storage, data) # Test content_find_first and content_find_all, first only executing the # revision-content algorithm, then adding the origin-revision layer. # Execute the revision-content algorithm on both storages. revisions = [ RevisionEntry(id=rev["id"], date=ts2dt(rev["date"]), root=rev["directory"]) for rev in data["revision"] ] revision_add(provenance, archive, revisions) revision_add(Provenance(provenance_storage), archive, revisions) assert ProvenanceResult( content=hash_to_bytes("20329687bb9c1231a7e05afe86160343ad49b494"), revision=hash_to_bytes("c0d8929936631ecbcf9147be6b8aa13b13b014e4"), date=datetime.fromtimestamp(1000000000.0, timezone.utc), origin=None, path=b"A/B/C/a", ) == provenance_storage.content_find_first( hash_to_bytes("20329687bb9c1231a7e05afe86160343ad49b494") ) for cnt in {cnt["sha1_git"] for cnt in data["content"]}: assert provenance.storage.content_find_first( cnt ) == provenance_storage.content_find_first(cnt) assert set(provenance.storage.content_find_all(cnt)) == set( provenance_storage.content_find_all(cnt) ) @pytest.mark.origin_layer def test_provenance_storage_find_origin_layer( self, provenance: ProvenanceInterface, provenance_storage: ProvenanceStorageInterface, archive: ArchiveInterface, ) -> None: """Tests `content_find_first` and `content_find_all` methods for every `ProvenanceStorageInterface` implementation. """ # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") fill_storage(archive.storage, data) # Execute the revision-content algorithm on both storages. revisions = [ RevisionEntry(id=rev["id"], date=ts2dt(rev["date"]), root=rev["directory"]) for rev in data["revision"] ] revision_add(provenance, archive, revisions) revision_add(Provenance(provenance_storage), archive, revisions) # Test content_find_first and content_find_all, first only executing the # revision-content algorithm, then adding the origin-revision layer. # Execute the origin-revision algorithm on both storages. origins = [ OriginEntry(url=sta["origin"], snapshot=sta["snapshot"]) for sta in data["origin_visit_status"] if sta["snapshot"] is not None ] origin_add(provenance, archive, origins) origin_add(Provenance(provenance_storage), archive, origins) assert ProvenanceResult( content=hash_to_bytes("20329687bb9c1231a7e05afe86160343ad49b494"), revision=hash_to_bytes("c0d8929936631ecbcf9147be6b8aa13b13b014e4"), date=datetime.fromtimestamp(1000000000.0, timezone.utc), origin="https://cmdbts2", path=b"A/B/C/a", ) == provenance_storage.content_find_first( hash_to_bytes("20329687bb9c1231a7e05afe86160343ad49b494") ) for cnt in {cnt["sha1_git"] for cnt in data["content"]}: assert provenance.storage.content_find_first( cnt ) == provenance_storage.content_find_first(cnt) assert set(provenance.storage.content_find_all(cnt)) == set( provenance_storage.content_find_all(cnt) ) def test_types(self, provenance_storage: ProvenanceStorageInterface) -> None: """Checks all methods of ProvenanceStorageInterface are implemented by this backend, and that they have the same signature.""" # Create an instance of the protocol (which cannot be instantiated # directly, so this creates a subclass, then instantiates it) interface = type("_", (ProvenanceStorageInterface,), {})() assert "content_find_first" in dir(interface) missing_methods = [] for meth_name in dir(interface): if meth_name.startswith("_"): continue interface_meth = getattr(interface, meth_name) try: concrete_meth = getattr(provenance_storage, meth_name) except AttributeError: if not getattr(interface_meth, "deprecated_endpoint", False): # The backend is missing a (non-deprecated) endpoint missing_methods.append(meth_name) continue expected_signature = inspect.signature(interface_meth) actual_signature = inspect.signature(concrete_meth) assert expected_signature == actual_signature, meth_name assert missing_methods == [] # If all the assertions above succeed, then this one should too. # But there's no harm in double-checking. # And we could replace the assertions above by this one, but unlike # the assertions above, it doesn't explain what is missing. assert isinstance(provenance_storage, ProvenanceStorageInterface) def dircontent( data: Dict[str, Any], ref: Sha1Git, dir: Dict[str, Any], prefix: bytes = b"", ) -> Iterable[Tuple[Sha1Git, RelationData]]: content = { ( entry["target"], RelationData(dst=ref, path=os.path.join(prefix, entry["name"])), ) for entry in dir["entries"] if entry["type"] == "file" } for entry in dir["entries"]: if entry["type"] == "dir": child = next( subdir for subdir in data["directory"] if subdir["id"] == entry["target"] ) content.update( dircontent(data, ref, child, os.path.join(prefix, entry["name"])) ) return content def entity_add( storage: ProvenanceStorageInterface, entity: EntityType, ids: Set[Sha1Git] ) -> bool: now = datetime.now(tz=timezone.utc) if entity == EntityType.CONTENT: return storage.content_add({sha1: now for sha1 in ids}) elif entity == EntityType.DIRECTORY: return storage.directory_add( {sha1: DirectoryData(date=now, flat=False) for sha1 in ids} ) else: # entity == EntityType.REVISION: return storage.revision_add( {sha1: RevisionData(date=None, origin=None) for sha1 in ids} ) def relation_add_and_compare_result( storage: ProvenanceStorageInterface, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]], ) -> None: # Source, destinations and locations must be added in advance. src, *_, dst = relation.value.split("_") srcs = {sha1 for sha1 in data} if src != "origin": assert entity_add(storage, EntityType(src), srcs) dsts = {rel.dst for rels in data.values() for rel in rels} if dst != "origin": assert entity_add(storage, EntityType(dst), dsts) assert storage.location_add( { hashlib.sha1(rel.path).digest(): rel.path for rels in data.values() for rel in rels if rel.path is not None } ) assert data assert storage.relation_add(relation, data) for src_sha1 in srcs: relation_compare_result( storage.relation_get(relation, [src_sha1]), {src_sha1: data[src_sha1]}, ) for dst_sha1 in dsts: relation_compare_result( storage.relation_get(relation, [dst_sha1], reverse=True), { src_sha1: { RelationData(dst=dst_sha1, path=rel.path) for rel in rels if dst_sha1 == rel.dst } for src_sha1, rels in data.items() if dst_sha1 in {rel.dst for rel in rels} }, ) relation_compare_result( storage.relation_get_all(relation), data, ) def relation_compare_result( computed: Dict[Sha1Git, Set[RelationData]], expected: Dict[Sha1Git, Set[RelationData]], ) -> None: assert { src_sha1: {RelationData(dst=rel.dst, path=rel.path) for rel in rels} for src_sha1, rels in expected.items() } == computed