diff --git a/swh/provenance/archive.py b/swh/provenance/archive.py index 7209c75..d391411 100644 --- a/swh/provenance/archive.py +++ b/swh/provenance/archive.py @@ -1,51 +1,54 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Iterable from typing_extensions import Protocol, runtime_checkable from swh.model.model import Sha1Git +from swh.storage.interface import StorageInterface @runtime_checkable class ArchiveInterface(Protocol): + storage: StorageInterface + def directory_ls(self, id: Sha1Git) -> Iterable[Dict[str, Any]]: """List entries for one directory. Args: id: sha1 id of the directory to list entries from. Yields: dictionary of entries in such directory containing only the keys "name", "target" and "type". """ ... def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: """List parents of one revision. Args: revisions: sha1 id of the revision to list parents from. Yields: sha1 ids for the parents of such revision. """ ... def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: """List all revisions targeted by one snapshot. Args: id: sha1 id of the snapshot. Yields: sha1 ids of revisions that are a target of such snapshot. Revisions are guaranteed to be retrieved in chronological order """ ... diff --git a/swh/provenance/postgresql/archive.py b/swh/provenance/postgresql/archive.py index 69f5012..e67ea20 100644 --- a/swh/provenance/postgresql/archive.py +++ b/swh/provenance/postgresql/archive.py @@ -1,115 +1,117 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Iterable, List from methodtools import lru_cache import psycopg2.extensions from swh.model.model import Sha1Git -from swh.storage.postgresql.storage import Storage +from swh.storage import get_storage class ArchivePostgreSQL: def __init__(self, conn: psycopg2.extensions.connection) -> None: + self.storage = get_storage( + "postgresql", db=conn.dsn, objstorage={"cls": "memory"} + ) self.conn = conn - self.storage = Storage(conn, objstorage={"cls": "memory"}) def directory_ls(self, id: Sha1Git) -> Iterable[Dict[str, Any]]: entries = self._directory_ls(id) yield from entries @lru_cache(maxsize=100000) def _directory_ls(self, id: Sha1Git) -> List[Dict[str, Any]]: # TODO: add file size filtering with self.conn.cursor() as cursor: cursor.execute( """ WITH dir AS (SELECT id AS dir_id, dir_entries, file_entries, rev_entries FROM directory WHERE id=%s), ls_d AS (SELECT dir_id, UNNEST(dir_entries) AS entry_id FROM dir), ls_f AS (SELECT dir_id, UNNEST(file_entries) AS entry_id FROM dir), ls_r AS (SELECT dir_id, UNNEST(rev_entries) AS entry_id FROM dir) (SELECT 'dir'::directory_entry_type AS type, e.target, e.name, NULL::sha1_git FROM ls_d LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) UNION (WITH known_contents AS (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id INNER JOIN content c ON e.target=c.sha1_git) SELECT * FROM known_contents UNION (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id LEFT JOIN skipped_content c ON e.target=c.sha1_git WHERE NOT EXISTS ( SELECT 1 FROM known_contents WHERE known_contents.sha1_git=e.target ) ) ) """, (id,), ) return [ {"type": row[0], "target": row[1], "name": row[2]} for row in cursor.fetchall() ] def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: with self.conn.cursor() as cursor: cursor.execute( """ SELECT RH.parent_id::bytea FROM revision_history AS RH WHERE RH.id=%s ORDER BY RH.parent_rank """, (id,), ) # There should be at most one row anyway yield from (row[0] for row in cursor.fetchall()) def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: with self.conn.cursor() as cursor: cursor.execute( """ WITH snaps AS (SELECT object_id FROM snapshot WHERE snapshot.id=%s), heads AS ((SELECT R.id, R.date FROM snaps JOIN snapshot_branches AS BS ON (snaps.object_id=BS.snapshot_id) JOIN snapshot_branch AS B ON (BS.branch_id=B.object_id) JOIN revision AS R ON (B.target=R.id) WHERE B.target_type='revision'::snapshot_target) UNION (SELECT RV.id, RV.date FROM snaps JOIN snapshot_branches AS BS ON (snaps.object_id=BS.snapshot_id) JOIN snapshot_branch AS B ON (BS.branch_id=B.object_id) JOIN release AS RL ON (B.target=RL.id) JOIN revision AS RV ON (RL.target=RV.id) WHERE B.target_type='release'::snapshot_target AND RL.target_type='revision'::object_type) ORDER BY date, id) SELECT id FROM heads """, (id,), ) yield from (row[0] for row in cursor.fetchall()) diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py index db16d0b..9e7749a 100644 --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -1,153 +1,120 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from datetime import datetime, timedelta, timezone from os import path from typing import Any, Dict, Iterable, Iterator from _pytest.fixtures import SubRequest import msgpack import psycopg2.extensions import pytest from swh.journal.serializers import msgpack_ext_hook -from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.provenance import get_provenance, get_provenance_storage from swh.provenance.api.client import RemoteProvenanceStorage import swh.provenance.api.server as server from swh.provenance.archive import ArchiveInterface from swh.provenance.interface import ProvenanceInterface, ProvenanceStorageInterface -from swh.provenance.postgresql.archive import ArchivePostgreSQL from swh.provenance.storage.archive import ArchiveStorage -from swh.storage.postgresql.storage import Storage +from swh.storage.interface import StorageInterface from swh.storage.replay import process_replay_objects @pytest.fixture( params=[ "with-path", "without-path", "with-path-denormalized", "without-path-denormalized", ] ) def populated_db( request: SubRequest, postgresql: psycopg2.extensions.connection, ) -> Dict[str, str]: """return a working and initialized provenance db""" from swh.core.cli.db import populate_database_for_package populate_database_for_package( "swh.provenance", postgresql.dsn, flavor=request.param ) return postgresql.get_dsn_parameters() # the Flask app used as server in these tests @pytest.fixture def app(populated_db: Dict[str, str]) -> Iterator[server.ProvenanceStorageServerApp]: assert hasattr(server, "storage") server.storage = get_provenance_storage(cls="local", db=populated_db) yield server.app # the RPCClient class used as client used in these tests @pytest.fixture def swh_rpc_client_class() -> type: return RemoteProvenanceStorage @pytest.fixture(params=["local", "remote"]) def provenance( request: SubRequest, populated_db: Dict[str, str], swh_rpc_client: RemoteProvenanceStorage, ) -> ProvenanceInterface: """Return a working and initialized ProvenanceInterface object""" if request.param == "remote": from swh.provenance.provenance import Provenance assert isinstance(swh_rpc_client, ProvenanceStorageInterface) return Provenance(swh_rpc_client) else: # in test sessions, we DO want to raise any exception occurring at commit time prov = get_provenance(cls=request.param, db=populated_db, raise_on_commit=True) return prov @pytest.fixture -def swh_storage_with_objects(swh_storage: Storage) -> Storage: - """return a Storage object (postgresql-based by default) with a few of each - object type in it - - The inserted content comes from swh.model.tests.swh_model_data. - """ - for obj_type in ( - "content", - "skipped_content", - "directory", - "revision", - "release", - "snapshot", - "origin", - "origin_visit", - "origin_visit_status", - ): - getattr(swh_storage, f"{obj_type}_add")(TEST_OBJECTS[obj_type]) - return swh_storage - - -@pytest.fixture -def archive_direct(swh_storage_with_objects: Storage) -> ArchiveInterface: - return ArchivePostgreSQL(swh_storage_with_objects.get_db().conn) - - -@pytest.fixture -def archive_api(swh_storage_with_objects: Storage) -> ArchiveInterface: - return ArchiveStorage(swh_storage_with_objects) - - -@pytest.fixture(params=["archive", "db"]) -def archive(request, swh_storage_with_objects: Storage) -> Iterator[ArchiveInterface]: - """Return a ArchivePostgreSQL based StorageInterface object""" - # this is a workaround to prevent tests from hanging because of an unclosed - # transaction. - # TODO: refactor the ArchivePostgreSQL to properly deal with - # transactions and get rid of this fixture - if request.param == "db": - archive = ArchivePostgreSQL(conn=swh_storage_with_objects.get_db().conn) - yield archive - archive.conn.rollback() - else: - yield ArchiveStorage(swh_storage_with_objects) +def archive(swh_storage: StorageInterface) -> ArchiveInterface: + """Return an ArchiveStorage-based ArchiveInterface object""" + return ArchiveStorage(swh_storage) def get_datafile(fname: str) -> str: return path.join(path.dirname(__file__), "data", fname) def load_repo_data(repo: str) -> Dict[str, Any]: data: Dict[str, Any] = {} with open(get_datafile(f"{repo}.msgpack"), "rb") as fobj: unpacker = msgpack.Unpacker( fobj, raw=False, ext_hook=msgpack_ext_hook, strict_map_key=False, timestamp=3, # convert Timestamp in datetime objects (tz UTC) ) for objtype, objd in unpacker: data.setdefault(objtype, []).append(objd) return data def filter_dict(d: Dict[Any, Any], keys: Iterable[Any]) -> Dict[Any, Any]: return {k: v for (k, v) in d.items() if k in keys} -def fill_storage(storage: Storage, data: Dict[str, Any]) -> None: +def fill_storage(storage: StorageInterface, data: Dict[str, Any]) -> None: process_replay_objects(data, storage=storage) + + +# TODO: remove this function in favour of TimestampWithTimezone.to_datetime +# from swh.model.model +def ts2dt(ts: Dict[str, Any]) -> datetime: + timestamp = datetime.fromtimestamp( + ts["timestamp"]["seconds"], timezone(timedelta(minutes=ts["offset"])) + ) + return timestamp.replace(microsecond=ts["timestamp"]["microseconds"]) diff --git a/swh/provenance/tests/test_archive_interface.py b/swh/provenance/tests/test_archive_interface.py index 71d1c32..04727d0 100644 --- a/swh/provenance/tests/test_archive_interface.py +++ b/swh/provenance/tests/test_archive_interface.py @@ -1,51 +1,53 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter from operator import itemgetter import pytest from swh.core.db import BaseDb from swh.provenance.postgresql.archive import ArchivePostgreSQL from swh.provenance.storage.archive import ArchiveStorage from swh.provenance.tests.conftest import fill_storage, load_repo_data +from swh.storage.interface import StorageInterface from swh.storage.postgresql.storage import Storage @pytest.mark.parametrize( "repo", ("cmdbts2", "out-of-order", "with-merges"), ) -def test_archive_interface(repo: str, swh_storage: Storage) -> None: +def test_archive_interface(repo: str, swh_storage: StorageInterface) -> None: archive_api = ArchiveStorage(swh_storage) + assert isinstance(swh_storage, Storage) dsn = swh_storage.get_db().conn.dsn with BaseDb.connect(dsn).conn as conn: BaseDb.adapt_conn(conn) archive_direct = ArchivePostgreSQL(conn) # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) for directory in data["directory"]: entries_api = sorted( archive_api.directory_ls(directory["id"]), key=itemgetter("name") ) entries_direct = sorted( archive_direct.directory_ls(directory["id"]), key=itemgetter("name") ) assert entries_api == entries_direct for revision in data["revision"]: parents_api = Counter(archive_api.revision_get_parents(revision["id"])) parents_direct = Counter( archive_direct.revision_get_parents(revision["id"]) ) assert parents_api == parents_direct for snapshot in data["snapshot"]: heads_api = Counter(archive_api.snapshot_get_heads(snapshot["id"])) heads_direct = Counter(archive_direct.snapshot_get_heads(snapshot["id"])) assert heads_api == heads_direct diff --git a/swh/provenance/tests/test_conftest.py b/swh/provenance/tests/test_conftest.py index 2fd66ec..d1fe3de 100644 --- a/swh/provenance/tests/test_conftest.py +++ b/swh/provenance/tests/test_conftest.py @@ -1,22 +1,26 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.provenance.interface import ProvenanceInterface -from swh.storage.postgresql.storage import Storage +from swh.provenance.tests.conftest import fill_storage, load_repo_data +from swh.storage.interface import StorageInterface def test_provenance_fixture(provenance: ProvenanceInterface) -> None: """Check the 'provenance' fixture produce a working ProvenanceDB object""" assert provenance provenance.flush() # should be a noop -def test_storage(swh_storage_with_objects: Storage) -> None: - """Check the 'swh_storage_with_objects' fixture produce a working Storage +def test_fill_storage(swh_storage: StorageInterface) -> None: + """Check the 'fill_storage' test utility produces a working Storage object with at least some Content, Revision and Directory in it""" - assert swh_storage_with_objects - assert swh_storage_with_objects.content_get_random() - assert swh_storage_with_objects.directory_get_random() - assert swh_storage_with_objects.revision_get_random() + data = load_repo_data("cmdbts2") + fill_storage(swh_storage, data) + + assert swh_storage + assert swh_storage.content_get_random() + assert swh_storage.directory_get_random() + assert swh_storage.revision_get_random() diff --git a/swh/provenance/tests/test_history_graph.py b/swh/provenance/tests/test_history_graph.py index adbbd5a..0aadb9f 100644 --- a/swh/provenance/tests/test_history_graph.py +++ b/swh/provenance/tests/test_history_graph.py @@ -1,74 +1,72 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict import pytest import yaml from swh.model.hashutil import hash_to_bytes from swh.provenance.archive import ArchiveInterface from swh.provenance.graph import HistoryNode, build_history_graph from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import OriginEntry, RevisionEntry from swh.provenance.origin import origin_add_revision from swh.provenance.tests.conftest import fill_storage, get_datafile, load_repo_data -from swh.storage.postgresql.storage import Storage def history_graph_from_dict(d: Dict[str, Any]) -> HistoryNode: """Takes a dictionary representing a tree of HistoryNode objects, and recursively builds the corresponding graph.""" node = HistoryNode( entry=RevisionEntry(hash_to_bytes(d["rev"])), visited=d.get("visited", False), in_history=d.get("in_history", False), ) node.parents = set( history_graph_from_dict(parent) for parent in d.get("parents", []) ) return node @pytest.mark.parametrize( "repo, visit", (("with-merges", "visits-01"),), ) @pytest.mark.parametrize("batch", (True, False)) def test_history_graph( provenance: ProvenanceInterface, - swh_storage: Storage, archive: ArchiveInterface, repo: str, visit: str, batch: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) - fill_storage(swh_storage, data) + fill_storage(archive.storage, data) filename = f"history_graphs_{repo}_{visit}.yaml" with open(get_datafile(filename)) as file: for expected in yaml.full_load(file): entry = OriginEntry(expected["origin"], hash_to_bytes(expected["snapshot"])) provenance.origin_add(entry) for graph_as_dict in expected["graphs"]: expected_graph = history_graph_from_dict(graph_as_dict) print("Expected graph:", expected_graph) computed_graph = build_history_graph( archive, provenance, RevisionEntry(hash_to_bytes(graph_as_dict["rev"])), ) print("Computed graph:", computed_graph) assert computed_graph == expected_graph origin_add_revision(provenance, entry, computed_graph) if not batch: provenance.flush() diff --git a/swh/provenance/tests/test_isochrone_graph.py b/swh/provenance/tests/test_isochrone_graph.py index 9b65cd5..79374d1 100644 --- a/swh/provenance/tests/test_isochrone_graph.py +++ b/swh/provenance/tests/test_isochrone_graph.py @@ -1,112 +1,114 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import deepcopy from datetime import datetime, timezone from typing import Any, Dict import pytest import yaml from swh.model.hashutil import hash_to_bytes from swh.provenance.archive import ArchiveInterface from swh.provenance.graph import IsochroneNode, build_isochrone_graph from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import DirectoryEntry, RevisionEntry from swh.provenance.revision import revision_add -from swh.provenance.tests.conftest import fill_storage, get_datafile, load_repo_data -from swh.provenance.tests.test_provenance_db import ts2dt -from swh.storage.postgresql.storage import Storage +from swh.provenance.tests.conftest import ( + fill_storage, + get_datafile, + load_repo_data, + ts2dt, +) def isochrone_graph_from_dict(d: Dict[str, Any], depth: int = 0) -> IsochroneNode: """Takes a dictionary representing a tree of IsochroneNode objects, and recursively builds the corresponding graph.""" d = deepcopy(d) d["entry"]["id"] = hash_to_bytes(d["entry"]["id"]) d["entry"]["name"] = bytes(d["entry"]["name"], encoding="utf-8") dbdate = d.get("dbdate", None) if dbdate is not None: dbdate = datetime.fromtimestamp(d["dbdate"], timezone.utc) children = d.get("children", []) node = IsochroneNode( entry=DirectoryEntry(**d["entry"]), dbdate=dbdate, depth=depth, ) node.maxdate = datetime.fromtimestamp(d["maxdate"], timezone.utc) node.known = d.get("known", False) node.invalid = d.get("invalid", False) node.path = bytes(d["path"], encoding="utf-8") node.children = set( isochrone_graph_from_dict(child, depth=depth + 1) for child in children ) return node @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) @pytest.mark.parametrize("batch", (True, False)) def test_isochrone_graph( provenance: ProvenanceInterface, - swh_storage: Storage, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, batch: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) - fill_storage(swh_storage, data) + fill_storage(archive.storage, data) revisions = {rev["id"]: rev for rev in data["revision"]} filename = f"graphs_{repo}_{'lower' if lower else 'upper'}_{mindepth}.yaml" with open(get_datafile(filename)) as file: for expected in yaml.full_load(file): print("# Processing revision", expected["rev"]) revision = revisions[hash_to_bytes(expected["rev"])] entry = RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) expected_graph = isochrone_graph_from_dict(expected["graph"]) print("Expected graph:", expected_graph) # Create graph for current revision and check it has the expected structure. assert entry.root is not None computed_graph = build_isochrone_graph( archive, provenance, entry, DirectoryEntry(entry.root), ) print("Computed graph:", computed_graph) assert computed_graph == expected_graph # Add current revision so that provenance info is kept up to date for the # following ones. revision_add( provenance, archive, [entry], lower=lower, mindepth=mindepth, commit=not batch, ) diff --git a/swh/provenance/tests/test_origin_iterator.py b/swh/provenance/tests/test_origin_iterator.py index 9c590dd..2c4e3ac 100644 --- a/swh/provenance/tests/test_origin_iterator.py +++ b/swh/provenance/tests/test_origin_iterator.py @@ -1,38 +1,42 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.model.model import OriginVisitStatus -from swh.model.tests.swh_model_data import TEST_OBJECTS +import pytest + from swh.provenance.origin import CSVOriginIterator +from swh.provenance.tests.conftest import fill_storage, load_repo_data from swh.storage.algos.origin import ( iter_origin_visit_statuses, iter_origin_visits, iter_origins, ) -from swh.storage.postgresql.storage import Storage +from swh.storage.interface import StorageInterface -def test_origin_iterator(swh_storage_with_objects: Storage) -> None: +@pytest.mark.parametrize( + "repo", + ( + "cmdbts2", + "out-of-order", + ), +) +def test_origin_iterator(swh_storage: StorageInterface, repo: str) -> None: """Test CSVOriginIterator""" + data = load_repo_data(repo) + fill_storage(swh_storage, data) + origins_csv = [] - for origin in iter_origins(swh_storage_with_objects): - for visit in iter_origin_visits(swh_storage_with_objects, origin.url): + for origin in iter_origins(swh_storage): + for visit in iter_origin_visits(swh_storage, origin.url): if visit.visit is not None: for status in iter_origin_visit_statuses( - swh_storage_with_objects, origin.url, visit.visit + swh_storage, origin.url, visit.visit ): if status.snapshot is not None: origins_csv.append((status.origin, status.snapshot)) origins = list(CSVOriginIterator(origins_csv)) + assert origins - assert len(origins) == len( - list( - { - status.origin - for status in TEST_OBJECTS["origin_visit_status"] - if isinstance(status, OriginVisitStatus) and status.snapshot is not None - } - ) - ) + assert len(origins) == len(data["origin"]) diff --git a/swh/provenance/tests/test_origin_revision_layer.py b/swh/provenance/tests/test_origin_revision_layer.py index 9bb66d8..78c7634 100644 --- a/swh/provenance/tests/test_origin_revision_layer.py +++ b/swh/provenance/tests/test_origin_revision_layer.py @@ -1,192 +1,190 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re from typing import Any, Dict, Iterable, Iterator, List, Set import pytest from typing_extensions import TypedDict from swh.model.hashutil import hash_to_bytes from swh.model.model import Sha1Git from swh.provenance.archive import ArchiveInterface from swh.provenance.interface import EntityType, ProvenanceInterface, RelationType from swh.provenance.model import OriginEntry from swh.provenance.origin import origin_add from swh.provenance.tests.conftest import fill_storage, get_datafile, load_repo_data -from swh.storage.postgresql.storage import Storage class SynthRelation(TypedDict): src: Sha1Git dst: Sha1Git name: str class SynthOrigin(TypedDict): sha1: Sha1Git url: str snap: Sha1Git O_R: List[SynthRelation] R_R: List[SynthRelation] def synthetic_origin_revision_result(filename: str) -> Iterator[SynthOrigin]: """Generates dict representations of synthetic origin visits found in the synthetic file (from the data/ directory) given as argument of the generator. Generated SynthOrigin (typed dict) with the following elements: "sha1": (Sha1Git) sha1 of the origin, "url": (str) url of the origin, "snap": (Sha1Git) sha1 of the visit's snapshot, "O_R": (list) new O-R relations added by this origin visit "R_R": (list) new R-R relations added by this origin visit Each relation above is a SynthRelation typed dict with: "src": (Sha1Git) sha1 of the source of the relation "dst": (Sha1Git) sha1 of the destination of the relation """ with open(get_datafile(filename), "r") as fobj: yield from _parse_synthetic_origin_revision_file(fobj) def _parse_synthetic_origin_revision_file(fobj: Iterable[str]) -> Iterator[SynthOrigin]: """Read a 'synthetic' file and generate a dict representation of the synthetic origin visit for each snapshot listed in the synthetic file. """ regs = [ "(?P[^ ]+)?", "(?P[^| ]*)", "(?PR[0-9]{2,4})?", "(?P[ORS]) (?P[0-9a-f]{40})", ] regex = re.compile("^ *" + r" *[|] *".join(regs) + r" *(#.*)?$") current_org: List[dict] = [] for m in (regex.match(line) for line in fobj): if m: d = m.groupdict() if d["url"]: if current_org: yield _mk_synth_org(current_org) current_org.clear() current_org.append(d) if current_org: yield _mk_synth_org(current_org) def _mk_synth_org(synth_org: List[Dict[str, str]]) -> SynthOrigin: assert synth_org[0]["type"] == "O" assert synth_org[1]["type"] == "S" org = SynthOrigin( sha1=hash_to_bytes(synth_org[0]["sha1"]), url=synth_org[0]["url"], snap=hash_to_bytes(synth_org[1]["sha1"]), O_R=[], R_R=[], ) for row in synth_org[2:]: if row["reltype"] == "O-R": assert row["type"] == "R" org["O_R"].append( SynthRelation( src=org["sha1"], dst=hash_to_bytes(row["sha1"]), name=row["revname"], ) ) elif row["reltype"] == "R-R": assert row["type"] == "R" org["R_R"].append( SynthRelation( src=org["O_R"][-1]["dst"], dst=hash_to_bytes(row["sha1"]), name=row["revname"], ) ) return org @pytest.mark.parametrize( "repo, visit", (("with-merges", "visits-01"),), ) def test_origin_revision_layer( provenance: ProvenanceInterface, - swh_storage: Storage, archive: ArchiveInterface, repo: str, visit: str, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) - fill_storage(swh_storage, data) + fill_storage(archive.storage, data) syntheticfile = get_datafile(f"origin-revision_{repo}_{visit}.txt") origins = [ {"url": status["origin"], "snap": status["snapshot"]} for status in data["origin_visit_status"] if status["snapshot"] is not None ] rows: Dict[str, Set[Any]] = { "origin": set(), "revision_in_origin": set(), "revision_before_revision": set(), "revision": set(), } for synth_org in synthetic_origin_revision_result(syntheticfile): for origin in ( org for org in origins if org["url"] == synth_org["url"] and org["snap"] == synth_org["snap"] ): entry = OriginEntry(url=origin["url"], snapshot=origin["snap"]) origin_add(provenance, archive, [entry]) # each "entry" in the synth file is one new origin visit rows["origin"].add(synth_org["sha1"]) assert rows["origin"] == provenance.storage.entity_get_all( EntityType.ORIGIN ), synth_org["url"] # check the url of the origin assert ( provenance.storage.origin_get([synth_org["sha1"]])[synth_org["sha1"]] == synth_org["url"] ), synth_org["snap"] # this origin visit might have added new revision objects rows["revision"] |= set(x["dst"] for x in synth_org["O_R"]) rows["revision"] |= set(x["dst"] for x in synth_org["R_R"]) assert rows["revision"] == provenance.storage.entity_get_all( EntityType.REVISION ), synth_org["snap"] # check for O-R (head) entries # these are added in the revision_in_origin relation rows["revision_in_origin"] |= set( (x["dst"], x["src"], None) for x in synth_org["O_R"] ) assert rows["revision_in_origin"] == { (rel.src, rel.dst, rel.path) for rel in provenance.storage.relation_get_all(RelationType.REV_IN_ORG) }, synth_org["snap"] # check for R-R entries # these are added in the revision_before_revision relation rows["revision_before_revision"] |= set( (x["dst"], x["src"], None) for x in synth_org["R_R"] ) assert rows["revision_before_revision"] == { (rel.src, rel.dst, rel.path) for rel in provenance.storage.relation_get_all( RelationType.REV_BEFORE_REV ) }, synth_org["snap"] diff --git a/swh/provenance/tests/test_provenance_db.py b/swh/provenance/tests/test_provenance_db.py index fd1c72a..2bb9cf4 100644 --- a/swh/provenance/tests/test_provenance_db.py +++ b/swh/provenance/tests/test_provenance_db.py @@ -1,47 +1,17 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from datetime import datetime, timedelta, timezone - -from swh.model.model import OriginVisitStatus -from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.provenance.interface import ProvenanceInterface -from swh.provenance.model import OriginEntry -from swh.provenance.origin import origin_add from swh.provenance.postgresql.provenancedb import ProvenanceDB -from swh.provenance.storage.archive import ArchiveStorage -from swh.storage.postgresql.storage import Storage - - -# TODO: remove this function in favour of TimestampWithTimezone.to_datetime -# from swh.model.model -def ts2dt(ts: dict) -> datetime: - timestamp = datetime.fromtimestamp( - ts["timestamp"]["seconds"], timezone(timedelta(minutes=ts["offset"])) - ) - return timestamp.replace(microsecond=ts["timestamp"]["microseconds"]) - - -def test_provenance_origin_add( - provenance: ProvenanceInterface, swh_storage_with_objects: Storage -) -> None: - """Test the origin_add function""" - archive = ArchiveStorage(swh_storage_with_objects) - for status in TEST_OBJECTS["origin_visit_status"]: - assert isinstance(status, OriginVisitStatus) - if status.snapshot is not None: - entry = OriginEntry(url=status.origin, snapshot=status.snapshot) - origin_add(provenance, archive, [entry]) - # TODO: check some facts here def test_provenance_flavor(provenance: ProvenanceInterface) -> None: if isinstance(provenance.storage, ProvenanceDB): assert provenance.storage.flavor in ( "with-path", "without-path", "with-path-denormalized", "without-path-denormalized", ) diff --git a/swh/provenance/tests/test_revision_content_layer.py b/swh/provenance/tests/test_revision_content_layer.py index 77e36d3..2aa17d4 100644 --- a/swh/provenance/tests/test_revision_content_layer.py +++ b/swh/provenance/tests/test_revision_content_layer.py @@ -1,447 +1,447 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple import pytest from typing_extensions import TypedDict from swh.model.hashutil import hash_to_bytes from swh.model.model import Sha1Git from swh.provenance.archive import ArchiveInterface from swh.provenance.interface import EntityType, ProvenanceInterface, RelationType from swh.provenance.model import RevisionEntry from swh.provenance.revision import revision_add -from swh.provenance.tests.conftest import fill_storage, get_datafile, load_repo_data -from swh.provenance.tests.test_provenance_db import ts2dt -from swh.storage.postgresql.storage import Storage +from swh.provenance.tests.conftest import ( + fill_storage, + get_datafile, + load_repo_data, + ts2dt, +) class SynthRelation(TypedDict): prefix: Optional[str] path: str src: Sha1Git dst: Sha1Git rel_ts: float class SynthRevision(TypedDict): sha1: Sha1Git date: float msg: str R_C: List[SynthRelation] R_D: List[SynthRelation] D_C: List[SynthRelation] def synthetic_revision_content_result(filename: str) -> Iterator[SynthRevision]: """Generates dict representations of synthetic revisions found in the synthetic file (from the data/ directory) given as argument of the generator. Generated SynthRevision (typed dict) with the following elements: "sha1": (Sha1Git) sha1 of the revision, "date": (float) timestamp of the revision, "msg": (str) commit message of the revision, "R_C": (list) new R---C relations added by this revision "R_D": (list) new R-D relations added by this revision "D_C": (list) new D-C relations added by this revision Each relation above is a SynthRelation typed dict with: "path": (str) location "src": (Sha1Git) sha1 of the source of the relation "dst": (Sha1Git) sha1 of the destination of the relation "rel_ts": (float) timestamp of the target of the relation (related to the timestamp of the revision) """ with open(get_datafile(filename), "r") as fobj: yield from _parse_synthetic_revision_content_file(fobj) def _parse_synthetic_revision_content_file( fobj: Iterable[str], ) -> Iterator[SynthRevision]: """Read a 'synthetic' file and generate a dict representation of the synthetic revision for each revision listed in the synthetic file. """ regs = [ "(?PR[0-9]{2,4})?", "(?P[^| ]*)", "([+] )?(?P[^| +]*?)[/]?", "(?P[RDC]) (?P[0-9a-f]{40})", "(?P-?[0-9]+(.[0-9]+)?)", ] regex = re.compile("^ *" + r" *[|] *".join(regs) + r" *(#.*)?$") current_rev: List[dict] = [] for m in (regex.match(line) for line in fobj): if m: d = m.groupdict() if d["revname"]: if current_rev: yield _mk_synth_rev(current_rev) current_rev.clear() current_rev.append(d) if current_rev: yield _mk_synth_rev(current_rev) def _mk_synth_rev(synth_rev: List[Dict[str, str]]) -> SynthRevision: assert synth_rev[0]["type"] == "R" rev = SynthRevision( sha1=hash_to_bytes(synth_rev[0]["sha1"]), date=float(synth_rev[0]["ts"]), msg=synth_rev[0]["revname"], R_C=[], R_D=[], D_C=[], ) current_path = None # path of the last R-D relation we parsed, used a prefix for next D-C # relations for row in synth_rev[1:]: if row["reltype"] == "R---C": assert row["type"] == "C" rev["R_C"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = None elif row["reltype"] == "R-D": assert row["type"] == "D" rev["R_D"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = row["path"] elif row["reltype"] == "D-C": assert row["type"] == "C" rev["D_C"].append( SynthRelation( prefix=current_path, path=row["path"], src=rev["R_D"][-1]["dst"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) return rev @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) def test_revision_content_result( provenance: ProvenanceInterface, - swh_storage: Storage, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) - fill_storage(swh_storage, data) + fill_storage(archive.storage, data) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) revisions = {rev["id"]: rev for rev in data["revision"]} rows: Dict[str, Set[Any]] = { "content": set(), "content_in_directory": set(), "content_in_revision": set(), "directory": set(), "directory_in_revision": set(), "location": set(), "revision": set(), } def maybe_path(path: str) -> Optional[bytes]: if provenance.storage.with_path(): return path.encode("utf-8") return None for synth_rev in synthetic_revision_content_result(syntheticfile): revision = revisions[synth_rev["sha1"]] entry = RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) revision_add(provenance, archive, [entry], lower=lower, mindepth=mindepth) # each "entry" in the synth file is one new revision rows["revision"].add(synth_rev["sha1"]) assert rows["revision"] == provenance.storage.entity_get_all( EntityType.REVISION ), synth_rev["msg"] # check the timestamp of the revision rev_ts = synth_rev["date"] rev_data = provenance.storage.revision_get([synth_rev["sha1"]])[ synth_rev["sha1"] ] assert ( rev_data.date is not None and rev_ts == rev_data.date.timestamp() ), synth_rev["msg"] # this revision might have added new content objects rows["content"] |= set(x["dst"] for x in synth_rev["R_C"]) rows["content"] |= set(x["dst"] for x in synth_rev["D_C"]) assert rows["content"] == provenance.storage.entity_get_all( EntityType.CONTENT ), synth_rev["msg"] # check for R-C (direct) entries # these are added directly in the content_early_in_rev table rows["content_in_revision"] |= set( (x["dst"], x["src"], maybe_path(x["path"])) for x in synth_rev["R_C"] ) assert rows["content_in_revision"] == { (rel.src, rel.dst, rel.path) for rel in provenance.storage.relation_get_all( RelationType.CNT_EARLY_IN_REV ) }, synth_rev["msg"] # check timestamps for rc in synth_rev["R_C"]: assert ( rev_ts + rc["rel_ts"] == provenance.storage.content_get([rc["dst"]])[rc["dst"]].timestamp() ), synth_rev["msg"] # check directories # each directory stored in the provenance index is an entry # in the "directory" table... rows["directory"] |= set(x["dst"] for x in synth_rev["R_D"]) assert rows["directory"] == provenance.storage.entity_get_all( EntityType.DIRECTORY ), synth_rev["msg"] # ... + a number of rows in the "directory_in_rev" table... # check for R-D entries rows["directory_in_revision"] |= set( (x["dst"], x["src"], maybe_path(x["path"])) for x in synth_rev["R_D"] ) assert rows["directory_in_revision"] == { (rel.src, rel.dst, rel.path) for rel in provenance.storage.relation_get_all(RelationType.DIR_IN_REV) }, synth_rev["msg"] # check timestamps for rd in synth_rev["R_D"]: assert ( rev_ts + rd["rel_ts"] == provenance.storage.directory_get([rd["dst"]])[rd["dst"]].timestamp() ), synth_rev["msg"] # ... + a number of rows in the "content_in_dir" table # for content of the directory. # check for D-C entries rows["content_in_directory"] |= set( (x["dst"], x["src"], maybe_path(x["path"])) for x in synth_rev["D_C"] ) assert rows["content_in_directory"] == { (rel.src, rel.dst, rel.path) for rel in provenance.storage.relation_get_all(RelationType.CNT_IN_DIR) }, synth_rev["msg"] # check timestamps for dc in synth_rev["D_C"]: assert ( rev_ts + dc["rel_ts"] == provenance.storage.content_get([dc["dst"]])[dc["dst"]].timestamp() ), synth_rev["msg"] if provenance.storage.with_path(): # check for location entries rows["location"] |= set(x["path"] for x in synth_rev["R_C"]) rows["location"] |= set(x["path"] for x in synth_rev["D_C"]) rows["location"] |= set(x["path"] for x in synth_rev["R_D"]) assert rows["location"] == provenance.storage.location_get(), synth_rev[ "msg" ] @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) @pytest.mark.parametrize("batch", (True, False)) def test_provenance_heuristics_content_find_all( provenance: ProvenanceInterface, - swh_storage: Storage, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, batch: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) - fill_storage(swh_storage, data) + fill_storage(archive.storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] def maybe_path(path: str) -> str: if provenance.storage.with_path(): return path return "" if batch: revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) else: for revision in revisions: revision_add( provenance, archive, [revision], lower=lower, mindepth=mindepth ) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_occurrences: Dict[str, List[Tuple[str, float, Optional[str], str]]] = {} for synth_rev in synthetic_revision_content_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: expected_occurrences.setdefault(rc["dst"].hex(), []).append( (rev_id, rev_ts, None, maybe_path(rc["path"])) ) for dc in synth_rev["D_C"]: assert dc["prefix"] is not None # to please mypy expected_occurrences.setdefault(dc["dst"].hex(), []).append( (rev_id, rev_ts, None, maybe_path(dc["prefix"] + "/" + dc["path"])) ) for content_id, results in expected_occurrences.items(): expected = [(content_id, *result) for result in results] db_occurrences = [ ( occur.content.hex(), occur.revision.hex(), occur.date.timestamp(), occur.origin, occur.path.decode(), ) for occur in provenance.content_find_all(hash_to_bytes(content_id)) ] if provenance.storage.with_path(): # this is not true if the db stores no path, because a same content # that appears several times in a given revision may be reported # only once by content_find_all() assert len(db_occurrences) == len(expected) assert set(db_occurrences) == set(expected) @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) @pytest.mark.parametrize("batch", (True, False)) def test_provenance_heuristics_content_find_first( provenance: ProvenanceInterface, - swh_storage: Storage, archive: ArchiveInterface, repo: str, lower: bool, mindepth: int, batch: bool, ) -> None: # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) - fill_storage(swh_storage, data) + fill_storage(archive.storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] if batch: revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) else: for revision in revisions: revision_add( provenance, archive, [revision], lower=lower, mindepth=mindepth ) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_first: Dict[str, Tuple[str, float, List[str]]] = {} # dict of tuples (blob_id, rev_id, [path, ...]) the third element for path # is a list because a content can be added at several places in a single # revision, in which case the result of content_find_first() is one of # those path, but we have no guarantee which one it will return. for synth_rev in synthetic_revision_content_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: sha1 = rc["dst"].hex() if sha1 not in expected_first: assert rc["rel_ts"] == 0 expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) else: if rev_ts == expected_first[sha1][1]: expected_first[sha1][2].append(rc["path"]) elif rev_ts < expected_first[sha1][1]: expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) for dc in synth_rev["D_C"]: sha1 = rc["dst"].hex() assert sha1 in expected_first # nothing to do there, this content cannot be a "first seen file" for content_id, (rev_id, ts, paths) in expected_first.items(): occur = provenance.content_find_first(hash_to_bytes(content_id)) assert occur is not None assert occur.content.hex() == content_id assert occur.revision.hex() == rev_id assert occur.date.timestamp() == ts assert occur.origin is None if provenance.storage.with_path(): assert occur.path.decode() in paths diff --git a/swh/provenance/tests/test_revision_iterator.py b/swh/provenance/tests/test_revision_iterator.py index a07fb63..1255546 100644 --- a/swh/provenance/tests/test_revision_iterator.py +++ b/swh/provenance/tests/test_revision_iterator.py @@ -1,30 +1,31 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.provenance.revision import CSVRevisionIterator -from swh.provenance.tests.conftest import fill_storage, load_repo_data -from swh.provenance.tests.test_provenance_db import ts2dt -from swh.storage.postgresql.storage import Storage +from swh.provenance.tests.conftest import fill_storage, load_repo_data, ts2dt +from swh.storage.interface import StorageInterface @pytest.mark.parametrize( "repo", ( "cmdbts2", "out-of-order", ), ) -def test_archive_direct_revision_iterator(swh_storage: Storage, repo: str) -> None: +def test_revision_iterator(swh_storage: StorageInterface, repo: str) -> None: """Test CSVRevisionIterator""" data = load_repo_data(repo) fill_storage(swh_storage, data) + revisions_csv = [ (rev["id"], ts2dt(rev["date"]), rev["directory"]) for rev in data["revision"] ] revisions = list(CSVRevisionIterator(revisions_csv)) + assert revisions assert len(revisions) == len(data["revision"])