diff --git a/requirements-test.txt b/requirements-test.txt
index c530a6c..cdfc857 100644
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,5 +1,4 @@
pytest
pytest-mongodb
swh.loader.git >= 0.8
swh.journal >= 0.8
-types-Werkzeug
diff --git a/requirements.txt b/requirements.txt
index 8f39169..199ddc7 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,11 +1,10 @@
# Add here external Python modules dependencies, one per line. Module names
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
click
iso8601
methodtools
pymongo
PyYAML
types-click
types-PyYAML
-types-Werkzeug
diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py
index 681e476..0c43d40 100644
--- a/swh/provenance/__init__.py
+++ b/swh/provenance/__init__.py
@@ -1,107 +1,99 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
from typing import TYPE_CHECKING
import warnings
if TYPE_CHECKING:
from .archive import ArchiveInterface
from .interface import ProvenanceInterface, ProvenanceStorageInterface
def get_archive(cls: str, **kwargs) -> ArchiveInterface:
"""Get an archive object of class ``cls`` with arguments ``args``.
Args:
cls: archive's class, either 'api' or 'direct'
args: dictionary of arguments passed to the archive class constructor
Returns:
an instance of archive object (either using swh.storage API or direct
queries to the archive's database)
Raises:
:cls:`ValueError` if passed an unknown archive class.
"""
if cls == "api":
from swh.storage import get_storage
from .storage.archive import ArchiveStorage
return ArchiveStorage(get_storage(**kwargs["storage"]))
elif cls == "direct":
from swh.core.db import BaseDb
from .postgresql.archive import ArchivePostgreSQL
return ArchivePostgreSQL(BaseDb.connect(**kwargs["db"]).conn)
else:
raise ValueError
def get_provenance(**kwargs) -> ProvenanceInterface:
"""Get an provenance object with arguments ``args``.
Args:
args: dictionary of arguments to retrieve a swh.provenance.storage
class (see :func:`get_provenance_storage` for details)
Returns:
an instance of provenance object
"""
from .provenance import Provenance
return Provenance(get_provenance_storage(**kwargs))
def get_provenance_storage(cls: str, **kwargs) -> ProvenanceStorageInterface:
"""Get an archive object of class ``cls`` with arguments ``args``.
Args:
cls: storage's class, only 'local' is currently supported
args: dictionary of arguments passed to the storage class constructor
Returns:
an instance of storage object
Raises:
:cls:`ValueError` if passed an unknown archive class.
"""
if cls in ["local", "postgresql"]:
from swh.core.db import BaseDb
from .postgresql.provenance import ProvenanceStoragePostgreSql
if cls == "local":
warnings.warn(
'"local" class is deprecated for provenance storage, please '
'use "postgresql" class instead.',
DeprecationWarning,
)
conn = BaseDb.connect(**kwargs["db"]).conn
raise_on_commit = kwargs.get("raise_on_commit", False)
return ProvenanceStoragePostgreSql(conn, raise_on_commit)
elif cls == "mongodb":
from pymongo import MongoClient
from .mongo.backend import ProvenanceStorageMongoDb
dbname = kwargs["db"].pop("dbname")
db = MongoClient(**kwargs["db"]).get_database(dbname)
return ProvenanceStorageMongoDb(db)
- elif cls == "remote":
- from .api.client import RemoteProvenanceStorage
-
- storage = RemoteProvenanceStorage(**kwargs)
- assert isinstance(storage, ProvenanceStorageInterface)
- return storage
-
- else:
- raise ValueError
+ raise ValueError
diff --git a/swh/provenance/api/client.py b/swh/provenance/api/client.py
index c772c08..72761a5 100644
--- a/swh/provenance/api/client.py
+++ b/swh/provenance/api/client.py
@@ -1,17 +1,4 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-
-from swh.core.api import RPCClient
-
-from ..interface import ProvenanceStorageInterface
-from .serializers import DECODERS, ENCODERS
-
-
-class RemoteProvenanceStorage(RPCClient):
- """Proxy to a remote provenance storage API"""
-
- backend_class = ProvenanceStorageInterface
- extra_type_decoders = DECODERS
- extra_type_encoders = ENCODERS
diff --git a/swh/provenance/api/server.py b/swh/provenance/api/server.py
index 60fb059..e030efd 100644
--- a/swh/provenance/api/server.py
+++ b/swh/provenance/api/server.py
@@ -1,148 +1,58 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import logging
import os
-from typing import Any, Dict, List, Optional
-
-from werkzeug.routing import Rule
+from typing import Any, Dict, Optional
from swh.core import config
-from swh.core.api import JSONFormatter, MsgpackFormatter, RPCServerApp, negotiate
-from swh.provenance import get_provenance_storage
-from swh.provenance.interface import ProvenanceStorageInterface
-
-from .serializers import DECODERS, ENCODERS
-
-storage: Optional[ProvenanceStorageInterface] = None
-
-
-def get_global_provenance_storage() -> ProvenanceStorageInterface:
- global storage
- if storage is None:
- storage = get_provenance_storage(**app.config["provenance"]["storage"])
- return storage
-
-
-class ProvenanceStorageServerApp(RPCServerApp):
- extra_type_decoders = DECODERS
- extra_type_encoders = ENCODERS
-
-
-app = ProvenanceStorageServerApp(
- __name__,
- backend_class=ProvenanceStorageInterface,
- backend_factory=get_global_provenance_storage,
-)
-
-
-def has_no_empty_params(rule: Rule) -> bool:
- return len(rule.defaults or ()) >= len(rule.arguments or ())
-
-
-@app.route("/")
-def index() -> str:
- return """
-
Software Heritage provenance storage RPC server
-
-You have reached the
-Software Heritage
-provenance storage RPC server.
-See its
-documentation
-and API for more information
-
-"""
-
-
-@app.route("/site-map")
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def site_map() -> List[Dict[str, Any]]:
- links = []
- for rule in app.url_map.iter_rules():
- if has_no_empty_params(rule) and hasattr(
- ProvenanceStorageInterface, rule.endpoint
- ):
- links.append(
- dict(
- rule=rule.rule,
- description=getattr(
- ProvenanceStorageInterface, rule.endpoint
- ).__doc__,
- )
- )
- # links is now a list of url, endpoint tuples
- return links
def load_and_check_config(
config_path: Optional[str], type: str = "local"
) -> Dict[str, Any]:
"""Check the minimal configuration is set to run the api or raise an
error explanation.
Args:
config_path (str): Path to the configuration file to load
type (str): configuration type. For 'local' type, more
checks are done.
Raises:
Error if the setup is not as expected
Returns:
configuration as a dict
"""
if config_path is None:
raise EnvironmentError("Configuration file must be defined")
if not os.path.exists(config_path):
raise FileNotFoundError(f"Configuration file {config_path} does not exist")
cfg = config.read(config_path)
pcfg: Optional[Dict[str, Any]] = cfg.get("provenance")
if pcfg is None:
raise KeyError("Missing 'provenance' configuration")
scfg: Optional[Dict[str, Any]] = pcfg.get("storage")
if scfg is None:
raise KeyError("Missing 'provenance.storage' configuration")
if type == "local":
cls = scfg.get("cls")
if cls != "postgresql":
raise ValueError(
"The provenance backend can only be started with a 'postgresql' "
"configuration"
)
db = scfg.get("db")
if not db:
raise KeyError("Invalid configuration; missing 'db' config entry")
return cfg
-
-
-api_cfg: Optional[Dict[str, Any]] = None
-
-
-def make_app_from_configfile() -> ProvenanceStorageServerApp:
- """Run the WSGI app from the webserver, loading the configuration from
- a configuration file.
-
- SWH_CONFIG_FILENAME environment variable defines the
- configuration path to load.
-
- """
- global api_cfg
- if api_cfg is None:
- config_path = os.environ.get("SWH_CONFIG_FILENAME")
- api_cfg = load_and_check_config(config_path)
- app.config.update(api_cfg)
- handler = logging.StreamHandler()
- app.logger.addHandler(handler)
- return app
diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py
index cd2570d..a4f8905 100644
--- a/swh/provenance/cli.py
+++ b/swh/provenance/cli.py
@@ -1,241 +1,238 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
from datetime import datetime, timezone
import os
from typing import Any, Dict, Generator, Optional, Tuple
import click
import iso8601
import yaml
from swh.core import config
from swh.core.cli import CONTEXT_SETTINGS
from swh.core.cli import swh as swh_cli_group
from swh.model.hashutil import hash_to_bytes, hash_to_hex
from swh.model.model import Sha1Git
# All generic config code should reside in swh.core.config
CONFIG_ENVVAR = "SWH_CONFIG_FILENAME"
DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None)
DEFAULT_CONFIG: Dict[str, Any] = {
"provenance": {
"archive": {
# Storage API based Archive object
# "cls": "api",
# "storage": {
# "cls": "remote",
# "url": "http://uffizi.internal.softwareheritage.org:5002",
# }
# Direct access Archive object
"cls": "direct",
"db": {
"host": "db.internal.softwareheritage.org",
"dbname": "softwareheritage",
"user": "guest",
},
},
"storage": {
# Local PostgreSQL Storage
"cls": "postgresql",
"db": {
"host": "localhost",
"user": "postgres",
"password": "postgres",
"dbname": "provenance",
},
# Local MongoDB Storage
# "cls": "mongodb",
# "db": {
# "dbname": "provenance",
# },
- # Remote REST-API/PostgreSQL
- # "cls": "remote",
- # "url": "http://localhost:8080/%2f",
},
}
}
CONFIG_FILE_HELP = f"""
\b Configuration can be loaded from a yaml file given either as --config-file
option or the {CONFIG_ENVVAR} environment variable. If no configuration file
is specified, use the following default configuration::
\b
{yaml.dump(DEFAULT_CONFIG)}"""
PROVENANCE_HELP = f"""Software Heritage provenance index database tools
{CONFIG_FILE_HELP}
"""
@swh_cli_group.group(
name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP
)
@click.option(
"-C",
"--config-file",
default=None,
type=click.Path(exists=True, dir_okay=False, path_type=str),
help="""YAML configuration file.""",
)
@click.option(
"-P",
"--profile",
default=None,
type=click.Path(exists=False, dir_okay=False, path_type=str),
help="""Enable profiling to specified file.""",
)
@click.pass_context
def cli(ctx: click.core.Context, config_file: Optional[str], profile: str) -> None:
if (
config_file is None
and DEFAULT_PATH is not None
and config.config_exists(DEFAULT_PATH)
):
config_file = DEFAULT_PATH
if config_file is None:
conf = DEFAULT_CONFIG
else:
# read_raw_config do not fail on ENOENT
if not os.path.exists(config_file):
raise FileNotFoundError(config_file)
conf = yaml.safe_load(open(config_file, "rb"))
ctx.ensure_object(dict)
ctx.obj["config"] = conf
if profile:
import atexit
import cProfile
print("Profiling...")
pr = cProfile.Profile()
pr.enable()
def exit() -> None:
pr.disable()
pr.dump_stats(profile)
atexit.register(exit)
@cli.command(name="iter-revisions")
@click.argument("filename")
@click.option("-a", "--track-all", default=True, type=bool)
@click.option("-l", "--limit", type=int)
@click.option("-m", "--min-depth", default=1, type=int)
@click.option("-r", "--reuse", default=True, type=bool)
@click.pass_context
def iter_revisions(
ctx: click.core.Context,
filename: str,
track_all: bool,
limit: Optional[int],
min_depth: int,
reuse: bool,
) -> None:
# TODO: add file size filtering
"""Process a provided list of revisions."""
from . import get_archive, get_provenance
from .revision import CSVRevisionIterator, revision_add
archive = get_archive(**ctx.obj["config"]["provenance"]["archive"])
provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"])
revisions_provider = generate_revision_tuples(filename)
revisions = CSVRevisionIterator(revisions_provider, limit=limit)
for revision in revisions:
revision_add(
provenance,
archive,
[revision],
trackall=track_all,
lower=reuse,
mindepth=min_depth,
)
def generate_revision_tuples(
filename: str,
) -> Generator[Tuple[Sha1Git, datetime, Sha1Git], None, None]:
for line in open(filename, "r"):
if line.strip():
revision, date, root = line.strip().split(",")
yield (
hash_to_bytes(revision),
iso8601.parse_date(date, default_timezone=timezone.utc),
hash_to_bytes(root),
)
@cli.command(name="iter-origins")
@click.argument("filename")
@click.option("-l", "--limit", type=int)
@click.pass_context
def iter_origins(ctx: click.core.Context, filename: str, limit: Optional[int]) -> None:
"""Process a provided list of origins."""
from . import get_archive, get_provenance
from .origin import CSVOriginIterator, origin_add
archive = get_archive(**ctx.obj["config"]["provenance"]["archive"])
provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"])
origins_provider = generate_origin_tuples(filename)
origins = CSVOriginIterator(origins_provider, limit=limit)
for origin in origins:
origin_add(provenance, archive, [origin])
def generate_origin_tuples(filename: str) -> Generator[Tuple[str, bytes], None, None]:
for line in open(filename, "r"):
if line.strip():
url, snapshot = line.strip().split(",")
yield (url, hash_to_bytes(snapshot))
@cli.command(name="find-first")
@click.argument("swhid")
@click.pass_context
def find_first(ctx: click.core.Context, swhid: str) -> None:
"""Find first occurrence of the requested blob."""
from . import get_provenance
provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"])
occur = provenance.content_find_first(hash_to_bytes(swhid))
if occur is not None:
print(
f"swh:1:cnt:{hash_to_hex(occur.content)}, "
f"swh:1:rev:{hash_to_hex(occur.revision)}, "
f"{occur.date}, "
f"{occur.origin}, "
f"{os.fsdecode(occur.path)}"
)
else:
print(f"Cannot find a content with the id {swhid}")
@cli.command(name="find-all")
@click.argument("swhid")
@click.option("-l", "--limit", type=int)
@click.pass_context
def find_all(ctx: click.core.Context, swhid: str, limit: Optional[int]) -> None:
"""Find all occurrences of the requested blob."""
from . import get_provenance
provenance = get_provenance(**ctx.obj["config"]["provenance"]["storage"])
for occur in provenance.content_find_all(hash_to_bytes(swhid), limit=limit):
print(
f"swh:1:cnt:{hash_to_hex(occur.content)}, "
f"swh:1:rev:{hash_to_hex(occur.revision)}, "
f"{occur.date}, "
f"{occur.origin}, "
f"{os.fsdecode(occur.path)}"
)
diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py
index 67d98c5..f30f34b 100644
--- a/swh/provenance/tests/conftest.py
+++ b/swh/provenance/tests/conftest.py
@@ -1,153 +1,128 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timedelta, timezone
from os import path
-from typing import Any, Dict, Iterable, Iterator
+from typing import Any, Dict, Iterable
from _pytest.fixtures import SubRequest
import msgpack
import psycopg2.extensions
import pymongo.database
import pytest
from pytest_postgresql.factories import postgresql
from swh.journal.serializers import msgpack_ext_hook
from swh.provenance import get_provenance, get_provenance_storage
-from swh.provenance.api.client import RemoteProvenanceStorage
-import swh.provenance.api.server as server
from swh.provenance.archive import ArchiveInterface
from swh.provenance.interface import ProvenanceInterface, ProvenanceStorageInterface
from swh.provenance.storage.archive import ArchiveStorage
from swh.storage.interface import StorageInterface
from swh.storage.replay import process_replay_objects
@pytest.fixture(
params=[
"with-path",
"without-path",
"with-path-denormalized",
"without-path-denormalized",
]
)
def provenance_postgresqldb(
request: SubRequest,
postgresql: psycopg2.extensions.connection,
) -> Dict[str, str]:
"""return a working and initialized provenance db"""
from swh.core.cli.db import populate_database_for_package
populate_database_for_package(
"swh.provenance", postgresql.dsn, flavor=request.param
)
return postgresql.get_dsn_parameters()
-# the Flask app used as server in these tests
-@pytest.fixture
-def app(
- provenance_postgresqldb: Dict[str, str]
-) -> Iterator[server.ProvenanceStorageServerApp]:
- assert hasattr(server, "storage")
- server.storage = get_provenance_storage(
- cls="postgresql", db=provenance_postgresqldb
- )
- yield server.app
-
-
-# the RPCClient class used as client used in these tests
-@pytest.fixture
-def swh_rpc_client_class() -> type:
- return RemoteProvenanceStorage
-
-
-@pytest.fixture(params=["mongodb", "postgresql", "remote"])
+@pytest.fixture(params=["mongodb", "postgresql"])
def provenance_storage(
request: SubRequest,
provenance_postgresqldb: Dict[str, str],
mongodb: pymongo.database.Database,
- swh_rpc_client: RemoteProvenanceStorage,
) -> ProvenanceStorageInterface:
"""Return a working and initialized ProvenanceStorageInterface object"""
- if request.param == "remote":
- assert isinstance(swh_rpc_client, ProvenanceStorageInterface)
- return swh_rpc_client
-
- elif request.param == "mongodb":
+ if request.param == "mongodb":
from swh.provenance.mongo.backend import ProvenanceStorageMongoDb
return ProvenanceStorageMongoDb(mongodb)
else:
# in test sessions, we DO want to raise any exception occurring at commit time
return get_provenance_storage(
cls=request.param, db=provenance_postgresqldb, raise_on_commit=True
)
provenance_postgresql = postgresql("postgresql_proc", dbname="provenance_tests")
@pytest.fixture
def provenance(
provenance_postgresql: psycopg2.extensions.connection,
) -> ProvenanceInterface:
"""Return a working and initialized ProvenanceInterface object"""
from swh.core.cli.db import populate_database_for_package
populate_database_for_package(
"swh.provenance", provenance_postgresql.dsn, flavor="with-path"
)
# in test sessions, we DO want to raise any exception occurring at commit time
return get_provenance(
cls="postgresql",
db=provenance_postgresql.get_dsn_parameters(),
raise_on_commit=True,
)
@pytest.fixture
def archive(swh_storage: StorageInterface) -> ArchiveInterface:
"""Return an ArchiveStorage-based ArchiveInterface object"""
return ArchiveStorage(swh_storage)
def get_datafile(fname: str) -> str:
return path.join(path.dirname(__file__), "data", fname)
def load_repo_data(repo: str) -> Dict[str, Any]:
data: Dict[str, Any] = {}
with open(get_datafile(f"{repo}.msgpack"), "rb") as fobj:
unpacker = msgpack.Unpacker(
fobj,
raw=False,
ext_hook=msgpack_ext_hook,
strict_map_key=False,
timestamp=3, # convert Timestamp in datetime objects (tz UTC)
)
for objtype, objd in unpacker:
data.setdefault(objtype, []).append(objd)
return data
def filter_dict(d: Dict[Any, Any], keys: Iterable[Any]) -> Dict[Any, Any]:
return {k: v for (k, v) in d.items() if k in keys}
def fill_storage(storage: StorageInterface, data: Dict[str, Any]) -> None:
process_replay_objects(data, storage=storage)
# TODO: remove this function in favour of TimestampWithTimezone.to_datetime
# from swh.model.model
def ts2dt(ts: Dict[str, Any]) -> datetime:
timestamp = datetime.fromtimestamp(
ts["timestamp"]["seconds"], timezone(timedelta(minutes=ts["offset"]))
)
return timestamp.replace(microsecond=ts["timestamp"]["microseconds"])