diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py
index 1e560d2..fc926c4 100644
--- a/swh/provenance/__init__.py
+++ b/swh/provenance/__init__.py
@@ -1,89 +1,97 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .archive import ArchiveInterface
from .interface import ProvenanceInterface, ProvenanceStorageInterface
def get_archive(cls: str, **kwargs) -> ArchiveInterface:
"""Get an archive object of class ``cls`` with arguments ``args``.
Args:
cls: archive's class, either 'api' or 'direct'
args: dictionary of arguments passed to the archive class constructor
Returns:
an instance of archive object (either using swh.storage API or direct
queries to the archive's database)
Raises:
:cls:`ValueError` if passed an unknown archive class.
"""
if cls == "api":
from swh.storage import get_storage
from .storage.archive import ArchiveStorage
return ArchiveStorage(get_storage(**kwargs["storage"]))
elif cls == "direct":
from swh.core.db import BaseDb
from .postgresql.archive import ArchivePostgreSQL
return ArchivePostgreSQL(BaseDb.connect(**kwargs["db"]).conn)
else:
raise ValueError
def get_provenance(**kwargs) -> ProvenanceInterface:
"""Get an provenance object with arguments ``args``.
Args:
args: dictionary of arguments to retrieve a swh.provenance.storage
class (see :func:`get_provenance_storage` for details)
Returns:
an instance of provenance object
"""
from .provenance import Provenance
return Provenance(get_provenance_storage(**kwargs))
def get_provenance_storage(cls: str, **kwargs) -> ProvenanceStorageInterface:
"""Get an archive object of class ``cls`` with arguments ``args``.
Args:
cls: storage's class, only 'local' is currently supported
args: dictionary of arguments passed to the storage class constructor
Returns:
an instance of storage object
Raises:
:cls:`ValueError` if passed an unknown archive class.
"""
if cls == "local":
from swh.core.db import BaseDb
from .postgresql.provenancedb_base import ProvenanceDBBase
conn = BaseDb.connect(**kwargs["db"]).conn
raise_on_commit = kwargs.get("raise_on_commit", False)
if ProvenanceDBBase(conn, raise_on_commit).flavor == "with-path":
from .postgresql.provenancedb_with_path import ProvenanceWithPathDB
return ProvenanceWithPathDB(conn, raise_on_commit)
else:
from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB
return ProvenanceWithoutPathDB(conn, raise_on_commit)
+
+ elif cls == "remote":
+ from .api.client import RemoteProvenanceStorage
+
+ storage = RemoteProvenanceStorage(**kwargs)
+ assert isinstance(storage, ProvenanceStorageInterface)
+ return storage
+
else:
raise ValueError
diff --git a/swh/provenance/api/__init__.py b/swh/provenance/api/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/swh/provenance/api/client.py b/swh/provenance/api/client.py
new file mode 100644
index 0000000..c772c08
--- /dev/null
+++ b/swh/provenance/api/client.py
@@ -0,0 +1,17 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from swh.core.api import RPCClient
+
+from ..interface import ProvenanceStorageInterface
+from .serializers import DECODERS, ENCODERS
+
+
+class RemoteProvenanceStorage(RPCClient):
+ """Proxy to a remote provenance storage API"""
+
+ backend_class = ProvenanceStorageInterface
+ extra_type_decoders = DECODERS
+ extra_type_encoders = ENCODERS
diff --git a/swh/provenance/api/serializers.py b/swh/provenance/api/serializers.py
new file mode 100644
index 0000000..ef3d83d
--- /dev/null
+++ b/swh/provenance/api/serializers.py
@@ -0,0 +1,48 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from dataclasses import asdict
+from typing import Callable, Dict, List, Tuple
+
+from .. import interface
+
+
+def _encode_dataclass(obj):
+ return {
+ **asdict(obj),
+ "__type__": type(obj).__name__,
+ }
+
+
+def _decode_dataclass(d):
+ return getattr(interface, d.pop("__type__"))(**d)
+
+
+def _encode_enum(obj):
+ return {
+ "value": obj.value,
+ "__type__": type(obj).__name__,
+ }
+
+
+def _decode_enum(d):
+ return getattr(interface, d.pop("__type__"))(d["value"])
+
+
+ENCODERS: List[Tuple[type, str, Callable]] = [
+ (interface.ProvenanceResult, "dataclass", _encode_dataclass),
+ (interface.RelationData, "dataclass", _encode_dataclass),
+ (interface.RevisionData, "dataclass", _encode_dataclass),
+ (interface.EntityType, "enum", _encode_enum),
+ (interface.RelationType, "enum", _encode_enum),
+ (set, "set", list),
+]
+
+
+DECODERS: Dict[str, Callable] = {
+ "dataclass": _decode_dataclass,
+ "enum": _decode_enum,
+ "set": set,
+}
diff --git a/swh/provenance/api/server.py b/swh/provenance/api/server.py
new file mode 100644
index 0000000..40a67c7
--- /dev/null
+++ b/swh/provenance/api/server.py
@@ -0,0 +1,143 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import logging
+import os
+
+from swh.core import config
+from swh.core.api import JSONFormatter, MsgpackFormatter, RPCServerApp, negotiate
+from swh.provenance import get_provenance_storage
+from swh.provenance.interface import ProvenanceStorageInterface
+
+from .serializers import DECODERS, ENCODERS
+
+storage = None
+
+
+def get_global_provenance_storage():
+ global storage
+ if not storage:
+ storage = get_provenance_storage(**app.config["provenance"]["storage"])
+ return storage
+
+
+class ProvenanceStorageServerApp(RPCServerApp):
+ extra_type_decoders = DECODERS
+ extra_type_encoders = ENCODERS
+
+
+app = ProvenanceStorageServerApp(
+ __name__,
+ backend_class=ProvenanceStorageInterface,
+ backend_factory=get_global_provenance_storage,
+)
+
+
+def has_no_empty_params(rule):
+ return len(rule.defaults or ()) >= len(rule.arguments or ())
+
+
+@app.route("/")
+def index():
+ return """
+
Software Heritage provenance storage RPC server
+
+You have reached the
+Software Heritage
+provenance storage RPC server.
+See its
+documentation
+and API for more information
+
+"""
+
+
+@app.route("/site-map")
+@negotiate(MsgpackFormatter)
+@negotiate(JSONFormatter)
+def site_map():
+ links = []
+ for rule in app.url_map.iter_rules():
+ if has_no_empty_params(rule) and hasattr(
+ ProvenanceStorageInterface, rule.endpoint
+ ):
+ links.append(
+ dict(
+ rule=rule.rule,
+ description=getattr(
+ ProvenanceStorageInterface, rule.endpoint
+ ).__doc__,
+ )
+ )
+ # links is now a list of url, endpoint tuples
+ return links
+
+
+def load_and_check_config(config_path, type="local"):
+ """Check the minimal configuration is set to run the api or raise an
+ error explanation.
+
+ Args:
+ config_path (str): Path to the configuration file to load
+ type (str): configuration type. For 'local' type, more
+ checks are done.
+
+ Raises:
+ Error if the setup is not as expected
+
+ Returns:
+ configuration as a dict
+
+ """
+ if not config_path:
+ raise EnvironmentError("Configuration file must be defined")
+
+ if not os.path.exists(config_path):
+ raise FileNotFoundError(f"Configuration file {config_path} does not exist")
+
+ cfg = config.read(config_path)
+
+ pcfg = cfg.get("provenance")
+ if not pcfg:
+ raise KeyError("Missing 'provenance' configuration")
+
+ scfg = pcfg.get("storage")
+ if not scfg:
+ raise KeyError("Missing 'provenance.storage' configuration")
+
+ if type == "local":
+ cls = scfg.get("cls")
+ if cls != "local":
+ raise ValueError(
+ "The provenance backend can only be started with a 'local' "
+ "configuration"
+ )
+
+ db = scfg.get("db")
+ if not db:
+ raise KeyError("Invalid configuration; missing 'db' config entry")
+
+ return cfg
+
+
+api_cfg = None
+
+
+def make_app_from_configfile():
+ """Run the WSGI app from the webserver, loading the configuration from
+ a configuration file.
+
+ SWH_CONFIG_FILENAME environment variable defines the
+ configuration path to load.
+
+ """
+ global api_cfg
+ if not api_cfg:
+ config_path = os.environ.get("SWH_CONFIG_FILENAME")
+ api_cfg = load_and_check_config(config_path)
+ app.config.update(api_cfg)
+ handler = logging.StreamHandler()
+ app.logger.addHandler(handler)
+ return app
diff --git a/swh/provenance/interface.py b/swh/provenance/interface.py
index 419a4a0..e1e20b6 100644
--- a/swh/provenance/interface.py
+++ b/swh/provenance/interface.py
@@ -1,304 +1,322 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from dataclasses import dataclass
from datetime import datetime
import enum
from typing import Dict, Generator, Iterable, Optional, Set
from typing_extensions import Protocol, runtime_checkable
+from swh.core.api import remote_api_endpoint
from swh.model.model import Sha1Git
from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry
class EntityType(enum.Enum):
CONTENT = "content"
DIRECTORY = "directory"
REVISION = "revision"
ORIGIN = "origin"
class RelationType(enum.Enum):
CNT_EARLY_IN_REV = "content_in_revision"
CNT_IN_DIR = "content_in_directory"
DIR_IN_REV = "directory_in_revision"
REV_IN_ORG = "revision_in_origin"
REV_BEFORE_REV = "revision_before_revision"
@dataclass(eq=True, frozen=True)
class ProvenanceResult:
content: Sha1Git
revision: Sha1Git
date: datetime
origin: Optional[str]
path: bytes
@dataclass(eq=True, frozen=True)
class RevisionData:
"""Object representing the data associated to a revision in the provenance model,
where `date` is the optional date of the revision (specifying it acknowledges that
the revision was already processed by the revision-content algorithm); and `origin`
identifies the preferred origin for the revision, if any.
"""
date: Optional[datetime]
origin: Optional[Sha1Git]
@dataclass(eq=True, frozen=True)
class RelationData:
"""Object representing a relation entry in the provenance model, where `src` and
`dst` are the sha1 ids of the entities being related, and `path` is optional
depending on the relation being represented.
"""
src: Sha1Git
dst: Sha1Git
path: Optional[bytes]
@runtime_checkable
class ProvenanceStorageInterface(Protocol):
+ @remote_api_endpoint("content_find_first")
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
"""Retrieve the first occurrence of the blob identified by `id`."""
...
+ @remote_api_endpoint("content_find_all")
def content_find_all(
self, id: Sha1Git, limit: Optional[int] = None
) -> Generator[ProvenanceResult, None, None]:
"""Retrieve all the occurrences of the blob identified by `id`."""
...
+ @remote_api_endpoint("content_set_date")
def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool:
"""Associate dates to blobs identified by sha1 ids, as paired in `dates`. Return
a boolean stating whether the information was successfully stored.
"""
...
+ @remote_api_endpoint("content_get")
def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
"""Retrieve the associated date for each blob sha1 in `ids`. If some blob has
no associated date, it is not present in the resulting dictionary.
"""
...
+ @remote_api_endpoint("directory_set_date")
def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool:
"""Associate dates to directories identified by sha1 ids, as paired in
`dates`. Return a boolean stating whether the information was successfully
stored.
"""
...
+ @remote_api_endpoint("directory_get")
def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
"""Retrieve the associated date for each directory sha1 in `ids`. If some
directory has no associated date, it is not present in the resulting dictionary.
"""
...
+ @remote_api_endpoint("entity_get_all")
def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]:
"""Retrieve all sha1 ids for entities of type `entity` present in the provenance
model.
"""
...
+ @remote_api_endpoint("location_get")
def location_get(self) -> Set[bytes]:
"""Retrieve all paths present in the provenance model."""
...
+ @remote_api_endpoint("origin_set_url")
def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool:
"""Associate urls to origins identified by sha1 ids, as paired in `urls`. Return
a boolean stating whether the information was successfully stored.
"""
...
+ @remote_api_endpoint("origin_get")
def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]:
"""Retrieve the associated url for each origin sha1 in `ids`. If some origin has
no associated date, it is not present in the resulting dictionary.
"""
...
+ @remote_api_endpoint("revision_set_date")
def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool:
"""Associate dates to revisions identified by sha1 ids, as paired in `dates`.
Return a boolean stating whether the information was successfully stored.
"""
...
+ @remote_api_endpoint("revision_set_origin")
def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool:
"""Associate origins to revisions identified by sha1 ids, as paired in
`origins` (revision ids are keys and origin ids, values). Return a boolean
stating whether the information was successfully stored.
"""
...
+ @remote_api_endpoint("revision_get")
def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]:
"""Retrieve the associated date and origin for each revision sha1 in `ids`. If
some revision has no associated date nor origin, it is not present in the
resulting dictionary.
"""
...
+ @remote_api_endpoint("relation_add")
def relation_add(
self, relation: RelationType, data: Iterable[RelationData]
) -> bool:
"""Add entries in the selected `relation`."""
...
+ @remote_api_endpoint("relation_get")
def relation_get(
self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False
) -> Set[RelationData]:
"""Retrieve all entries in the selected `relation` whose source entities are
identified by some sha1 id in `ids`. If `reverse` is set, destination entities
are matched instead.
"""
...
+ @remote_api_endpoint("relation_get_all")
def relation_get_all(self, relation: RelationType) -> Set[RelationData]:
"""Retrieve all entries in the selected `relation` that are present in the
provenance model.
"""
...
+ @remote_api_endpoint("with_path")
def with_path(self) -> bool:
...
@runtime_checkable
class ProvenanceInterface(Protocol):
storage: ProvenanceStorageInterface
def flush(self) -> None:
"""Flush internal cache to the underlying `storage`."""
...
def content_add_to_directory(
self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes
) -> None:
"""Associate `blob` with `directory` in the provenance model. `prefix` is the
relative path from `directory` to `blob` (excluding `blob`'s name).
"""
...
def content_add_to_revision(
self, revision: RevisionEntry, blob: FileEntry, prefix: bytes
) -> None:
"""Associate `blob` with `revision` in the provenance model. `prefix` is the
absolute path from `revision`'s root directory to `blob` (excluding `blob`'s
name).
"""
...
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
"""Retrieve the first occurrence of the blob identified by `id`."""
...
def content_find_all(
self, id: Sha1Git, limit: Optional[int] = None
) -> Generator[ProvenanceResult, None, None]:
"""Retrieve all the occurrences of the blob identified by `id`."""
...
def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]:
"""Retrieve the earliest known date of `blob`."""
...
def content_get_early_dates(
self, blobs: Iterable[FileEntry]
) -> Dict[Sha1Git, datetime]:
"""Retrieve the earliest known date for each blob in `blobs`. If some blob has
no associated date, it is not present in the resulting dictionary.
"""
...
def content_set_early_date(self, blob: FileEntry, date: datetime) -> None:
"""Associate `date` to `blob` as it's earliest known date."""
...
def directory_add_to_revision(
self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes
) -> None:
"""Associate `directory` with `revision` in the provenance model. `path` is the
absolute path from `revision`'s root directory to `directory` (including
`directory`'s name).
"""
...
def directory_get_date_in_isochrone_frontier(
self, directory: DirectoryEntry
) -> Optional[datetime]:
"""Retrieve the earliest known date of `directory` as an isochrone frontier in
the provenance model.
"""
...
def directory_get_dates_in_isochrone_frontier(
self, dirs: Iterable[DirectoryEntry]
) -> Dict[Sha1Git, datetime]:
"""Retrieve the earliest known date for each directory in `dirs` as isochrone
frontiers provenance model. If some directory has no associated date, it is not
present in the resulting dictionary.
"""
...
def directory_set_date_in_isochrone_frontier(
self, directory: DirectoryEntry, date: datetime
) -> None:
"""Associate `date` to `directory` as it's earliest known date as an isochrone
frontier in the provenance model.
"""
...
def origin_add(self, origin: OriginEntry) -> None:
"""Add `origin` to the provenance model."""
...
def revision_add(self, revision: RevisionEntry) -> None:
"""Add `revision` to the provenance model. This implies storing `revision`'s
date in the model, thus `revision.date` must be a valid date.
"""
...
def revision_add_before_revision(
self, head: RevisionEntry, revision: RevisionEntry
) -> None:
"""Associate `revision` to `head` as an ancestor of the latter."""
...
def revision_add_to_origin(
self, origin: OriginEntry, revision: RevisionEntry
) -> None:
"""Associate `revision` to `origin` as a head revision of the latter (ie. the
target of an snapshot for `origin` in the archive)."""
...
def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]:
"""Retrieve the date associated to `revision`."""
...
def revision_get_preferred_origin(
self, revision: RevisionEntry
) -> Optional[Sha1Git]:
"""Retrieve the preferred origin associated to `revision`."""
...
def revision_in_history(self, revision: RevisionEntry) -> bool:
"""Check if `revision` is known to be an ancestor of some head revision in the
provenance model.
"""
...
def revision_set_preferred_origin(
self, origin: OriginEntry, revision: RevisionEntry
) -> None:
"""Associate `origin` as the preferred origin for `revision`."""
...
def revision_visited(self, revision: RevisionEntry) -> bool:
"""Check if `revision` is known to be a head revision for some origin in the
provenance model.
"""
...
diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py
index 8b7e154..b07f481 100644
--- a/swh/provenance/tests/conftest.py
+++ b/swh/provenance/tests/conftest.py
@@ -1,248 +1,273 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from os import path
import re
from typing import Any, Dict, Iterable, Iterator, List, Optional
import msgpack
import psycopg2
import pytest
from typing_extensions import TypedDict
-from swh.core.db import BaseDb
from swh.journal.serializers import msgpack_ext_hook
from swh.model.hashutil import hash_to_bytes
from swh.model.model import Sha1Git
from swh.model.tests.swh_model_data import TEST_OBJECTS
-from swh.provenance import get_provenance
+from swh.provenance import get_provenance, get_provenance_storage
+from swh.provenance.api.client import RemoteProvenanceStorage
+import swh.provenance.api.server as server
from swh.provenance.archive import ArchiveInterface
-from swh.provenance.interface import ProvenanceInterface
+from swh.provenance.interface import ProvenanceInterface, ProvenanceStorageInterface
from swh.provenance.postgresql.archive import ArchivePostgreSQL
-from swh.provenance.postgresql.provenancedb_base import ProvenanceDBBase
from swh.provenance.storage.archive import ArchiveStorage
from swh.storage.postgresql.storage import Storage
from swh.storage.replay import process_replay_objects
@pytest.fixture(params=["with-path", "without-path"])
-def provenance(
+def populated_db(
request, # TODO: add proper type annotation
postgresql: psycopg2.extensions.connection,
-) -> ProvenanceInterface:
- """return a working and initialized provenance db"""
+) -> Dict[str, str]:
from swh.core.cli.db import populate_database_for_package
- flavor = request.param
+ flavor = "with-path" if request.param == "client-server" else request.param
populate_database_for_package("swh.provenance", postgresql.dsn, flavor=flavor)
-
- BaseDb.adapt_conn(postgresql)
-
- args: Dict[str, str] = {
+ return {
item.split("=")[0]: item.split("=")[1]
for item in postgresql.dsn.split()
if item.split("=")[0] != "options"
}
- # in test sessions, we DO want to raise any exception occurring at commit time
- prov = get_provenance(cls="local", db=args, raise_on_commit=True)
- assert isinstance(prov.storage, ProvenanceDBBase)
- assert prov.storage.flavor == flavor
- return prov
+
+
+# the Flask app used as server in these tests
+@pytest.fixture
+def app(populated_db: Dict[str, str]):
+ assert hasattr(server, "storage")
+ server.storage = get_provenance_storage(cls="local", db=populated_db)
+ yield server.app
+
+
+# the RPCClient class used as client used in these tests
+@pytest.fixture
+def swh_rpc_client_class():
+ return RemoteProvenanceStorage
+
+
+@pytest.fixture(params=["local", "remote"])
+def provenance(
+ request, # TODO: add proper type annotation
+ populated_db: Dict[str, str],
+ swh_rpc_client: RemoteProvenanceStorage,
+) -> ProvenanceInterface:
+ """return a working and initialized provenance db"""
+
+ if request.param == "remote":
+ from swh.provenance.provenance import Provenance
+
+ assert isinstance(swh_rpc_client, ProvenanceStorageInterface)
+ return Provenance(swh_rpc_client)
+
+ else:
+ # in test sessions, we DO want to raise any exception occurring at commit time
+ prov = get_provenance(cls=request.param, db=populated_db, raise_on_commit=True)
+ return prov
@pytest.fixture
def swh_storage_with_objects(swh_storage: Storage) -> Storage:
"""return a Storage object (postgresql-based by default) with a few of each
object type in it
The inserted content comes from swh.model.tests.swh_model_data.
"""
for obj_type in (
"content",
"skipped_content",
"directory",
"revision",
"release",
"snapshot",
"origin",
"origin_visit",
"origin_visit_status",
):
getattr(swh_storage, f"{obj_type}_add")(TEST_OBJECTS[obj_type])
return swh_storage
@pytest.fixture
def archive_direct(swh_storage_with_objects: Storage) -> ArchiveInterface:
return ArchivePostgreSQL(swh_storage_with_objects.get_db().conn)
@pytest.fixture
def archive_api(swh_storage_with_objects: Storage) -> ArchiveInterface:
return ArchiveStorage(swh_storage_with_objects)
@pytest.fixture(params=["archive", "db"])
def archive(request, swh_storage_with_objects: Storage) -> Iterator[ArchiveInterface]:
"""Return a ArchivePostgreSQL based StorageInterface object"""
# this is a workaround to prevent tests from hanging because of an unclosed
# transaction.
# TODO: refactor the ArchivePostgreSQL to properly deal with
# transactions and get rid of this fixture
if request.param == "db":
archive = ArchivePostgreSQL(conn=swh_storage_with_objects.get_db().conn)
yield archive
archive.conn.rollback()
else:
yield ArchiveStorage(swh_storage_with_objects)
def get_datafile(fname: str) -> str:
return path.join(path.dirname(__file__), "data", fname)
def load_repo_data(repo: str) -> Dict[str, Any]:
data: Dict[str, Any] = {}
with open(get_datafile(f"{repo}.msgpack"), "rb") as fobj:
unpacker = msgpack.Unpacker(
fobj,
raw=False,
ext_hook=msgpack_ext_hook,
strict_map_key=False,
timestamp=3, # convert Timestamp in datetime objects (tz UTC)
)
for objtype, objd in unpacker:
data.setdefault(objtype, []).append(objd)
return data
def filter_dict(d: Dict[Any, Any], keys: Iterable[Any]) -> Dict[Any, Any]:
return {k: v for (k, v) in d.items() if k in keys}
def fill_storage(storage: Storage, data: Dict[str, Any]) -> None:
process_replay_objects(data, storage=storage)
class SynthRelation(TypedDict):
prefix: Optional[str]
path: str
src: Sha1Git
dst: Sha1Git
rel_ts: float
class SynthRevision(TypedDict):
sha1: Sha1Git
date: float
msg: str
R_C: List[SynthRelation]
R_D: List[SynthRelation]
D_C: List[SynthRelation]
def synthetic_result(filename: str) -> Iterator[SynthRevision]:
"""Generates dict representations of synthetic revisions found in the synthetic
file (from the data/ directory) given as argument of the generator.
Generated SynthRevision (typed dict) with the following elements:
"sha1": (Sha1Git) sha1 of the revision,
"date": (float) timestamp of the revision,
"msg": (str) commit message of the revision,
"R_C": (list) new R---C relations added by this revision
"R_D": (list) new R-D relations added by this revision
"D_C": (list) new D-C relations added by this revision
Each relation above is a SynthRelation typed dict with:
"path": (str) location
"src": (Sha1Git) sha1 of the source of the relation
"dst": (Sha1Git) sha1 of the destination of the relation
"rel_ts": (float) timestamp of the target of the relation
(related to the timestamp of the revision)
"""
with open(get_datafile(filename), "r") as fobj:
yield from _parse_synthetic_file(fobj)
def _parse_synthetic_file(fobj: Iterable[str]) -> Iterator[SynthRevision]:
"""Read a 'synthetic' file and generate a dict representation of the synthetic
revision for each revision listed in the synthetic file.
"""
regs = [
"(?PR[0-9]{2,4})?",
"(?P[^| ]*)",
"([+] )?(?P[^| +]*?)[/]?",
"(?P[RDC]) (?P[0-9a-z]{40})",
"(?P-?[0-9]+(.[0-9]+)?)",
]
regex = re.compile("^ *" + r" *[|] *".join(regs) + r" *(#.*)?$")
current_rev: List[dict] = []
for m in (regex.match(line) for line in fobj):
if m:
d = m.groupdict()
if d["revname"]:
if current_rev:
yield _mk_synth_rev(current_rev)
current_rev.clear()
current_rev.append(d)
if current_rev:
yield _mk_synth_rev(current_rev)
def _mk_synth_rev(synth_rev: List[Dict[str, str]]) -> SynthRevision:
assert synth_rev[0]["type"] == "R"
rev = SynthRevision(
sha1=hash_to_bytes(synth_rev[0]["sha1"]),
date=float(synth_rev[0]["ts"]),
msg=synth_rev[0]["revname"],
R_C=[],
R_D=[],
D_C=[],
)
current_path = None
# path of the last R-D relation we parsed, used a prefix for next D-C
# relations
for row in synth_rev[1:]:
if row["reltype"] == "R---C":
assert row["type"] == "C"
rev["R_C"].append(
SynthRelation(
prefix=None,
path=row["path"],
src=rev["sha1"],
dst=hash_to_bytes(row["sha1"]),
rel_ts=float(row["ts"]),
)
)
current_path = None
elif row["reltype"] == "R-D":
assert row["type"] == "D"
rev["R_D"].append(
SynthRelation(
prefix=None,
path=row["path"],
src=rev["sha1"],
dst=hash_to_bytes(row["sha1"]),
rel_ts=float(row["ts"]),
)
)
current_path = row["path"]
elif row["reltype"] == "D-C":
assert row["type"] == "C"
rev["D_C"].append(
SynthRelation(
prefix=current_path,
path=row["path"],
src=rev["R_D"][-1]["dst"],
dst=hash_to_bytes(row["sha1"]),
rel_ts=float(row["ts"]),
)
)
return rev
diff --git a/swh/provenance/tests/test_provenance_db.py b/swh/provenance/tests/test_provenance_db.py
index 3e84768..4b684a0 100644
--- a/swh/provenance/tests/test_provenance_db.py
+++ b/swh/provenance/tests/test_provenance_db.py
@@ -1,51 +1,51 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timedelta, timezone
from typing import Type
from swh.model.model import OriginVisitStatus
from swh.model.tests.swh_model_data import TEST_OBJECTS
from swh.provenance.interface import ProvenanceInterface, ProvenanceStorageInterface
from swh.provenance.model import OriginEntry
from swh.provenance.origin import origin_add
from swh.provenance.postgresql.provenancedb_base import ProvenanceDBBase
from swh.provenance.postgresql.provenancedb_with_path import ProvenanceWithPathDB
from swh.provenance.postgresql.provenancedb_without_path import ProvenanceWithoutPathDB
from swh.provenance.storage.archive import ArchiveStorage
from swh.storage.postgresql.storage import Storage
# TODO: remove this function in favour of TimestampWithTimezone.to_datetime
# from swh.model.model
def ts2dt(ts: dict) -> datetime:
timestamp = datetime.fromtimestamp(
ts["timestamp"]["seconds"], timezone(timedelta(minutes=ts["offset"]))
)
return timestamp.replace(microsecond=ts["timestamp"]["microseconds"])
def test_provenance_origin_add(
provenance: ProvenanceInterface, swh_storage_with_objects: Storage
) -> None:
"""Test the origin_add function"""
archive = ArchiveStorage(swh_storage_with_objects)
for status in TEST_OBJECTS["origin_visit_status"]:
assert isinstance(status, OriginVisitStatus)
if status.snapshot is not None:
entry = OriginEntry(url=status.origin, snapshot=status.snapshot)
origin_add(provenance, archive, [entry])
# TODO: check some facts here
def test_provenance_flavor(provenance: ProvenanceInterface) -> None:
- assert isinstance(provenance.storage, ProvenanceDBBase)
- assert provenance.storage.flavor in ("with-path", "without-path")
- backend_class: Type[ProvenanceStorageInterface]
- if provenance.storage.flavor == "with-path":
- backend_class = ProvenanceWithPathDB
- else:
- backend_class = ProvenanceWithoutPathDB
- assert isinstance(provenance.storage, backend_class)
+ if isinstance(provenance.storage, ProvenanceDBBase):
+ assert provenance.storage.flavor in ("with-path", "without-path")
+ backend_class: Type[ProvenanceStorageInterface]
+ if provenance.storage.flavor == "with-path":
+ backend_class = ProvenanceWithPathDB
+ else:
+ backend_class = ProvenanceWithoutPathDB
+ assert isinstance(provenance.storage, backend_class)