diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py index 60ae464..e2f719b 100644 --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -1,179 +1,84 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from contextlib import contextmanager -from datetime import datetime from functools import partial -import multiprocessing -from os import path -from pathlib import Path -from typing import Any, Dict, Generator, List +from typing import Dict, Generator from _pytest.fixtures import SubRequest -from aiohttp.test_utils import TestClient, TestServer, loop_context -import msgpack import psycopg2.extensions import pytest from pytest_postgresql import factories from swh.core.db.db_utils import initialize_database_for_module -from swh.graph.http_rpc_server import make_app -from swh.journal.serializers import msgpack_ext_hook -from swh.model.model import BaseModel, TimestampWithTimezone from swh.provenance import get_provenance from swh.provenance.archive.interface import ArchiveInterface from swh.provenance.archive.storage import ArchiveStorage from swh.provenance.interface import ProvenanceInterface from swh.provenance.storage import get_provenance_storage from swh.provenance.storage.interface import ProvenanceStorageInterface from swh.provenance.storage.postgresql import ProvenanceStoragePostgreSql from swh.storage.interface import StorageInterface -from swh.storage.replay import OBJECT_CONVERTERS, OBJECT_FIXERS, process_replay_objects provenance_postgresql_proc = factories.postgresql_proc( load=[ partial( initialize_database_for_module, modname="provenance", flavor="normalized", version=ProvenanceStoragePostgreSql.current_version, ) ], ) postgres_provenance = factories.postgresql("provenance_postgresql_proc") @pytest.fixture() def provenance_postgresqldb(request, postgres_provenance): return postgres_provenance.get_dsn_parameters() @pytest.fixture() def provenance_storage( request: SubRequest, provenance_postgresqldb: Dict[str, str], ) -> Generator[ProvenanceStorageInterface, None, None]: """Return a working and initialized ProvenanceStorageInterface object""" # in test sessions, we DO want to raise any exception occurring at commit time with get_provenance_storage( cls="postgresql", db=provenance_postgresqldb, raise_on_commit=True ) as storage: yield storage @pytest.fixture def provenance( postgres_provenance: psycopg2.extensions.connection, ) -> Generator[ProvenanceInterface, None, None]: """Return a working and initialized ProvenanceInterface object""" from swh.core.db.db_utils import ( init_admin_extensions, populate_database_for_package, ) init_admin_extensions("swh.provenance", postgres_provenance.dsn) populate_database_for_package( "swh.provenance", postgres_provenance.dsn, flavor="normalized" ) # in test sessions, we DO want to raise any exception occurring at commit time with get_provenance( cls="postgresql", db=postgres_provenance.get_dsn_parameters(), raise_on_commit=True, ) as provenance: yield provenance @pytest.fixture def archive(swh_storage: StorageInterface) -> ArchiveInterface: """Return an ArchiveStorage-based ArchiveInterface object""" return ArchiveStorage(swh_storage) - - -def fill_storage(storage: StorageInterface, data: Dict[str, List[dict]]) -> None: - objects = { - objtype: [objs_from_dict(objtype, d) for d in dicts] - for objtype, dicts in data.items() - } - process_replay_objects(objects, storage=storage) - - -def get_datafile(fname: str) -> str: - return path.join(path.dirname(__file__), "data", fname) - - -# TODO: this should return Dict[str, List[BaseModel]] directly, but it requires -# refactoring several tests -def load_repo_data(repo: str) -> Dict[str, List[dict]]: - data: Dict[str, List[dict]] = {} - with open(get_datafile(f"{repo}.msgpack"), "rb") as fobj: - unpacker = msgpack.Unpacker( - fobj, - raw=False, - ext_hook=msgpack_ext_hook, - strict_map_key=False, - timestamp=3, # convert Timestamp in datetime objects (tz UTC) - ) - for msg in unpacker: - if len(msg) == 2: # old format - objtype, objd = msg - else: # now we should have a triplet (type, key, value) - objtype, _, objd = msg - data.setdefault(objtype, []).append(objd) - return data - - -def objs_from_dict(object_type: str, dict_repr: dict) -> BaseModel: - if object_type in OBJECT_FIXERS: - dict_repr = OBJECT_FIXERS[object_type](dict_repr) - obj = OBJECT_CONVERTERS[object_type](dict_repr) - return obj - - -def ts2dt(ts: Dict[str, Any]) -> datetime: - return TimestampWithTimezone.from_dict(ts).to_datetime() - - -def run_grpc_server(queue, dataset_path): - try: - config = { - "graph": { - "cls": "local", - "grpc_server": {"path": dataset_path}, - "http_rpc_server": {"debug": True}, - } - } - with loop_context() as loop: - app = make_app(config=config) - client = TestClient(TestServer(app), loop=loop) - loop.run_until_complete(client.start_server()) - url = client.make_url("/graph/") - queue.put((url, app["rpc_url"])) - loop.run_forever() - except Exception as e: - queue.put(e) - - -@contextmanager -def grpc_server(dataset): - dataset_path = ( - Path(__file__).parents[0] / "data/swhgraph" / dataset / "compressed/example" - ) - queue = multiprocessing.Queue() - server = multiprocessing.Process( - target=run_grpc_server, kwargs={"queue": queue, "dataset_path": dataset_path} - ) - server.start() - res = queue.get() - if isinstance(res, Exception): - raise res - grpc_url = res[1] - try: - yield grpc_url - finally: - server.terminate() diff --git a/swh/provenance/tests/data/generate_graph_dataset.py b/swh/provenance/tests/data/generate_graph_dataset.py index 418303a..a74296a 100755 --- a/swh/provenance/tests/data/generate_graph_dataset.py +++ b/swh/provenance/tests/data/generate_graph_dataset.py @@ -1,56 +1,56 @@ #!/usr/bin/env python3 # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # type: ignore import argparse import logging from pathlib import Path import shutil from swh.dataset.exporters.edges import GraphEdgesExporter from swh.dataset.exporters.orc import ORCExporter from swh.graph.webgraph import compress -from swh.provenance.tests.conftest import load_repo_data +from swh.provenance.tests.utils import load_repo_data def main(): logging.basicConfig(level=logging.INFO) parser = argparse.ArgumentParser(description="Generate a test dataset") parser.add_argument( "--compress", action="store_true", default=False, help="Also compress the dataset", ) parser.add_argument("--output", help="output directory", default="swhgraph") parser.add_argument("dataset", help="dataset name", nargs="+") args = parser.parse_args() for repo in args.dataset: exporters = {"edges": GraphEdgesExporter, "orc": ORCExporter} config = {"test_unique_file_id": "all"} output_path = Path(args.output) / repo data = load_repo_data(repo) print(data.keys()) for name, exporter in exporters.items(): if (output_path / name).exists(): shutil.rmtree(output_path / name) with exporter(config, output_path / name) as e: for object_type, objs in data.items(): for obj_dict in objs: e.process_object(object_type, obj_dict) if args.compress: if (output_path / "compressed").exists(): shutil.rmtree(output_path / "compressed") compress("example", output_path / "orc", output_path / "compressed") if __name__ == "__main__": main() diff --git a/swh/provenance/tests/test_archive_interface.py b/swh/provenance/tests/test_archive_interface.py index 137aa32..018b30d 100644 --- a/swh/provenance/tests/test_archive_interface.py +++ b/swh/provenance/tests/test_archive_interface.py @@ -1,274 +1,275 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter from operator import itemgetter from typing import Any from typing import Counter as TCounter from typing import Dict, Iterable, List, Set, Tuple, Type, Union import pytest from swh.core.db import BaseDb from swh.model.model import ( SWH_MODEL_OBJECT_TYPES, BaseModel, Content, Directory, DirectoryEntry, ObjectType, Origin, OriginVisitStatus, Release, Revision, Sha1Git, Snapshot, SnapshotBranch, TargetType, ) from swh.model.swhids import CoreSWHID, ExtendedObjectType, ExtendedSWHID from swh.provenance.archive import ArchiveInterface from swh.provenance.archive.multiplexer import ArchiveMultiplexed from swh.provenance.archive.postgresql import ArchivePostgreSQL from swh.provenance.archive.storage import ArchiveStorage from swh.provenance.archive.swhgraph import ArchiveGraph -from swh.provenance.tests.conftest import fill_storage, grpc_server, load_repo_data from swh.storage.interface import StorageInterface from swh.storage.postgresql.storage import Storage +from .utils import fill_storage, grpc_server, load_repo_data + class ArchiveNoop: storage: StorageInterface def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]: return [] def revision_get_some_outbound_edges( self, revision_id: Sha1Git ) -> Iterable[Tuple[Sha1Git, Sha1Git]]: return [] def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: return [] def check_directory_ls( reference: ArchiveInterface, archive: ArchiveInterface, data: Dict[str, List[dict]] ) -> None: for directory in data["directory"]: entries_ref = sorted( reference.directory_ls(directory["id"]), key=itemgetter("name") ) entries = sorted(archive.directory_ls(directory["id"]), key=itemgetter("name")) assert entries_ref == entries def check_revision_get_some_outbound_edges( reference: ArchiveInterface, archive: ArchiveInterface, data: Dict[str, List[dict]] ) -> None: for revision in data["revision"]: parents_ref: TCounter[Tuple[Sha1Git, Sha1Git]] = Counter( reference.revision_get_some_outbound_edges(revision["id"]) ) parents: TCounter[Tuple[Sha1Git, Sha1Git]] = Counter( archive.revision_get_some_outbound_edges(revision["id"]) ) # Check that all the reference outbound edges are included in the other # archives's outbound edges assert set(parents_ref.items()) <= set(parents.items()) def check_snapshot_get_heads( reference: ArchiveInterface, archive: ArchiveInterface, data: Dict[str, List[dict]] ) -> None: for snapshot in data["snapshot"]: heads_ref: TCounter[Sha1Git] = Counter( reference.snapshot_get_heads(snapshot["id"]) ) heads: TCounter[Sha1Git] = Counter(archive.snapshot_get_heads(snapshot["id"])) assert heads_ref == heads def get_object_class(object_type: str) -> Type[BaseModel]: return SWH_MODEL_OBJECT_TYPES[object_type] def data_to_model(data: Dict[str, List[dict]]) -> Dict[str, List[BaseModel]]: model: Dict[str, List[BaseModel]] = {} for object_type, objects in data.items(): for object in objects: model.setdefault(object_type, []).append( get_object_class(object_type).from_dict(object) ) return model def add_link( edges: Set[ Tuple[ Union[CoreSWHID, ExtendedSWHID, str], Union[CoreSWHID, ExtendedSWHID, str] ] ], src_obj: Union[Content, Directory, Origin, Release, Revision, Snapshot], dst_id: bytes, dst_type: ExtendedObjectType, ) -> None: swhid = ExtendedSWHID(object_type=dst_type, object_id=dst_id) edges.add((src_obj.swhid(), swhid)) def get_graph_data( data: Dict[str, List[dict]] ) -> Tuple[ List[Union[CoreSWHID, ExtendedSWHID, str]], List[ Tuple[ Union[CoreSWHID, ExtendedSWHID, str], Union[CoreSWHID, ExtendedSWHID, str] ] ], ]: nodes: Set[Union[CoreSWHID, ExtendedSWHID, str]] = set() edges: Set[ Tuple[ Union[CoreSWHID, ExtendedSWHID, str], Union[CoreSWHID, ExtendedSWHID, str] ] ] = set() model = data_to_model(data) for origin in model["origin"]: assert isinstance(origin, Origin) nodes.add(origin.swhid()) for status in model["origin_visit_status"]: assert isinstance(status, OriginVisitStatus) if status.origin == origin.url and status.snapshot is not None: add_link(edges, origin, status.snapshot, ExtendedObjectType.SNAPSHOT) for snapshot in model["snapshot"]: assert isinstance(snapshot, Snapshot) nodes.add(snapshot.swhid()) for branch in snapshot.branches.values(): assert isinstance(branch, SnapshotBranch) if branch.target_type in [TargetType.RELEASE, TargetType.REVISION]: target_type = ( ExtendedObjectType.RELEASE if branch.target_type == TargetType.RELEASE else ExtendedObjectType.REVISION ) add_link(edges, snapshot, branch.target, target_type) for revision in model["revision"]: assert isinstance(revision, Revision) nodes.add(revision.swhid()) # root directory add_link(edges, revision, revision.directory, ExtendedObjectType.DIRECTORY) # parent for parent in revision.parents: add_link(edges, revision, parent, ExtendedObjectType.REVISION) dir_entry_types = { "file": ExtendedObjectType.CONTENT, "dir": ExtendedObjectType.DIRECTORY, "rev": ExtendedObjectType.REVISION, } for directory in model["directory"]: assert isinstance(directory, Directory) nodes.add(directory.swhid()) for entry in directory.entries: assert isinstance(entry, DirectoryEntry) add_link(edges, directory, entry.target, dir_entry_types[entry.type]) for content in model["content"]: assert isinstance(content, Content) nodes.add(content.swhid()) object_type = { ObjectType.CONTENT: ExtendedObjectType.CONTENT, ObjectType.DIRECTORY: ExtendedObjectType.DIRECTORY, ObjectType.REVISION: ExtendedObjectType.REVISION, ObjectType.RELEASE: ExtendedObjectType.RELEASE, ObjectType.SNAPSHOT: ExtendedObjectType.SNAPSHOT, } for release in model["release"]: assert isinstance(release, Release) nodes.add(release.swhid()) if release.target is not None: add_link(edges, release, release.target, object_type[release.target_type]) return list(nodes), list(edges) @pytest.mark.parametrize( "repo", ("cmdbts2", "out-of-order", "with-merges"), ) def test_archive_interface(repo: str, archive: ArchiveInterface) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) # test against ArchiveStorage archive_api = ArchiveStorage(archive.storage) check_directory_ls(archive, archive_api, data) check_revision_get_some_outbound_edges(archive, archive_api, data) check_snapshot_get_heads(archive, archive_api, data) # test against ArchivePostgreSQL assert isinstance(archive.storage, Storage) dsn = archive.storage.get_db().conn.dsn with BaseDb.connect(dsn).conn as conn: BaseDb.adapt_conn(conn) archive_direct = ArchivePostgreSQL(conn) check_directory_ls(archive, archive_direct, data) check_revision_get_some_outbound_edges(archive, archive_direct, data) check_snapshot_get_heads(archive, archive_direct, data) @pytest.mark.parametrize( "repo", ("cmdbts2", "out-of-order", "with-merges"), ) def test_archive_graph(repo: str, archive: ArchiveInterface) -> None: data = load_repo_data(repo) fill_storage(archive.storage, data) with grpc_server(repo) as url: # test against ArchiveGraph archive_graph = ArchiveGraph(url, archive.storage) with pytest.raises(NotImplementedError): check_directory_ls(archive, archive_graph, data) check_revision_get_some_outbound_edges(archive, archive_graph, data) check_snapshot_get_heads(archive, archive_graph, data) @pytest.mark.parametrize( "repo", ("cmdbts2", "out-of-order", "with-merges"), ) def test_archive_multiplexed(repo: str, archive: ArchiveInterface) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) # test against ArchiveMultiplexer with grpc_server(repo) as url: archive_api = ArchiveStorage(archive.storage) archive_graph = ArchiveGraph(url, archive.storage) archive_multiplexed = ArchiveMultiplexed( [("noop", ArchiveNoop()), ("graph", archive_graph), ("api", archive_api)] ) check_directory_ls(archive, archive_multiplexed, data) check_revision_get_some_outbound_edges(archive, archive_multiplexed, data) check_snapshot_get_heads(archive, archive_multiplexed, data) def test_noop_multiplexer(): archive = ArchiveMultiplexed([("noop", ArchiveNoop())]) assert not archive.directory_ls(Sha1Git(b"abcd")) assert not archive.revision_get_some_outbound_edges(Sha1Git(b"abcd")) assert not archive.snapshot_get_heads(Sha1Git(b"abcd")) diff --git a/swh/provenance/tests/test_cli.py b/swh/provenance/tests/test_cli.py index eb84fb6..7e40378 100644 --- a/swh/provenance/tests/test_cli.py +++ b/swh/provenance/tests/test_cli.py @@ -1,295 +1,293 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import logging import re from typing import Dict, List from _pytest.monkeypatch import MonkeyPatch from click.testing import CliRunner from confluent_kafka import Producer import psycopg2.extensions import pytest from swh.core.cli import swh as swhmain import swh.core.cli.db # noqa ; ensure cli is loaded from swh.core.db import BaseDb from swh.core.db.db_utils import init_admin_extensions from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.model.hashutil import MultiHash import swh.provenance.cli # noqa ; ensure cli is loaded from swh.provenance.storage.interface import EntityType, RelationType -from swh.provenance.tests.conftest import fill_storage, load_repo_data from swh.storage.interface import StorageInterface -from .conftest import get_datafile -from .test_utils import invoke +from .utils import fill_storage, get_datafile, invoke, load_repo_data logger = logging.getLogger(__name__) def now(): return datetime.now(timezone.utc) def test_cli_swh_db_help() -> None: # swhmain.add_command(provenance_cli) result = CliRunner().invoke(swhmain, ["provenance", "-h"]) assert result.exit_code == 0 assert "Commands:" in result.output commands = result.output.split("Commands:")[1] for command in ( "find-all", "find-first", "iter-frontiers", "iter-origins", "iter-revisions", ): assert f" {command} " in commands TABLES = { "dbflavor", "dbmodule", "dbversion", "content", "content_in_revision", "content_in_directory", "directory", "directory_in_revision", "location", "origin", "revision", "revision_before_revision", "revision_in_origin", } @pytest.mark.parametrize("flavor", ("normalized", "denormalized")) def test_cli_db_create_and_init_db_with_flavor( monkeypatch: MonkeyPatch, postgresql: psycopg2.extensions.connection, flavor: str, ) -> None: """Test that 'swh db init provenance' works with flavors""" dbname = f"{flavor}-db" # DB creation using 'swh db create' db_params = postgresql.get_dsn_parameters() monkeypatch.setenv("PGHOST", db_params["host"]) monkeypatch.setenv("PGUSER", db_params["user"]) monkeypatch.setenv("PGPORT", db_params["port"]) result = CliRunner().invoke(swhmain, ["db", "create", "-d", dbname, "provenance"]) assert result.exit_code == 0, result.output # DB init using 'swh db init' result = CliRunner().invoke( swhmain, ["db", "init", "-d", dbname, "--flavor", flavor, "provenance"] ) assert result.exit_code == 0, result.output assert f"(flavor {flavor})" in result.output db_params["dbname"] = dbname cnx = BaseDb.connect(**db_params).conn # check the DB looks OK (check for db_flavor and expected tables) with cnx.cursor() as cur: cur.execute("select swh_get_dbflavor()") assert cur.fetchone() == (flavor,) cur.execute( "select table_name from information_schema.tables " "where table_schema = 'public' " f"and table_catalog = '{dbname}'" ) tables = set(x for (x,) in cur.fetchall()) assert tables == TABLES def test_cli_init_db_default_flavor(postgresql: psycopg2.extensions.connection) -> None: "Test that 'swh db init provenance' defaults to a normalized flavored DB" dbname = postgresql.dsn init_admin_extensions("swh.provenance", dbname) result = CliRunner().invoke(swhmain, ["db", "init", "-d", dbname, "provenance"]) assert result.exit_code == 0, result.output with postgresql.cursor() as cur: cur.execute("select swh_get_dbflavor()") assert cur.fetchone() == ("normalized",) @pytest.mark.origin_layer @pytest.mark.parametrize( "subcommand", (["origin", "from-csv"], ["iter-origins"]), ) def test_cli_origin_from_csv( swh_storage: StorageInterface, subcommand: List[str], swh_storage_backend_config: Dict, provenance, tmp_path, ): repo = "cmdbts2" origin_url = f"https://{repo}" data = load_repo_data(repo) fill_storage(swh_storage, data) assert len(data["origin"]) >= 1 assert origin_url in [o["url"] for o in data["origin"]] cfg = { "provenance": { "archive": { "cls": "api", "storage": swh_storage_backend_config, }, "storage": { "cls": "postgresql", # "db": provenance.storage.conn.dsn, "db": provenance.storage.conn.get_dsn_parameters(), }, }, } csv_filepath = get_datafile("origins.csv") subcommand = subcommand + [csv_filepath] result = invoke(subcommand, config=cfg) assert result.exit_code == 0, f"Unexpected result: {result.output}" origin_sha1 = MultiHash.from_data( origin_url.encode(), hash_names=["sha1"] ).digest()["sha1"] actual_result = provenance.storage.origin_get([origin_sha1]) assert actual_result == {origin_sha1: origin_url} @pytest.mark.kafka def test_replay( provenance_storage, provenance_postgresqldb, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, ): kafka_prefix += ".swh.journal.provenance" producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test-producer", "acks": "all", } ) for i in range(10): cntkey = (b"cnt:" + bytes([i])).ljust(20, b"\x00") producer.produce( topic=kafka_prefix + ".content", key=key_to_kafka(cntkey), value=value_to_kafka({"id": cntkey, "value": {"date": None}}), ) dirkey = (b"dir:" + bytes([i])).ljust(20, b"\x00") producer.produce( topic=kafka_prefix + ".directory", key=key_to_kafka(dirkey), value=value_to_kafka( {"id": dirkey, "value": {"date": None, "flat": False}} ), ) revkey = (b"rev:" + bytes([i])).ljust(20, b"\x00") producer.produce( topic=kafka_prefix + ".revision", key=key_to_kafka(revkey), value=value_to_kafka( {"id": revkey, "value": {"date": None, "origin": None}} ), ) loc = f"dir/{i}".encode() lockey = (b"loc:" + bytes([i])).ljust(20, b"\x00") producer.produce( topic=kafka_prefix + ".location", key=key_to_kafka(lockey), value=value_to_kafka({"id": lockey, "value": loc}), ) producer.produce( topic=kafka_prefix + ".content_in_revision", key=key_to_kafka(cntkey), value=value_to_kafka( {"id": cntkey, "value": [{"dst": revkey, "path": loc}]} ), ) producer.produce( topic=kafka_prefix + ".content_in_directory", key=key_to_kafka(cntkey), value=value_to_kafka( {"id": cntkey, "value": [{"dst": dirkey, "path": loc}]} ), ) producer.produce( topic=kafka_prefix + ".directory_in_revision", key=key_to_kafka(dirkey), value=value_to_kafka( {"id": dirkey, "value": [{"dst": revkey, "path": loc}]} ), ) # now add dates to entities producer.produce( topic=kafka_prefix + ".content", key=key_to_kafka(cntkey), value=value_to_kafka({"id": cntkey, "value": {"date": now()}}), ) producer.produce( topic=kafka_prefix + ".directory", key=key_to_kafka(dirkey), value=value_to_kafka( {"id": dirkey, "value": {"date": now(), "flat": False}} ), ) producer.produce( topic=kafka_prefix + ".revision", key=key_to_kafka(revkey), value=value_to_kafka( {"id": revkey, "value": {"date": now(), "origin": None}} ), ) producer.flush() logger.debug("Flushed producer") config = { "provenance": { "storage": { "cls": "postgresql", "db": provenance_postgresqldb, }, "journal_client": { "cls": "kafka", "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, "stop_on_eof": True, }, } } result = invoke(["replay"], config=config) expected = r"Done. processed 100 messages\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output assert len(provenance_storage.entity_get_all(EntityType.CONTENT)) == 10 assert len(provenance_storage.entity_get_all(EntityType.REVISION)) == 10 assert len(provenance_storage.entity_get_all(EntityType.DIRECTORY)) == 10 assert len(provenance_storage.location_get_all()) == 10 assert len(provenance_storage.relation_get_all(RelationType.CNT_EARLY_IN_REV)) == 10 assert len(provenance_storage.relation_get_all(RelationType.DIR_IN_REV)) == 10 assert len(provenance_storage.relation_get_all(RelationType.CNT_IN_DIR)) == 10 diff --git a/swh/provenance/tests/test_conftest.py b/swh/provenance/tests/test_conftest.py index 6c6d082..e3db9e5 100644 --- a/swh/provenance/tests/test_conftest.py +++ b/swh/provenance/tests/test_conftest.py @@ -1,27 +1,28 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.provenance.interface import ProvenanceInterface -from swh.provenance.tests.conftest import fill_storage, load_repo_data from swh.storage.interface import StorageInterface +from .utils import fill_storage, load_repo_data + def test_provenance_fixture(provenance: ProvenanceInterface) -> None: """Check the 'provenance' fixture produce a working ProvenanceStoragePostgreSql object""" assert provenance provenance.flush() # should be a noop def test_fill_storage(swh_storage: StorageInterface) -> None: """Check the 'fill_storage' test utility produces a working Storage object with at least some Content, Revision and Directory in it""" data = load_repo_data("cmdbts2") fill_storage(swh_storage, data) assert swh_storage assert swh_storage.content_get_random() assert swh_storage.directory_get_random() assert swh_storage.revision_get_random() diff --git a/swh/provenance/tests/test_consistency.py b/swh/provenance/tests/test_consistency.py index d14be0a..0b61789 100644 --- a/swh/provenance/tests/test_consistency.py +++ b/swh/provenance/tests/test_consistency.py @@ -1,92 +1,93 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.model.hashutil import hash_to_bytes from swh.provenance.algos.revision import revision_add from swh.provenance.archive import ArchiveInterface from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import RevisionEntry from swh.provenance.storage.interface import DirectoryData, ProvenanceResult -from swh.provenance.tests.conftest import fill_storage, load_repo_data, ts2dt + +from .utils import fill_storage, load_repo_data, ts2dt def test_consistency( provenance: ProvenanceInterface, archive: ArchiveInterface, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data("cmdbts2") fill_storage(archive.storage, data) revisions = {rev["id"]: rev for rev in data["revision"]} # Process R00 first as expected rev_00 = revisions[hash_to_bytes("c0d8929936631ecbcf9147be6b8aa13b13b014e4")] r00 = RevisionEntry( id=rev_00["id"], date=ts2dt(rev_00["date"]), root=rev_00["directory"], ) revision_add(provenance, archive, [r00]) # Register contents A/B/C/b from R01 in the storage to simulate a crash rev_01 = revisions[hash_to_bytes("1444db96cbd8cd791abe83527becee73d3c64e86")] r01 = RevisionEntry( id=rev_01["id"], date=ts2dt(rev_01["date"]), root=rev_01["directory"], ) assert r01.date is not None # for mypy cnt_b_sha1 = hash_to_bytes("50e9cdb03f9719261dd39d7f2920b906db3711a3") provenance.storage.content_add({cnt_b_sha1: r01.date}) # Process R02 (this should set a frontier in directory C) rev_02 = revisions[hash_to_bytes("0d45f1ee524db8f6f0b5a267afac4e733b4b2cee")] r02 = RevisionEntry( id=rev_02["id"], date=ts2dt(rev_02["date"]), root=rev_02["directory"], ) assert r02.date is not None # for mypy revision_add(provenance, archive, [r02]) dir_C_sha1 = hash_to_bytes("c9cabe7f49012e3fdef6ac6b929efb5654f583cf") assert provenance.storage.directory_get([dir_C_sha1]) == { dir_C_sha1: DirectoryData(r01.date, True) } assert provenance.content_find_first(cnt_b_sha1) is None # No first occurrence assert set(provenance.content_find_all(cnt_b_sha1)) == { ProvenanceResult( content=cnt_b_sha1, revision=r02.id, date=r02.date, origin=None, path=b"A/B/C/b", ) } # Process R01 out of order (frontier in C should not be reused to guarantee that the # first occurrence of A/B/C/b is in the CNT_EARLY_IN_REV relation) revision_add(provenance, archive, [r01]) assert provenance.content_find_first(cnt_b_sha1) == ProvenanceResult( content=cnt_b_sha1, revision=r01.id, date=r01.date, origin=None, path=b"A/B/C/b" ) assert set(provenance.content_find_all(cnt_b_sha1)) == { ProvenanceResult( content=cnt_b_sha1, revision=r01.id, date=r01.date, origin=None, path=b"A/B/C/b", ), ProvenanceResult( content=cnt_b_sha1, revision=r02.id, date=r02.date, origin=None, path=b"A/B/C/b", ), } diff --git a/swh/provenance/tests/test_directory_flatten.py b/swh/provenance/tests/test_directory_flatten.py index 84b10fd..cc283f5 100644 --- a/swh/provenance/tests/test_directory_flatten.py +++ b/swh/provenance/tests/test_directory_flatten.py @@ -1,101 +1,102 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone from typing import Tuple from swh.model.hashutil import hash_to_bytes from swh.provenance.algos.directory import directory_add, directory_flatten_range from swh.provenance.archive import ArchiveInterface from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import DirectoryEntry, FileEntry from swh.provenance.storage.interface import DirectoryData, RelationData, RelationType -from swh.provenance.tests.conftest import fill_storage, load_repo_data + +from .utils import fill_storage, load_repo_data def prepare( provenance: ProvenanceInterface, archive: ArchiveInterface ) -> Tuple[datetime, DirectoryEntry, FileEntry, FileEntry]: """Prepare the provenance database with some content suitable for flattening tests""" # read data/README.md for more details on how these datasets are generated data = load_repo_data("cmdbts2") fill_storage(archive.storage, data) # just take a directory that is known to exists in cmdbts2 directory = DirectoryEntry( id=hash_to_bytes("48007c961cc734d1f63886d0413a6dc605e3e2ea") ) content1 = FileEntry( id=hash_to_bytes("20329687bb9c1231a7e05afe86160343ad49b494"), name=b"a" ) content2 = FileEntry( id=hash_to_bytes("50e9cdb03f9719261dd39d7f2920b906db3711a3"), name=b"b" ) date = datetime.fromtimestamp(1000000010, timezone.utc) # directory_add and the internal directory_flatten require the directory and its # content to be known by the provenance object. Otherwise, they do nothing provenance.directory_set_date_in_isochrone_frontier(directory, date) provenance.content_set_early_date(content1, date) provenance.content_set_early_date(content2, date) provenance.flush() assert provenance.storage.directory_get([directory.id]) == { directory.id: DirectoryData(date=date, flat=False) } assert provenance.storage.content_get([content1.id, content2.id]) == { content1.id: date, content2.id: date, } # this query forces the directory date to be retrieved from the storage and cached # (otherwise, the flush below won't update the directory flatten flag) flattened = provenance.directory_already_flattened(directory) assert flattened is not None and not flattened return date, directory, content1, content2 def test_directory_add( provenance: ProvenanceInterface, archive: ArchiveInterface, ) -> None: date, directory, content1, content2 = prepare(provenance, archive) # flatten the directory and check the expected result directory_add(provenance, archive, [directory]) assert provenance.storage.directory_get([directory.id]) == { directory.id: DirectoryData(date=date, flat=True) } assert provenance.storage.relation_get_all(RelationType.CNT_IN_DIR) == { content1.id: { RelationData(dst=directory.id, path=b"a"), RelationData(dst=directory.id, path=b"C/a"), }, content2.id: {RelationData(dst=directory.id, path=b"C/b")}, } def test_directory_flatten_range( provenance: ProvenanceInterface, archive: ArchiveInterface, ) -> None: date, directory, content1, content2 = prepare(provenance, archive) # flatten the directory and check the expected result directory_flatten_range(provenance, archive, directory.id[:-1], directory.id) assert provenance.storage.directory_get([directory.id]) == { directory.id: DirectoryData(date=date, flat=True) } assert provenance.storage.relation_get_all(RelationType.CNT_IN_DIR) == { content1.id: { RelationData(dst=directory.id, path=b"a"), RelationData(dst=directory.id, path=b"C/a"), }, content2.id: {RelationData(dst=directory.id, path=b"C/b")}, } diff --git a/swh/provenance/tests/test_directory_iterator.py b/swh/provenance/tests/test_directory_iterator.py index 9601aa5..fcfab87 100644 --- a/swh/provenance/tests/test_directory_iterator.py +++ b/swh/provenance/tests/test_directory_iterator.py @@ -1,29 +1,30 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.provenance.algos.directory import CSVDirectoryIterator -from swh.provenance.tests.conftest import fill_storage, load_repo_data from swh.storage.interface import StorageInterface +from .utils import fill_storage, load_repo_data + @pytest.mark.parametrize( "repo", ( "cmdbts2", "out-of-order", ), ) def test_revision_iterator(swh_storage: StorageInterface, repo: str) -> None: """Test CSVDirectoryIterator""" data = load_repo_data(repo) fill_storage(swh_storage, data) directories_ids = [dir["id"] for dir in data["directory"]] directories = list(CSVDirectoryIterator(directories_ids)) assert directories assert len(directories) == len(data["directory"]) diff --git a/swh/provenance/tests/test_history_graph.py b/swh/provenance/tests/test_history_graph.py index 5612c0c..4e2a824 100644 --- a/swh/provenance/tests/test_history_graph.py +++ b/swh/provenance/tests/test_history_graph.py @@ -1,54 +1,55 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest import yaml from swh.model.hashutil import hash_to_bytes from swh.provenance.algos.origin import HistoryGraph, origin_add_revision from swh.provenance.archive import ArchiveInterface from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import OriginEntry, RevisionEntry -from swh.provenance.tests.conftest import fill_storage, get_datafile, load_repo_data + +from .utils import fill_storage, get_datafile, load_repo_data @pytest.mark.origin_layer @pytest.mark.parametrize( "repo, visit", (("with-merges", "visits-01"),), ) @pytest.mark.parametrize("batch", (True, False)) def test_history_graph( provenance: ProvenanceInterface, archive: ArchiveInterface, repo: str, visit: str, batch: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) filename = f"history_graphs_{repo}_{visit}.yaml" with open(get_datafile(filename)) as file: for expected in yaml.full_load(file): entry = OriginEntry(expected["origin"], hash_to_bytes(expected["snapshot"])) provenance.origin_add(entry) for expected_graph_as_dict in expected["graphs"]: print("Expected graph:", expected_graph_as_dict) computed_graph = HistoryGraph( archive, RevisionEntry(hash_to_bytes(expected_graph_as_dict["head"])), ) print("Computed graph:", computed_graph.as_dict()) assert computed_graph.as_dict() == expected_graph_as_dict origin_add_revision(provenance, entry, computed_graph) if not batch: provenance.flush() diff --git a/swh/provenance/tests/test_isochrone_graph.py b/swh/provenance/tests/test_isochrone_graph.py index 81202fc..28d4ff8 100644 --- a/swh/provenance/tests/test_isochrone_graph.py +++ b/swh/provenance/tests/test_isochrone_graph.py @@ -1,152 +1,148 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import deepcopy from datetime import datetime, timezone from typing import Any, Dict import pytest import yaml from swh.model.hashutil import hash_to_bytes from swh.provenance.algos.isochrone_graph import ( DirectoryTooLarge, IsochroneNode, build_isochrone_graph, ) from swh.provenance.algos.revision import revision_add from swh.provenance.archive import ArchiveInterface from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import DirectoryEntry, RevisionEntry -from swh.provenance.tests.conftest import ( - fill_storage, - get_datafile, - load_repo_data, - ts2dt, -) + +from .utils import fill_storage, get_datafile, load_repo_data, ts2dt def isochrone_graph_from_dict(d: Dict[str, Any], depth: int = 0) -> IsochroneNode: """Takes a dictionary representing a tree of IsochroneNode objects, and recursively builds the corresponding graph.""" d = deepcopy(d) d["entry"]["id"] = hash_to_bytes(d["entry"]["id"]) d["entry"]["name"] = bytes(d["entry"]["name"], encoding="utf-8") dbdate = d.get("dbdate", None) if dbdate is not None: dbdate = datetime.fromtimestamp(d["dbdate"], timezone.utc) children = d.get("children", []) node = IsochroneNode( entry=DirectoryEntry(**d["entry"]), dbdate=dbdate, depth=depth, ) node.maxdate = datetime.fromtimestamp(d["maxdate"], timezone.utc) node.invalid = d.get("invalid", False) node.path = bytes(d["path"], encoding="utf-8") node.children = set( isochrone_graph_from_dict(child, depth=depth + 1) for child in children ) return node @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) @pytest.mark.parametrize("batch", (True, False)) def test_isochrone_graph( provenance: ProvenanceInterface, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, batch: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) revisions = {rev["id"]: rev for rev in data["revision"]} filename = f"graphs_{repo}_{'lower' if lower else 'upper'}_{mindepth}.yaml" with open(get_datafile(filename)) as file: for expected in yaml.full_load(file): print("# Processing revision", expected["rev"]) revision = revisions[hash_to_bytes(expected["rev"])] entry = RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) expected_graph = isochrone_graph_from_dict(expected["graph"]) print("Expected graph:", expected_graph) # Create graph for current revision and check it has the expected structure. assert entry.root is not None computed_graph = build_isochrone_graph( provenance, archive, entry, DirectoryEntry(entry.root), ) print("Computed graph:", computed_graph) assert computed_graph == expected_graph # Add current revision so that provenance info is kept up to date for the # following ones. revision_add( provenance, archive, [entry], lower=lower, mindepth=mindepth, commit=not batch, ) def test_isochrone_graph_max_dir_size( provenance: ProvenanceInterface, archive: ArchiveInterface, ): data = load_repo_data("git-bomb") fill_storage(archive.storage, data) rev = archive.storage.revision_get( [hash_to_bytes("7af99c9e7d4768fa681f4fe4ff61259794cf719b")] )[0] assert rev is not None assert rev.date is not None with pytest.raises(DirectoryTooLarge, match="Max directory size exceeded"): build_isochrone_graph( provenance, archive, RevisionEntry(id=rev.id, date=rev.date.to_datetime(), root=rev.directory), DirectoryEntry(rev.directory), max_directory_size=1000, ) pass # from this directory, there should be only ~1k recursive entries, so the # call to build_isochrone_graph with max_directory_size=1200 should succeed dir_id = hash_to_bytes("3e50041e82b225ca9e9b2641548b0c1b81eb971b") build_isochrone_graph( provenance, archive, RevisionEntry(id=rev.id, date=rev.date.to_datetime(), root=dir_id), DirectoryEntry(dir_id), max_directory_size=1200, ) diff --git a/swh/provenance/tests/test_journal_client.py b/swh/provenance/tests/test_journal_client.py index 7be5e17..33c11d5 100644 --- a/swh/provenance/tests/test_journal_client.py +++ b/swh/provenance/tests/test_journal_client.py @@ -1,134 +1,133 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict from confluent_kafka import Consumer import pytest from swh.model.hashutil import MultiHash -from swh.provenance.tests.conftest import fill_storage, load_repo_data from swh.storage.interface import StorageInterface -from .test_utils import invoke +from .utils import fill_storage, invoke, load_repo_data @pytest.fixture def swh_storage_backend_config(swh_storage_backend_config, kafka_server, kafka_prefix): writer_config = { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, "anonymize": False, } yield {**swh_storage_backend_config, "journal_writer": writer_config} @pytest.mark.origin_layer @pytest.mark.kafka def test_cli_origin_from_journal_client( swh_storage: StorageInterface, swh_storage_backend_config: Dict, kafka_prefix: str, kafka_server: str, consumer: Consumer, provenance, postgres_provenance, ) -> None: """Test origin journal client cli""" # Prepare storage data data = load_repo_data("cmdbts2") assert len(data["origin"]) >= 1 origin_url = data["origin"][0]["url"] fill_storage(swh_storage, data) # Prepare configuration for cli call swh_storage_backend_config.pop("journal_writer", None) # no need for that config storage_config_dict = swh_storage_backend_config cfg = { "journal_client": { "cls": "kafka", "brokers": [kafka_server], "group_id": "toto", "prefix": kafka_prefix, "stop_on_eof": True, }, "provenance": { "archive": { "cls": "api", "storage": storage_config_dict, }, "storage": { "cls": "postgresql", "db": postgres_provenance.get_dsn_parameters(), }, }, } # call the cli 'swh provenance origin from-journal' cli_result = invoke(["origin", "from-journal"], config=cfg) assert cli_result.exit_code == 0, f"Unexpected result: {cli_result.output}" origin_sha1 = MultiHash.from_data( origin_url.encode(), hash_names=["sha1"] ).digest()["sha1"] actual_result = provenance.storage.origin_get([origin_sha1]) assert actual_result == {origin_sha1: origin_url} @pytest.mark.kafka def test_cli_revision_from_journal_client( swh_storage: StorageInterface, swh_storage_backend_config: Dict, kafka_prefix: str, kafka_server: str, consumer: Consumer, provenance, postgres_provenance, ) -> None: """Test revision journal client cli""" # Prepare storage data data = load_repo_data("cmdbts2") assert len(data["origin"]) >= 1 fill_storage(swh_storage, data) # Prepare configuration for cli call swh_storage_backend_config.pop("journal_writer", None) # no need for that config storage_config_dict = swh_storage_backend_config cfg = { "journal_client": { "cls": "kafka", "brokers": [kafka_server], "group_id": "toto", "prefix": kafka_prefix, "stop_on_eof": True, }, "provenance": { "archive": { "cls": "api", "storage": storage_config_dict, }, "storage": { "cls": "postgresql", "db": postgres_provenance.get_dsn_parameters(), }, }, } revisions = [rev["id"] for rev in data["revision"]] result = provenance.storage.revision_get(revisions) assert not result # call the cli 'swh provenance revision from-journal' cli_result = invoke(["revision", "from-journal"], config=cfg) assert cli_result.exit_code == 0, f"Unexpected result: {cli_result.output}" result = provenance.storage.revision_get(revisions) assert set(result.keys()) == set(revisions) diff --git a/swh/provenance/tests/test_origin_iterator.py b/swh/provenance/tests/test_origin_iterator.py index 6492109..93aafcb 100644 --- a/swh/provenance/tests/test_origin_iterator.py +++ b/swh/provenance/tests/test_origin_iterator.py @@ -1,47 +1,48 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.provenance.algos.origin import CSVOriginIterator -from swh.provenance.tests.conftest import fill_storage, load_repo_data from swh.storage.algos.origin import ( iter_origin_visit_statuses, iter_origin_visits, iter_origins, ) from swh.storage.interface import StorageInterface +from .utils import fill_storage, load_repo_data + @pytest.mark.origin_layer @pytest.mark.parametrize( "repo", ( "cmdbts2", "out-of-order", ), ) def test_origin_iterator(swh_storage: StorageInterface, repo: str) -> None: """Test CSVOriginIterator""" data = load_repo_data(repo) fill_storage(swh_storage, data) origins_csv = [] for origin in iter_origins(swh_storage): for visit in iter_origin_visits(swh_storage, origin.url): if visit.visit is not None: for status in iter_origin_visit_statuses( swh_storage, origin.url, visit.visit ): if status.snapshot is not None: origins_csv.append((status.origin, status.snapshot)) origins = list(CSVOriginIterator(origins_csv)) assert origins # there can be more origins, depending on the additional extra visits.yaml # file used during dataset generation (see data/generate_storage_from_git) assert len(origins) >= len(data["origin"]) # but we can check it's a subset assert set(o.url for o in origins) <= set(o["url"] for o in data["origin"]) diff --git a/swh/provenance/tests/test_origin_revision_layer.py b/swh/provenance/tests/test_origin_revision_layer.py index 1948fed..292ee8c 100644 --- a/swh/provenance/tests/test_origin_revision_layer.py +++ b/swh/provenance/tests/test_origin_revision_layer.py @@ -1,196 +1,197 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re from typing import Any, Dict, Iterable, Iterator, List, Set import pytest from typing_extensions import TypedDict from swh.model.hashutil import hash_to_bytes from swh.model.model import Sha1Git from swh.provenance.algos.origin import origin_add from swh.provenance.archive import ArchiveInterface from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import OriginEntry from swh.provenance.storage.interface import EntityType, RelationType -from swh.provenance.tests.conftest import fill_storage, get_datafile, load_repo_data + +from .utils import fill_storage, get_datafile, load_repo_data class SynthRelation(TypedDict): src: Sha1Git dst: Sha1Git name: str class SynthOrigin(TypedDict): sha1: Sha1Git url: str snap: Sha1Git O_R: List[SynthRelation] R_R: List[SynthRelation] def synthetic_origin_revision_result(filename: str) -> Iterator[SynthOrigin]: """Generates dict representations of synthetic origin visits found in the synthetic file (from the data/ directory) given as argument of the generator. Generated SynthOrigin (typed dict) with the following elements: "sha1": (Sha1Git) sha1 of the origin, "url": (str) url of the origin, "snap": (Sha1Git) sha1 of the visit's snapshot, "O_R": (list) new O-R relations added by this origin visit "R_R": (list) new R-R relations added by this origin visit Each relation above is a SynthRelation typed dict with: "src": (Sha1Git) sha1 of the source of the relation "dst": (Sha1Git) sha1 of the destination of the relation """ with open(get_datafile(filename), "r") as fobj: yield from _parse_synthetic_origin_revision_file(fobj) def _parse_synthetic_origin_revision_file(fobj: Iterable[str]) -> Iterator[SynthOrigin]: """Read a 'synthetic' file and generate a dict representation of the synthetic origin visit for each snapshot listed in the synthetic file. """ regs = [ "(?P[^ ]+)?", "(?P[^| ]*)", "(?PR[0-9]{2,4})?", "(?P[ORS]) (?P[0-9a-f]{40})", ] regex = re.compile("^ *" + r" *[|] *".join(regs) + r" *(#.*)?$") current_org: List[dict] = [] for m in (regex.match(line) for line in fobj): if m: d = m.groupdict() if d["url"]: if current_org: yield _mk_synth_org(current_org) current_org.clear() current_org.append(d) if current_org: yield _mk_synth_org(current_org) def _mk_synth_org(synth_org: List[Dict[str, str]]) -> SynthOrigin: assert synth_org[0]["type"] == "O" assert synth_org[1]["type"] == "S" org = SynthOrigin( sha1=hash_to_bytes(synth_org[0]["sha1"]), url=synth_org[0]["url"], snap=hash_to_bytes(synth_org[1]["sha1"]), O_R=[], R_R=[], ) for row in synth_org[2:]: if row["reltype"] == "O-R": assert row["type"] == "R" org["O_R"].append( SynthRelation( src=org["sha1"], dst=hash_to_bytes(row["sha1"]), name=row["revname"], ) ) elif row["reltype"] == "R-R": assert row["type"] == "R" org["R_R"].append( SynthRelation( src=org["O_R"][-1]["dst"], dst=hash_to_bytes(row["sha1"]), name=row["revname"], ) ) return org @pytest.mark.origin_layer @pytest.mark.parametrize( "repo, visit", (("with-merges", "visits-01"),), ) def test_origin_revision_layer( provenance: ProvenanceInterface, archive: ArchiveInterface, repo: str, visit: str, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) syntheticfile = get_datafile(f"origin-revision_{repo}_{visit}.txt") origins = [ {"url": status["origin"], "snap": status["snapshot"]} for status in data["origin_visit_status"] if status["snapshot"] is not None ] rows: Dict[str, Set[Any]] = { "origin": set(), "revision_in_origin": set(), "revision_before_revision": set(), "revision": set(), } for synth_org in synthetic_origin_revision_result(syntheticfile): for origin in ( org for org in origins if org["url"] == synth_org["url"] and org["snap"] == synth_org["snap"] ): entry = OriginEntry(url=origin["url"], snapshot=origin["snap"]) origin_add(provenance, archive, [entry]) # each "entry" in the synth file is one new origin visit rows["origin"].add(synth_org["sha1"]) assert rows["origin"] == provenance.storage.entity_get_all( EntityType.ORIGIN ), synth_org["url"] # check the url of the origin assert ( provenance.storage.origin_get([synth_org["sha1"]])[synth_org["sha1"]] == synth_org["url"] ), synth_org["snap"] # this origin visit might have added new revision objects rows["revision"] |= set(x["dst"] for x in synth_org["O_R"]) rows["revision"] |= set(x["dst"] for x in synth_org["R_R"]) assert rows["revision"] == provenance.storage.entity_get_all( EntityType.REVISION ), synth_org["snap"] # check for O-R (head) entries # these are added in the revision_in_origin relation rows["revision_in_origin"] |= set( (x["dst"], x["src"], None) for x in synth_org["O_R"] ) assert rows["revision_in_origin"] == { (src, rel.dst, rel.path) for src, rels in provenance.storage.relation_get_all( RelationType.REV_IN_ORG ).items() for rel in rels }, synth_org["snap"] # check for R-R entries # these are added in the revision_before_revision relation rows["revision_before_revision"] |= set( (x["dst"], x["src"], None) for x in synth_org["R_R"] ) assert rows["revision_before_revision"] == { (src, rel.dst, rel.path) for src, rels in provenance.storage.relation_get_all( RelationType.REV_BEFORE_REV ).items() for rel in rels }, synth_org["snap"] diff --git a/swh/provenance/tests/test_provenance_storage.py b/swh/provenance/tests/test_provenance_storage.py index 0f85103..57d0cac 100644 --- a/swh/provenance/tests/test_provenance_storage.py +++ b/swh/provenance/tests/test_provenance_storage.py @@ -1,487 +1,488 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import hashlib import inspect import os from typing import Any, Dict, Iterable, Optional, Set, Tuple import pytest from swh.model.hashutil import hash_to_bytes from swh.model.model import Origin, Sha1Git from swh.provenance.algos.origin import origin_add from swh.provenance.algos.revision import revision_add from swh.provenance.archive import ArchiveInterface from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import OriginEntry, RevisionEntry from swh.provenance.provenance import Provenance from swh.provenance.storage.interface import ( DirectoryData, EntityType, ProvenanceResult, ProvenanceStorageInterface, RelationData, RelationType, RevisionData, ) -from swh.provenance.tests.conftest import fill_storage, load_repo_data, ts2dt + +from .utils import fill_storage, load_repo_data, ts2dt class TestProvenanceStorage: def test_provenance_storage_content( self, provenance_storage: ProvenanceStorageInterface, ) -> None: """Tests content methods for every `ProvenanceStorageInterface` implementation.""" # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") # Add all content present in the current repo to the storage, just assigning their # creation dates. Then check that the returned results when querying are the same. cnt_dates = { cnt["sha1_git"]: cnt["ctime"] for idx, cnt in enumerate(data["content"]) } assert provenance_storage.content_add(cnt_dates) assert provenance_storage.content_get(set(cnt_dates.keys())) == cnt_dates assert provenance_storage.entity_get_all(EntityType.CONTENT) == set( cnt_dates.keys() ) def test_provenance_storage_directory( self, provenance_storage: ProvenanceStorageInterface, ) -> None: """Tests directory methods for every `ProvenanceStorageInterface` implementation.""" # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") # Of all directories present in the current repo, only assign a date to those # containing blobs (picking the max date among the available ones). Then check that # the returned results when querying are the same. def getmaxdate( directory: Dict[str, Any], contents: Iterable[Dict[str, Any]] ) -> Optional[datetime]: dates = [ content["ctime"] for entry in directory["entries"] for content in contents if entry["type"] == "file" and entry["target"] == content["sha1_git"] ] return max(dates) if dates else None flat_values = (False, True) dir_dates = {} for idx, dir in enumerate(data["directory"]): date = getmaxdate(dir, data["content"]) if date is not None: dir_dates[dir["id"]] = DirectoryData( date=date, flat=flat_values[idx % 2] ) assert provenance_storage.directory_add(dir_dates) assert provenance_storage.directory_get(set(dir_dates.keys())) == dir_dates assert provenance_storage.entity_get_all(EntityType.DIRECTORY) == set( dir_dates.keys() ) def test_provenance_storage_location( self, provenance_storage: ProvenanceStorageInterface, ) -> None: """Tests location methods for every `ProvenanceStorageInterface` implementation.""" # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") # Add all names of entries present in the directories of the current repo as paths # to the storage. Then check that the returned results when querying are the same. paths = { hashlib.sha1(entry["name"]).digest(): entry["name"] for dir in data["directory"] for entry in dir["entries"] } assert provenance_storage.location_add(paths) assert provenance_storage.location_get_all() == paths @pytest.mark.origin_layer def test_provenance_storage_origin( self, provenance_storage: ProvenanceStorageInterface, ) -> None: """Tests origin methods for every `ProvenanceStorageInterface` implementation.""" # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") # Test origin methods. # Add all origins present in the current repo to the storage. Then check that the # returned results when querying are the same. orgs = {Origin(url=org["url"]).id: org["url"] for org in data["origin"]} assert orgs assert provenance_storage.origin_add(orgs) assert provenance_storage.origin_get(set(orgs.keys())) == orgs assert provenance_storage.entity_get_all(EntityType.ORIGIN) == set(orgs.keys()) def test_provenance_storage_revision( self, provenance_storage: ProvenanceStorageInterface, ) -> None: """Tests revision methods for every `ProvenanceStorageInterface` implementation.""" # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") # Test revision methods. # Add all revisions present in the current repo to the storage, assigning their # dates and an arbitrary origin to each one. Then check that the returned results # when querying are the same. origin = Origin(url=next(iter(data["origin"]))["url"]) # Origin must be inserted in advance. assert provenance_storage.origin_add({origin.id: origin.url}) revs = {rev["id"] for idx, rev in enumerate(data["revision"])} rev_data = { rev["id"]: RevisionData( date=ts2dt(rev["date"]) if idx % 2 != 0 else None, origin=origin.id if idx % 3 != 0 else None, ) for idx, rev in enumerate(data["revision"]) } assert revs assert provenance_storage.revision_add(rev_data) assert provenance_storage.revision_get(set(rev_data.keys())) == { k: v for (k, v) in rev_data.items() if v.date is not None or v.origin is not None } assert provenance_storage.entity_get_all(EntityType.REVISION) == set(rev_data) def test_provenance_storage_relation_revision_layer( self, provenance_storage: ProvenanceStorageInterface, ) -> None: """Tests relation methods for every `ProvenanceStorageInterface` implementation.""" # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") # Test content-in-revision relation. # Create flat models of every root directory for the revisions in the dataset. cnt_in_rev: Dict[Sha1Git, Set[RelationData]] = {} for rev in data["revision"]: root = next( subdir for subdir in data["directory"] if subdir["id"] == rev["directory"] ) for cnt, rel in dircontent(data, rev["id"], root): cnt_in_rev.setdefault(cnt, set()).add(rel) relation_add_and_compare_result( provenance_storage, RelationType.CNT_EARLY_IN_REV, cnt_in_rev ) # Test content-in-directory relation. # Create flat models for every directory in the dataset. cnt_in_dir: Dict[Sha1Git, Set[RelationData]] = {} for dir in data["directory"]: for cnt, rel in dircontent(data, dir["id"], dir): cnt_in_dir.setdefault(cnt, set()).add(rel) relation_add_and_compare_result( provenance_storage, RelationType.CNT_IN_DIR, cnt_in_dir ) # Test content-in-directory relation. # Add root directories to their correspondent revision in the dataset. dir_in_rev: Dict[Sha1Git, Set[RelationData]] = {} for rev in data["revision"]: dir_in_rev.setdefault(rev["directory"], set()).add( RelationData(dst=rev["id"], path=b".") ) relation_add_and_compare_result( provenance_storage, RelationType.DIR_IN_REV, dir_in_rev ) @pytest.mark.origin_layer def test_provenance_storage_relation_orign_layer( self, provenance_storage: ProvenanceStorageInterface, ) -> None: """Tests relation methods for every `ProvenanceStorageInterface` implementation.""" # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") # Test revision-in-origin relation. # Origins must be inserted in advance (cannot be done by `entity_add` inside # `relation_add_and_compare_result`). orgs = {Origin(url=org["url"]).id: org["url"] for org in data["origin"]} assert provenance_storage.origin_add(orgs) # Add all revisions that are head of some snapshot branch to the corresponding # origin. rev_in_org: Dict[Sha1Git, Set[RelationData]] = {} for status in data["origin_visit_status"]: if status["snapshot"] is not None: for snapshot in data["snapshot"]: if snapshot["id"] == status["snapshot"]: for branch in snapshot["branches"].values(): if branch["target_type"] == "revision": rev_in_org.setdefault(branch["target"], set()).add( RelationData( dst=Origin(url=status["origin"]).id, path=None, ) ) relation_add_and_compare_result( provenance_storage, RelationType.REV_IN_ORG, rev_in_org ) # Test revision-before-revision relation. # For each revision in the data set add an entry for each parent to the relation. rev_before_rev: Dict[Sha1Git, Set[RelationData]] = {} for rev in data["revision"]: for parent in rev["parents"]: rev_before_rev.setdefault(parent, set()).add( RelationData(dst=rev["id"], path=None) ) relation_add_and_compare_result( provenance_storage, RelationType.REV_BEFORE_REV, rev_before_rev ) def test_provenance_storage_find_revision_layer( self, provenance: ProvenanceInterface, provenance_storage: ProvenanceStorageInterface, archive: ArchiveInterface, ) -> None: """Tests `content_find_first` and `content_find_all` methods for every `ProvenanceStorageInterface` implementation. """ # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") fill_storage(archive.storage, data) # Test content_find_first and content_find_all, first only executing the # revision-content algorithm, then adding the origin-revision layer. # Execute the revision-content algorithm on both storages. revisions = [ RevisionEntry(id=rev["id"], date=ts2dt(rev["date"]), root=rev["directory"]) for rev in data["revision"] ] revision_add(provenance, archive, revisions) revision_add(Provenance(provenance_storage), archive, revisions) assert ProvenanceResult( content=hash_to_bytes("20329687bb9c1231a7e05afe86160343ad49b494"), revision=hash_to_bytes("c0d8929936631ecbcf9147be6b8aa13b13b014e4"), date=datetime.fromtimestamp(1000000000.0, timezone.utc), origin=None, path=b"A/B/C/a", ) == provenance_storage.content_find_first( hash_to_bytes("20329687bb9c1231a7e05afe86160343ad49b494") ) for cnt in {cnt["sha1_git"] for cnt in data["content"]}: assert provenance.storage.content_find_first( cnt ) == provenance_storage.content_find_first(cnt) assert set(provenance.storage.content_find_all(cnt)) == set( provenance_storage.content_find_all(cnt) ) @pytest.mark.origin_layer def test_provenance_storage_find_origin_layer( self, provenance: ProvenanceInterface, provenance_storage: ProvenanceStorageInterface, archive: ArchiveInterface, ) -> None: """Tests `content_find_first` and `content_find_all` methods for every `ProvenanceStorageInterface` implementation. """ # Read data/README.md for more details on how these datasets are generated. data = load_repo_data("cmdbts2") fill_storage(archive.storage, data) # Execute the revision-content algorithm on both storages. revisions = [ RevisionEntry(id=rev["id"], date=ts2dt(rev["date"]), root=rev["directory"]) for rev in data["revision"] ] revision_add(provenance, archive, revisions) revision_add(Provenance(provenance_storage), archive, revisions) # Test content_find_first and content_find_all, first only executing the # revision-content algorithm, then adding the origin-revision layer. # Execute the origin-revision algorithm on both storages. origins = [ OriginEntry(url=sta["origin"], snapshot=sta["snapshot"]) for sta in data["origin_visit_status"] if sta["snapshot"] is not None ] origin_add(provenance, archive, origins) origin_add(Provenance(provenance_storage), archive, origins) assert ProvenanceResult( content=hash_to_bytes("20329687bb9c1231a7e05afe86160343ad49b494"), revision=hash_to_bytes("c0d8929936631ecbcf9147be6b8aa13b13b014e4"), date=datetime.fromtimestamp(1000000000.0, timezone.utc), origin="https://cmdbts2", path=b"A/B/C/a", ) == provenance_storage.content_find_first( hash_to_bytes("20329687bb9c1231a7e05afe86160343ad49b494") ) for cnt in {cnt["sha1_git"] for cnt in data["content"]}: assert provenance.storage.content_find_first( cnt ) == provenance_storage.content_find_first(cnt) assert set(provenance.storage.content_find_all(cnt)) == set( provenance_storage.content_find_all(cnt) ) def test_types(self, provenance_storage: ProvenanceStorageInterface) -> None: """Checks all methods of ProvenanceStorageInterface are implemented by this backend, and that they have the same signature.""" # Create an instance of the protocol (which cannot be instantiated # directly, so this creates a subclass, then instantiates it) interface = type("_", (ProvenanceStorageInterface,), {})() assert "content_find_first" in dir(interface) missing_methods = [] for meth_name in dir(interface): if meth_name.startswith("_"): continue interface_meth = getattr(interface, meth_name) try: concrete_meth = getattr(provenance_storage, meth_name) except AttributeError: if not getattr(interface_meth, "deprecated_endpoint", False): # The backend is missing a (non-deprecated) endpoint missing_methods.append(meth_name) continue expected_signature = inspect.signature(interface_meth) actual_signature = inspect.signature(concrete_meth) assert expected_signature == actual_signature, meth_name assert missing_methods == [] # If all the assertions above succeed, then this one should too. # But there's no harm in double-checking. # And we could replace the assertions above by this one, but unlike # the assertions above, it doesn't explain what is missing. assert isinstance(provenance_storage, ProvenanceStorageInterface) def dircontent( data: Dict[str, Any], ref: Sha1Git, dir: Dict[str, Any], prefix: bytes = b"", ) -> Iterable[Tuple[Sha1Git, RelationData]]: content = { ( entry["target"], RelationData(dst=ref, path=os.path.join(prefix, entry["name"])), ) for entry in dir["entries"] if entry["type"] == "file" } for entry in dir["entries"]: if entry["type"] == "dir": child = next( subdir for subdir in data["directory"] if subdir["id"] == entry["target"] ) content.update( dircontent(data, ref, child, os.path.join(prefix, entry["name"])) ) return content def entity_add( storage: ProvenanceStorageInterface, entity: EntityType, ids: Set[Sha1Git] ) -> bool: now = datetime.now(tz=timezone.utc) if entity == EntityType.CONTENT: return storage.content_add({sha1: now for sha1 in ids}) elif entity == EntityType.DIRECTORY: return storage.directory_add( {sha1: DirectoryData(date=now, flat=False) for sha1 in ids} ) else: # entity == EntityType.REVISION: return storage.revision_add( {sha1: RevisionData(date=None, origin=None) for sha1 in ids} ) def relation_add_and_compare_result( storage: ProvenanceStorageInterface, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]], ) -> None: # Source, destinations and locations must be added in advance. src, *_, dst = relation.value.split("_") srcs = {sha1 for sha1 in data} if src != "origin": assert entity_add(storage, EntityType(src), srcs) dsts = {rel.dst for rels in data.values() for rel in rels} if dst != "origin": assert entity_add(storage, EntityType(dst), dsts) assert storage.location_add( { hashlib.sha1(rel.path).digest(): rel.path for rels in data.values() for rel in rels if rel.path is not None } ) assert data assert storage.relation_add(relation, data) for src_sha1 in srcs: relation_compare_result( storage.relation_get(relation, [src_sha1]), {src_sha1: data[src_sha1]}, ) for dst_sha1 in dsts: relation_compare_result( storage.relation_get(relation, [dst_sha1], reverse=True), { src_sha1: { RelationData(dst=dst_sha1, path=rel.path) for rel in rels if dst_sha1 == rel.dst } for src_sha1, rels in data.items() if dst_sha1 in {rel.dst for rel in rels} }, ) relation_compare_result( storage.relation_get_all(relation), data, ) def relation_compare_result( computed: Dict[Sha1Git, Set[RelationData]], expected: Dict[Sha1Git, Set[RelationData]], ) -> None: assert { src_sha1: {RelationData(dst=rel.dst, path=rel.path) for rel in rels} for src_sha1, rels in expected.items() } == computed diff --git a/swh/provenance/tests/test_replay.py b/swh/provenance/tests/test_replay.py index 4ec0fb5..6253202 100644 --- a/swh/provenance/tests/test_replay.py +++ b/swh/provenance/tests/test_replay.py @@ -1,170 +1,170 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools from typing import Dict import psycopg2 import pytest from swh.journal.client import JournalClient from swh.provenance.algos.revision import revision_add from swh.provenance.archive.interface import ArchiveInterface from swh.provenance.model import RevisionEntry from swh.provenance.provenance import Provenance from swh.provenance.storage import get_provenance_storage from swh.provenance.storage.interface import ( EntityType, ProvenanceStorageInterface, RelationType, ) from swh.provenance.storage.replay import ( ProvenanceObjectDeserializer, process_replay_objects, ) -from .conftest import fill_storage, load_repo_data, ts2dt +from .utils import fill_storage, load_repo_data, ts2dt @pytest.fixture(scope="function") def object_types(): """Set of object types to precreate topics for.""" return { # objects "revision", "directory", "content", "location", # relations "content_in_revision", "content_in_directory", "directory_in_revision", } @pytest.fixture() def replayer_storage_and_client( provenance_postgresqldb: Dict[str, str], kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, ): cfg = { "storage": { "cls": "postgresql", "db": provenance_postgresqldb, "raise_on_commit": True, }, "journal_writer": { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, "anonymize": False, }, } with get_provenance_storage(cls="journal", **cfg) as storage: deserializer = ProvenanceObjectDeserializer() replayer = JournalClient( brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, value_deserializer=deserializer.convert, ) yield storage, replayer @pytest.fixture() def secondary_db(provenance_postgresqldb: Dict[str, str]): """Create a new test db the new db is named after the db configured in provenance_postgresqldb and is using the same template as this later. """ dsn = provenance_postgresqldb.copy() conn = psycopg2.connect( dbname="postgres", user=dsn["user"], password=dsn.get("password"), host=dsn["host"], port=dsn["port"], ) conn.autocommit = True with conn.cursor() as cur: dbname = dsn["dbname"] template_name = f"{dbname}_tmpl" secondary_dbname = f"{dbname}_dst" cur.execute(f'CREATE DATABASE "{secondary_dbname}" TEMPLATE "{template_name}"') try: dsn["dbname"] = secondary_dbname yield dsn finally: with conn.cursor() as cur: cur.execute(f'DROP DATABASE "{secondary_dbname}"') @pytest.mark.kafka @pytest.mark.parametrize( "repo", ( "cmdbts2", "out-of-order", "with-merges", ), ) def test_provenance_replayer( provenance_storage: ProvenanceStorageInterface, archive: ArchiveInterface, replayer_storage_and_client, secondary_db, repo: str, ): """Optimal replayer scenario. This: - writes objects to a provenance storage (which have a journal writer) - replayer consumes objects from the topic and replays them - a destination provenance storage is filled from this In the end, both storages should have the same content. """ # load test data and fill a swh-storage data = load_repo_data(repo) fill_storage(archive.storage, data) prov_sto_src, replayer = replayer_storage_and_client # Fill Kafka by filling the source provenance storage revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] revision_add(Provenance(prov_sto_src), archive, revisions) # now replay the kafka log in a new provenance storage with get_provenance_storage( cls="postgresql", db=secondary_db, raise_on_commit=True ) as prov_sto_dst: worker_fn = functools.partial(process_replay_objects, storage=prov_sto_dst) replayer.process(worker_fn) compare_provenance_storages(prov_sto_src, prov_sto_dst) def compare_provenance_storages(sto1, sto2): entities1 = {etype: sto1.entity_get_all(etype) for etype in EntityType} entities2 = {etype: sto2.entity_get_all(etype) for etype in EntityType} assert entities1 == entities2 relations1 = {rtype: sto1.relation_get_all(rtype) for rtype in RelationType} relations2 = {rtype: sto2.relation_get_all(rtype) for rtype in RelationType} assert relations1 == relations2 diff --git a/swh/provenance/tests/test_revision_content_layer.py b/swh/provenance/tests/test_revision_content_layer.py index d9759e0..3e8b365 100644 --- a/swh/provenance/tests/test_revision_content_layer.py +++ b/swh/provenance/tests/test_revision_content_layer.py @@ -1,467 +1,463 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple import pytest from typing_extensions import TypedDict from swh.model.hashutil import hash_to_bytes from swh.model.model import Sha1Git from swh.provenance.algos.directory import directory_add from swh.provenance.algos.revision import revision_add from swh.provenance.archive import ArchiveInterface from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import DirectoryEntry, RevisionEntry from swh.provenance.storage.interface import EntityType, RelationType -from swh.provenance.tests.conftest import ( - fill_storage, - get_datafile, - load_repo_data, - ts2dt, -) + +from .utils import fill_storage, get_datafile, load_repo_data, ts2dt class SynthRelation(TypedDict): prefix: Optional[str] path: str src: Sha1Git dst: Sha1Git rel_ts: float class SynthRevision(TypedDict): sha1: Sha1Git date: float msg: str R_C: List[SynthRelation] R_D: List[SynthRelation] D_C: List[SynthRelation] def synthetic_revision_content_result(filename: str) -> Iterator[SynthRevision]: """Generates dict representations of synthetic revisions found in the synthetic file (from the data/ directory) given as argument of the generator. Generated SynthRevision (typed dict) with the following elements: "sha1": (Sha1Git) sha1 of the revision, "date": (float) timestamp of the revision, "msg": (str) commit message of the revision, "R_C": (list) new R---C relations added by this revision "R_D": (list) new R-D relations added by this revision "D_C": (list) new D-C relations added by this revision Each relation above is a SynthRelation typed dict with: "path": (str) location "src": (Sha1Git) sha1 of the source of the relation "dst": (Sha1Git) sha1 of the destination of the relation "rel_ts": (float) timestamp of the target of the relation (related to the timestamp of the revision) """ with open(get_datafile(filename), "r") as fobj: yield from _parse_synthetic_revision_content_file(fobj) def _parse_synthetic_revision_content_file( fobj: Iterable[str], ) -> Iterator[SynthRevision]: """Read a 'synthetic' file and generate a dict representation of the synthetic revision for each revision listed in the synthetic file. """ regs = [ "(?PR[0-9]{2,4})?", "(?P[^| ]*)", "([+] )?(?P[^| +]*?)[/]?", "(?P[RDC]) (?P[0-9a-f]{40})", "(?P-?[0-9]+(.[0-9]+)?)", ] regex = re.compile("^ *" + r" *[|] *".join(regs) + r" *(#.*)?$") current_rev: List[dict] = [] for m in (regex.match(line) for line in fobj): if m: d = m.groupdict() if d["revname"]: if current_rev: yield _mk_synth_rev(current_rev) current_rev.clear() current_rev.append(d) if current_rev: yield _mk_synth_rev(current_rev) def _mk_synth_rev(synth_rev: List[Dict[str, str]]) -> SynthRevision: assert synth_rev[0]["type"] == "R" rev = SynthRevision( sha1=hash_to_bytes(synth_rev[0]["sha1"]), date=float(synth_rev[0]["ts"]), msg=synth_rev[0]["revname"], R_C=[], R_D=[], D_C=[], ) current_path = None # path of the last R-D relation we parsed, used a prefix for next D-C # relations for row in synth_rev[1:]: if row["reltype"] == "R---C": assert row["type"] == "C" rev["R_C"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = None elif row["reltype"] == "R-D": assert row["type"] == "D" rev["R_D"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = row["path"] elif row["reltype"] == "D-C": assert row["type"] == "C" rev["D_C"].append( SynthRelation( prefix=current_path, path=row["path"], src=rev["R_D"][-1]["dst"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) return rev @pytest.mark.parametrize( "repo, lower, mindepth, flatten", ( ("cmdbts2", True, 1, True), ("cmdbts2", True, 1, False), ("cmdbts2", False, 1, True), ("cmdbts2", False, 1, False), ("cmdbts2", True, 2, True), ("cmdbts2", True, 2, False), ("cmdbts2", False, 2, True), ("cmdbts2", False, 2, False), ("out-of-order", True, 1, True), ("out-of-order", True, 1, False), ), ) def test_revision_content_result( provenance: ProvenanceInterface, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, flatten: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) revisions = {rev["id"]: rev for rev in data["revision"]} rows: Dict[str, Set[Any]] = { "content": set(), "content_in_directory": set(), "content_in_revision": set(), "directory": set(), "directory_in_revision": set(), "location": set(), "revision": set(), } for synth_rev in synthetic_revision_content_result(syntheticfile): revision = revisions[synth_rev["sha1"]] entry = RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) if flatten: revision_add(provenance, archive, [entry], lower=lower, mindepth=mindepth) else: prev_directories = provenance.storage.entity_get_all(EntityType.DIRECTORY) revision_add( provenance, archive, [entry], lower=lower, mindepth=mindepth, flatten=False, ) directories = [ DirectoryEntry(id=sha1) for sha1 in provenance.storage.entity_get_all( EntityType.DIRECTORY ).difference(prev_directories) ] for directory in directories: assert not provenance.directory_already_flattened(directory) directory_add(provenance, archive, directories) # each "entry" in the synth file is one new revision rows["revision"].add(synth_rev["sha1"]) assert rows["revision"] == provenance.storage.entity_get_all( EntityType.REVISION ), synth_rev["msg"] # check the timestamp of the revision rev_ts = synth_rev["date"] rev_data = provenance.storage.revision_get([synth_rev["sha1"]])[ synth_rev["sha1"] ] assert ( rev_data.date is not None and rev_ts == rev_data.date.timestamp() ), synth_rev["msg"] # this revision might have added new content objects rows["content"] |= set(x["dst"] for x in synth_rev["R_C"]) rows["content"] |= set(x["dst"] for x in synth_rev["D_C"]) assert rows["content"] == provenance.storage.entity_get_all( EntityType.CONTENT ), synth_rev["msg"] # check for R-C (direct) entries # these are added directly in the content_early_in_rev table rows["content_in_revision"] |= set( (x["dst"], x["src"], x["path"].encode()) for x in synth_rev["R_C"] ) assert rows["content_in_revision"] == { (src, rel.dst, rel.path) for src, rels in provenance.storage.relation_get_all( RelationType.CNT_EARLY_IN_REV ).items() for rel in rels }, synth_rev["msg"] # check timestamps for rc in synth_rev["R_C"]: assert ( rev_ts + rc["rel_ts"] == provenance.storage.content_get([rc["dst"]])[rc["dst"]].timestamp() ), synth_rev["msg"] # check directories # each directory stored in the provenance index is an entry # in the "directory" table... rows["directory"] |= set(x["dst"] for x in synth_rev["R_D"]) assert rows["directory"] == provenance.storage.entity_get_all( EntityType.DIRECTORY ), synth_rev["msg"] # ... + a number of rows in the "directory_in_rev" table... # check for R-D entries rows["directory_in_revision"] |= set( (x["dst"], x["src"], x["path"].encode()) for x in synth_rev["R_D"] ) assert rows["directory_in_revision"] == { (src, rel.dst, rel.path) for src, rels in provenance.storage.relation_get_all( RelationType.DIR_IN_REV ).items() for rel in rels }, synth_rev["msg"] # check timestamps for rd in synth_rev["R_D"]: dir_data = provenance.storage.directory_get([rd["dst"]])[rd["dst"]] assert dir_data.date is not None assert rev_ts + rd["rel_ts"] == dir_data.date.timestamp(), synth_rev["msg"] assert dir_data.flat, synth_rev["msg"] # ... + a number of rows in the "content_in_dir" table # for content of the directory. # check for D-C entries rows["content_in_directory"] |= set( (x["dst"], x["src"], x["path"].encode()) for x in synth_rev["D_C"] ) assert rows["content_in_directory"] == { (src, rel.dst, rel.path) for src, rels in provenance.storage.relation_get_all( RelationType.CNT_IN_DIR ).items() for rel in rels }, synth_rev["msg"] # check timestamps for dc in synth_rev["D_C"]: assert ( rev_ts + dc["rel_ts"] == provenance.storage.content_get([dc["dst"]])[dc["dst"]].timestamp() ), synth_rev["msg"] # check for location entries rows["location"] |= set(x["path"].encode() for x in synth_rev["R_C"]) rows["location"] |= set(x["path"].encode() for x in synth_rev["D_C"]) rows["location"] |= set(x["path"].encode() for x in synth_rev["R_D"]) assert rows["location"] == set( provenance.storage.location_get_all().values() ), synth_rev["msg"] @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) @pytest.mark.parametrize("batch", (True, False)) def test_provenance_heuristics_content_find_all( provenance: ProvenanceInterface, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, batch: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] if batch: revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) else: for revision in revisions: revision_add( provenance, archive, [revision], lower=lower, mindepth=mindepth ) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_occurrences: Dict[str, List[Tuple[str, float, Optional[str], str]]] = {} for synth_rev in synthetic_revision_content_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: expected_occurrences.setdefault(rc["dst"].hex(), []).append( (rev_id, rev_ts, None, rc["path"]) ) for dc in synth_rev["D_C"]: assert dc["prefix"] is not None # to please mypy expected_occurrences.setdefault(dc["dst"].hex(), []).append( (rev_id, rev_ts, None, dc["prefix"] + "/" + dc["path"]) ) for content_id, results in expected_occurrences.items(): expected = [(content_id, *result) for result in results] db_occurrences = [ ( occur.content.hex(), occur.revision.hex(), occur.date.timestamp(), occur.origin, occur.path.decode(), ) for occur in provenance.content_find_all(hash_to_bytes(content_id)) ] assert len(db_occurrences) == len(expected) assert set(db_occurrences) == set(expected) @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) @pytest.mark.parametrize("batch", (True, False)) def test_provenance_heuristics_content_find_first( provenance: ProvenanceInterface, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, batch: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(archive.storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] if batch: revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) else: for revision in revisions: revision_add( provenance, archive, [revision], lower=lower, mindepth=mindepth ) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_first: Dict[str, Tuple[str, float, List[str]]] = {} # dict of tuples (blob_id, rev_id, [path, ...]) the third element for path # is a list because a content can be added at several places in a single # revision, in which case the result of content_find_first() is one of # those path, but we have no guarantee which one it will return. for synth_rev in synthetic_revision_content_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: sha1 = rc["dst"].hex() if sha1 not in expected_first: assert rc["rel_ts"] == 0 expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) else: if rev_ts == expected_first[sha1][1]: expected_first[sha1][2].append(rc["path"]) elif rev_ts < expected_first[sha1][1]: expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) for dc in synth_rev["D_C"]: sha1 = rc["dst"].hex() assert sha1 in expected_first # nothing to do there, this content cannot be a "first seen file" for content_id, (rev_id, ts, paths) in expected_first.items(): occur = provenance.content_find_first(hash_to_bytes(content_id)) assert occur is not None assert occur.content.hex() == content_id assert occur.revision.hex() == rev_id assert occur.date.timestamp() == ts assert occur.origin is None assert occur.path.decode() in paths diff --git a/swh/provenance/tests/test_revision_iterator.py b/swh/provenance/tests/test_revision_iterator.py index 9151fe4..c7c4c8a 100644 --- a/swh/provenance/tests/test_revision_iterator.py +++ b/swh/provenance/tests/test_revision_iterator.py @@ -1,31 +1,32 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.provenance.algos.revision import CSVRevisionIterator -from swh.provenance.tests.conftest import fill_storage, load_repo_data, ts2dt from swh.storage.interface import StorageInterface +from .utils import fill_storage, load_repo_data, ts2dt + @pytest.mark.parametrize( "repo", ( "cmdbts2", "out-of-order", ), ) def test_revision_iterator(swh_storage: StorageInterface, repo: str) -> None: """Test CSVRevisionIterator""" data = load_repo_data(repo) fill_storage(swh_storage, data) revisions_csv = [ (rev["id"], ts2dt(rev["date"]), rev["directory"]) for rev in data["revision"] ] revisions = list(CSVRevisionIterator(revisions_csv)) assert revisions assert len(revisions) == len(data["revision"]) diff --git a/swh/provenance/tests/test_utils.py b/swh/provenance/tests/test_utils.py deleted file mode 100644 index a8361c3..0000000 --- a/swh/provenance/tests/test_utils.py +++ /dev/null @@ -1,32 +0,0 @@ -# Copyright (C) 2022 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - - -import logging -import tempfile -from typing import Dict, List, Optional - -from click.testing import CliRunner, Result -from yaml import safe_dump - -from swh.provenance.cli import cli - - -def invoke( - args: List[str], config: Optional[Dict] = None, catch_exceptions: bool = False -) -> Result: - """Invoke swh journal subcommands""" - runner = CliRunner() - with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: - if config is not None: - safe_dump(config, config_fd) - config_fd.seek(0) - args = ["-C" + config_fd.name] + args - - result = runner.invoke(cli, args, obj={"log_level": logging.DEBUG}, env=None) - if not catch_exceptions and result.exception: - print(result.output) - raise result.exception - return result diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/utils.py similarity index 55% copy from swh/provenance/tests/conftest.py copy to swh/provenance/tests/utils.py index 60ae464..21e624a 100644 --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/utils.py @@ -1,179 +1,128 @@ -# Copyright (C) 2021-2022 The Software Heritage developers +# Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information + from contextlib import contextmanager from datetime import datetime -from functools import partial +import logging import multiprocessing from os import path from pathlib import Path -from typing import Any, Dict, Generator, List +import tempfile +from typing import Any, Dict, List, Optional -from _pytest.fixtures import SubRequest from aiohttp.test_utils import TestClient, TestServer, loop_context +from click.testing import CliRunner, Result import msgpack -import psycopg2.extensions -import pytest -from pytest_postgresql import factories +from yaml import safe_dump -from swh.core.db.db_utils import initialize_database_for_module from swh.graph.http_rpc_server import make_app from swh.journal.serializers import msgpack_ext_hook from swh.model.model import BaseModel, TimestampWithTimezone -from swh.provenance import get_provenance -from swh.provenance.archive.interface import ArchiveInterface -from swh.provenance.archive.storage import ArchiveStorage -from swh.provenance.interface import ProvenanceInterface -from swh.provenance.storage import get_provenance_storage -from swh.provenance.storage.interface import ProvenanceStorageInterface -from swh.provenance.storage.postgresql import ProvenanceStoragePostgreSql +from swh.provenance.cli import cli from swh.storage.interface import StorageInterface from swh.storage.replay import OBJECT_CONVERTERS, OBJECT_FIXERS, process_replay_objects -provenance_postgresql_proc = factories.postgresql_proc( - load=[ - partial( - initialize_database_for_module, - modname="provenance", - flavor="normalized", - version=ProvenanceStoragePostgreSql.current_version, - ) - ], -) - -postgres_provenance = factories.postgresql("provenance_postgresql_proc") - - -@pytest.fixture() -def provenance_postgresqldb(request, postgres_provenance): - return postgres_provenance.get_dsn_parameters() - - -@pytest.fixture() -def provenance_storage( - request: SubRequest, - provenance_postgresqldb: Dict[str, str], -) -> Generator[ProvenanceStorageInterface, None, None]: - """Return a working and initialized ProvenanceStorageInterface object""" - # in test sessions, we DO want to raise any exception occurring at commit time - with get_provenance_storage( - cls="postgresql", db=provenance_postgresqldb, raise_on_commit=True - ) as storage: - yield storage - - -@pytest.fixture -def provenance( - postgres_provenance: psycopg2.extensions.connection, -) -> Generator[ProvenanceInterface, None, None]: - """Return a working and initialized ProvenanceInterface object""" - - from swh.core.db.db_utils import ( - init_admin_extensions, - populate_database_for_package, - ) - - init_admin_extensions("swh.provenance", postgres_provenance.dsn) - populate_database_for_package( - "swh.provenance", postgres_provenance.dsn, flavor="normalized" - ) - # in test sessions, we DO want to raise any exception occurring at commit time - with get_provenance( - cls="postgresql", - db=postgres_provenance.get_dsn_parameters(), - raise_on_commit=True, - ) as provenance: - yield provenance - - -@pytest.fixture -def archive(swh_storage: StorageInterface) -> ArchiveInterface: - """Return an ArchiveStorage-based ArchiveInterface object""" - return ArchiveStorage(swh_storage) +def invoke( + args: List[str], config: Optional[Dict] = None, catch_exceptions: bool = False +) -> Result: + """Invoke swh journal subcommands""" + runner = CliRunner() + with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: + if config is not None: + safe_dump(config, config_fd) + config_fd.seek(0) + args = ["-C" + config_fd.name] + args + + result = runner.invoke(cli, args, obj={"log_level": logging.DEBUG}, env=None) + if not catch_exceptions and result.exception: + print(result.output) + raise result.exception + return result def fill_storage(storage: StorageInterface, data: Dict[str, List[dict]]) -> None: objects = { objtype: [objs_from_dict(objtype, d) for d in dicts] for objtype, dicts in data.items() } process_replay_objects(objects, storage=storage) def get_datafile(fname: str) -> str: return path.join(path.dirname(__file__), "data", fname) # TODO: this should return Dict[str, List[BaseModel]] directly, but it requires # refactoring several tests def load_repo_data(repo: str) -> Dict[str, List[dict]]: data: Dict[str, List[dict]] = {} with open(get_datafile(f"{repo}.msgpack"), "rb") as fobj: unpacker = msgpack.Unpacker( fobj, raw=False, ext_hook=msgpack_ext_hook, strict_map_key=False, timestamp=3, # convert Timestamp in datetime objects (tz UTC) ) for msg in unpacker: if len(msg) == 2: # old format objtype, objd = msg else: # now we should have a triplet (type, key, value) objtype, _, objd = msg data.setdefault(objtype, []).append(objd) return data def objs_from_dict(object_type: str, dict_repr: dict) -> BaseModel: if object_type in OBJECT_FIXERS: dict_repr = OBJECT_FIXERS[object_type](dict_repr) obj = OBJECT_CONVERTERS[object_type](dict_repr) return obj def ts2dt(ts: Dict[str, Any]) -> datetime: return TimestampWithTimezone.from_dict(ts).to_datetime() def run_grpc_server(queue, dataset_path): try: config = { "graph": { "cls": "local", "grpc_server": {"path": dataset_path}, "http_rpc_server": {"debug": True}, } } with loop_context() as loop: app = make_app(config=config) client = TestClient(TestServer(app), loop=loop) loop.run_until_complete(client.start_server()) url = client.make_url("/graph/") queue.put((url, app["rpc_url"])) loop.run_forever() except Exception as e: queue.put(e) @contextmanager def grpc_server(dataset): dataset_path = ( Path(__file__).parents[0] / "data/swhgraph" / dataset / "compressed/example" ) queue = multiprocessing.Queue() server = multiprocessing.Process( target=run_grpc_server, kwargs={"queue": queue, "dataset_path": dataset_path} ) server.start() res = queue.get() if isinstance(res, Exception): raise res grpc_url = res[1] try: yield grpc_url finally: server.terminate()