diff --git a/docs/index.rst b/docs/index.rst index 5877643..c568d33 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,146 +1,146 @@ .. _swh-provenance: Software Heritage Provenance ============================ A provenance index database based on the Software Heritage Archive. This is an implementation of the paper `Software Provenance Tracking at the Scale of Public Source Code`_ published in `Empirical Software Engineering`_ This provenance index database is a tool to help answering the question "where does this source code artifact come from?", which the main Software Heritage Archive cannot easily solve. Quick Start ----------- Database creation ~~~~~~~~~~~~~~~~~ Create a provenance index database (in this example we use pifpaf_ to easily set up a test Postgresql database. Adapt the example below to your Postgresql setup): .. code-block:: shell eval $(pifpaf run postgresql) swh db create -d provdb provenance swh db init-admin -d provdb provenance swh db init -d provdb provenance The provenance index DB comes in 2 feature flags, so there are 4 possible flavors. Feature flags are: - `with-path` / `without-path`: whether the provenance index database will store file path, - `normalized` / `denormalized`: whether or not the main relation tables are normalized (see below). So the possible flavors are: - `with-path` - `without-path` - `with-path-denormalized` - `without-path-denormalized` Filling the provenance index database ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ This step requires an access to the Software Heritage Archive to retrieve the actual data from the Archive. It currently also needs an input CSV file of revisions and origins to insert in the provenance database. Examples of such files are available in the `provenance public dataset`_. .. _`provenance public dataset`: https://annex.softwareheritage.org/public/dataset/provenance .. code-block:: shell wget https://annex.softwareheritage.org/public/dataset/provenance/sample_10k.csv.bz2 bunzip2 sample_10k.csv.bz2 You need a configuration file, like: .. code-block:: yaml # config.yaml provenance: storage: - cls: local + cls: postgresql db: host: /tmp/tmpifn2ov_j port: 9824 dbname: provdb archive: cls: api storage: cls: remote url: http://storage:5002/ Note that you need access to the internal API of a :ref:`swh-storage ` instance (here the machine named `storage`) for this. Then you can feed the provenance index database using: .. code-block:: shell swh provenance -C config.yaml iter-revisions sample_10k.csv This may take a while to complete. Querying the provenance index database ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Using the same config file, you may look for the first known occurrence of a file content: .. code-block:: shell swh provenance -C config.yaml find-first 8a54694c92c944fcb06d73c17743ac72444a5b72 swh:1:cnt:8a54694c92c944fcb06d73c17743ac72444a5b72, swh:1:rev:6193fae0668d082d90207f6c9f33d6e8c98dd04a, 2008-10-06 18:32:23+00:00, None, lua/effects/bloodstream/init.lua Or all the known occurrences: .. code-block:: shell swh provenance -C config.yaml find-all 8a54694c92c944fcb06d73c17743ac72444a5b72 swh:1:cnt:8a54694c92c944fcb06d73c17743ac72444a5b72, swh:1:rev:6193fae0668d082d90207f6c9f33d6e8c98dd04a, 2008-10-06 18:32:23+00:00, None, lua/effects/bloodstream/init.lua swh:1:cnt:8a54694c92c944fcb06d73c17743ac72444a5b72, swh:1:rev:f0a5078eed8808323b93ed09cddb003dbe2a85e4, 2008-10-06 18:32:23+00:00, None, trunk/lua/effects/bloodstream/init.lua [...] (De)normalized database ----------------------- For some relation tables (like the ``content_in_revision`` storing, for each content object, in which revision it has been found), the default data schema is to store one row for each relation. For a big database, this can have a significant cost in terms of storage. So it is possible to store these relations using an array as destination column (the ``revision`` column in the case of the ``content_in_revisison`` table). This can drastically reduce the database storage size, possibly at the price of a slight performance hit. Warning: the denormalized version of the database is still under test and validation. Do not use for serious work. .. _`Empirical Software Engineering`: http://link.springer.com/journal/10664 .. _`Software Provenance Tracking at the Scale of Public Source Code`: http://dx.doi.org/10.1007/s10664-020-09828-5 .. _pifpaf: https://github.com/jd/pifpaf .. toctree:: :maxdepth: 2 :caption: Contents: Indices and tables ================== * :ref:`genindex` * :ref:`modindex` * :ref:`search` diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py index bdde5f5..9dee9f7 100644 --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -1,90 +1,98 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from typing import TYPE_CHECKING +import warnings if TYPE_CHECKING: from .archive import ArchiveInterface from .interface import ProvenanceInterface, ProvenanceStorageInterface def get_archive(cls: str, **kwargs) -> ArchiveInterface: """Get an archive object of class ``cls`` with arguments ``args``. Args: cls: archive's class, either 'api' or 'direct' args: dictionary of arguments passed to the archive class constructor Returns: an instance of archive object (either using swh.storage API or direct queries to the archive's database) Raises: :cls:`ValueError` if passed an unknown archive class. """ if cls == "api": from swh.storage import get_storage from .storage.archive import ArchiveStorage return ArchiveStorage(get_storage(**kwargs["storage"])) elif cls == "direct": from swh.core.db import BaseDb from .postgresql.archive import ArchivePostgreSQL return ArchivePostgreSQL(BaseDb.connect(**kwargs["db"]).conn) else: raise ValueError def get_provenance(**kwargs) -> ProvenanceInterface: """Get an provenance object with arguments ``args``. Args: args: dictionary of arguments to retrieve a swh.provenance.storage class (see :func:`get_provenance_storage` for details) Returns: an instance of provenance object """ from .provenance import Provenance return Provenance(get_provenance_storage(**kwargs)) def get_provenance_storage(cls: str, **kwargs) -> ProvenanceStorageInterface: """Get an archive object of class ``cls`` with arguments ``args``. Args: cls: storage's class, only 'local' is currently supported args: dictionary of arguments passed to the storage class constructor Returns: an instance of storage object Raises: :cls:`ValueError` if passed an unknown archive class. """ - if cls == "local": + if cls in ["local", "postgresql"]: from swh.core.db import BaseDb - from .postgresql.provenancedb import ProvenanceDB + from .postgresql.provenance import ProvenanceStoragePostgreSql + + if cls == "local": + warnings.warn( + '"local" class is deprecated for provenance storage, please ' + 'use "postgresql" class instead.', + DeprecationWarning, + ) conn = BaseDb.connect(**kwargs["db"]).conn raise_on_commit = kwargs.get("raise_on_commit", False) - return ProvenanceDB(conn, raise_on_commit) + return ProvenanceStoragePostgreSql(conn, raise_on_commit) elif cls == "remote": from .api.client import RemoteProvenanceStorage storage = RemoteProvenanceStorage(**kwargs) assert isinstance(storage, ProvenanceStorageInterface) return storage else: raise ValueError diff --git a/swh/provenance/api/server.py b/swh/provenance/api/server.py index 814b760..60fb059 100644 --- a/swh/provenance/api/server.py +++ b/swh/provenance/api/server.py @@ -1,148 +1,148 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os from typing import Any, Dict, List, Optional from werkzeug.routing import Rule from swh.core import config from swh.core.api import JSONFormatter, MsgpackFormatter, RPCServerApp, negotiate from swh.provenance import get_provenance_storage from swh.provenance.interface import ProvenanceStorageInterface from .serializers import DECODERS, ENCODERS storage: Optional[ProvenanceStorageInterface] = None def get_global_provenance_storage() -> ProvenanceStorageInterface: global storage if storage is None: storage = get_provenance_storage(**app.config["provenance"]["storage"]) return storage class ProvenanceStorageServerApp(RPCServerApp): extra_type_decoders = DECODERS extra_type_encoders = ENCODERS app = ProvenanceStorageServerApp( __name__, backend_class=ProvenanceStorageInterface, backend_factory=get_global_provenance_storage, ) def has_no_empty_params(rule: Rule) -> bool: return len(rule.defaults or ()) >= len(rule.arguments or ()) @app.route("/") def index() -> str: return """ Software Heritage provenance storage RPC server

You have reached the Software Heritage provenance storage RPC server.
See its documentation and API for more information

""" @app.route("/site-map") @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def site_map() -> List[Dict[str, Any]]: links = [] for rule in app.url_map.iter_rules(): if has_no_empty_params(rule) and hasattr( ProvenanceStorageInterface, rule.endpoint ): links.append( dict( rule=rule.rule, description=getattr( ProvenanceStorageInterface, rule.endpoint ).__doc__, ) ) # links is now a list of url, endpoint tuples return links def load_and_check_config( config_path: Optional[str], type: str = "local" ) -> Dict[str, Any]: """Check the minimal configuration is set to run the api or raise an error explanation. Args: config_path (str): Path to the configuration file to load type (str): configuration type. For 'local' type, more checks are done. Raises: Error if the setup is not as expected Returns: configuration as a dict """ if config_path is None: raise EnvironmentError("Configuration file must be defined") if not os.path.exists(config_path): raise FileNotFoundError(f"Configuration file {config_path} does not exist") cfg = config.read(config_path) pcfg: Optional[Dict[str, Any]] = cfg.get("provenance") if pcfg is None: raise KeyError("Missing 'provenance' configuration") scfg: Optional[Dict[str, Any]] = pcfg.get("storage") if scfg is None: raise KeyError("Missing 'provenance.storage' configuration") if type == "local": cls = scfg.get("cls") - if cls != "local": + if cls != "postgresql": raise ValueError( - "The provenance backend can only be started with a 'local' " + "The provenance backend can only be started with a 'postgresql' " "configuration" ) db = scfg.get("db") if not db: raise KeyError("Invalid configuration; missing 'db' config entry") return cfg api_cfg: Optional[Dict[str, Any]] = None def make_app_from_configfile() -> ProvenanceStorageServerApp: """Run the WSGI app from the webserver, loading the configuration from a configuration file. SWH_CONFIG_FILENAME environment variable defines the configuration path to load. """ global api_cfg if api_cfg is None: config_path = os.environ.get("SWH_CONFIG_FILENAME") api_cfg = load_and_check_config(config_path) app.config.update(api_cfg) handler = logging.StreamHandler() app.logger.addHandler(handler) return app diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index bc6fb6a..5734129 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,225 +1,225 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control from datetime import datetime, timezone import os from typing import Any, Dict, Generator, Optional, Tuple import click import iso8601 import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.model import Sha1Git # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) DEFAULT_CONFIG: Dict[str, Any] = { "provenance": { "archive": { # "cls": "api", # "storage": { # "cls": "remote", # "url": "http://uffizi.internal.softwareheritage.org:5002", # } "cls": "direct", "db": { "host": "db.internal.softwareheritage.org", "dbname": "softwareheritage", "user": "guest", }, }, "storage": { - "cls": "local", + "cls": "postgresql", "db": {"host": "localhost", "dbname": "provenance"}, }, } } CONFIG_FILE_HELP = f""" \b Configuration can be loaded from a yaml file given either as --config-file option or the {CONFIG_ENVVAR} environment variable. If no configuration file is specified, use the following default configuration:: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage provenance index database tools {CONFIG_FILE_HELP} """ @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=True, dir_okay=False, path_type=str), help="""YAML configuration file.""", ) @click.option( "-P", "--profile", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""Enable profiling to specified file.""", ) @click.pass_context def cli(ctx: click.core.Context, config_file: Optional[str], profile: str) -> None: if ( config_file is None and DEFAULT_PATH is not None and config.config_exists(DEFAULT_PATH) ): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not os.path.exists(config_file): raise FileNotFoundError(config_file) conf = yaml.safe_load(open(config_file, "rb")) ctx.ensure_object(dict) ctx.obj["config"] = conf if profile: import atexit import cProfile print("Profiling...") pr = cProfile.Profile() pr.enable() def exit() -> None: pr.disable() pr.dump_stats(profile) atexit.register(exit) @cli.command(name="iter-revisions") @click.argument("filename") @click.option("-a", "--track-all", default=True, type=bool) @click.option("-l", "--limit", type=int) @click.option("-m", "--min-depth", default=1, type=int) @click.option("-r", "--reuse", default=True, type=bool) @click.pass_context def iter_revisions( ctx: click.core.Context, filename: str, track_all: bool, limit: Optional[int], min_depth: int, reuse: bool, ) -> None: # TODO: add file size filtering """Process a provided list of revisions.""" from . import get_archive, get_provenance from .revision import CSVRevisionIterator, revision_add archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) revisions_provider = generate_revision_tuples(filename) revisions = CSVRevisionIterator(revisions_provider, limit=limit) for revision in revisions: revision_add( provenance, archive, [revision], trackall=track_all, lower=reuse, mindepth=min_depth, ) def generate_revision_tuples( filename: str, ) -> Generator[Tuple[Sha1Git, datetime, Sha1Git], None, None]: for line in open(filename, "r"): if line.strip(): revision, date, root = line.strip().split(",") yield ( hash_to_bytes(revision), iso8601.parse_date(date, default_timezone=timezone.utc), hash_to_bytes(root), ) @cli.command(name="iter-origins") @click.argument("filename") @click.option("-l", "--limit", type=int) @click.pass_context def iter_origins(ctx: click.core.Context, filename: str, limit: Optional[int]) -> None: """Process a provided list of origins.""" from . import get_archive, get_provenance from .origin import CSVOriginIterator, origin_add archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) origins_provider = generate_origin_tuples(filename) origins = CSVOriginIterator(origins_provider, limit=limit) for origin in origins: origin_add(provenance, archive, [origin]) def generate_origin_tuples(filename: str) -> Generator[Tuple[str, bytes], None, None]: for line in open(filename, "r"): if line.strip(): url, snapshot = line.strip().split(",") yield (url, hash_to_bytes(snapshot)) @cli.command(name="find-first") @click.argument("swhid") @click.pass_context def find_first(ctx: click.core.Context, swhid: str) -> None: """Find first occurrence of the requested blob.""" from . import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) occur = provenance.content_find_first(hash_to_bytes(swhid)) if occur is not None: print( f"swh:1:cnt:{hash_to_hex(occur.content)}, " f"swh:1:rev:{hash_to_hex(occur.revision)}, " f"{occur.date}, " f"{occur.origin}, " f"{os.fsdecode(occur.path)}" ) else: print(f"Cannot find a content with the id {swhid}") @cli.command(name="find-all") @click.argument("swhid") @click.option("-l", "--limit", type=int) @click.pass_context def find_all(ctx: click.core.Context, swhid: str, limit: Optional[int]) -> None: """Find all occurrences of the requested blob.""" from . import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) for occur in provenance.content_find_all(hash_to_bytes(swhid), limit=limit): print( f"swh:1:cnt:{hash_to_hex(occur.content)}, " f"swh:1:rev:{hash_to_hex(occur.revision)}, " f"{occur.date}, " f"{occur.origin}, " f"{os.fsdecode(occur.path)}" ) diff --git a/swh/provenance/postgresql/provenancedb.py b/swh/provenance/postgresql/provenance.py similarity index 96% rename from swh/provenance/postgresql/provenancedb.py rename to swh/provenance/postgresql/provenance.py index 0f17f0b..1f788ff 100644 --- a/swh/provenance/postgresql/provenancedb.py +++ b/swh/provenance/postgresql/provenance.py @@ -1,370 +1,374 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime import itertools import logging from typing import Dict, Generator, Iterable, Optional, Set, Tuple import psycopg2.extensions import psycopg2.extras from typing_extensions import Literal from swh.core.db import BaseDb from swh.model.model import Sha1Git from ..interface import ( EntityType, ProvenanceResult, RelationData, RelationType, RevisionData, ) -class ProvenanceDB: +class ProvenanceStoragePostgreSql: def __init__( self, conn: psycopg2.extensions.connection, raise_on_commit: bool = False ) -> None: BaseDb.adapt_conn(conn) conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) # XXX: not sure this is the best place to do it! sql = "SET timezone TO 'UTC'" self.cursor.execute(sql) self._flavor: Optional[str] = None self.raise_on_commit = raise_on_commit @property def flavor(self) -> str: if self._flavor is None: sql = "SELECT swh_get_dbflavor() AS flavor" self.cursor.execute(sql) self._flavor = self.cursor.fetchone()["flavor"] assert self._flavor is not None return self._flavor def with_path(self) -> bool: return "with-path" in self.flavor @property def denormalized(self) -> bool: return "denormalized" in self.flavor def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: return self._entity_set_date("content", dates) def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return self._entity_get_date("content", ids) def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: return self._entity_set_date("directory", dates) def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: return self._entity_get_date("directory", ids) def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: sql = f"SELECT sha1 FROM {entity.value}" self.cursor.execute(sql) return {row["sha1"] for row in self.cursor.fetchall()} def location_get(self) -> Set[bytes]: sql = "SELECT encode(location.path::bytea, 'escape') AS path FROM location" self.cursor.execute(sql) return {row["path"] for row in self.cursor.fetchall()} def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: try: if urls: sql = """ LOCK TABLE ONLY origin; INSERT INTO origin(sha1, url) VALUES %s ON CONFLICT DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, urls.items()) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: urls: Dict[Sha1Git, str] = {} sha1s = tuple(ids) if sha1s: + # TODO: consider splitting this query in several ones if sha1s is too big! values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT sha1, url FROM origin WHERE sha1 IN ({values}) """ self.cursor.execute(sql, sha1s) urls.update( (row["sha1"], row["url"].decode()) for row in self.cursor.fetchall() ) return urls def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: return self._entity_set_date("revision", dates) def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: sql = "SELECT * FROM swh_provenance_content_find_first(%s)" self.cursor.execute(sql, (id,)) row = self.cursor.fetchone() return ProvenanceResult(**row) if row is not None else None def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: sql = "SELECT * FROM swh_provenance_content_find_all(%s, %s)" self.cursor.execute(sql, (id, limit)) yield from (ProvenanceResult(**row) for row in self.cursor.fetchall()) def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: try: if origins: sql = """ LOCK TABLE ONLY revision; INSERT INTO revision(sha1, origin) (SELECT V.rev AS sha1, O.id AS origin FROM (VALUES %s) AS V(rev, org) JOIN origin AS O ON (O.sha1=V.org)) ON CONFLICT (sha1) DO UPDATE SET origin=EXCLUDED.origin """ psycopg2.extras.execute_values(self.cursor, sql, origins.items()) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: result: Dict[Sha1Git, RevisionData] = {} sha1s = tuple(ids) if sha1s: + # TODO: consider splitting this query in several ones if sha1s is too big! values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT sha1, date, origin FROM revision WHERE sha1 IN ({values}) """ self.cursor.execute(sql, sha1s) result.update( (row["sha1"], RevisionData(date=row["date"], origin=row["origin"])) for row in self.cursor.fetchall() ) return result def relation_add( self, relation: RelationType, data: Iterable[RelationData] ) -> bool: try: rows = tuple((rel.src, rel.dst, rel.path) for rel in data) if rows: table = relation.value src, *_, dst = table.split("_") if src != "origin": # Origin entries should be inserted previously as they require extra # non-null information srcs = tuple(set((sha1,) for (sha1, _, _) in rows)) sql = f""" LOCK TABLE ONLY {src}; INSERT INTO {src}(sha1) VALUES %s ON CONFLICT DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, srcs) if dst != "origin": # Origin entries should be inserted previously as they require extra # non-null information dsts = tuple(set((sha1,) for (_, sha1, _) in rows)) sql = f""" LOCK TABLE ONLY {dst}; INSERT INTO {dst}(sha1) VALUES %s ON CONFLICT DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, dsts) joins = [ f"INNER JOIN {src} AS S ON (S.sha1=V.src)", f"INNER JOIN {dst} AS D ON (D.sha1=V.dst)", ] nope = (RelationType.REV_BEFORE_REV, RelationType.REV_IN_ORG) selected = ["S.id"] if self.denormalized and relation not in nope: selected.append("ARRAY_AGG(D.id)") else: selected.append("D.id") if self._relation_uses_location_table(relation): locations = tuple(set((path,) for (_, _, path) in rows)) sql = """ LOCK TABLE ONLY location; INSERT INTO location(path) VALUES %s ON CONFLICT (path) DO NOTHING """ psycopg2.extras.execute_values(self.cursor, sql, locations) joins.append("INNER JOIN location AS L ON (L.path=V.path)") if self.denormalized: selected.append("ARRAY_AGG(L.id)") else: selected.append("L.id") sql_l = [ f"INSERT INTO {table}", f" SELECT {', '.join(selected)}", " FROM (VALUES %s) AS V(src, dst, path)", *joins, ] if self.denormalized and relation not in nope: sql_l.append("GROUP BY S.id") sql_l.append( f"""ON CONFLICT ({src}) DO UPDATE SET {dst}=ARRAY( - SELECT UNNEST({table}.{dst} || excluded.{dst})), - location=ARRAY( - SELECT UNNEST({relation.value}.location || excluded.location)) - """ + SELECT UNNEST({table}.{dst} || EXCLUDED.{dst}) + ), location=ARRAY( + SELECT UNNEST({relation.value}.location || EXCLUDED.location) + ) + """ ) else: sql_l.append("ON CONFLICT DO NOTHING") sql = "\n".join(sql_l) psycopg2.extras.execute_values(self.cursor, sql, rows) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def relation_get( self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False ) -> Set[RelationData]: return self._relation_get(relation, ids, reverse) def relation_get_all(self, relation: RelationType) -> Set[RelationData]: return self._relation_get(relation, None) def _entity_get_date( self, entity: Literal["content", "directory", "revision"], ids: Iterable[Sha1Git], ) -> Dict[Sha1Git, datetime]: dates: Dict[Sha1Git, datetime] = {} sha1s = tuple(ids) if sha1s: + # TODO: consider splitting this query in several ones if sha1s is too big! values = ", ".join(itertools.repeat("%s", len(sha1s))) sql = f""" SELECT sha1, date FROM {entity} WHERE sha1 IN ({values}) """ self.cursor.execute(sql, sha1s) dates.update((row["sha1"], row["date"]) for row in self.cursor.fetchall()) return dates def _entity_set_date( self, entity: Literal["content", "directory", "revision"], data: Dict[Sha1Git, datetime], ) -> bool: try: if data: sql = f""" LOCK TABLE ONLY {entity}; INSERT INTO {entity}(sha1, date) VALUES %s ON CONFLICT (sha1) DO UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) """ psycopg2.extras.execute_values(self.cursor, sql, data.items()) return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") if self.raise_on_commit: raise return False def _relation_get( self, relation: RelationType, ids: Optional[Iterable[Sha1Git]], reverse: bool = False, ) -> Set[RelationData]: result: Set[RelationData] = set() sha1s: Optional[Tuple[Tuple[Sha1Git, ...]]] if ids is not None: sha1s = (tuple(ids),) where = f"WHERE {'S' if not reverse else 'D'}.sha1 IN %s" else: sha1s = None where = "" aggreg_dst = self.denormalized and relation in ( RelationType.CNT_EARLY_IN_REV, RelationType.CNT_IN_DIR, RelationType.DIR_IN_REV, ) if sha1s is None or sha1s[0]: table = relation.value src, *_, dst = table.split("_") # TODO: improve this! if src == "revision" and dst == "revision": src_field = "prev" dst_field = "next" else: src_field = src dst_field = dst if aggreg_dst: revloc = f"UNNEST(R.{dst_field}) AS dst" if self._relation_uses_location_table(relation): revloc += ", UNNEST(R.location) AS path" else: revloc = f"R.{dst_field} AS dst" if self._relation_uses_location_table(relation): revloc += ", R.location AS path" inner_sql = f""" SELECT S.sha1 AS src, {revloc} FROM {table} AS R INNER JOIN {src} AS S ON (S.id=R.{src_field}) """ if where != "" and not reverse: inner_sql += where if self._relation_uses_location_table(relation): loc = "L.path AS path" else: loc = "NULL AS path" sql = f""" SELECT CL.src, D.sha1 AS dst, {loc} FROM ({inner_sql}) AS CL INNER JOIN {dst} AS D ON (D.id=CL.dst) """ if self._relation_uses_location_table(relation): sql += "INNER JOIN location AS L ON (L.id=CL.path)" if where != "" and reverse: sql += where self.cursor.execute(sql, sha1s) result.update(RelationData(**row) for row in self.cursor.fetchall()) return result def _relation_uses_location_table(self, relation: RelationType) -> bool: if self.with_path(): src = relation.value.split("_")[0] return src in ("content", "directory") return False diff --git a/swh/provenance/sql/40-funcs.sql b/swh/provenance/sql/40-funcs.sql index 8e1636a..d6247f1 100644 --- a/swh/provenance/sql/40-funcs.sql +++ b/swh/provenance/sql/40-funcs.sql @@ -1,338 +1,338 @@ select position('denormalized' in swh_get_dbflavor()::text) = 0 as dbflavor_norm \gset select position('with-path' in swh_get_dbflavor()::text) != 0 as dbflavor_with_path \gset create type relation_row as (src sha1_git, dst sha1_git, loc unix_path); \if :dbflavor_norm \if :dbflavor_with_path -- -- with path and normalized -- create or replace function swh_provenance_content_find_first(content_id sha1_git) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin unix_path, path unix_path ) language sql stable as $$ select C.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, L.path as path from content as C inner join content_in_revision as CR on (CR.content = C.id) - inner join location as L on (CR.location = L.id) - inner join revision as R on (CR.revision = R.id) - left join origin as O on (R.origin=O.id) - where C.sha1=content_id + inner join location as L on (L.id = CR.location) + inner join revision as R on (R.id = CR.revision) + left join origin as O on (O.id = R.origin) + where C.sha1 = content_id order by date, revision, origin, path asc limit 1 $$; create or replace function swh_provenance_content_find_all(content_id sha1_git, early_cut int) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin unix_path, path unix_path ) language sql stable as $$ (select C.sha1 as content, - r.sha1 as revision, - r.date as date, + R.sha1 as revision, + R.date as date, O.url as origin, - l.path as path - from content as c - inner join content_in_revision as cr on (cr.content = c.id) - inner join location as l on (cr.location = l.id) - inner join revision as r on (cr.revision = r.id) - left join origin AS O on (R.origin=O.id) - where c.sha1=content_id) + L.path as path + from content as C + inner join content_in_revision as CR on (CR.content = C.id) + inner join location as L on (L.id = CR.location) + inner join revision as R on (R.id = CR.revision) + left join origin as O on (O.id = R.origin) + where C.sha1 = content_id) union - (select c.sha1 as content, - r.sha1 as revision, - r.date as date, + (select C.sha1 as content, + R.sha1 as revision, + R.date as date, O.url as origin, - case dirloc.path - when '' then cntloc.path - when '.' then cntloc.path - else (dirloc.path || '/' || cntloc.path)::unix_path + case DL.path + when '' then CL.path + when '.' then CL.path + else (DL.path || '/' || CL.path)::unix_path end as path - from content as c - inner join content_in_directory as cd on (c.id = cd.content) - inner join directory_in_revision as dr on (cd.directory = dr.directory) - inner join revision as r on (dr.revision = r.id) - inner join location as cntloc on (cd.location = cntloc.id) - inner join location as dirloc on (dr.location = dirloc.id) - left join origin as O on (R.origin=O.id) - where C.sha1=content_id) + from content as C + inner join content_in_directory as CD on (CD.content = C.id) + inner join directory_in_revision as DR on (DR.directory = CD.directory) + inner join revision as R on (R.id = DR.revision) + inner join location as CL on (CL.id = CD.location) + inner join location as DL on (DL.id = DR.location) + left join origin as O on (O.id = R.origin) + where C.sha1 = content_id) order by date, revision, origin, path limit early_cut $$; \else -- -- without path and normalized -- create or replace function swh_provenance_content_find_first(content_id sha1_git) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin unix_path, path unix_path ) language sql stable as $$ select C.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, '\x'::unix_path as path from content as C inner join content_in_revision as CR on (CR.content = C.id) - inner join revision as R on (CR.revision = R.id) - left join origin as O on (R.origin=O.id) - where C.sha1=content_id + inner join revision as R on (R.id = CR.revision) + left join origin as O on (O.id = R.origin) + where C.sha1 = content_id order by date, revision, origin asc limit 1 $$; create or replace function swh_provenance_content_find_all(content_id sha1_git, early_cut int) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin unix_path, path unix_path ) language sql stable as $$ (select C.sha1 as content, - r.sha1 as revision, - r.date as date, + R.sha1 as revision, + R.date as date, O.url as origin, '\x'::unix_path as path - from content as c - inner join content_in_revision as cr on (cr.content = c.id) - inner join revision as r on (cr.revision = r.id) - left join origin as O on (R.origin=O.id) - where c.sha1=content_id) + from content as C + inner join content_in_revision as CR on (CR.content = C.id) + inner join revision as R on (R.id = CR.revision) + left join origin as O on (O.id = R.origin) + where C.sha1 = content_id) union - (select c.sha1 as content, - r.sha1 as revision, - r.date as date, + (select C.sha1 as content, + R.sha1 as revision, + R.date as date, O.url as origin, '\x'::unix_path as path - from content as c - inner join content_in_directory as cd on (c.id = cd.content) - inner join directory_in_revision as dr on (cd.directory = dr.directory) - inner join revision as r on (dr.revision = r.id) - left join origin as O on (R.origin=O.id) - where C.sha1=content_id) + from content as C + inner join content_in_directory as CD on (CD.content = C.id) + inner join directory_in_revision as DR on (DR.directory = CD.directory) + inner join revision as R on (R.id = DR.revision) + left join origin as O on (O.id = R.origin) + where C.sha1 = content_id) order by date, revision, origin, path limit early_cut $$; -- :dbflavor_with_path \endif -- :dbflavor_norm \else \if :dbflavor_with_path -- -- with path and denormalized -- create or replace function swh_provenance_content_find_first(content_id sha1_git) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin unix_path, path unix_path ) language sql stable as $$ - select C_L.sha1 as content, + select CL.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, L.path as path from ( select C.sha1 as sha1, - unnest(revision) as revision, - unnest(location) as location - from content_in_revision as C_R - inner join content as C on (C.id=C_R.content) - where C.sha1=content_id - ) as C_L - inner join revision as R on (R.id=C_L.revision) - inner join location as L on (L.id=C_L.location) - left join origin as O on (R.origin=O.id) + unnest(CR.revision) as revision, + unnest(CR.location) as location + from content_in_revision as CR + inner join content as C on (C.id = CR.content) + where C.sha1 = content_id + ) as CL + inner join revision as R on (R.id = CL.revision) + inner join location as L on (L.id = CL.location) + left join origin as O on (O.id = R.origin) order by date, revision, origin, path asc limit 1 $$; create or replace function swh_provenance_content_find_all(content_id sha1_git, early_cut int) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin unix_path, path unix_path ) language sql stable as $$ - (with cnt as ( - select c.sha1 as sha1, - unnest(c_r.revision) as revision, - unnest(c_r.location) as location - from content_in_revision as c_r - inner join content as c on (c.id = c_r.content) - where c.sha1 = content_id + (with cntrev as ( + select C.sha1 as sha1, + unnest(CR.revision) as revision, + unnest(CR.location) as location + from content_in_revision as CR + inner join content as C on (C.id = CR.content) + where C.sha1 = content_id ) - select cnt.sha1 as content, - r.sha1 as revision, - r.date as date, + select CR.sha1 as content, + R.sha1 as revision, + R.date as date, O.url as origin, - l.path as path - from cnt - inner join revision as r on (r.id = cnt.revision) - inner join location as l on (l.id = cnt.location) - left join origin as O on (R.origin=O.id) + L.path as path + from cntrev as CR + inner join revision as R on (R.id = CR.revision) + inner join location as L on (L.id = CR.location) + left join origin as O on (O.id = R.origin) ) union - (with cnt as ( - select c.sha1 as content_sha1, - unnest(cd.directory) as directory, - unnest(cd.location) as location - from content as c - inner join content_in_directory as cd on (cd.content = c.id) - where c.sha1 = content_id + (with cntdir as ( + select C.sha1 as sha1, + unnest(CD.directory) as directory, + unnest(CD.location) as location + from content as C + inner join content_in_directory as CD on (CD.content = C.id) + where C.sha1 = content_id ), - cntdir as ( - select cnt.content_sha1 as content_sha1, - cntloc.path as file_path, - unnest(dr.revision) as revision, - unnest(dr.location) as prefix_location - from cnt - inner join directory_in_revision as dr on (dr.directory = cnt.directory) - inner join location as cntloc on (cntloc.id = cnt.location) + cntrev as ( + select CD.sha1 as sha1, + L.path as path, + unnest(DR.revision) as revision, + unnest(DR.location) as prefix + from cntdir as CD + inner join directory_in_revision as DR on (DR.directory = CD.directory) + inner join location as L on (L.id = CD.location) ) - select cntdir.content_sha1 as content, - r.sha1 as revision, - r.date as date, + select CR.sha1 as content, + R.sha1 as revision, + R.date as date, O.url as origin, - case dirloc.path - when '' then cntdir.file_path - when '.' then cntdir.file_path - else (dirloc.path || '/' || cntdir.file_path)::unix_path + case DL.path + when '' then CR.path + when '.' then CR.path + else (DL.path || '/' || CR.path)::unix_path end as path - from cntdir - inner join location as dirloc on (cntdir.prefix_location = dirloc.id) - inner join revision as r on (cntdir.revision = r.id) - left join origin as O on (R.origin=O.id) + from cntrev as CR + inner join revision as R on (R.id = CR.revision) + inner join location as DL on (DL.id = CR.prefix) + left join origin as O on (O.id = R.origin) ) order by date, revision, origin, path limit early_cut $$; \else -- -- without path and denormalized -- create or replace function swh_provenance_content_find_first(content_id sha1_git) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin unix_path, path unix_path ) language sql stable as $$ - select C_L.sha1 as content, + select CL.sha1 as content, R.sha1 as revision, R.date as date, O.url as origin, '\x'::unix_path as path from ( select C.sha1, unnest(revision) as revision - from content_in_revision as C_R - inner join content as C on (C.id=C_R.content) + from content_in_revision as CR + inner join content as C on (C.id = CR.content) where C.sha1=content_id - ) as C_L - inner join revision as R on (R.id=C_L.revision) - left join origin as O on (R.origin=O.id) + ) as CL + inner join revision as R on (R.id = CL.revision) + left join origin as O on (O.id = R.origin) order by date, revision, origin, path asc limit 1 $$; create or replace function swh_provenance_content_find_all(content_id sha1_git, early_cut int) returns table ( content sha1_git, revision sha1_git, date timestamptz, origin unix_path, path unix_path ) language sql stable as $$ - (with cnt as ( - select c.sha1 as sha1, - unnest(c_r.revision) as revision - from content_in_revision as c_r - inner join content as c on (c.id = c_r.content) - where c.sha1 = content_id + (with cntrev as ( + select C.sha1 as sha1, + unnest(CR.revision) as revision + from content_in_revision as CR + inner join content as C on (C.id = CR.content) + where C.sha1 = content_id ) - select cnt.sha1 as content, - r.sha1 as revision, - r.date as date, + select CR.sha1 as content, + R.sha1 as revision, + R.date as date, O.url as origin, '\x'::unix_path as path - from cnt - inner join revision as r on (r.id = cnt.revision) - left join origin as O on (r.origin=O.id) + from cntrev as CR + inner join revision as R on (R.id = CR.revision) + left join origin as O on (O.id = R.origin) ) union - (with cnt as ( - select c.sha1 as content_sha1, - unnest(cd.directory) as directory - from content as c - inner join content_in_directory as cd on (cd.content = c.id) - where c.sha1 = content_id + (with cntdir as ( + select C.sha1 as sha1, + unnest(CD.directory) as directory + from content as C + inner join content_in_directory as CD on (CD.content = C.id) + where C.sha1 = content_id ), - cntdir as ( - select cnt.content_sha1 as content_sha1, - unnest(dr.revision) as revision - from cnt - inner join directory_in_revision as dr on (dr.directory = cnt.directory) + cntrev as ( + select CD.sha1 as sha1, + unnest(DR.revision) as revision + from cntdir as CD + inner join directory_in_revision as DR on (DR.directory = CD.directory) ) - select cntdir.content_sha1 as content, - r.sha1 as revision, - r.date as date, + select CR.sha1 as content, + R.sha1 as revision, + R.date as date, O.url as origin, '\x'::unix_path as path - from cntdir - inner join revision as r on (cntdir.revision = r.id) - left join origin as O on (r.origin=O.id) + from cntrev as CR + inner join revision as R on (R.id = CR.revision) + left join origin as O on (O.id = R.origin) ) order by date, revision, origin, path limit early_cut $$; \endif -- :dbflavor_with_path \endif -- :dbflavor_norm diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py index 4a8bced..25a6b37 100644 --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -1,140 +1,142 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta, timezone from os import path from typing import Any, Dict, Iterable, Iterator from _pytest.fixtures import SubRequest import msgpack import psycopg2.extensions import pytest from pytest_postgresql.factories import postgresql from swh.journal.serializers import msgpack_ext_hook from swh.provenance import get_provenance, get_provenance_storage from swh.provenance.api.client import RemoteProvenanceStorage import swh.provenance.api.server as server from swh.provenance.archive import ArchiveInterface from swh.provenance.interface import ProvenanceInterface, ProvenanceStorageInterface from swh.provenance.storage.archive import ArchiveStorage from swh.storage.interface import StorageInterface from swh.storage.replay import process_replay_objects @pytest.fixture( params=[ "with-path", "without-path", "with-path-denormalized", "without-path-denormalized", ] ) def populated_db( request: SubRequest, postgresql: psycopg2.extensions.connection, ) -> Dict[str, str]: """return a working and initialized provenance db""" from swh.core.cli.db import populate_database_for_package populate_database_for_package( "swh.provenance", postgresql.dsn, flavor=request.param ) return postgresql.get_dsn_parameters() # the Flask app used as server in these tests @pytest.fixture def app(populated_db: Dict[str, str]) -> Iterator[server.ProvenanceStorageServerApp]: assert hasattr(server, "storage") - server.storage = get_provenance_storage(cls="local", db=populated_db) + server.storage = get_provenance_storage(cls="postgresql", db=populated_db) yield server.app # the RPCClient class used as client used in these tests @pytest.fixture def swh_rpc_client_class() -> type: return RemoteProvenanceStorage -@pytest.fixture(params=["local", "remote"]) +@pytest.fixture(params=["postgresql", "remote"]) def provenance_storage( request: SubRequest, populated_db: Dict[str, str], swh_rpc_client: RemoteProvenanceStorage, ) -> ProvenanceStorageInterface: """Return a working and initialized ProvenanceStorageInterface object""" if request.param == "remote": assert isinstance(swh_rpc_client, ProvenanceStorageInterface) return swh_rpc_client else: # in test sessions, we DO want to raise any exception occurring at commit time return get_provenance_storage( cls=request.param, db=populated_db, raise_on_commit=True ) provenance_postgresql = postgresql("postgresql_proc", dbname="provenance_tests") @pytest.fixture def provenance( provenance_postgresql: psycopg2.extensions.connection, ) -> ProvenanceInterface: """Return a working and initialized ProvenanceInterface object""" from swh.core.cli.db import populate_database_for_package populate_database_for_package( "swh.provenance", provenance_postgresql.dsn, flavor="with-path" ) # in test sessions, we DO want to raise any exception occurring at commit time return get_provenance( - cls="local", db=provenance_postgresql.get_dsn_parameters(), raise_on_commit=True + cls="postgresql", + db=provenance_postgresql.get_dsn_parameters(), + raise_on_commit=True, ) @pytest.fixture def archive(swh_storage: StorageInterface) -> ArchiveInterface: """Return an ArchiveStorage-based ArchiveInterface object""" return ArchiveStorage(swh_storage) def get_datafile(fname: str) -> str: return path.join(path.dirname(__file__), "data", fname) def load_repo_data(repo: str) -> Dict[str, Any]: data: Dict[str, Any] = {} with open(get_datafile(f"{repo}.msgpack"), "rb") as fobj: unpacker = msgpack.Unpacker( fobj, raw=False, ext_hook=msgpack_ext_hook, strict_map_key=False, timestamp=3, # convert Timestamp in datetime objects (tz UTC) ) for objtype, objd in unpacker: data.setdefault(objtype, []).append(objd) return data def filter_dict(d: Dict[Any, Any], keys: Iterable[Any]) -> Dict[Any, Any]: return {k: v for (k, v) in d.items() if k in keys} def fill_storage(storage: StorageInterface, data: Dict[str, Any]) -> None: process_replay_objects(data, storage=storage) # TODO: remove this function in favour of TimestampWithTimezone.to_datetime # from swh.model.model def ts2dt(ts: Dict[str, Any]) -> datetime: timestamp = datetime.fromtimestamp( ts["timestamp"]["seconds"], timezone(timedelta(minutes=ts["offset"])) ) return timestamp.replace(microsecond=ts["timestamp"]["microseconds"]) diff --git a/swh/provenance/tests/test_conftest.py b/swh/provenance/tests/test_conftest.py index d1fe3de..c615f24 100644 --- a/swh/provenance/tests/test_conftest.py +++ b/swh/provenance/tests/test_conftest.py @@ -1,26 +1,27 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.provenance.interface import ProvenanceInterface from swh.provenance.tests.conftest import fill_storage, load_repo_data from swh.storage.interface import StorageInterface def test_provenance_fixture(provenance: ProvenanceInterface) -> None: - """Check the 'provenance' fixture produce a working ProvenanceDB object""" + """Check the 'provenance' fixture produce a working + ProvenanceStoragePostgreSql object""" assert provenance provenance.flush() # should be a noop def test_fill_storage(swh_storage: StorageInterface) -> None: """Check the 'fill_storage' test utility produces a working Storage object with at least some Content, Revision and Directory in it""" data = load_repo_data("cmdbts2") fill_storage(swh_storage, data) assert swh_storage assert swh_storage.content_get_random() assert swh_storage.directory_get_random() assert swh_storage.revision_get_random() diff --git a/swh/provenance/tests/test_provenance_db.py b/swh/provenance/tests/test_provenance_db.py index 2bb9cf4..212cf72 100644 --- a/swh/provenance/tests/test_provenance_db.py +++ b/swh/provenance/tests/test_provenance_db.py @@ -1,17 +1,17 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.provenance.interface import ProvenanceInterface -from swh.provenance.postgresql.provenancedb import ProvenanceDB +from swh.provenance.postgresql.provenance import ProvenanceStoragePostgreSql def test_provenance_flavor(provenance: ProvenanceInterface) -> None: - if isinstance(provenance.storage, ProvenanceDB): + if isinstance(provenance.storage, ProvenanceStoragePostgreSql): assert provenance.storage.flavor in ( "with-path", "without-path", "with-path-denormalized", "without-path-denormalized", )