Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/tests/test_provenance_storage.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from datetime import datetime | from datetime import datetime, timezone | ||||
import inspect | import inspect | ||||
import os | import os | ||||
from typing import Any, Dict, Iterable, Optional, Set | from typing import Any, Dict, Iterable, Optional, Set | ||||
import pytest | import pytest | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
from swh.model.identifiers import origin_identifier | from swh.model.identifiers import origin_identifier | ||||
from swh.model.model import Sha1Git | from swh.model.model import Sha1Git | ||||
from swh.provenance.archive import ArchiveInterface | |||||
from swh.provenance.interface import ( | from swh.provenance.interface import ( | ||||
EntityType, | EntityType, | ||||
ProvenanceInterface, | ProvenanceInterface, | ||||
ProvenanceResult, | ProvenanceResult, | ||||
ProvenanceStorageInterface, | ProvenanceStorageInterface, | ||||
RelationData, | RelationData, | ||||
RelationType, | RelationType, | ||||
RevisionData, | |||||
UnsupportedEntityError, | |||||
) | ) | ||||
from swh.provenance.tests.conftest import load_repo_data, ts2dt | from swh.provenance.model import OriginEntry, RevisionEntry | ||||
from swh.provenance.mongo.backend import ProvenanceStorageMongoDb | |||||
from swh.provenance.origin import origin_add | |||||
from swh.provenance.provenance import Provenance | |||||
from swh.provenance.revision import revision_add | |||||
from swh.provenance.tests.conftest import fill_storage, load_repo_data, ts2dt | |||||
def relation_add_and_compare_result( | @pytest.mark.parametrize( | ||||
relation: RelationType, | "repo", | ||||
data: Set[RelationData], | ("cmdbts2",), | ||||
refstorage: ProvenanceStorageInterface, | ) | ||||
storage: ProvenanceStorageInterface, | def test_provenance_storage_content( | ||||
with_path: bool = True, | provenance_storage: ProvenanceStorageInterface, | ||||
repo: str, | |||||
) -> None: | ) -> None: | ||||
assert data | """Tests content methods for every `ProvenanceStorageInterface` implementation.""" | ||||
assert refstorage.relation_add(relation, data) == storage.relation_add( | |||||
relation, data | # Read data/README.md for more details on how these datasets are generated. | ||||
data = load_repo_data(repo) | |||||
# Add all content present in the current repo to the storage, just assigning their | |||||
# creation dates. Then check that the returned results when querying are the same. | |||||
dates = {cnt["sha1_git"]: cnt["ctime"] for cnt in data["content"]} | |||||
assert dates | |||||
assert provenance_storage.content_set_date(dates) | |||||
assert provenance_storage.content_get(set(dates.keys())) == dates | |||||
assert provenance_storage.entity_get_all(EntityType.CONTENT) == set(dates.keys()) | |||||
@pytest.mark.parametrize( | |||||
"repo", | |||||
("cmdbts2",), | |||||
) | ) | ||||
def test_provenance_storage_directory( | |||||
provenance_storage: ProvenanceStorageInterface, | |||||
repo: str, | |||||
) -> None: | |||||
"""Tests directory methods for every `ProvenanceStorageInterface` implementation.""" | |||||
assert relation_compare_result( | # Read data/README.md for more details on how these datasets are generated. | ||||
refstorage.relation_get(relation, (reldata.src for reldata in data)), | data = load_repo_data(repo) | ||||
storage.relation_get(relation, (reldata.src for reldata in data)), | |||||
with_path, | # Of all directories present in the current repo, only assign a date to those | ||||
# containing blobs (picking the max date among the available ones). Then check that | |||||
# the returned results when querying are the same. | |||||
def getmaxdate( | |||||
directory: Dict[str, Any], contents: Iterable[Dict[str, Any]] | |||||
) -> datetime: | |||||
dates = [ | |||||
content["ctime"] | |||||
for entry in directory["entries"] | |||||
for content in contents | |||||
if entry["type"] == "file" and entry["target"] == content["sha1_git"] | |||||
] | |||||
return max(dates) if dates else datetime.now(tz=timezone.utc) | |||||
dates = {dir["id"]: getmaxdate(dir, data["content"]) for dir in data["directory"]} | |||||
assert dates | |||||
assert provenance_storage.directory_set_date(dates) | |||||
assert provenance_storage.directory_get(set(dates.keys())) == dates | |||||
assert provenance_storage.entity_get_all(EntityType.DIRECTORY) == set(dates.keys()) | |||||
@pytest.mark.parametrize( | |||||
"repo", | |||||
("cmdbts2",), | |||||
) | ) | ||||
assert relation_compare_result( | def test_provenance_storage_entity( | ||||
refstorage.relation_get( | provenance_storage: ProvenanceStorageInterface, | ||||
relation, | repo: str, | ||||
(reldata.dst for reldata in data), | ) -> None: | ||||
reverse=True, | """Tests entity methods for every `ProvenanceStorageInterface` implementation.""" | ||||
), | |||||
storage.relation_get( | # Read data/README.md for more details on how these datasets are generated. | ||||
relation, | data = load_repo_data(repo) | ||||
(reldata.dst for reldata in data), | |||||
reverse=True, | # Test EntityType.CONTENT | ||||
), | # Add all contents present in the current repo to the storage. Then check that the | ||||
with_path, | # returned results when querying are the same. | ||||
sha1s = {cnt["sha1_git"] for cnt in data["content"]} | |||||
assert sha1s | |||||
assert provenance_storage.entity_add(EntityType.CONTENT, sha1s) | |||||
assert provenance_storage.entity_get_all(EntityType.CONTENT) == sha1s | |||||
# Test EntityType.DIRECTORY | |||||
# Add all directories present in the current repo to the storage. Then check that | |||||
# the returned directories when querying are the same. | |||||
sha1s = {dir["id"] for dir in data["directory"]} | |||||
assert sha1s | |||||
assert provenance_storage.entity_add(EntityType.DIRECTORY, sha1s) | |||||
assert provenance_storage.entity_get_all(EntityType.DIRECTORY) == sha1s | |||||
# Test EntityType.REVISION | |||||
# Add all revisions present in the current repo to the storage. Then check that the | |||||
# returned revisions when querying are the same. | |||||
sha1s = {rev["id"] for rev in data["revision"]} | |||||
assert sha1s | |||||
assert provenance_storage.entity_add(EntityType.REVISION, sha1s) | |||||
assert provenance_storage.entity_get_all(EntityType.REVISION) == sha1s | |||||
# Test EntityType.ORIGIN | |||||
# Add all origins present in the current repo. It should fail with a | |||||
# `UnsupportedEntityError`. Then check that indeed nothing was inserted. | |||||
sha1s = {hash_to_bytes(origin_identifier(org)) for org in data["origin"]} | |||||
assert sha1s | |||||
with pytest.raises(UnsupportedEntityError) as error: | |||||
provenance_storage.entity_add(EntityType.ORIGIN, sha1s) | |||||
assert "Unsupported entity: origin" in str(error.value) | |||||
assert provenance_storage.entity_get_all(EntityType.ORIGIN) == set() | |||||
@pytest.mark.parametrize( | |||||
"repo", | |||||
("cmdbts2",), | |||||
) | ) | ||||
assert relation_compare_result( | def test_provenance_storage_location( | ||||
refstorage.relation_get_all(relation), | provenance_storage: ProvenanceStorageInterface, | ||||
storage.relation_get_all(relation), | repo: str, | ||||
with_path, | ) -> None: | ||||
"""Tests location methods for every `ProvenanceStorageInterface` implementation.""" | |||||
# Read data/README.md for more details on how these datasets are generated. | |||||
data = load_repo_data(repo) | |||||
# Add all names of entries present in the directories of the current repo as paths | |||||
# to the storage. Then check that the returned results when querying are the same. | |||||
paths = {entry["name"] for dir in data["directory"] for entry in dir["entries"]} | |||||
assert provenance_storage.location_add(paths) | |||||
if isinstance(provenance_storage, ProvenanceStorageMongoDb): | |||||
# TODO: remove this when `location_add` is properly implemented for MongoDb. | |||||
return | |||||
if provenance_storage.with_path(): | |||||
assert provenance_storage.location_get() == paths | |||||
else: | |||||
assert provenance_storage.location_get() == set() | |||||
@pytest.mark.parametrize( | |||||
"repo", | |||||
("cmdbts2",), | |||||
) | ) | ||||
def test_provenance_storage_origin( | |||||
provenance_storage: ProvenanceStorageInterface, | |||||
repo: str, | |||||
) -> None: | |||||
"""Tests origin methods for every `ProvenanceStorageInterface` implementation.""" | |||||
# Read data/README.md for more details on how these datasets are generated. | |||||
data = load_repo_data(repo) | |||||
def relation_compare_result( | # Test origin methods. | ||||
expected: Set[RelationData], computed: Set[RelationData], with_path: bool | # Add all origins present in the current repo to the storage. Then check that the | ||||
) -> bool: | # returned results when querying are the same. | ||||
return { | urls = {hash_to_bytes(origin_identifier(org)): org["url"] for org in data["origin"]} | ||||
RelationData(reldata.src, reldata.dst, reldata.path if with_path else None) | assert urls | ||||
for reldata in expected | assert provenance_storage.origin_set_url(urls) | ||||
} == computed | assert provenance_storage.origin_get(set(urls.keys())) == urls | ||||
assert provenance_storage.entity_get_all(EntityType.ORIGIN) == set(urls.keys()) | |||||
@pytest.mark.parametrize( | |||||
"repo", | |||||
("cmdbts2",), | |||||
) | |||||
def test_provenance_storage_revision( | |||||
provenance_storage: ProvenanceStorageInterface, | |||||
repo: str, | |||||
) -> None: | |||||
"""Tests revision methods for every `ProvenanceStorageInterface` implementation.""" | |||||
# Read data/README.md for more details on how these datasets are generated. | |||||
data = load_repo_data(repo) | |||||
# Test revision methods. | |||||
# Add all revisions present in the current repo to the storage, assigning their | |||||
# dates and an arbitrary origin to each one. Then check that the returned results | |||||
# when querying are the same. | |||||
origin = next(iter(data["origin"])) | |||||
org_sha1 = hash_to_bytes(origin_identifier(origin)) | |||||
# Origin must be inserted in advance. | |||||
assert provenance_storage.origin_set_url({org_sha1: origin["url"]}) | |||||
dates = {rev["id"]: ts2dt(rev["date"]) for rev in data["revision"]} | |||||
orgs = {rev["id"]: org_sha1 for rev in data["revision"]} | |||||
assert set(dates.keys()) == set(orgs.keys()) | |||||
revs = { | |||||
rev: RevisionData(date, org) | |||||
for sha1, date in dates.items() | |||||
for rev, org in orgs.items() | |||||
if rev == sha1 | |||||
} | |||||
assert dates | |||||
assert orgs | |||||
assert provenance_storage.revision_set_date(dates) | |||||
assert provenance_storage.revision_set_origin(orgs) | |||||
assert provenance_storage.revision_get(set(revs.keys())) == revs | |||||
assert provenance_storage.entity_get_all(EntityType.REVISION) == set(revs.keys()) | |||||
def dircontent( | def dircontent( | ||||
data: Dict[str, Any], | data: Dict[str, Any], | ||||
ref: Sha1Git, | ref: Sha1Git, | ||||
dir: Dict[str, Any], | dir: Dict[str, Any], | ||||
prefix: bytes = b"", | prefix: bytes = b"", | ||||
) -> Iterable[RelationData]: | ) -> Iterable[RelationData]: | ||||
Show All 10 Lines | for entry in dir["entries"]: | ||||
if subdir["id"] == entry["target"] | if subdir["id"] == entry["target"] | ||||
) | ) | ||||
content.update( | content.update( | ||||
dircontent(data, ref, child, os.path.join(prefix, entry["name"])) | dircontent(data, ref, child, os.path.join(prefix, entry["name"])) | ||||
) | ) | ||||
return content | return content | ||||
def relation_add_and_compare_result( | |||||
relation: RelationType, data: Set[RelationData], storage: ProvenanceStorageInterface | |||||
) -> None: | |||||
# Source, destinations and locations must be added in advance. | |||||
src, *_, dst = relation.value.split("_") | |||||
if src != "origin": | |||||
assert storage.entity_add(EntityType(src), {entry.src for entry in data}) | |||||
if dst != "origin": | |||||
assert storage.entity_add(EntityType(dst), {entry.dst for entry in data}) | |||||
if storage.with_path(): | |||||
assert storage.location_add( | |||||
{entry.path for entry in data if entry.path is not None} | |||||
) | |||||
assert data | |||||
assert storage.relation_add(relation, data) | |||||
for row in data: | |||||
assert relation_compare_result( | |||||
storage.relation_get(relation, [row.src]), | |||||
{entry for entry in data if entry.src == row.src}, | |||||
storage.with_path(), | |||||
) | |||||
assert relation_compare_result( | |||||
storage.relation_get( | |||||
relation, | |||||
[row.dst], | |||||
reverse=True, | |||||
), | |||||
{entry for entry in data if entry.dst == row.dst}, | |||||
storage.with_path(), | |||||
) | |||||
assert relation_compare_result( | |||||
storage.relation_get_all(relation), data, storage.with_path() | |||||
) | |||||
def relation_compare_result( | |||||
computed: Set[RelationData], expected: Set[RelationData], with_path: bool | |||||
) -> bool: | |||||
return { | |||||
RelationData(row.src, row.dst, row.path if with_path else None) | |||||
for row in expected | |||||
} == computed | |||||
@pytest.mark.parametrize( | @pytest.mark.parametrize( | ||||
"repo", | "repo", | ||||
("cmdbts2", "out-of-order", "with-merges"), | ("cmdbts2",), | ||||
) | ) | ||||
def test_provenance_storage( | def test_provenance_storage_relation( | ||||
provenance: ProvenanceInterface, | |||||
provenance_storage: ProvenanceStorageInterface, | provenance_storage: ProvenanceStorageInterface, | ||||
repo: str, | repo: str, | ||||
) -> None: | ) -> None: | ||||
"""Tests every ProvenanceStorageInterface implementation against the one provided | """Tests relation methods for every `ProvenanceStorageInterface` implementation.""" | ||||
for provenance.storage.""" | |||||
# Read data/README.md for more details on how these datasets are generated. | # Read data/README.md for more details on how these datasets are generated. | ||||
data = load_repo_data(repo) | data = load_repo_data(repo) | ||||
# Assuming provenance.storage has the 'with-path' flavor. | |||||
assert provenance.storage.with_path() | |||||
# Test origin methods. | |||||
# Add all origins present in the current repo to both storages. Then check that the | |||||
# inserted data is the same in both cases. | |||||
org_urls = { | |||||
hash_to_bytes(origin_identifier(org)): org["url"] for org in data["origin"] | |||||
} | |||||
assert org_urls | |||||
assert provenance.storage.origin_set_url( | |||||
org_urls | |||||
) == provenance_storage.origin_set_url(org_urls) | |||||
assert provenance.storage.origin_get(org_urls) == provenance_storage.origin_get( | |||||
org_urls | |||||
) | |||||
assert provenance.storage.entity_get_all( | |||||
EntityType.ORIGIN | |||||
) == provenance_storage.entity_get_all(EntityType.ORIGIN) | |||||
# Test content-in-revision relation. | # Test content-in-revision relation. | ||||
# Create flat models of every root directory for the revisions in the dataset. | # Create flat models of every root directory for the revisions in the dataset. | ||||
cnt_in_rev: Set[RelationData] = set() | cnt_in_rev: Set[RelationData] = set() | ||||
for rev in data["revision"]: | for rev in data["revision"]: | ||||
root = next( | root = next( | ||||
subdir for subdir in data["directory"] if subdir["id"] == rev["directory"] | subdir for subdir in data["directory"] if subdir["id"] == rev["directory"] | ||||
) | ) | ||||
cnt_in_rev.update(dircontent(data, rev["id"], root)) | cnt_in_rev.update(dircontent(data, rev["id"], root)) | ||||
relation_add_and_compare_result( | relation_add_and_compare_result( | ||||
RelationType.CNT_EARLY_IN_REV, | RelationType.CNT_EARLY_IN_REV, cnt_in_rev, provenance_storage | ||||
cnt_in_rev, | |||||
provenance.storage, | |||||
provenance_storage, | |||||
provenance_storage.with_path(), | |||||
) | ) | ||||
# Test content-in-directory relation. | # Test content-in-directory relation. | ||||
# Create flat models for every directory in the dataset. | # Create flat models for every directory in the dataset. | ||||
cnt_in_dir: Set[RelationData] = set() | cnt_in_dir: Set[RelationData] = set() | ||||
for dir in data["directory"]: | for dir in data["directory"]: | ||||
cnt_in_dir.update(dircontent(data, dir["id"], dir)) | cnt_in_dir.update(dircontent(data, dir["id"], dir)) | ||||
relation_add_and_compare_result( | relation_add_and_compare_result( | ||||
RelationType.CNT_IN_DIR, | RelationType.CNT_IN_DIR, cnt_in_dir, provenance_storage | ||||
cnt_in_dir, | |||||
provenance.storage, | |||||
provenance_storage, | |||||
provenance_storage.with_path(), | |||||
) | ) | ||||
# Test content-in-directory relation. | # Test content-in-directory relation. | ||||
# Add root directories to their correspondent revision in the dataset. | # Add root directories to their correspondent revision in the dataset. | ||||
dir_in_rev = { | dir_in_rev = { | ||||
RelationData(rev["directory"], rev["id"], b".") for rev in data["revision"] | RelationData(rev["directory"], rev["id"], b".") for rev in data["revision"] | ||||
} | } | ||||
relation_add_and_compare_result( | relation_add_and_compare_result( | ||||
RelationType.DIR_IN_REV, | RelationType.DIR_IN_REV, dir_in_rev, provenance_storage | ||||
dir_in_rev, | |||||
provenance.storage, | |||||
provenance_storage, | |||||
provenance_storage.with_path(), | |||||
) | ) | ||||
# Test revision-in-origin relation. | # Test revision-in-origin relation. | ||||
# Add all revisions that are head of some snapshot branch to the corresponding | # Add all revisions that are head of some snapshot branch to the corresponding | ||||
# origin. | # origin. | ||||
rev_in_org = { | rev_in_org = { | ||||
RelationData( | RelationData( | ||||
branch["target"], | branch["target"], | ||||
hash_to_bytes(origin_identifier({"url": status["origin"]})), | hash_to_bytes(origin_identifier({"url": status["origin"]})), | ||||
None, | None, | ||||
) | ) | ||||
for status in data["origin_visit_status"] | for status in data["origin_visit_status"] | ||||
if status["snapshot"] is not None | if status["snapshot"] is not None | ||||
for snapshot in data["snapshot"] | for snapshot in data["snapshot"] | ||||
if snapshot["id"] == status["snapshot"] | if snapshot["id"] == status["snapshot"] | ||||
for _, branch in snapshot["branches"].items() | for _, branch in snapshot["branches"].items() | ||||
if branch["target_type"] == "revision" | if branch["target_type"] == "revision" | ||||
} | } | ||||
# Origins must be inserted in advance (cannot be done by `entity_add` inside | |||||
# `relation_add_and_compare_result`). | |||||
urls = { | |||||
hash_to_bytes(origin_identifier(origin)): origin["url"] | |||||
for origin in data["origin"] | |||||
} | |||||
assert provenance_storage.origin_set_url(urls) | |||||
relation_add_and_compare_result( | relation_add_and_compare_result( | ||||
RelationType.REV_IN_ORG, | RelationType.REV_IN_ORG, rev_in_org, provenance_storage | ||||
rev_in_org, | |||||
provenance.storage, | |||||
provenance_storage, | |||||
) | ) | ||||
# Test revision-before-revision relation. | # Test revision-before-revision relation. | ||||
# For each revision in the data set add an entry for each parent to the relation. | # For each revision in the data set add an entry for each parent to the relation. | ||||
rev_before_rev = { | rev_before_rev = { | ||||
RelationData(parent, rev["id"], None) | RelationData(parent, rev["id"], None) | ||||
for rev in data["revision"] | for rev in data["revision"] | ||||
for parent in rev["parents"] | for parent in rev["parents"] | ||||
} | } | ||||
relation_add_and_compare_result( | relation_add_and_compare_result( | ||||
RelationType.REV_BEFORE_REV, | RelationType.REV_BEFORE_REV, rev_before_rev, provenance_storage | ||||
rev_before_rev, | ) | ||||
provenance.storage, | |||||
provenance_storage, | |||||
) | |||||
# Test content methods. | |||||
# Add all content present in the current repo to both storages, just assigning their | |||||
# creation dates. Then check that the inserted content is the same in both cases. | |||||
cnt_dates = {cnt["sha1_git"]: cnt["ctime"] for cnt in data["content"]} | |||||
assert cnt_dates | |||||
assert provenance.storage.content_set_date( | |||||
cnt_dates | |||||
) == provenance_storage.content_set_date(cnt_dates) | |||||
assert provenance.storage.content_get(cnt_dates) == provenance_storage.content_get( | |||||
cnt_dates | |||||
) | |||||
assert provenance.storage.entity_get_all( | |||||
EntityType.CONTENT | |||||
) == provenance_storage.entity_get_all(EntityType.CONTENT) | |||||
# Test directory methods. | |||||
# Of all directories present in the current repo, only assign a date to those | |||||
# containing blobs (picking the max date among the available ones). Then check that | |||||
# the inserted data is the same in both storages. | |||||
def getmaxdate( | |||||
dir: Dict[str, Any], cnt_dates: Dict[Sha1Git, datetime] | |||||
) -> Optional[datetime]: | |||||
dates = [ | |||||
cnt_dates[entry["target"]] | |||||
for entry in dir["entries"] | |||||
if entry["type"] == "file" | |||||
] | |||||
return max(dates) if dates else None | |||||
dir_dates = {dir["id"]: getmaxdate(dir, cnt_dates) for dir in data["directory"]} | @pytest.mark.parametrize( | ||||
assert dir_dates | "repo", | ||||
assert provenance.storage.directory_set_date( | ("cmdbts2",), | ||||
{sha1: date for sha1, date in dir_dates.items() if date is not None} | ) | ||||
) == provenance_storage.directory_set_date( | def test_provenance_storage_find( | ||||
{sha1: date for sha1, date in dir_dates.items() if date is not None} | archive: ArchiveInterface, | ||||
) | provenance: ProvenanceInterface, | ||||
assert provenance.storage.directory_get( | provenance_storage: ProvenanceStorageInterface, | ||||
dir_dates | repo: str, | ||||
) == provenance_storage.directory_get(dir_dates) | ) -> None: | ||||
assert provenance.storage.entity_get_all( | """Tests `content_find_first` and `content_find_all` methods for every | ||||
EntityType.DIRECTORY | `ProvenanceStorageInterface` implementation. | ||||
) == provenance_storage.entity_get_all(EntityType.DIRECTORY) | """ | ||||
# Read data/README.md for more details on how these datasets are generated. | |||||
data = load_repo_data(repo) | |||||
fill_storage(archive.storage, data) | |||||
# Test revision methods. | # Execute the origin-revision algorithm on both storages. | ||||
# Add all revisions present in the current repo to both storages, assigning their | origins = [ | ||||
# dataes and an arbitrary origin to each one. Then check that the inserted data is | OriginEntry(url=sta["origin"], snapshot=sta["snapshot"]) | ||||
# the same in both cases. | for sta in data["origin_visit_status"] | ||||
rev_dates = {rev["id"]: ts2dt(rev["date"]) for rev in data["revision"]} | if sta["snapshot"] is not None | ||||
assert rev_dates | ] | ||||
assert provenance.storage.revision_set_date( | origin_add(provenance, archive, origins) | ||||
rev_dates | origin_add(Provenance(provenance_storage), archive, origins) | ||||
) == provenance_storage.revision_set_date(rev_dates) | |||||
rev_origins = { | # Execute the revision-content algorithm on both storages. | ||||
rev["id"]: next(iter(org_urls)) # any arbitrary origin will do | revisions = [ | ||||
RevisionEntry(id=rev["id"], date=ts2dt(rev["date"]), root=rev["directory"]) | |||||
for rev in data["revision"] | for rev in data["revision"] | ||||
} | ] | ||||
assert rev_origins | revision_add(provenance, archive, revisions) | ||||
assert provenance.storage.revision_set_origin( | revision_add(Provenance(provenance_storage), archive, revisions) | ||||
rev_origins | |||||
) == provenance_storage.revision_set_origin(rev_origins) | |||||
assert provenance.storage.revision_get( | |||||
rev_dates | |||||
) == provenance_storage.revision_get(rev_dates) | |||||
assert provenance.storage.entity_get_all( | |||||
EntityType.REVISION | |||||
) == provenance_storage.entity_get_all(EntityType.REVISION) | |||||
# Test location_get. | |||||
if provenance_storage.with_path(): | |||||
assert provenance.storage.location_get() == provenance_storage.location_get() | |||||
# Test content_find_first and content_find_all. | # Test content_find_first and content_find_all. | ||||
def adapt_result( | def adapt_result( | ||||
result: Optional[ProvenanceResult], with_path: bool | result: Optional[ProvenanceResult], with_path: bool | ||||
) -> Optional[ProvenanceResult]: | ) -> Optional[ProvenanceResult]: | ||||
if result is not None: | if result is not None: | ||||
return ProvenanceResult( | return ProvenanceResult( | ||||
result.content, | result.content, | ||||
result.revision, | result.revision, | ||||
result.date, | result.date, | ||||
result.origin, | result.origin, | ||||
result.path if with_path else b"", | result.path if with_path else b"", | ||||
) | ) | ||||
return result | return result | ||||
for cnt in cnt_dates: | for cnt in {cnt["sha1_git"] for cnt in data["content"]}: | ||||
assert adapt_result( | assert adapt_result( | ||||
provenance.storage.content_find_first(cnt), provenance_storage.with_path() | provenance.storage.content_find_first(cnt), provenance_storage.with_path() | ||||
) == provenance_storage.content_find_first(cnt) | ) == provenance_storage.content_find_first(cnt) | ||||
assert { | assert { | ||||
adapt_result(occur, provenance_storage.with_path()) | adapt_result(occur, provenance_storage.with_path()) | ||||
for occur in provenance.storage.content_find_all(cnt) | for occur in provenance.storage.content_find_all(cnt) | ||||
} == set(provenance_storage.content_find_all(cnt)) | } == set(provenance_storage.content_find_all(cnt)) | ||||
def test_types(provenance: ProvenanceInterface) -> None: | def test_types(provenance_storage: ProvenanceInterface) -> None: | ||||
"""Checks all methods of ProvenanceStorageInterface are implemented by this | """Checks all methods of ProvenanceStorageInterface are implemented by this | ||||
backend, and that they have the same signature.""" | backend, and that they have the same signature.""" | ||||
# Create an instance of the protocol (which cannot be instantiated | # Create an instance of the protocol (which cannot be instantiated | ||||
# directly, so this creates a subclass, then instantiates it) | # directly, so this creates a subclass, then instantiates it) | ||||
interface = type("_", (ProvenanceStorageInterface,), {})() | interface = type("_", (ProvenanceStorageInterface,), {})() | ||||
assert "content_find_first" in dir(interface) | assert "content_find_first" in dir(interface) | ||||
missing_methods = [] | missing_methods = [] | ||||
for meth_name in dir(interface): | for meth_name in dir(interface): | ||||
if meth_name.startswith("_"): | if meth_name.startswith("_"): | ||||
continue | continue | ||||
interface_meth = getattr(interface, meth_name) | interface_meth = getattr(interface, meth_name) | ||||
try: | try: | ||||
concrete_meth = getattr(provenance.storage, meth_name) | concrete_meth = getattr(provenance_storage, meth_name) | ||||
except AttributeError: | except AttributeError: | ||||
if not getattr(interface_meth, "deprecated_endpoint", False): | if not getattr(interface_meth, "deprecated_endpoint", False): | ||||
# The backend is missing a (non-deprecated) endpoint | # The backend is missing a (non-deprecated) endpoint | ||||
missing_methods.append(meth_name) | missing_methods.append(meth_name) | ||||
continue | continue | ||||
expected_signature = inspect.signature(interface_meth) | expected_signature = inspect.signature(interface_meth) | ||||
actual_signature = inspect.signature(concrete_meth) | actual_signature = inspect.signature(concrete_meth) | ||||
assert expected_signature == actual_signature, meth_name | assert expected_signature == actual_signature, meth_name | ||||
assert missing_methods == [] | assert missing_methods == [] | ||||
# If all the assertions above succeed, then this one should too. | # If all the assertions above succeed, then this one should too. | ||||
# But there's no harm in double-checking. | # But there's no harm in double-checking. | ||||
# And we could replace the assertions above by this one, but unlike | # And we could replace the assertions above by this one, but unlike | ||||
# the assertions above, it doesn't explain what is missing. | # the assertions above, it doesn't explain what is missing. | ||||
assert isinstance(provenance.storage, ProvenanceStorageInterface) | assert isinstance(provenance_storage, ProvenanceStorageInterface) |