diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py index da52aa0..697c7cf 100644 --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -1,32 +1,47 @@ from typing import TYPE_CHECKING +import warnings from .postgresql.db_utils import connect if TYPE_CHECKING: from swh.provenance.archive import ArchiveInterface from swh.provenance.provenance import ProvenanceInterface def get_archive(cls: str, **kwargs) -> "ArchiveInterface": if cls == "api": from swh.provenance.storage.archive import ArchiveStorage from swh.storage import get_storage return ArchiveStorage(get_storage(**kwargs["storage"])) elif cls == "direct": from swh.provenance.postgresql.archive import ArchivePostgreSQL return ArchivePostgreSQL(connect(kwargs["db"])) else: raise NotImplementedError def get_provenance(cls: str, **kwargs) -> "ProvenanceInterface": if cls == "local": conn = connect(kwargs["db"]) - with_path = kwargs.get("with_path", True) + if "with_path" in kwargs: + warnings.warn( + "Usage of the 'with-path' config option is deprecated. " + "The db flavor is now used instead.", + DeprecationWarning, + ) + + with_path = kwargs.get("with_path") from swh.provenance.provenance import ProvenanceBackend - return ProvenanceBackend(conn, with_path=with_path) + prov = ProvenanceBackend(conn) + if with_path is not None: + flavor = "with-path" if with_path else "without-path" + if prov.storage.flavor != flavor: + raise ValueError( + "The given flavor does not match the flavor stored in the backend." + ) + return prov else: raise NotImplementedError diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py index b1b0af4..5040e86 100644 --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,165 +1,178 @@ from datetime import datetime import itertools import logging from typing import Any, Dict, Generator, List, Optional, Set, Tuple import psycopg2 import psycopg2.extras class ProvenanceDBBase: def __init__(self, conn: psycopg2.extensions.connection): conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor() # XXX: not sure this is the best place to do it! self.cursor.execute("SET timezone TO 'UTC'") + self._flavor: Optional[str] = None + + @property + def flavor(self) -> str: + if self._flavor is None: + self.cursor.execute("select swh_get_dbflavor()") + self._flavor = self.cursor.fetchone()[0] + assert self._flavor is not None + return self._flavor + + @property + def with_path(self) -> bool: + return self.flavor == "with-path" def commit(self, data: Dict[str, Any], raise_on_commit: bool = False) -> bool: try: # First insert entities for entity in ("content", "directory", "revision"): self.insert_entity( entity, { sha1: data[entity]["data"][sha1] for sha1 in data[entity]["added"] }, ) # Relations should come after ids for entities were resolved for rel_table in ( "content_in_revision", "content_in_directory", "directory_in_revision", ): self.insert_relation(rel_table, data[rel_table]) # TODO: this should be updated when origin-revision layer gets properly # updated. # if data["revision_before_revision"]: # psycopg2.extras.execute_values( # self.cursor, # """ # LOCK TABLE ONLY revision_before_revision; # INSERT INTO revision_before_revision VALUES %s # ON CONFLICT DO NOTHING # """, # data["revision_before_revision"], # ) # data["revision_before_revision"].clear() # # if data["revision_in_origin"]: # psycopg2.extras.execute_values( # self.cursor, # """ # LOCK TABLE ONLY revision_in_origin; # INSERT INTO revision_in_origin VALUES %s # ON CONFLICT DO NOTHING # """, # data["revision_in_origin"], # ) # data["revision_in_origin"].clear() return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if raise_on_commit: raise return False def get_dates(self, entity: str, ids: List[bytes]) -> Dict[bytes, datetime]: dates = {} if ids: values = ", ".join(itertools.repeat("%s", len(ids))) self.cursor.execute( f"""SELECT sha1, date FROM {entity} WHERE sha1 IN ({values})""", tuple(ids), ) dates.update(self.cursor.fetchall()) return dates def insert_entity(self, entity: str, data: Dict[bytes, datetime]): if data: psycopg2.extras.execute_values( self.cursor, f""" LOCK TABLE ONLY {entity}; INSERT INTO {entity}(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) """, data.items(), ) # XXX: not sure if Python takes a reference or a copy. # This might be useless! data.clear() def insert_relation(self, relation: str, data: Set[Tuple[bytes, bytes, bytes]]): ... def content_find_first( self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: ... def content_find_all( self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: ... def origin_get_id(self, url: str) -> int: # Insert origin in the DB and return the assigned id self.cursor.execute( """ LOCK TABLE ONLY origin; INSERT INTO origin(url) VALUES (%s) ON CONFLICT DO NOTHING RETURNING id """, (url,), ) return self.cursor.fetchone()[0] def revision_get_preferred_origin(self, revision: bytes) -> int: self.cursor.execute( """SELECT COALESCE(origin, 0) FROM revision WHERE sha1=%s""", (revision,) ) row = self.cursor.fetchone() # None means revision is not in database; # 0 means revision has no preferred origin return row[0] if row is not None and row[0] != 0 else None def revision_in_history(self, revision: bytes) -> bool: self.cursor.execute( """ SELECT 1 FROM revision_before_revision JOIN revision ON revision.id=revision_before_revision.prev WHERE revision.sha1=%s """, (revision,), ) return self.cursor.fetchone() is not None def revision_set_preferred_origin(self, origin: int, revision: bytes): self.cursor.execute( """UPDATE revision SET origin=%s WHERE sha1=%s""", (origin, revision) ) def revision_visited(self, revision: bytes) -> bool: self.cursor.execute( """ SELECT 1 FROM revision_in_origin JOIN revision ON revision.id=revision_in_origin.revision WHERE revision.sha1=%s """, (revision,), ) return self.cursor.fetchone() is not None diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index 9adf67d..16ab6ec 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,282 +1,282 @@ from datetime import datetime import logging import os from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple import psycopg2 from typing_extensions import Literal, Protocol, TypedDict, runtime_checkable from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry # XXX: this protocol doesn't make much sense now that flavours have been delegated to # another class, lower in the callstack. @runtime_checkable class ProvenanceInterface(Protocol): raise_on_commit: bool = False def commit(self): """Commit currently ongoing transactions in the backend DB""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: ... def content_find_first( self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: ... def content_find_all( self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: ... def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[bytes, datetime]: ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: ... def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[bytes, datetime]: ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: ... def origin_get_id(self, origin: OriginEntry) -> int: ... def revision_add(self, revision: RevisionEntry) -> None: ... def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ) -> None: ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: ... def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: ... def revision_in_history(self, revision: RevisionEntry) -> bool: ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: ... def revision_visited(self, revision: RevisionEntry) -> bool: ... class Cache(TypedDict): data: Dict[bytes, datetime] added: Set[bytes] class ProvenanceCache(TypedDict): content: Cache directory: Cache revision: Cache # below are insertion caches only content_in_revision: Set[Tuple[bytes, bytes, bytes]] content_in_directory: Set[Tuple[bytes, bytes, bytes]] directory_in_revision: Set[Tuple[bytes, bytes, bytes]] # these two are for the origin layer revision_before_revision: List[Tuple[bytes, bytes]] revision_in_origin: List[Tuple[bytes, int]] def new_cache(): return ProvenanceCache( content=Cache(data={}, added=set()), directory=Cache(data={}, added=set()), revision=Cache(data={}, added=set()), content_in_revision=set(), content_in_directory=set(), directory_in_revision=set(), revision_before_revision=[], revision_in_origin=[], ) # TODO: maybe move this to a separate file class ProvenanceBackend: raise_on_commit: bool = False - def __init__(self, conn: psycopg2.extensions.connection, with_path: bool = True): + def __init__(self, conn: psycopg2.extensions.connection): from .postgresql.provenancedb_base import ProvenanceDBBase # TODO: this class should not know what the actual used DB is. self.storage: ProvenanceDBBase - if with_path: + flavor = ProvenanceDBBase(conn).flavor + if flavor == "with-path": from .postgresql.provenancedb_with_path import ProvenanceWithPathDB self.storage = ProvenanceWithPathDB(conn) else: from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB self.storage = ProvenanceWithoutPathDB(conn) - self.cache: ProvenanceCache = new_cache() def clear_caches(self): self.cache = new_cache() def commit(self): # TODO: for now we just forward the write_cache. This should be improved! while not self.storage.commit(self.cache, raise_on_commit=self.raise_on_commit): logging.warning( f"Unable to commit cached information {self.write_cache}. Retrying..." ) self.clear_caches() def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ): self.cache["content_in_directory"].add( (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ): self.cache["content_in_revision"].add( (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) ) def content_find_first( self, blob: bytes ) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: return self.storage.content_find_first(blob) def content_find_all( self, blob: bytes, limit: Optional[int] = None ) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: yield from self.storage.content_find_all(blob, limit=limit) def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: return self.get_dates("content", [blob.id]).get(blob.id, None) def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[bytes, datetime]: return self.get_dates("content", [blob.id for blob in blobs]) def content_set_early_date(self, blob: FileEntry, date: datetime): self.cache["content"]["data"][blob.id] = date self.cache["content"]["added"].add(blob.id) def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ): self.cache["directory_in_revision"].add( (directory.id, revision.id, normalize(path)) ) def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: return self.get_dates("directory", [directory.id]).get(directory.id, None) def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[bytes, datetime]: return self.get_dates("directory", [directory.id for directory in dirs]) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ): self.cache["directory"]["data"][directory.id] = date self.cache["directory"]["added"].add(directory.id) def get_dates( self, entity: Literal["content", "revision", "directory"], ids: List[bytes] ) -> Dict[bytes, datetime]: cache = self.cache[entity] missing_ids = set(id for id in ids if id not in cache) if missing_ids: cache["data"].update(self.storage.get_dates(entity, list(missing_ids))) return {sha1: cache["data"][sha1] for sha1 in ids if sha1 in cache["data"]} def origin_get_id(self, origin: OriginEntry) -> int: if origin.id is None: return self.storage.origin_get_id(origin.url) else: return origin.id def revision_add(self, revision: RevisionEntry): # Add current revision to the compact DB assert revision.date is not None self.cache["revision"]["data"][revision.id] = revision.date self.cache["revision"]["added"].add(revision.id) def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ): self.cache["revision_before_revision"].append((revision.id, relative.id)) def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): assert origin.id is not None self.cache["revision_in_origin"].append((revision.id, origin.id)) def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: return self.get_dates("revision", [revision.id]).get(revision.id, None) def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: # TODO: adapt this method to consider cached values return self.storage.revision_get_preferred_origin(revision.id) def revision_in_history(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values return self.storage.revision_in_history(revision.id) def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ): assert origin.id is not None # TODO: adapt this method to consider cached values self.storage.revision_set_preferred_origin(origin.id, revision.id) def revision_visited(self, revision: RevisionEntry) -> bool: # TODO: adapt this method to consider cached values return self.storage.revision_visited(revision.id) def normalize(path: bytes) -> bytes: return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py index 62c49cd..521dd84 100644 --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -1,252 +1,243 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import glob from os import path import re from typing import Iterable, Iterator, List, Optional import pytest from typing_extensions import TypedDict from swh.core.api.serializers import msgpack_loads from swh.core.db import BaseDb -from swh.core.db.pytest_plugin import postgresql_fact -from swh.core.utils import numfile_sortkey as sortkey from swh.model.model import Content, Directory, DirectoryEntry, Revision from swh.model.tests.swh_model_data import TEST_OBJECTS -import swh.provenance from swh.provenance.postgresql.archive import ArchivePostgreSQL from swh.provenance.storage.archive import ArchiveStorage -SQL_DIR = path.join(path.dirname(swh.provenance.__file__), "sql") -SQL_FILES = [ - sqlfile - for sqlfile in sorted(glob.glob(path.join(SQL_DIR, "*.sql")), key=sortkey) - if "-without-path-" not in sqlfile -] -provenance_db = postgresql_fact( - "postgresql_proc", dbname="provenance", dump_files=SQL_FILES -) +@pytest.fixture(params=["with-path", "without-path"]) +def provenance(request, postgresql): + """return a working and initialized provenance db""" + from swh.core.cli.db import populate_database_for_package + flavor = request.param + populate_database_for_package("swh.provenance", postgresql.dsn, flavor=flavor) -@pytest.fixture -def provenance(provenance_db): - """return a working and initialized provenance db""" from swh.provenance.provenance import ProvenanceBackend - BaseDb.adapt_conn(provenance_db) - prov = ProvenanceBackend(provenance_db) + BaseDb.adapt_conn(postgresql) + prov = ProvenanceBackend(postgresql) + assert prov.storage.flavor == flavor # in test sessions, we DO want to raise any exception occurring at commit time prov.raise_on_commit = True return prov @pytest.fixture def swh_storage_with_objects(swh_storage): """return a Storage object (postgresql-based by default) with a few of each object type in it The inserted content comes from swh.model.tests.swh_model_data. """ for obj_type in ( "content", "skipped_content", "directory", "revision", "release", "snapshot", "origin", "origin_visit", "origin_visit_status", ): getattr(swh_storage, f"{obj_type}_add")(TEST_OBJECTS[obj_type]) return swh_storage @pytest.fixture def archive_direct(swh_storage_with_objects): return ArchivePostgreSQL(swh_storage_with_objects.get_db().conn) @pytest.fixture def archive_api(swh_storage_with_objects): return ArchiveStorage(swh_storage_with_objects) @pytest.fixture(params=["archive", "db"]) def archive(request, swh_storage_with_objects): """Return a ArchivePostgreSQL based StorageInterface object""" # this is a workaround to prevent tests from hanging because of an unclosed # transaction. # TODO: refactor the ArchivePostgreSQL to properly deal with # transactions and get rif of this fixture if request.param == "db": archive = ArchivePostgreSQL(conn=swh_storage_with_objects.get_db().conn) yield archive archive.conn.rollback() else: yield ArchiveStorage(swh_storage_with_objects) def get_datafile(fname): return path.join(path.dirname(__file__), "data", fname) def load_repo_data(repo): data = {"revision": [], "directory": [], "content": []} with open(get_datafile(f"{repo}.msgpack"), "rb") as fobj: for etype, value in msgpack_loads(fobj.read()): data[etype].append(value) return data def filter_dict(d, keys): return {k: v for (k, v) in d.items() if k in keys} def fill_storage(storage, data): storage.content_add_metadata( Content.from_dict(content) for content in data["content"] ) storage.directory_add( [ Directory( entries=tuple( [ DirectoryEntry.from_dict( filter_dict(entry, ("name", "type", "target", "perms")) ) for entry in dir["entries"] ] ) ) for dir in data["directory"] ] ) storage.revision_add(Revision.from_dict(revision) for revision in data["revision"]) class SynthRelation(TypedDict): prefix: Optional[str] path: str src: bytes dst: bytes rel_ts: float class SynthRevision(TypedDict): sha1: bytes date: float msg: str R_C: List[SynthRelation] R_D: List[SynthRelation] D_C: List[SynthRelation] def synthetic_result(filename: str) -> Iterator[SynthRevision]: """Generates dict representations of synthetic revisions found in the synthetic file (from the data/ directory) given as argument of the generator. Generated SynthRevision (typed dict) with the following elements: "sha1": (bytes) sha1 of the revision, "date": (float) timestamp of the revision, "msg": (str) commit message of the revision, "R_C": (list) new R---C relations added by this revision "R_D": (list) new R-D relations added by this revision "D_C": (list) new D-C relations added by this revision Each relation above is a SynthRelation typed dict with: "path": (str) location "src": (bytes) sha1 of the source of the relation "dst": (bytes) sha1 of the destination of the relation "rel_ts": (float) timestamp of the target of the relation (related to the timestamp of the revision) """ with open(get_datafile(filename), "r") as fobj: yield from _parse_synthetic_file(fobj) def _parse_synthetic_file(fobj: Iterable[str]) -> Iterator[SynthRevision]: """Read a 'synthetic' file and generate a dict representation of the synthetic revision for each revision listed in the synthetic file. """ regs = [ "(?PR[0-9]{2,4})?", "(?P[^| ]*)", "([+] )?(?P[^| +]*?)[/]?", "(?P[RDC]) (?P[0-9a-z]{40})", "(?P-?[0-9]+(.[0-9]+)?)", ] regex = re.compile("^ *" + r" *[|] *".join(regs) + r" *(#.*)?$") current_rev: List[dict] = [] for m in (regex.match(line) for line in fobj): if m: d = m.groupdict() if d["revname"]: if current_rev: yield _mk_synth_rev(current_rev) current_rev.clear() current_rev.append(d) if current_rev: yield _mk_synth_rev(current_rev) def _mk_synth_rev(synth_rev) -> SynthRevision: assert synth_rev[0]["type"] == "R" rev = SynthRevision( sha1=bytes.fromhex(synth_rev[0]["sha1"]), date=float(synth_rev[0]["ts"]), msg=synth_rev[0]["revname"], R_C=[], R_D=[], D_C=[], ) current_path = None # path of the last R-D relation we parsed, used a prefix for next D-C # relations for row in synth_rev[1:]: if row["reltype"] == "R---C": assert row["type"] == "C" rev["R_C"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=bytes.fromhex(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = None elif row["reltype"] == "R-D": assert row["type"] == "D" rev["R_D"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=bytes.fromhex(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = row["path"] elif row["reltype"] == "D-C": assert row["type"] == "C" rev["D_C"].append( SynthRelation( prefix=current_path, path=row["path"], src=rev["R_D"][-1]["dst"], dst=bytes.fromhex(row["sha1"]), rel_ts=float(row["ts"]), ) ) return rev diff --git a/swh/provenance/tests/test_cli.py b/swh/provenance/tests/test_cli.py index 0a94547..744fbed 100644 --- a/swh/provenance/tests/test_cli.py +++ b/swh/provenance/tests/test_cli.py @@ -1,97 +1,97 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from click.testing import CliRunner import psycopg2 import pytest from swh.core.cli import swh as swhmain import swh.core.cli.db # noqa ; ensure cli is loaded import swh.provenance.cli # noqa ; ensure cli is loaded def test_cli_swh_db_help(): # swhmain.add_command(provenance_cli) result = CliRunner().invoke(swhmain, ["provenance", "-h"]) assert result.exit_code == 0 assert "Commands:" in result.output commands = result.output.split("Commands:")[1] for command in ( "find-all", "find-first", "iter-origins", "iter-revisions", ): assert f" {command} " in commands TABLES = { "dbflavor", "dbversion", "content", "content_in_revision", "content_in_directory", "directory", "directory_in_revision", "location", "origin", "revision", "revision_before_revision", "revision_in_origin", } @pytest.mark.parametrize( "flavor, dbtables", (("with-path", TABLES | {"location"}), ("without-path", TABLES)) ) def test_cli_db_create_and_init_db_with_flavor( monkeypatch, postgresql, flavor, dbtables ): """Test that 'swh db init provenance' works with flavors for both with-path and without-path flavors""" dbname = f"{flavor}-db" # DB creation using 'swh db create' db_params = postgresql.get_dsn_parameters() monkeypatch.setenv("PGHOST", db_params["host"]) monkeypatch.setenv("PGUSER", db_params["user"]) monkeypatch.setenv("PGPORT", db_params["port"]) result = CliRunner().invoke(swhmain, ["db", "create", "-d", dbname, "provenance"]) assert result.exit_code == 0, result.output # DB init using 'swh db init' result = CliRunner().invoke( swhmain, ["db", "init", "-d", dbname, "--flavor", flavor, "provenance"] ) assert result.exit_code == 0, result.output assert f"(flavor {flavor})" in result.output db_params["dbname"] = dbname cnx = psycopg2.connect(**db_params) # check the DB looks OK (check for db_flavor and expected tables) with cnx.cursor() as cur: cur.execute("select swh_get_dbflavor()") assert cur.fetchone() == (flavor,) cur.execute( "select table_name from information_schema.tables " "where table_schema = 'public' " f"and table_catalog = '{dbname}'" ) tables = set(x for (x,) in cur.fetchall()) assert tables == dbtables -def test_cli_init_db_default_flavor(provenance_db): +def test_cli_init_db_default_flavor(postgresql): "Test that 'swh db init provenance' defaults to a with-path flavored DB" - dbname = provenance_db.dsn + dbname = postgresql.dsn result = CliRunner().invoke(swhmain, ["db", "init", "-d", dbname, "provenance"]) assert result.exit_code == 0, result.output - with provenance_db.cursor() as cur: + with postgresql.cursor() as cur: cur.execute("select swh_get_dbflavor()") assert cur.fetchone() == ("with-path",) diff --git a/swh/provenance/tests/test_provenance_db.py b/swh/provenance/tests/test_provenance_db.py index cf9b22a..db568a7 100644 --- a/swh/provenance/tests/test_provenance_db.py +++ b/swh/provenance/tests/test_provenance_db.py @@ -1,31 +1,42 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.provenance.model import OriginEntry from swh.provenance.origin import origin_add +from swh.provenance.postgresql.provenancedb_with_path import ProvenanceWithPathDB +from swh.provenance.postgresql.provenancedb_without_path import ProvenanceWithoutPathDB from swh.provenance.storage.archive import ArchiveStorage def ts2dt(ts: dict) -> datetime.datetime: timestamp = datetime.datetime.fromtimestamp( ts["timestamp"]["seconds"], datetime.timezone(datetime.timedelta(minutes=ts["offset"])), ) return timestamp.replace(microsecond=ts["timestamp"]["microseconds"]) def test_provenance_origin_add(provenance, swh_storage_with_objects): """Test the origin_add function""" archive = ArchiveStorage(swh_storage_with_objects) for status in TEST_OBJECTS["origin_visit_status"]: if status.snapshot is not None: entry = OriginEntry( url=status.origin, date=status.date, snapshot=status.snapshot ) origin_add(provenance, archive, [entry]) # TODO: check some facts here + + +def test_provenance_flavor(provenance): + assert provenance.storage.flavor in ("with-path", "without-path") + if provenance.storage.flavor == "with-path": + backend_class = ProvenanceWithPathDB + else: + backend_class = ProvenanceWithoutPathDB + assert isinstance(provenance.storage, backend_class) diff --git a/swh/provenance/tests/test_provenance_heuristics.py b/swh/provenance/tests/test_provenance_heuristics.py index 6843c78..d8f47c0 100644 --- a/swh/provenance/tests/test_provenance_heuristics.py +++ b/swh/provenance/tests/test_provenance_heuristics.py @@ -1,311 +1,346 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict, List, Tuple import pytest from swh.provenance.model import RevisionEntry from swh.provenance.revision import revision_add from swh.provenance.tests.conftest import ( fill_storage, get_datafile, load_repo_data, synthetic_result, ) from swh.provenance.tests.test_provenance_db import ts2dt def sha1s(cur, table): """return the 'sha1' column from the DB 'table' (as hex) 'cur' is a cursor to the provenance index DB. """ cur.execute(f"SELECT sha1 FROM {table}") return set(sha1.hex() for (sha1,) in cur.fetchall()) def locations(cur): """return the 'path' column from the DB location table 'cur' is a cursor to the provenance index DB. """ cur.execute("SELECT encode(location.path::bytea, 'escape') FROM location") return set(x for (x,) in cur.fetchall()) def relations(cur, src, dst): """return the triplets ('sha1', 'sha1', 'path') from the DB for the relation between 'src' table and 'dst' table (i.e. for C-R, C-D and D-R relations). 'cur' is a cursor to the provenance index DB. """ relation = f"{src}_in_{dst}" + cur.execute("select swh_get_dbflavor()") + with_path = cur.fetchone()[0] == "with-path" + # note that the columns have the same name as the relations they refer to, # so we can write things like "rel.{dst}=src.id" in the query below - cur.execute( - f"SELECT encode(src.sha1::bytea, 'hex')," - f" encode(dst.sha1::bytea, 'hex')," - f" encode(location.path::bytea, 'escape') " - f"FROM {relation} as rel, " - f" {src} as src, {dst} as dst, location " - f"WHERE rel.{src}=src.id " - f" AND rel.{dst}=dst.id " - f" AND rel.location=location.id" - ) + if with_path: + cur.execute( + f""" + SELECT encode(src.sha1::bytea, 'hex'), + encode(dst.sha1::bytea, 'hex'), + encode(location.path::bytea, 'escape') + FROM {relation} as relation + INNER JOIN {src} AS src ON (relation.{src} = src.id) + INNER JOIN {dst} AS dst ON (relation.{dst} = dst.id) + INNER JOIN location ON (relation.location = location.id) + """ + ) + else: + cur.execute( + f""" + SELECT encode(src.sha1::bytea, 'hex'), + encode(dst.sha1::bytea, 'hex'), + '' + FROM {relation} as relation + INNER JOIN {src} AS src ON (src.id = relation.{src}) + INNER JOIN {dst} AS dst ON (dst.id = relation.{dst}) + """ + ) return set(cur.fetchall()) def get_timestamp(cur, table, sha1): """return the date for the 'sha1' from the DB 'table' (as hex) 'cur' is a cursor to the provenance index DB. """ if isinstance(sha1, str): sha1 = bytes.fromhex(sha1) cur.execute(f"SELECT date FROM {table} WHERE sha1=%s", (sha1,)) return [date.timestamp() for (date,) in cur.fetchall()] @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) def test_provenance_heuristics(provenance, swh_storage, archive, repo, lower, mindepth): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) revisions = {rev["id"]: rev for rev in data["revision"]} rows = { "content": set(), "content_in_directory": set(), "content_in_revision": set(), "directory": set(), "directory_in_revision": set(), "location": set(), "revision": set(), } cursor = provenance.storage.cursor + def maybe_path(path: str) -> str: + if provenance.storage.with_path: + return path + return "" + for synth_rev in synthetic_result(syntheticfile): revision = revisions[synth_rev["sha1"]] entry = RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) revision_add(provenance, archive, [entry], lower=lower, mindepth=mindepth) # each "entry" in the synth file is one new revision rows["revision"].add(synth_rev["sha1"].hex()) assert rows["revision"] == sha1s(cursor, "revision"), synth_rev["msg"] # check the timestamp of the revision rev_ts = synth_rev["date"] assert get_timestamp(cursor, "revision", synth_rev["sha1"].hex()) == [ rev_ts ], synth_rev["msg"] # this revision might have added new content objects rows["content"] |= set(x["dst"].hex() for x in synth_rev["R_C"]) rows["content"] |= set(x["dst"].hex() for x in synth_rev["D_C"]) assert rows["content"] == sha1s(cursor, "content"), synth_rev["msg"] # check for R-C (direct) entries # these are added directly in the content_early_in_rev table rows["content_in_revision"] |= set( - (x["dst"].hex(), x["src"].hex(), x["path"]) for x in synth_rev["R_C"] + (x["dst"].hex(), x["src"].hex(), maybe_path(x["path"])) + for x in synth_rev["R_C"] ) assert rows["content_in_revision"] == relations( cursor, "content", "revision" ), synth_rev["msg"] # check timestamps for rc in synth_rev["R_C"]: assert get_timestamp(cursor, "content", rc["dst"]) == [ rev_ts + rc["rel_ts"] ], synth_rev["msg"] # check directories # each directory stored in the provenance index is an entry # in the "directory" table... rows["directory"] |= set(x["dst"].hex() for x in synth_rev["R_D"]) assert rows["directory"] == sha1s(cursor, "directory"), synth_rev["msg"] # ... + a number of rows in the "directory_in_rev" table... # check for R-D entries rows["directory_in_revision"] |= set( - (x["dst"].hex(), x["src"].hex(), x["path"]) for x in synth_rev["R_D"] + (x["dst"].hex(), x["src"].hex(), maybe_path(x["path"])) + for x in synth_rev["R_D"] ) assert rows["directory_in_revision"] == relations( cursor, "directory", "revision" ), synth_rev["msg"] # check timestamps for rd in synth_rev["R_D"]: assert get_timestamp(cursor, "directory", rd["dst"]) == [ rev_ts + rd["rel_ts"] ], synth_rev["msg"] # ... + a number of rows in the "content_in_dir" table # for content of the directory. # check for D-C entries rows["content_in_directory"] |= set( - (x["dst"].hex(), x["src"].hex(), x["path"]) for x in synth_rev["D_C"] + (x["dst"].hex(), x["src"].hex(), maybe_path(x["path"])) + for x in synth_rev["D_C"] ) assert rows["content_in_directory"] == relations( cursor, "content", "directory" ), synth_rev["msg"] # check timestamps for dc in synth_rev["D_C"]: assert get_timestamp(cursor, "content", dc["dst"]) == [ rev_ts + dc["rel_ts"] ], synth_rev["msg"] - # check for location entries - rows["location"] |= set(x["path"] for x in synth_rev["R_C"]) - rows["location"] |= set(x["path"] for x in synth_rev["D_C"]) - rows["location"] |= set(x["path"] for x in synth_rev["R_D"]) - assert rows["location"] == locations(cursor), synth_rev["msg"] + if provenance.storage.with_path: + # check for location entries + rows["location"] |= set(x["path"] for x in synth_rev["R_C"]) + rows["location"] |= set(x["path"] for x in synth_rev["D_C"]) + rows["location"] |= set(x["path"] for x in synth_rev["R_D"]) + assert rows["location"] == locations(cursor), synth_rev["msg"] @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) def test_provenance_heuristics_content_find_all( provenance, swh_storage, archive, repo, lower, mindepth ): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] + def maybe_path(path: str) -> str: + if provenance.storage.with_path: + return path + return "" + # XXX adding all revisions at once should be working just fine, but it does not... # revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) # ...so add revisions one at a time for now for revision in revisions: revision_add(provenance, archive, [revision], lower=lower, mindepth=mindepth) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_occurrences = {} for synth_rev in synthetic_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: expected_occurrences.setdefault(rc["dst"].hex(), []).append( - (rev_id, rev_ts, rc["path"]) + (rev_id, rev_ts, maybe_path(rc["path"])) ) for dc in synth_rev["D_C"]: assert dc["prefix"] is not None # to please mypy expected_occurrences.setdefault(dc["dst"].hex(), []).append( - (rev_id, rev_ts, dc["prefix"] + "/" + dc["path"]) + (rev_id, rev_ts, maybe_path(dc["prefix"] + "/" + dc["path"])) ) for content_id, results in expected_occurrences.items(): expected = [(content_id, *result) for result in results] db_occurrences = [ (blob.hex(), rev.hex(), date.timestamp(), path.decode()) for blob, rev, date, path in provenance.content_find_all( bytes.fromhex(content_id) ) ] - assert len(db_occurrences) == len(expected) + if provenance.storage.with_path: + # this is not true if the db stores no path, because a same content + # that appears several times in a given revision may be reported + # only once by content_find_all() + assert len(db_occurrences) == len(expected) assert set(db_occurrences) == set(expected) @pytest.mark.parametrize( "repo, lower, mindepth", ( ("cmdbts2", True, 1), ("cmdbts2", False, 1), ("cmdbts2", True, 2), ("cmdbts2", False, 2), ("out-of-order", True, 1), ), ) def test_provenance_heuristics_content_find_first( provenance, swh_storage, archive, repo, lower, mindepth ): # read data/README.md for more details on how these datasets are generated data = load_repo_data(repo) fill_storage(swh_storage, data) revisions = [ RevisionEntry( id=revision["id"], date=ts2dt(revision["date"]), root=revision["directory"], ) for revision in data["revision"] ] # XXX adding all revisions at once should be working just fine, but it does not... # revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth) # ...so add revisions one at a time for now for revision in revisions: revision_add(provenance, archive, [revision], lower=lower, mindepth=mindepth) syntheticfile = get_datafile( f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt" ) expected_first: Dict[str, Tuple[str, str, List[str]]] = {} # dict of tuples (blob_id, rev_id, [path, ...]) the third element for path # is a list because a content can be added at several places in a single # revision, in which case the result of content_find_first() is one of # those path, but we have no guarantee which one it will return. for synth_rev in synthetic_result(syntheticfile): rev_id = synth_rev["sha1"].hex() rev_ts = synth_rev["date"] for rc in synth_rev["R_C"]: sha1 = rc["dst"].hex() if sha1 not in expected_first: assert rc["rel_ts"] == 0 expected_first[sha1] = (rev_id, rev_ts, [rc["path"]]) else: if rev_ts == expected_first[sha1][1]: expected_first[sha1][2].append(rc["path"]) elif rev_ts < expected_first[sha1][1]: expected_first[sha1] = (rev_id, rev_ts, rc["path"]) for dc in synth_rev["D_C"]: sha1 = rc["dst"].hex() assert sha1 in expected_first # nothing to do there, this content cannot be a "first seen file" for content_id, (rev_id, ts, paths) in expected_first.items(): (r_sha1, r_rev_id, r_ts, r_path) = provenance.content_find_first( bytes.fromhex(content_id) ) assert r_sha1.hex() == content_id assert r_rev_id.hex() == rev_id assert r_ts.timestamp() == ts - assert r_path.decode() in paths + if provenance.storage.with_path: + assert r_path.decode() in paths