diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py index c0f519b..8663ac7 100644 --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -1,237 +1,249 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from os import path import re -from typing import Iterable, Iterator, List, Optional +from typing import Any, Dict, Iterable, Iterator, List, Optional import msgpack +import psycopg2 import pytest from typing_extensions import TypedDict from swh.core.db import BaseDb from swh.journal.serializers import msgpack_ext_hook from swh.model.hashutil import hash_to_bytes from swh.model.model import Sha1Git from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.provenance import get_provenance +from swh.provenance.archive import ArchiveInterface from swh.provenance.postgresql.archive import ArchivePostgreSQL +from swh.provenance.postgresql.provenancedb_base import ProvenanceDBBase +from swh.provenance.provenance import ProvenanceInterface from swh.provenance.storage.archive import ArchiveStorage +from swh.storage.postgresql.storage import Storage from swh.storage.replay import process_replay_objects @pytest.fixture(params=["with-path", "without-path"]) -def provenance(request, postgresql): +def provenance( + request, # TODO: add proper type annotation + postgresql: psycopg2.extensions.connection, +) -> ProvenanceInterface: """return a working and initialized provenance db""" from swh.core.cli.db import populate_database_for_package flavor = request.param populate_database_for_package("swh.provenance", postgresql.dsn, flavor=flavor) BaseDb.adapt_conn(postgresql) - args = dict(tuple(item.split("=")) for item in postgresql.dsn.split()) - args.pop("options") + args: Dict[str, str] = { + item.split("=")[0]: item.split("=")[1] + for item in postgresql.dsn.split() + if item.split("=")[0] != "options" + } prov = get_provenance(cls="local", db=args) + assert isinstance(prov.storage, ProvenanceDBBase) assert prov.storage.flavor == flavor # in test sessions, we DO want to raise any exception occurring at commit time prov.storage.raise_on_commit = True return prov @pytest.fixture -def swh_storage_with_objects(swh_storage): +def swh_storage_with_objects(swh_storage: Storage) -> Storage: """return a Storage object (postgresql-based by default) with a few of each object type in it The inserted content comes from swh.model.tests.swh_model_data. """ for obj_type in ( "content", "skipped_content", "directory", "revision", "release", "snapshot", "origin", "origin_visit", "origin_visit_status", ): getattr(swh_storage, f"{obj_type}_add")(TEST_OBJECTS[obj_type]) return swh_storage @pytest.fixture -def archive_direct(swh_storage_with_objects): +def archive_direct(swh_storage_with_objects: Storage) -> ArchiveInterface: return ArchivePostgreSQL(swh_storage_with_objects.get_db().conn) @pytest.fixture -def archive_api(swh_storage_with_objects): +def archive_api(swh_storage_with_objects: Storage) -> ArchiveInterface: return ArchiveStorage(swh_storage_with_objects) @pytest.fixture(params=["archive", "db"]) -def archive(request, swh_storage_with_objects): +def archive(request, swh_storage_with_objects: Storage) -> Iterator[ArchiveInterface]: """Return a ArchivePostgreSQL based StorageInterface object""" # this is a workaround to prevent tests from hanging because of an unclosed # transaction. # TODO: refactor the ArchivePostgreSQL to properly deal with - # transactions and get rif of this fixture + # transactions and get rid of this fixture if request.param == "db": archive = ArchivePostgreSQL(conn=swh_storage_with_objects.get_db().conn) yield archive archive.conn.rollback() else: yield ArchiveStorage(swh_storage_with_objects) -def get_datafile(fname): +def get_datafile(fname: str) -> str: return path.join(path.dirname(__file__), "data", fname) -def load_repo_data(repo): - data = {} +def load_repo_data(repo: str) -> Dict[str, Any]: + data: Dict[str, Any] = {} with open(get_datafile(f"{repo}.msgpack"), "rb") as fobj: unpacker = msgpack.Unpacker( fobj, raw=False, ext_hook=msgpack_ext_hook, strict_map_key=False, timestamp=3, # convert Timestamp in datetime objects (tz UTC) ) for objtype, objd in unpacker: data.setdefault(objtype, []).append(objd) return data -def filter_dict(d, keys): +def filter_dict(d: Dict[Any, Any], keys: Iterable[Any]) -> Dict[Any, Any]: return {k: v for (k, v) in d.items() if k in keys} -def fill_storage(storage, data): +def fill_storage(storage: Storage, data: Dict[str, Any]) -> None: process_replay_objects(data, storage=storage) class SynthRelation(TypedDict): prefix: Optional[str] path: str src: Sha1Git dst: Sha1Git rel_ts: float class SynthRevision(TypedDict): sha1: Sha1Git date: float msg: str R_C: List[SynthRelation] R_D: List[SynthRelation] D_C: List[SynthRelation] def synthetic_result(filename: str) -> Iterator[SynthRevision]: """Generates dict representations of synthetic revisions found in the synthetic file (from the data/ directory) given as argument of the generator. Generated SynthRevision (typed dict) with the following elements: "sha1": (Sha1Git) sha1 of the revision, "date": (float) timestamp of the revision, "msg": (str) commit message of the revision, "R_C": (list) new R---C relations added by this revision "R_D": (list) new R-D relations added by this revision "D_C": (list) new D-C relations added by this revision Each relation above is a SynthRelation typed dict with: "path": (str) location "src": (Sha1Git) sha1 of the source of the relation "dst": (Sha1Git) sha1 of the destination of the relation "rel_ts": (float) timestamp of the target of the relation (related to the timestamp of the revision) """ with open(get_datafile(filename), "r") as fobj: yield from _parse_synthetic_file(fobj) def _parse_synthetic_file(fobj: Iterable[str]) -> Iterator[SynthRevision]: """Read a 'synthetic' file and generate a dict representation of the synthetic revision for each revision listed in the synthetic file. """ regs = [ "(?PR[0-9]{2,4})?", "(?P[^| ]*)", "([+] )?(?P[^| +]*?)[/]?", "(?P[RDC]) (?P[0-9a-z]{40})", "(?P-?[0-9]+(.[0-9]+)?)", ] regex = re.compile("^ *" + r" *[|] *".join(regs) + r" *(#.*)?$") current_rev: List[dict] = [] for m in (regex.match(line) for line in fobj): if m: d = m.groupdict() if d["revname"]: if current_rev: yield _mk_synth_rev(current_rev) current_rev.clear() current_rev.append(d) if current_rev: yield _mk_synth_rev(current_rev) -def _mk_synth_rev(synth_rev) -> SynthRevision: +def _mk_synth_rev(synth_rev: List[Dict[str, str]]) -> SynthRevision: assert synth_rev[0]["type"] == "R" rev = SynthRevision( sha1=hash_to_bytes(synth_rev[0]["sha1"]), date=float(synth_rev[0]["ts"]), msg=synth_rev[0]["revname"], R_C=[], R_D=[], D_C=[], ) current_path = None # path of the last R-D relation we parsed, used a prefix for next D-C # relations for row in synth_rev[1:]: if row["reltype"] == "R---C": assert row["type"] == "C" rev["R_C"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = None elif row["reltype"] == "R-D": assert row["type"] == "D" rev["R_D"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = row["path"] elif row["reltype"] == "D-C": assert row["type"] == "C" rev["D_C"].append( SynthRelation( prefix=current_path, path=row["path"], src=rev["R_D"][-1]["dst"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) return rev diff --git a/swh/provenance/tests/test_archive_interface.py b/swh/provenance/tests/test_archive_interface.py index 53775d2..71d1c32 100644 --- a/swh/provenance/tests/test_archive_interface.py +++ b/swh/provenance/tests/test_archive_interface.py @@ -1,50 +1,51 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter from operator import itemgetter import pytest from swh.core.db import BaseDb from swh.provenance.postgresql.archive import ArchivePostgreSQL from swh.provenance.storage.archive import ArchiveStorage from swh.provenance.tests.conftest import fill_storage, load_repo_data +from swh.storage.postgresql.storage import Storage @pytest.mark.parametrize( "repo", ("cmdbts2", "out-of-order", "with-merges"), ) -def test_archive_interface(repo, swh_storage): +def test_archive_interface(repo: str, swh_storage: Storage) -> None: archive_api = ArchiveStorage(swh_storage) dsn = swh_storage.get_db().conn.dsn with BaseDb.connect(dsn).conn as conn: BaseDb.adapt_conn(conn) archive_direct = ArchivePostgreSQL(conn) # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) for directory in data["directory"]: entries_api = sorted( archive_api.directory_ls(directory["id"]), key=itemgetter("name") ) entries_direct = sorted( archive_direct.directory_ls(directory["id"]), key=itemgetter("name") ) assert entries_api == entries_direct for revision in data["revision"]: parents_api = Counter(archive_api.revision_get_parents(revision["id"])) parents_direct = Counter( archive_direct.revision_get_parents(revision["id"]) ) assert parents_api == parents_direct for snapshot in data["snapshot"]: heads_api = Counter(archive_api.snapshot_get_heads(snapshot["id"])) heads_direct = Counter(archive_direct.snapshot_get_heads(snapshot["id"])) assert heads_api == heads_direct diff --git a/swh/provenance/tests/test_cli.py b/swh/provenance/tests/test_cli.py index 51ebefe..eb8c1b3 100644 --- a/swh/provenance/tests/test_cli.py +++ b/swh/provenance/tests/test_cli.py @@ -1,97 +1,103 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from typing import Set + from click.testing import CliRunner +import psycopg2 import pytest from swh.core.cli import swh as swhmain import swh.core.cli.db # noqa ; ensure cli is loaded from swh.core.db import BaseDb import swh.provenance.cli # noqa ; ensure cli is loaded -def test_cli_swh_db_help(): +def test_cli_swh_db_help() -> None: # swhmain.add_command(provenance_cli) result = CliRunner().invoke(swhmain, ["provenance", "-h"]) assert result.exit_code == 0 assert "Commands:" in result.output commands = result.output.split("Commands:")[1] for command in ( "find-all", "find-first", "iter-origins", "iter-revisions", ): assert f" {command} " in commands TABLES = { "dbflavor", "dbversion", "content", "content_in_revision", "content_in_directory", "directory", "directory_in_revision", "location", "origin", "revision", "revision_before_revision", "revision_in_origin", } @pytest.mark.parametrize( "flavor, dbtables", (("with-path", TABLES | {"location"}), ("without-path", TABLES)) ) def test_cli_db_create_and_init_db_with_flavor( - monkeypatch, postgresql, flavor, dbtables -): + monkeypatch, # TODO: add proper type annotation + postgresql: psycopg2.extensions.connection, + flavor: str, + dbtables: Set[str], +) -> None: """Test that 'swh db init provenance' works with flavors for both with-path and without-path flavors""" dbname = f"{flavor}-db" # DB creation using 'swh db create' db_params = postgresql.get_dsn_parameters() monkeypatch.setenv("PGHOST", db_params["host"]) monkeypatch.setenv("PGUSER", db_params["user"]) monkeypatch.setenv("PGPORT", db_params["port"]) result = CliRunner().invoke(swhmain, ["db", "create", "-d", dbname, "provenance"]) assert result.exit_code == 0, result.output # DB init using 'swh db init' result = CliRunner().invoke( swhmain, ["db", "init", "-d", dbname, "--flavor", flavor, "provenance"] ) assert result.exit_code == 0, result.output assert f"(flavor {flavor})" in result.output db_params["dbname"] = dbname cnx = BaseDb.connect(**db_params).conn # check the DB looks OK (check for db_flavor and expected tables) with cnx.cursor() as cur: cur.execute("select swh_get_dbflavor()") assert cur.fetchone() == (flavor,) cur.execute( "select table_name from information_schema.tables " "where table_schema = 'public' " f"and table_catalog = '{dbname}'" ) tables = set(x for (x,) in cur.fetchall()) assert tables == dbtables -def test_cli_init_db_default_flavor(postgresql): +def test_cli_init_db_default_flavor(postgresql: psycopg2.extensions.connection) -> None: "Test that 'swh db init provenance' defaults to a with-path flavored DB" dbname = postgresql.dsn result = CliRunner().invoke(swhmain, ["db", "init", "-d", dbname, "provenance"]) assert result.exit_code == 0, result.output with postgresql.cursor() as cur: cur.execute("select swh_get_dbflavor()") assert cur.fetchone() == ("with-path",) diff --git a/swh/provenance/tests/test_conftest.py b/swh/provenance/tests/test_conftest.py index 8b9d23a..8690698 100644 --- a/swh/provenance/tests/test_conftest.py +++ b/swh/provenance/tests/test_conftest.py @@ -1,19 +1,22 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from swh.provenance.provenance import ProvenanceInterface +from swh.storage.postgresql.storage import Storage -def test_provenance_fixture(provenance): + +def test_provenance_fixture(provenance: ProvenanceInterface) -> None: """Check the 'provenance' fixture produce a working ProvenanceDB object""" assert provenance provenance.flush() # should be a noop -def test_storage(swh_storage_with_objects): +def test_storage(swh_storage_with_objects: Storage) -> None: """Check the 'swh_storage_with_objects' fixture produce a working Storage object with at least some Content, Revision and Directory in it""" assert swh_storage_with_objects assert swh_storage_with_objects.content_get_random() assert swh_storage_with_objects.directory_get_random() assert swh_storage_with_objects.revision_get_random() diff --git a/swh/provenance/tests/test_history_graph.py b/swh/provenance/tests/test_history_graph.py index 091201a..1062502 100644 --- a/swh/provenance/tests/test_history_graph.py +++ b/swh/provenance/tests/test_history_graph.py @@ -1,62 +1,74 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from typing import Any, Dict + import pytest import yaml from swh.model.hashutil import hash_to_bytes +from swh.provenance.archive import ArchiveInterface from swh.provenance.graph import HistoryNode, build_history_graph from swh.provenance.model import OriginEntry, RevisionEntry from swh.provenance.origin import origin_add_revision +from swh.provenance.provenance import ProvenanceInterface from swh.provenance.tests.conftest import fill_storage, get_datafile, load_repo_data +from swh.storage.postgresql.storage import Storage -def history_graph_from_dict(d) -> HistoryNode: +def history_graph_from_dict(d: Dict[str, Any]) -> HistoryNode: """Takes a dictionary representing a tree of HistoryNode objects, and recursively builds the corresponding graph.""" node = HistoryNode( entry=RevisionEntry(hash_to_bytes(d["rev"])), visited=d.get("visited", False), in_history=d.get("in_history", False), ) node.parents = set( history_graph_from_dict(parent) for parent in d.get("parents", []) ) return node @pytest.mark.parametrize( "repo, visit", (("with-merges", "visits-01"),), ) @pytest.mark.parametrize("batch", (True, False)) -def test_history_graph(provenance, swh_storage, archive, repo, visit, batch): +def test_history_graph( + provenance: ProvenanceInterface, + swh_storage: Storage, + archive: ArchiveInterface, + repo: str, + visit: str, + batch: bool, +) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) filename = f"history_graphs_{repo}_{visit}.yaml" with open(get_datafile(filename)) as file: for expected in yaml.full_load(file): entry = OriginEntry(expected["origin"], hash_to_bytes(expected["snapshot"])) provenance.origin_add(entry) for graph_as_dict in expected["graphs"]: expected_graph = history_graph_from_dict(graph_as_dict) print("Expected graph:", expected_graph) computed_graph = build_history_graph( archive, provenance, RevisionEntry(hash_to_bytes(graph_as_dict["rev"])), ) print("Computed graph:", computed_graph) assert computed_graph == expected_graph origin_add_revision(provenance, entry, computed_graph) if not batch: provenance.flush() diff --git a/swh/provenance/tests/test_isochrone_graph.py b/swh/provenance/tests/test_isochrone_graph.py index 7822460..e0adc7b 100644 --- a/swh/provenance/tests/test_isochrone_graph.py +++ b/swh/provenance/tests/test_isochrone_graph.py @@ -1,101 +1,112 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import deepcopy from datetime import datetime, timezone +from typing import Any, Dict import pytest import yaml from swh.model.hashutil import hash_to_bytes +from swh.provenance.archive import ArchiveInterface from swh.provenance.graph import IsochroneNode, build_isochrone_graph from swh.provenance.model import DirectoryEntry, RevisionEntry +from swh.provenance.provenance import ProvenanceInterface from swh.provenance.revision import revision_add from swh.provenance.tests.conftest import fill_storage, get_datafile, load_repo_data from swh.provenance.tests.test_provenance_db import ts2dt +from swh.storage.postgresql.storage import Storage -def isochrone_graph_from_dict(d, depth=0) -> IsochroneNode: +def isochrone_graph_from_dict(d: Dict[str, Any], depth: int = 0) -> IsochroneNode: """Takes a dictionary representing a tree of IsochroneNode objects, and recursively builds the corresponding graph.""" d = deepcopy(d) d["entry"]["id"] = hash_to_bytes(d["entry"]["id"]) d["entry"]["name"] = bytes(d["entry"]["name"], encoding="utf-8") dbdate = d.get("dbdate", None) if dbdate is not None: dbdate = datetime.fromtimestamp(d["dbdate"], timezone.utc) children = d.get("children", []) node = IsochroneNode( entry=DirectoryEntry(**d["entry"]), dbdate=dbdate, depth=depth, ) node.maxdate = datetime.fromtimestamp(d["maxdate"], timezone.utc) node.known = d.get("known", False) node.invalid = d.get("invalid", False) node.path = bytes(d["path"], encoding="utf-8") node.children = set( isochrone_graph_from_dict(child, depth=depth + 1) for child in children ) return node @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) @pytest.mark.parametrize("batch", (True, False)) def test_isochrone_graph( - provenance, swh_storage, archive, repo, lower, mindepth, batch -): + provenance: ProvenanceInterface, + swh_storage: Storage, + archive: ArchiveInterface, + repo: str, + lower: bool, + mindepth: int, + batch: bool, +) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) revisions = {rev["id"]: rev for rev in data["revision"]} filename = f"graphs_{repo}_{'lower' if lower else 'upper'}_{mindepth}.yaml" with open(get_datafile(filename)) as file: for expected in yaml.full_load(file): print("# Processing revision", expected["rev"]) revision = revisions[hash_to_bytes(expected["rev"])] entry = RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) expected_graph = isochrone_graph_from_dict(expected["graph"]) print("Expected graph:", expected_graph) # Create graph for current revision and check it has the expected structure. + assert entry.root is not None computed_graph = build_isochrone_graph( archive, provenance, entry, DirectoryEntry(entry.root), ) print("Computed graph:", computed_graph) assert computed_graph == expected_graph # Add current revision so that provenance info is kept up to date for the # following ones. revision_add( provenance, archive, [entry], lower=lower, mindepth=mindepth, commit=not batch, ) diff --git a/swh/provenance/tests/test_origin_iterator.py b/swh/provenance/tests/test_origin_iterator.py index 1c2eaa1..9c590dd 100644 --- a/swh/provenance/tests/test_origin_iterator.py +++ b/swh/provenance/tests/test_origin_iterator.py @@ -1,35 +1,38 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from swh.model.model import OriginVisitStatus from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.provenance.origin import CSVOriginIterator from swh.storage.algos.origin import ( iter_origin_visit_statuses, iter_origin_visits, iter_origins, ) +from swh.storage.postgresql.storage import Storage -def test_origin_iterator(swh_storage_with_objects): +def test_origin_iterator(swh_storage_with_objects: Storage) -> None: """Test CSVOriginIterator""" origins_csv = [] for origin in iter_origins(swh_storage_with_objects): for visit in iter_origin_visits(swh_storage_with_objects, origin.url): - for status in iter_origin_visit_statuses( - swh_storage_with_objects, origin.url, visit.visit - ): - if status.snapshot is not None: - origins_csv.append((status.origin, status.snapshot)) + if visit.visit is not None: + for status in iter_origin_visit_statuses( + swh_storage_with_objects, origin.url, visit.visit + ): + if status.snapshot is not None: + origins_csv.append((status.origin, status.snapshot)) origins = list(CSVOriginIterator(origins_csv)) assert origins assert len(origins) == len( list( { status.origin for status in TEST_OBJECTS["origin_visit_status"] - if status.snapshot is not None + if isinstance(status, OriginVisitStatus) and status.snapshot is not None } ) ) diff --git a/swh/provenance/tests/test_provenance_db.py b/swh/provenance/tests/test_provenance_db.py index 3120cc2..ab55763 100644 --- a/swh/provenance/tests/test_provenance_db.py +++ b/swh/provenance/tests/test_provenance_db.py @@ -1,40 +1,51 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import datetime +from datetime import datetime, timedelta, timezone +from typing import Type +from swh.model.model import OriginVisitStatus from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.provenance.model import OriginEntry from swh.provenance.origin import origin_add +from swh.provenance.postgresql.provenancedb_base import ProvenanceDBBase from swh.provenance.postgresql.provenancedb_with_path import ProvenanceWithPathDB from swh.provenance.postgresql.provenancedb_without_path import ProvenanceWithoutPathDB +from swh.provenance.provenance import ProvenanceInterface, ProvenanceStorageInterface from swh.provenance.storage.archive import ArchiveStorage +from swh.storage.postgresql.storage import Storage -def ts2dt(ts: dict) -> datetime.datetime: - timestamp = datetime.datetime.fromtimestamp( - ts["timestamp"]["seconds"], - datetime.timezone(datetime.timedelta(minutes=ts["offset"])), +# TODO: remove this function in favour of TimestampWithTimezone.to_datetime +# from swh.model.model +def ts2dt(ts: dict) -> datetime: + timestamp = datetime.fromtimestamp( + ts["timestamp"]["seconds"], timezone(timedelta(minutes=ts["offset"])) ) return timestamp.replace(microsecond=ts["timestamp"]["microseconds"]) -def test_provenance_origin_add(provenance, swh_storage_with_objects): +def test_provenance_origin_add( + provenance: ProvenanceInterface, swh_storage_with_objects: Storage +) -> None: """Test the origin_add function""" archive = ArchiveStorage(swh_storage_with_objects) for status in TEST_OBJECTS["origin_visit_status"]: + assert isinstance(status, OriginVisitStatus) if status.snapshot is not None: entry = OriginEntry(url=status.origin, snapshot=status.snapshot) origin_add(provenance, archive, [entry]) # TODO: check some facts here -def test_provenance_flavor(provenance): +def test_provenance_flavor(provenance: ProvenanceInterface) -> None: + assert isinstance(provenance.storage, ProvenanceDBBase) assert provenance.storage.flavor in ("with-path", "without-path") + backend_class: Type[ProvenanceStorageInterface] if provenance.storage.flavor == "with-path": backend_class = ProvenanceWithPathDB else: backend_class = ProvenanceWithoutPathDB assert isinstance(provenance.storage, backend_class) diff --git a/swh/provenance/tests/test_provenance_heuristics.py b/swh/provenance/tests/test_provenance_heuristics.py index f6134fb..f047b0c 100644 --- a/swh/provenance/tests/test_provenance_heuristics.py +++ b/swh/provenance/tests/test_provenance_heuristics.py @@ -1,350 +1,382 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Dict, List, Tuple +from datetime import datetime +from typing import Any, Dict, List, Optional, Set, Tuple +import psycopg2 import pytest from swh.model.hashutil import hash_to_bytes +from swh.model.model import Sha1Git +from swh.provenance.archive import ArchiveInterface from swh.provenance.model import RevisionEntry +from swh.provenance.postgresql.provenancedb_base import ProvenanceDBBase +from swh.provenance.provenance import ProvenanceInterface from swh.provenance.revision import revision_add from swh.provenance.tests.conftest import ( fill_storage, get_datafile, load_repo_data, synthetic_result, ) from swh.provenance.tests.test_provenance_db import ts2dt +from swh.storage.postgresql.storage import Storage -def sha1s(cur, table): +def sha1s(cur: psycopg2.extensions.cursor, table: str) -> Set[Sha1Git]: """return the 'sha1' column from the DB 'table' (as hex) 'cur' is a cursor to the provenance index DB. """ cur.execute(f"SELECT sha1 FROM {table}") return set(row["sha1"].hex() for row in cur.fetchall()) -def locations(cur): +def locations(cur: psycopg2.extensions.cursor) -> Set[bytes]: """return the 'path' column from the DB location table 'cur' is a cursor to the provenance index DB. """ cur.execute("SELECT encode(location.path::bytea, 'escape') AS path FROM location") return set(row["path"] for row in cur.fetchall()) -def relations(cur, src, dst): +def relations( + cur: psycopg2.extensions.cursor, src: str, dst: str +) -> Set[Tuple[Sha1Git, Sha1Git, bytes]]: """return the triplets ('sha1', 'sha1', 'path') from the DB for the relation between 'src' table and 'dst' table (i.e. for C-R, C-D and D-R relations). 'cur' is a cursor to the provenance index DB. """ relation = f"{src}_in_{dst}" cur.execute("SELECT swh_get_dbflavor() AS flavor") with_path = cur.fetchone()["flavor"] == "with-path" # note that the columns have the same name as the relations they refer to, # so we can write things like "rel.{dst}=src.id" in the query below if with_path: cur.execute( f""" SELECT encode(src.sha1::bytea, 'hex') AS src, encode(dst.sha1::bytea, 'hex') AS dst, encode(location.path::bytea, 'escape') AS path FROM {relation} as relation INNER JOIN {src} AS src ON (relation.{src} = src.id) INNER JOIN {dst} AS dst ON (relation.{dst} = dst.id) INNER JOIN location ON (relation.location = location.id) """ ) else: cur.execute( f""" SELECT encode(src.sha1::bytea, 'hex') AS src, encode(dst.sha1::bytea, 'hex') AS dst, '' AS path FROM {relation} as relation INNER JOIN {src} AS src ON (src.id = relation.{src}) INNER JOIN {dst} AS dst ON (dst.id = relation.{dst}) """ ) return set((row["src"], row["dst"], row["path"]) for row in cur.fetchall()) -def get_timestamp(cur, table, sha1): +def get_timestamp( + cur: psycopg2.extensions.cursor, table: str, sha1: Sha1Git +) -> List[datetime]: """return the date for the 'sha1' from the DB 'table' (as hex) 'cur' is a cursor to the provenance index DB. """ - if isinstance(sha1, str): - sha1 = hash_to_bytes(sha1) cur.execute(f"SELECT date FROM {table} WHERE sha1=%s", (sha1,)) return [row["date"].timestamp() for row in cur.fetchall()] @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) -def test_provenance_heuristics(provenance, swh_storage, archive, repo, lower, mindepth): +def test_provenance_heuristics( + provenance: ProvenanceInterface, + swh_storage: Storage, + archive: ArchiveInterface, + repo: str, + lower: bool, + mindepth: int, +) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) revisions = {rev["id"]: rev for rev in data["revision"]} - rows = { + rows: Dict[str, Set[Any]] = { "content": set(), "content_in_directory": set(), "content_in_revision": set(), "directory": set(), "directory_in_revision": set(), "location": set(), "revision": set(), } + assert isinstance(provenance.storage, ProvenanceDBBase) cursor = provenance.storage.cursor def maybe_path(path: str) -> str: + assert isinstance(provenance.storage, ProvenanceDBBase) if provenance.storage.with_path: return path return "" for synth_rev in synthetic_result(syntheticfile): revision = revisions[synth_rev["sha1"]] entry = RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) revision_add(provenance, archive, [entry], lower=lower, mindepth=mindepth) # each "entry" in the synth file is one new revision rows["revision"].add(synth_rev["sha1"].hex()) assert rows["revision"] == sha1s(cursor, "revision"), synth_rev["msg"] # check the timestamp of the revision rev_ts = synth_rev["date"] - assert get_timestamp(cursor, "revision", synth_rev["sha1"].hex()) == [ + assert get_timestamp(cursor, "revision", synth_rev["sha1"]) == [ rev_ts ], synth_rev["msg"] # this revision might have added new content objects rows["content"] |= set(x["dst"].hex() for x in synth_rev["R_C"]) rows["content"] |= set(x["dst"].hex() for x in synth_rev["D_C"]) assert rows["content"] == sha1s(cursor, "content"), synth_rev["msg"] # check for R-C (direct) entries # these are added directly in the content_early_in_rev table rows["content_in_revision"] |= set( (x["dst"].hex(), x["src"].hex(), maybe_path(x["path"])) for x in synth_rev["R_C"] ) assert rows["content_in_revision"] == relations( cursor, "content", "revision" ), synth_rev["msg"] # check timestamps for rc in synth_rev["R_C"]: assert get_timestamp(cursor, "content", rc["dst"]) == [ rev_ts + rc["rel_ts"] ], synth_rev["msg"] # check directories # each directory stored in the provenance index is an entry # in the "directory" table... rows["directory"] |= set(x["dst"].hex() for x in synth_rev["R_D"]) assert rows["directory"] == sha1s(cursor, "directory"), synth_rev["msg"] # ... + a number of rows in the "directory_in_rev" table... # check for R-D entries rows["directory_in_revision"] |= set( (x["dst"].hex(), x["src"].hex(), maybe_path(x["path"])) for x in synth_rev["R_D"] ) assert rows["directory_in_revision"] == relations( cursor, "directory", "revision" ), synth_rev["msg"] # check timestamps for rd in synth_rev["R_D"]: assert get_timestamp(cursor, "directory", rd["dst"]) == [ rev_ts + rd["rel_ts"] ], synth_rev["msg"] # ... + a number of rows in the "content_in_dir" table # for content of the directory. # check for D-C entries rows["content_in_directory"] |= set( (x["dst"].hex(), x["src"].hex(), maybe_path(x["path"])) for x in synth_rev["D_C"] ) assert rows["content_in_directory"] == relations( cursor, "content", "directory" ), synth_rev["msg"] # check timestamps for dc in synth_rev["D_C"]: assert get_timestamp(cursor, "content", dc["dst"]) == [ rev_ts + dc["rel_ts"] ], synth_rev["msg"] if provenance.storage.with_path: # check for location entries rows["location"] |= set(x["path"] for x in synth_rev["R_C"]) rows["location"] |= set(x["path"] for x in synth_rev["D_C"]) rows["location"] |= set(x["path"] for x in synth_rev["R_D"]) assert rows["location"] == locations(cursor), synth_rev["msg"] @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) def test_provenance_heuristics_content_find_all( - provenance, swh_storage, archive, repo, lower, mindepth -): + provenance: ProvenanceInterface, + swh_storage: Storage, + archive: ArchiveInterface, + repo: str, + lower: bool, + mindepth: int, +) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] def maybe_path(path: str) -> str: + assert isinstance(provenance.storage, ProvenanceDBBase) if provenance.storage.with_path: return path return "" # XXX adding all revisions at once should be working just fine, but it does not... # revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) # ...so add revisions one at a time for now for revision in revisions: revision_add(provenance, archive, [revision], lower=lower, mindepth=mindepth) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) - expected_occurrences = {} + expected_occurrences: Dict[str, List[Tuple[str, float, Optional[str], str]]] = {} for synth_rev in synthetic_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: expected_occurrences.setdefault(rc["dst"].hex(), []).append( (rev_id, rev_ts, None, maybe_path(rc["path"])) ) for dc in synth_rev["D_C"]: assert dc["prefix"] is not None # to please mypy expected_occurrences.setdefault(dc["dst"].hex(), []).append( (rev_id, rev_ts, None, maybe_path(dc["prefix"] + "/" + dc["path"])) ) + assert isinstance(provenance.storage, ProvenanceDBBase) for content_id, results in expected_occurrences.items(): expected = [(content_id, *result) for result in results] db_occurrences = [ ( occur.content.hex(), occur.revision.hex(), occur.date.timestamp(), occur.origin, occur.path.decode(), ) for occur in provenance.content_find_all(hash_to_bytes(content_id)) ] if provenance.storage.with_path: # this is not true if the db stores no path, because a same content # that appears several times in a given revision may be reported # only once by content_find_all() assert len(db_occurrences) == len(expected) assert set(db_occurrences) == set(expected) @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) def test_provenance_heuristics_content_find_first( - provenance, swh_storage, archive, repo, lower, mindepth -): + provenance: ProvenanceInterface, + swh_storage: Storage, + archive: ArchiveInterface, + repo: str, + lower: bool, + mindepth: int, +) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] # XXX adding all revisions at once should be working just fine, but it does not... # revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) # ...so add revisions one at a time for now for revision in revisions: revision_add(provenance, archive, [revision], lower=lower, mindepth=mindepth) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) - expected_first: Dict[str, Tuple[str, str, List[str]]] = {} + expected_first: Dict[str, Tuple[str, float, List[str]]] = {} # dict of tuples (blob_id, rev_id, [path, ...]) the third element for path # is a list because a content can be added at several places in a single # revision, in which case the result of content_find_first() is one of # those path, but we have no guarantee which one it will return. for synth_rev in synthetic_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: sha1 = rc["dst"].hex() if sha1 not in expected_first: assert rc["rel_ts"] == 0 expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) else: if rev_ts == expected_first[sha1][1]: expected_first[sha1][2].append(rc["path"]) elif rev_ts < expected_first[sha1][1]: - expected_first[sha1] = (rev_id, rev_ts, rc["path"]) + expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) for dc in synth_rev["D_C"]: sha1 = rc["dst"].hex() assert sha1 in expected_first # nothing to do there, this content cannot be a "first seen file" + assert isinstance(provenance.storage, ProvenanceDBBase) for content_id, (rev_id, ts, paths) in expected_first.items(): occur = provenance.content_find_first(hash_to_bytes(content_id)) + assert occur is not None assert occur.content.hex() == content_id assert occur.revision.hex() == rev_id assert occur.date.timestamp() == ts assert occur.origin is None if provenance.storage.with_path: assert occur.path.decode() in paths diff --git a/swh/provenance/tests/test_revision_iterator.py b/swh/provenance/tests/test_revision_iterator.py index 72409dd..a07fb63 100644 --- a/swh/provenance/tests/test_revision_iterator.py +++ b/swh/provenance/tests/test_revision_iterator.py @@ -1,29 +1,30 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.provenance.revision import CSVRevisionIterator from swh.provenance.tests.conftest import fill_storage, load_repo_data from swh.provenance.tests.test_provenance_db import ts2dt +from swh.storage.postgresql.storage import Storage @pytest.mark.parametrize( "repo", ( "cmdbts2", "out-of-order", ), ) -def test_archive_direct_revision_iterator(swh_storage, repo): +def test_archive_direct_revision_iterator(swh_storage: Storage, repo: str) -> None: """Test CSVRevisionIterator""" data = load_repo_data(repo) fill_storage(swh_storage, data) revisions_csv = [ (rev["id"], ts2dt(rev["date"]), rev["directory"]) for rev in data["revision"] ] revisions = list(CSVRevisionIterator(revisions_csv)) assert revisions assert len(revisions) == len(data["revision"])