diff --git a/requirements-test.txt b/requirements-test.txt index c530a6c..cdfc857 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,5 +1,4 @@ pytest pytest-mongodb swh.loader.git >= 0.8 swh.journal >= 0.8 -types-Werkzeug diff --git a/requirements.txt b/requirements.txt index 8f39169..199ddc7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,10 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html click iso8601 methodtools pymongo PyYAML types-click types-PyYAML -types-Werkzeug diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py index 681e476..0c43d40 100644 --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -1,107 +1,99 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from typing import TYPE_CHECKING import warnings if TYPE_CHECKING: from .archive import ArchiveInterface from .interface import ProvenanceInterface, ProvenanceStorageInterface def get_archive(cls: str, **kwargs) -> ArchiveInterface: """Get an archive object of class ``cls`` with arguments ``args``. Args: cls: archive's class, either 'api' or 'direct' args: dictionary of arguments passed to the archive class constructor Returns: an instance of archive object (either using swh.storage API or direct queries to the archive's database) Raises: :cls:`ValueError` if passed an unknown archive class. """ if cls == "api": from swh.storage import get_storage from .storage.archive import ArchiveStorage return ArchiveStorage(get_storage(**kwargs["storage"])) elif cls == "direct": from swh.core.db import BaseDb from .postgresql.archive import ArchivePostgreSQL return ArchivePostgreSQL(BaseDb.connect(**kwargs["db"]).conn) else: raise ValueError def get_provenance(**kwargs) -> ProvenanceInterface: """Get an provenance object with arguments ``args``. Args: args: dictionary of arguments to retrieve a swh.provenance.storage class (see :func:`get_provenance_storage` for details) Returns: an instance of provenance object """ from .provenance import Provenance return Provenance(get_provenance_storage(**kwargs)) def get_provenance_storage(cls: str, **kwargs) -> ProvenanceStorageInterface: """Get an archive object of class ``cls`` with arguments ``args``. Args: cls: storage's class, only 'local' is currently supported args: dictionary of arguments passed to the storage class constructor Returns: an instance of storage object Raises: :cls:`ValueError` if passed an unknown archive class. """ if cls in ["local", "postgresql"]: from swh.core.db import BaseDb from .postgresql.provenance import ProvenanceStoragePostgreSql if cls == "local": warnings.warn( '"local" class is deprecated for provenance storage, please ' 'use "postgresql" class instead.', DeprecationWarning, ) conn = BaseDb.connect(**kwargs["db"]).conn raise_on_commit = kwargs.get("raise_on_commit", False) return ProvenanceStoragePostgreSql(conn, raise_on_commit) elif cls == "mongodb": from pymongo import MongoClient from .mongo.backend import ProvenanceStorageMongoDb dbname = kwargs["db"].pop("dbname") db = MongoClient(**kwargs["db"]).get_database(dbname) return ProvenanceStorageMongoDb(db) - elif cls == "remote": - from .api.client import RemoteProvenanceStorage - - storage = RemoteProvenanceStorage(**kwargs) - assert isinstance(storage, ProvenanceStorageInterface) - return storage - - else: - raise ValueError + raise ValueError diff --git a/swh/provenance/api/client.py b/swh/provenance/api/client.py index c772c08..72761a5 100644 --- a/swh/provenance/api/client.py +++ b/swh/provenance/api/client.py @@ -1,17 +1,4 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information - -from swh.core.api import RPCClient - -from ..interface import ProvenanceStorageInterface -from .serializers import DECODERS, ENCODERS - - -class RemoteProvenanceStorage(RPCClient): - """Proxy to a remote provenance storage API""" - - backend_class = ProvenanceStorageInterface - extra_type_decoders = DECODERS - extra_type_encoders = ENCODERS diff --git a/swh/provenance/api/server.py b/swh/provenance/api/server.py index 60fb059..e030efd 100644 --- a/swh/provenance/api/server.py +++ b/swh/provenance/api/server.py @@ -1,148 +1,58 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import logging import os -from typing import Any, Dict, List, Optional - -from werkzeug.routing import Rule +from typing import Any, Dict, Optional from swh.core import config -from swh.core.api import JSONFormatter, MsgpackFormatter, RPCServerApp, negotiate -from swh.provenance import get_provenance_storage -from swh.provenance.interface import ProvenanceStorageInterface - -from .serializers import DECODERS, ENCODERS - -storage: Optional[ProvenanceStorageInterface] = None - - -def get_global_provenance_storage() -> ProvenanceStorageInterface: - global storage - if storage is None: - storage = get_provenance_storage(**app.config["provenance"]["storage"]) - return storage - - -class ProvenanceStorageServerApp(RPCServerApp): - extra_type_decoders = DECODERS - extra_type_encoders = ENCODERS - - -app = ProvenanceStorageServerApp( - __name__, - backend_class=ProvenanceStorageInterface, - backend_factory=get_global_provenance_storage, -) - - -def has_no_empty_params(rule: Rule) -> bool: - return len(rule.defaults or ()) >= len(rule.arguments or ()) - - -@app.route("/") -def index() -> str: - return """ -Software Heritage provenance storage RPC server - -

You have reached the -Software Heritage -provenance storage RPC server.
-See its -documentation -and API for more information

- -""" - - -@app.route("/site-map") -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def site_map() -> List[Dict[str, Any]]: - links = [] - for rule in app.url_map.iter_rules(): - if has_no_empty_params(rule) and hasattr( - ProvenanceStorageInterface, rule.endpoint - ): - links.append( - dict( - rule=rule.rule, - description=getattr( - ProvenanceStorageInterface, rule.endpoint - ).__doc__, - ) - ) - # links is now a list of url, endpoint tuples - return links def load_and_check_config( config_path: Optional[str], type: str = "local" ) -> Dict[str, Any]: """Check the minimal configuration is set to run the api or raise an error explanation. Args: config_path (str): Path to the configuration file to load type (str): configuration type. For 'local' type, more checks are done. Raises: Error if the setup is not as expected Returns: configuration as a dict """ if config_path is None: raise EnvironmentError("Configuration file must be defined") if not os.path.exists(config_path): raise FileNotFoundError(f"Configuration file {config_path} does not exist") cfg = config.read(config_path) pcfg: Optional[Dict[str, Any]] = cfg.get("provenance") if pcfg is None: raise KeyError("Missing 'provenance' configuration") scfg: Optional[Dict[str, Any]] = pcfg.get("storage") if scfg is None: raise KeyError("Missing 'provenance.storage' configuration") if type == "local": cls = scfg.get("cls") if cls != "postgresql": raise ValueError( "The provenance backend can only be started with a 'postgresql' " "configuration" ) db = scfg.get("db") if not db: raise KeyError("Invalid configuration; missing 'db' config entry") return cfg - - -api_cfg: Optional[Dict[str, Any]] = None - - -def make_app_from_configfile() -> ProvenanceStorageServerApp: - """Run the WSGI app from the webserver, loading the configuration from - a configuration file. - - SWH_CONFIG_FILENAME environment variable defines the - configuration path to load. - - """ - global api_cfg - if api_cfg is None: - config_path = os.environ.get("SWH_CONFIG_FILENAME") - api_cfg = load_and_check_config(config_path) - app.config.update(api_cfg) - handler = logging.StreamHandler() - app.logger.addHandler(handler) - return app diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py index cd2570d..a4f8905 100644 --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -1,241 +1,238 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control from datetime import datetime, timezone import os from typing import Any, Dict, Generator, Optional, Tuple import click import iso8601 import yaml from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.model import Sha1Git # All generic config code should reside in swh.core.config CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) DEFAULT_CONFIG: Dict[str, Any] = { "provenance": { "archive": { # Storage API based Archive object # "cls": "api", # "storage": { # "cls": "remote", # "url": "http://uffizi.internal.softwareheritage.org:5002", # } # Direct access Archive object "cls": "direct", "db": { "host": "db.internal.softwareheritage.org", "dbname": "softwareheritage", "user": "guest", }, }, "storage": { # Local PostgreSQL Storage "cls": "postgresql", "db": { "host": "localhost", "user": "postgres", "password": "postgres", "dbname": "provenance", }, # Local MongoDB Storage # "cls": "mongodb", # "db": { # "dbname": "provenance", # }, - # Remote REST-API/PostgreSQL - # "cls": "remote", - # "url": "http://localhost:8080/%2f", }, } } CONFIG_FILE_HELP = f""" \b Configuration can be loaded from a yaml file given either as --config-file option or the {CONFIG_ENVVAR} environment variable. If no configuration file is specified, use the following default configuration:: \b {yaml.dump(DEFAULT_CONFIG)}""" PROVENANCE_HELP = f"""Software Heritage provenance index database tools {CONFIG_FILE_HELP} """ @swh_cli_group.group( name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP ) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=True, dir_okay=False, path_type=str), help="""YAML configuration file.""", ) @click.option( "-P", "--profile", default=None, type=click.Path(exists=False, dir_okay=False, path_type=str), help="""Enable profiling to specified file.""", ) @click.pass_context def cli(ctx: click.core.Context, config_file: Optional[str], profile: str) -> None: if ( config_file is None and DEFAULT_PATH is not None and config.config_exists(DEFAULT_PATH) ): config_file = DEFAULT_PATH if config_file is None: conf = DEFAULT_CONFIG else: # read_raw_config do not fail on ENOENT if not os.path.exists(config_file): raise FileNotFoundError(config_file) conf = yaml.safe_load(open(config_file, "rb")) ctx.ensure_object(dict) ctx.obj["config"] = conf if profile: import atexit import cProfile print("Profiling...") pr = cProfile.Profile() pr.enable() def exit() -> None: pr.disable() pr.dump_stats(profile) atexit.register(exit) @cli.command(name="iter-revisions") @click.argument("filename") @click.option("-a", "--track-all", default=True, type=bool) @click.option("-l", "--limit", type=int) @click.option("-m", "--min-depth", default=1, type=int) @click.option("-r", "--reuse", default=True, type=bool) @click.pass_context def iter_revisions( ctx: click.core.Context, filename: str, track_all: bool, limit: Optional[int], min_depth: int, reuse: bool, ) -> None: # TODO: add file size filtering """Process a provided list of revisions.""" from . import get_archive, get_provenance from .revision import CSVRevisionIterator, revision_add archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) revisions_provider = generate_revision_tuples(filename) revisions = CSVRevisionIterator(revisions_provider, limit=limit) for revision in revisions: revision_add( provenance, archive, [revision], trackall=track_all, lower=reuse, mindepth=min_depth, ) def generate_revision_tuples( filename: str, ) -> Generator[Tuple[Sha1Git, datetime, Sha1Git], None, None]: for line in open(filename, "r"): if line.strip(): revision, date, root = line.strip().split(",") yield ( hash_to_bytes(revision), iso8601.parse_date(date, default_timezone=timezone.utc), hash_to_bytes(root), ) @cli.command(name="iter-origins") @click.argument("filename") @click.option("-l", "--limit", type=int) @click.pass_context def iter_origins(ctx: click.core.Context, filename: str, limit: Optional[int]) -> None: """Process a provided list of origins.""" from . import get_archive, get_provenance from .origin import CSVOriginIterator, origin_add archive = get_archive(**ctx.obj["config"]["provenance"]["archive"]) provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) origins_provider = generate_origin_tuples(filename) origins = CSVOriginIterator(origins_provider, limit=limit) for origin in origins: origin_add(provenance, archive, [origin]) def generate_origin_tuples(filename: str) -> Generator[Tuple[str, bytes], None, None]: for line in open(filename, "r"): if line.strip(): url, snapshot = line.strip().split(",") yield (url, hash_to_bytes(snapshot)) @cli.command(name="find-first") @click.argument("swhid") @click.pass_context def find_first(ctx: click.core.Context, swhid: str) -> None: """Find first occurrence of the requested blob.""" from . import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) occur = provenance.content_find_first(hash_to_bytes(swhid)) if occur is not None: print( f"swh:1:cnt:{hash_to_hex(occur.content)}, " f"swh:1:rev:{hash_to_hex(occur.revision)}, " f"{occur.date}, " f"{occur.origin}, " f"{os.fsdecode(occur.path)}" ) else: print(f"Cannot find a content with the id {swhid}") @cli.command(name="find-all") @click.argument("swhid") @click.option("-l", "--limit", type=int) @click.pass_context def find_all(ctx: click.core.Context, swhid: str, limit: Optional[int]) -> None: """Find all occurrences of the requested blob.""" from . import get_provenance provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"]) for occur in provenance.content_find_all(hash_to_bytes(swhid), limit=limit): print( f"swh:1:cnt:{hash_to_hex(occur.content)}, " f"swh:1:rev:{hash_to_hex(occur.revision)}, " f"{occur.date}, " f"{occur.origin}, " f"{os.fsdecode(occur.path)}" ) diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py index 67d98c5..f30f34b 100644 --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -1,153 +1,128 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta, timezone from os import path -from typing import Any, Dict, Iterable, Iterator +from typing import Any, Dict, Iterable from _pytest.fixtures import SubRequest import msgpack import psycopg2.extensions import pymongo.database import pytest from pytest_postgresql.factories import postgresql from swh.journal.serializers import msgpack_ext_hook from swh.provenance import get_provenance, get_provenance_storage -from swh.provenance.api.client import RemoteProvenanceStorage -import swh.provenance.api.server as server from swh.provenance.archive import ArchiveInterface from swh.provenance.interface import ProvenanceInterface, ProvenanceStorageInterface from swh.provenance.storage.archive import ArchiveStorage from swh.storage.interface import StorageInterface from swh.storage.replay import process_replay_objects @pytest.fixture( params=[ "with-path", "without-path", "with-path-denormalized", "without-path-denormalized", ] ) def provenance_postgresqldb( request: SubRequest, postgresql: psycopg2.extensions.connection, ) -> Dict[str, str]: """return a working and initialized provenance db""" from swh.core.cli.db import populate_database_for_package populate_database_for_package( "swh.provenance", postgresql.dsn, flavor=request.param ) return postgresql.get_dsn_parameters() -# the Flask app used as server in these tests -@pytest.fixture -def app( - provenance_postgresqldb: Dict[str, str] -) -> Iterator[server.ProvenanceStorageServerApp]: - assert hasattr(server, "storage") - server.storage = get_provenance_storage( - cls="postgresql", db=provenance_postgresqldb - ) - yield server.app - - -# the RPCClient class used as client used in these tests -@pytest.fixture -def swh_rpc_client_class() -> type: - return RemoteProvenanceStorage - - -@pytest.fixture(params=["mongodb", "postgresql", "remote"]) +@pytest.fixture(params=["mongodb", "postgresql"]) def provenance_storage( request: SubRequest, provenance_postgresqldb: Dict[str, str], mongodb: pymongo.database.Database, - swh_rpc_client: RemoteProvenanceStorage, ) -> ProvenanceStorageInterface: """Return a working and initialized ProvenanceStorageInterface object""" - if request.param == "remote": - assert isinstance(swh_rpc_client, ProvenanceStorageInterface) - return swh_rpc_client - - elif request.param == "mongodb": + if request.param == "mongodb": from swh.provenance.mongo.backend import ProvenanceStorageMongoDb return ProvenanceStorageMongoDb(mongodb) else: # in test sessions, we DO want to raise any exception occurring at commit time return get_provenance_storage( cls=request.param, db=provenance_postgresqldb, raise_on_commit=True ) provenance_postgresql = postgresql("postgresql_proc", dbname="provenance_tests") @pytest.fixture def provenance( provenance_postgresql: psycopg2.extensions.connection, ) -> ProvenanceInterface: """Return a working and initialized ProvenanceInterface object""" from swh.core.cli.db import populate_database_for_package populate_database_for_package( "swh.provenance", provenance_postgresql.dsn, flavor="with-path" ) # in test sessions, we DO want to raise any exception occurring at commit time return get_provenance( cls="postgresql", db=provenance_postgresql.get_dsn_parameters(), raise_on_commit=True, ) @pytest.fixture def archive(swh_storage: StorageInterface) -> ArchiveInterface: """Return an ArchiveStorage-based ArchiveInterface object""" return ArchiveStorage(swh_storage) def get_datafile(fname: str) -> str: return path.join(path.dirname(__file__), "data", fname) def load_repo_data(repo: str) -> Dict[str, Any]: data: Dict[str, Any] = {} with open(get_datafile(f"{repo}.msgpack"), "rb") as fobj: unpacker = msgpack.Unpacker( fobj, raw=False, ext_hook=msgpack_ext_hook, strict_map_key=False, timestamp=3, # convert Timestamp in datetime objects (tz UTC) ) for objtype, objd in unpacker: data.setdefault(objtype, []).append(objd) return data def filter_dict(d: Dict[Any, Any], keys: Iterable[Any]) -> Dict[Any, Any]: return {k: v for (k, v) in d.items() if k in keys} def fill_storage(storage: StorageInterface, data: Dict[str, Any]) -> None: process_replay_objects(data, storage=storage) # TODO: remove this function in favour of TimestampWithTimezone.to_datetime # from swh.model.model def ts2dt(ts: Dict[str, Any]) -> datetime: timestamp = datetime.fromtimestamp( ts["timestamp"]["seconds"], timezone(timedelta(minutes=ts["offset"])) ) return timestamp.replace(microsecond=ts["timestamp"]["microseconds"])