diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py index 1e560d2..fc926c4 100644 --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -1,89 +1,97 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from typing import TYPE_CHECKING if TYPE_CHECKING: from .archive import ArchiveInterface from .interface import ProvenanceInterface, ProvenanceStorageInterface def get_archive(cls: str, **kwargs) -> ArchiveInterface: """Get an archive object of class ``cls`` with arguments ``args``. Args: cls: archive's class, either 'api' or 'direct' args: dictionary of arguments passed to the archive class constructor Returns: an instance of archive object (either using swh.storage API or direct queries to the archive's database) Raises: :cls:`ValueError` if passed an unknown archive class. """ if cls == "api": from swh.storage import get_storage from .storage.archive import ArchiveStorage return ArchiveStorage(get_storage(**kwargs["storage"])) elif cls == "direct": from swh.core.db import BaseDb from .postgresql.archive import ArchivePostgreSQL return ArchivePostgreSQL(BaseDb.connect(**kwargs["db"]).conn) else: raise ValueError def get_provenance(**kwargs) -> ProvenanceInterface: """Get an provenance object with arguments ``args``. Args: args: dictionary of arguments to retrieve a swh.provenance.storage class (see :func:`get_provenance_storage` for details) Returns: an instance of provenance object """ from .provenance import Provenance return Provenance(get_provenance_storage(**kwargs)) def get_provenance_storage(cls: str, **kwargs) -> ProvenanceStorageInterface: """Get an archive object of class ``cls`` with arguments ``args``. Args: cls: storage's class, only 'local' is currently supported args: dictionary of arguments passed to the storage class constructor Returns: an instance of storage object Raises: :cls:`ValueError` if passed an unknown archive class. """ if cls == "local": from swh.core.db import BaseDb from .postgresql.provenancedb_base import ProvenanceDBBase conn = BaseDb.connect(**kwargs["db"]).conn raise_on_commit = kwargs.get("raise_on_commit", False) if ProvenanceDBBase(conn, raise_on_commit).flavor == "with-path": from .postgresql.provenancedb_with_path import ProvenanceWithPathDB return ProvenanceWithPathDB(conn, raise_on_commit) else: from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB return ProvenanceWithoutPathDB(conn, raise_on_commit) + + elif cls == "remote": + from .api.client import RemoteProvenanceStorage + + storage = RemoteProvenanceStorage(**kwargs) + assert isinstance(storage, ProvenanceStorageInterface) + return storage + else: raise ValueError diff --git a/swh/provenance/api/__init__.py b/swh/provenance/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/swh/provenance/api/client.py b/swh/provenance/api/client.py new file mode 100644 index 0000000..c772c08 --- /dev/null +++ b/swh/provenance/api/client.py @@ -0,0 +1,17 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.core.api import RPCClient + +from ..interface import ProvenanceStorageInterface +from .serializers import DECODERS, ENCODERS + + +class RemoteProvenanceStorage(RPCClient): + """Proxy to a remote provenance storage API""" + + backend_class = ProvenanceStorageInterface + extra_type_decoders = DECODERS + extra_type_encoders = ENCODERS diff --git a/swh/provenance/api/serializers.py b/swh/provenance/api/serializers.py new file mode 100644 index 0000000..ef3d83d --- /dev/null +++ b/swh/provenance/api/serializers.py @@ -0,0 +1,48 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from dataclasses import asdict +from typing import Callable, Dict, List, Tuple + +from .. import interface + + +def _encode_dataclass(obj): + return { + **asdict(obj), + "__type__": type(obj).__name__, + } + + +def _decode_dataclass(d): + return getattr(interface, d.pop("__type__"))(**d) + + +def _encode_enum(obj): + return { + "value": obj.value, + "__type__": type(obj).__name__, + } + + +def _decode_enum(d): + return getattr(interface, d.pop("__type__"))(d["value"]) + + +ENCODERS: List[Tuple[type, str, Callable]] = [ + (interface.ProvenanceResult, "dataclass", _encode_dataclass), + (interface.RelationData, "dataclass", _encode_dataclass), + (interface.RevisionData, "dataclass", _encode_dataclass), + (interface.EntityType, "enum", _encode_enum), + (interface.RelationType, "enum", _encode_enum), + (set, "set", list), +] + + +DECODERS: Dict[str, Callable] = { + "dataclass": _decode_dataclass, + "enum": _decode_enum, + "set": set, +} diff --git a/swh/provenance/api/server.py b/swh/provenance/api/server.py new file mode 100644 index 0000000..40a67c7 --- /dev/null +++ b/swh/provenance/api/server.py @@ -0,0 +1,143 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +import os + +from swh.core import config +from swh.core.api import JSONFormatter, MsgpackFormatter, RPCServerApp, negotiate +from swh.provenance import get_provenance_storage +from swh.provenance.interface import ProvenanceStorageInterface + +from .serializers import DECODERS, ENCODERS + +storage = None + + +def get_global_provenance_storage(): + global storage + if not storage: + storage = get_provenance_storage(**app.config["provenance"]["storage"]) + return storage + + +class ProvenanceStorageServerApp(RPCServerApp): + extra_type_decoders = DECODERS + extra_type_encoders = ENCODERS + + +app = ProvenanceStorageServerApp( + __name__, + backend_class=ProvenanceStorageInterface, + backend_factory=get_global_provenance_storage, +) + + +def has_no_empty_params(rule): + return len(rule.defaults or ()) >= len(rule.arguments or ()) + + +@app.route("/") +def index(): + return """ +Software Heritage provenance storage RPC server + +

You have reached the +Software Heritage +provenance storage RPC server.
+See its +documentation +and API for more information

+ +""" + + +@app.route("/site-map") +@negotiate(MsgpackFormatter) +@negotiate(JSONFormatter) +def site_map(): + links = [] + for rule in app.url_map.iter_rules(): + if has_no_empty_params(rule) and hasattr( + ProvenanceStorageInterface, rule.endpoint + ): + links.append( + dict( + rule=rule.rule, + description=getattr( + ProvenanceStorageInterface, rule.endpoint + ).__doc__, + ) + ) + # links is now a list of url, endpoint tuples + return links + + +def load_and_check_config(config_path, type="local"): + """Check the minimal configuration is set to run the api or raise an + error explanation. + + Args: + config_path (str): Path to the configuration file to load + type (str): configuration type. For 'local' type, more + checks are done. + + Raises: + Error if the setup is not as expected + + Returns: + configuration as a dict + + """ + if not config_path: + raise EnvironmentError("Configuration file must be defined") + + if not os.path.exists(config_path): + raise FileNotFoundError(f"Configuration file {config_path} does not exist") + + cfg = config.read(config_path) + + pcfg = cfg.get("provenance") + if not pcfg: + raise KeyError("Missing 'provenance' configuration") + + scfg = pcfg.get("storage") + if not scfg: + raise KeyError("Missing 'provenance.storage' configuration") + + if type == "local": + cls = scfg.get("cls") + if cls != "local": + raise ValueError( + "The provenance backend can only be started with a 'local' " + "configuration" + ) + + db = scfg.get("db") + if not db: + raise KeyError("Invalid configuration; missing 'db' config entry") + + return cfg + + +api_cfg = None + + +def make_app_from_configfile(): + """Run the WSGI app from the webserver, loading the configuration from + a configuration file. + + SWH_CONFIG_FILENAME environment variable defines the + configuration path to load. + + """ + global api_cfg + if not api_cfg: + config_path = os.environ.get("SWH_CONFIG_FILENAME") + api_cfg = load_and_check_config(config_path) + app.config.update(api_cfg) + handler = logging.StreamHandler() + app.logger.addHandler(handler) + return app diff --git a/swh/provenance/interface.py b/swh/provenance/interface.py index 419a4a0..e1e20b6 100644 --- a/swh/provenance/interface.py +++ b/swh/provenance/interface.py @@ -1,304 +1,322 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from dataclasses import dataclass from datetime import datetime import enum from typing import Dict, Generator, Iterable, Optional, Set from typing_extensions import Protocol, runtime_checkable +from swh.core.api import remote_api_endpoint from swh.model.model import Sha1Git from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry class EntityType(enum.Enum): CONTENT = "content" DIRECTORY = "directory" REVISION = "revision" ORIGIN = "origin" class RelationType(enum.Enum): CNT_EARLY_IN_REV = "content_in_revision" CNT_IN_DIR = "content_in_directory" DIR_IN_REV = "directory_in_revision" REV_IN_ORG = "revision_in_origin" REV_BEFORE_REV = "revision_before_revision" @dataclass(eq=True, frozen=True) class ProvenanceResult: content: Sha1Git revision: Sha1Git date: datetime origin: Optional[str] path: bytes @dataclass(eq=True, frozen=True) class RevisionData: """Object representing the data associated to a revision in the provenance model, where `date` is the optional date of the revision (specifying it acknowledges that the revision was already processed by the revision-content algorithm); and `origin` identifies the preferred origin for the revision, if any. """ date: Optional[datetime] origin: Optional[Sha1Git] @dataclass(eq=True, frozen=True) class RelationData: """Object representing a relation entry in the provenance model, where `src` and `dst` are the sha1 ids of the entities being related, and `path` is optional depending on the relation being represented. """ src: Sha1Git dst: Sha1Git path: Optional[bytes] @runtime_checkable class ProvenanceStorageInterface(Protocol): + @remote_api_endpoint("content_find_first") def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: """Retrieve the first occurrence of the blob identified by `id`.""" ... + @remote_api_endpoint("content_find_all") def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: """Retrieve all the occurrences of the blob identified by `id`.""" ... + @remote_api_endpoint("content_set_date") def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: """Associate dates to blobs identified by sha1 ids, as paired in `dates`. Return a boolean stating whether the information was successfully stored. """ ... + @remote_api_endpoint("content_get") def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: """Retrieve the associated date for each blob sha1 in `ids`. If some blob has no associated date, it is not present in the resulting dictionary. """ ... + @remote_api_endpoint("directory_set_date") def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: """Associate dates to directories identified by sha1 ids, as paired in `dates`. Return a boolean stating whether the information was successfully stored. """ ... + @remote_api_endpoint("directory_get") def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: """Retrieve the associated date for each directory sha1 in `ids`. If some directory has no associated date, it is not present in the resulting dictionary. """ ... + @remote_api_endpoint("entity_get_all") def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: """Retrieve all sha1 ids for entities of type `entity` present in the provenance model. """ ... + @remote_api_endpoint("location_get") def location_get(self) -> Set[bytes]: """Retrieve all paths present in the provenance model.""" ... + @remote_api_endpoint("origin_set_url") def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: """Associate urls to origins identified by sha1 ids, as paired in `urls`. Return a boolean stating whether the information was successfully stored. """ ... + @remote_api_endpoint("origin_get") def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: """Retrieve the associated url for each origin sha1 in `ids`. If some origin has no associated date, it is not present in the resulting dictionary. """ ... + @remote_api_endpoint("revision_set_date") def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: """Associate dates to revisions identified by sha1 ids, as paired in `dates`. Return a boolean stating whether the information was successfully stored. """ ... + @remote_api_endpoint("revision_set_origin") def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: """Associate origins to revisions identified by sha1 ids, as paired in `origins` (revision ids are keys and origin ids, values). Return a boolean stating whether the information was successfully stored. """ ... + @remote_api_endpoint("revision_get") def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: """Retrieve the associated date and origin for each revision sha1 in `ids`. If some revision has no associated date nor origin, it is not present in the resulting dictionary. """ ... + @remote_api_endpoint("relation_add") def relation_add( self, relation: RelationType, data: Iterable[RelationData] ) -> bool: """Add entries in the selected `relation`.""" ... + @remote_api_endpoint("relation_get") def relation_get( self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False ) -> Set[RelationData]: """Retrieve all entries in the selected `relation` whose source entities are identified by some sha1 id in `ids`. If `reverse` is set, destination entities are matched instead. """ ... + @remote_api_endpoint("relation_get_all") def relation_get_all(self, relation: RelationType) -> Set[RelationData]: """Retrieve all entries in the selected `relation` that are present in the provenance model. """ ... + @remote_api_endpoint("with_path") def with_path(self) -> bool: ... @runtime_checkable class ProvenanceInterface(Protocol): storage: ProvenanceStorageInterface def flush(self) -> None: """Flush internal cache to the underlying `storage`.""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: """Associate `blob` with `directory` in the provenance model. `prefix` is the relative path from `directory` to `blob` (excluding `blob`'s name). """ ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: """Associate `blob` with `revision` in the provenance model. `prefix` is the absolute path from `revision`'s root directory to `blob` (excluding `blob`'s name). """ ... def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: """Retrieve the first occurrence of the blob identified by `id`.""" ... def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: """Retrieve all the occurrences of the blob identified by `id`.""" ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: """Retrieve the earliest known date of `blob`.""" ... def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[Sha1Git, datetime]: """Retrieve the earliest known date for each blob in `blobs`. If some blob has no associated date, it is not present in the resulting dictionary. """ ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: """Associate `date` to `blob` as it's earliest known date.""" ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: """Associate `directory` with `revision` in the provenance model. `path` is the absolute path from `revision`'s root directory to `directory` (including `directory`'s name). """ ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: """Retrieve the earliest known date of `directory` as an isochrone frontier in the provenance model. """ ... def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[Sha1Git, datetime]: """Retrieve the earliest known date for each directory in `dirs` as isochrone frontiers provenance model. If some directory has no associated date, it is not present in the resulting dictionary. """ ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: """Associate `date` to `directory` as it's earliest known date as an isochrone frontier in the provenance model. """ ... def origin_add(self, origin: OriginEntry) -> None: """Add `origin` to the provenance model.""" ... def revision_add(self, revision: RevisionEntry) -> None: """Add `revision` to the provenance model. This implies storing `revision`'s date in the model, thus `revision.date` must be a valid date. """ ... def revision_add_before_revision( self, head: RevisionEntry, revision: RevisionEntry ) -> None: """Associate `revision` to `head` as an ancestor of the latter.""" ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: """Associate `revision` to `origin` as a head revision of the latter (ie. the target of an snapshot for `origin` in the archive).""" ... def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: """Retrieve the date associated to `revision`.""" ... def revision_get_preferred_origin( self, revision: RevisionEntry ) -> Optional[Sha1Git]: """Retrieve the preferred origin associated to `revision`.""" ... def revision_in_history(self, revision: RevisionEntry) -> bool: """Check if `revision` is known to be an ancestor of some head revision in the provenance model. """ ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: """Associate `origin` as the preferred origin for `revision`.""" ... def revision_visited(self, revision: RevisionEntry) -> bool: """Check if `revision` is known to be a head revision for some origin in the provenance model. """ ... diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py index 8b7e154..b07f481 100644 --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -1,248 +1,273 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from os import path import re from typing import Any, Dict, Iterable, Iterator, List, Optional import msgpack import psycopg2 import pytest from typing_extensions import TypedDict -from swh.core.db import BaseDb from swh.journal.serializers import msgpack_ext_hook from swh.model.hashutil import hash_to_bytes from swh.model.model import Sha1Git from swh.model.tests.swh_model_data import TEST_OBJECTS -from swh.provenance import get_provenance +from swh.provenance import get_provenance, get_provenance_storage +from swh.provenance.api.client import RemoteProvenanceStorage +import swh.provenance.api.server as server from swh.provenance.archive import ArchiveInterface -from swh.provenance.interface import ProvenanceInterface +from swh.provenance.interface import ProvenanceInterface, ProvenanceStorageInterface from swh.provenance.postgresql.archive import ArchivePostgreSQL -from swh.provenance.postgresql.provenancedb_base import ProvenanceDBBase from swh.provenance.storage.archive import ArchiveStorage from swh.storage.postgresql.storage import Storage from swh.storage.replay import process_replay_objects @pytest.fixture(params=["with-path", "without-path"]) -def provenance( +def populated_db( request, # TODO: add proper type annotation postgresql: psycopg2.extensions.connection, -) -> ProvenanceInterface: - """return a working and initialized provenance db""" +) -> Dict[str, str]: from swh.core.cli.db import populate_database_for_package - flavor = request.param + flavor = "with-path" if request.param == "client-server" else request.param populate_database_for_package("swh.provenance", postgresql.dsn, flavor=flavor) - - BaseDb.adapt_conn(postgresql) - - args: Dict[str, str] = { + return { item.split("=")[0]: item.split("=")[1] for item in postgresql.dsn.split() if item.split("=")[0] != "options" } - # in test sessions, we DO want to raise any exception occurring at commit time - prov = get_provenance(cls="local", db=args, raise_on_commit=True) - assert isinstance(prov.storage, ProvenanceDBBase) - assert prov.storage.flavor == flavor - return prov + + +# the Flask app used as server in these tests +@pytest.fixture +def app(populated_db: Dict[str, str]): + assert hasattr(server, "storage") + server.storage = get_provenance_storage(cls="local", db=populated_db) + yield server.app + + +# the RPCClient class used as client used in these tests +@pytest.fixture +def swh_rpc_client_class(): + return RemoteProvenanceStorage + + +@pytest.fixture(params=["local", "remote"]) +def provenance( + request, # TODO: add proper type annotation + populated_db: Dict[str, str], + swh_rpc_client: RemoteProvenanceStorage, +) -> ProvenanceInterface: + """return a working and initialized provenance db""" + + if request.param == "remote": + from swh.provenance.provenance import Provenance + + assert isinstance(swh_rpc_client, ProvenanceStorageInterface) + return Provenance(swh_rpc_client) + + else: + # in test sessions, we DO want to raise any exception occurring at commit time + prov = get_provenance(cls=request.param, db=populated_db, raise_on_commit=True) + return prov @pytest.fixture def swh_storage_with_objects(swh_storage: Storage) -> Storage: """return a Storage object (postgresql-based by default) with a few of each object type in it The inserted content comes from swh.model.tests.swh_model_data. """ for obj_type in ( "content", "skipped_content", "directory", "revision", "release", "snapshot", "origin", "origin_visit", "origin_visit_status", ): getattr(swh_storage, f"{obj_type}_add")(TEST_OBJECTS[obj_type]) return swh_storage @pytest.fixture def archive_direct(swh_storage_with_objects: Storage) -> ArchiveInterface: return ArchivePostgreSQL(swh_storage_with_objects.get_db().conn) @pytest.fixture def archive_api(swh_storage_with_objects: Storage) -> ArchiveInterface: return ArchiveStorage(swh_storage_with_objects) @pytest.fixture(params=["archive", "db"]) def archive(request, swh_storage_with_objects: Storage) -> Iterator[ArchiveInterface]: """Return a ArchivePostgreSQL based StorageInterface object""" # this is a workaround to prevent tests from hanging because of an unclosed # transaction. # TODO: refactor the ArchivePostgreSQL to properly deal with # transactions and get rid of this fixture if request.param == "db": archive = ArchivePostgreSQL(conn=swh_storage_with_objects.get_db().conn) yield archive archive.conn.rollback() else: yield ArchiveStorage(swh_storage_with_objects) def get_datafile(fname: str) -> str: return path.join(path.dirname(__file__), "data", fname) def load_repo_data(repo: str) -> Dict[str, Any]: data: Dict[str, Any] = {} with open(get_datafile(f"{repo}.msgpack"), "rb") as fobj: unpacker = msgpack.Unpacker( fobj, raw=False, ext_hook=msgpack_ext_hook, strict_map_key=False, timestamp=3, # convert Timestamp in datetime objects (tz UTC) ) for objtype, objd in unpacker: data.setdefault(objtype, []).append(objd) return data def filter_dict(d: Dict[Any, Any], keys: Iterable[Any]) -> Dict[Any, Any]: return {k: v for (k, v) in d.items() if k in keys} def fill_storage(storage: Storage, data: Dict[str, Any]) -> None: process_replay_objects(data, storage=storage) class SynthRelation(TypedDict): prefix: Optional[str] path: str src: Sha1Git dst: Sha1Git rel_ts: float class SynthRevision(TypedDict): sha1: Sha1Git date: float msg: str R_C: List[SynthRelation] R_D: List[SynthRelation] D_C: List[SynthRelation] def synthetic_result(filename: str) -> Iterator[SynthRevision]: """Generates dict representations of synthetic revisions found in the synthetic file (from the data/ directory) given as argument of the generator. Generated SynthRevision (typed dict) with the following elements: "sha1": (Sha1Git) sha1 of the revision, "date": (float) timestamp of the revision, "msg": (str) commit message of the revision, "R_C": (list) new R---C relations added by this revision "R_D": (list) new R-D relations added by this revision "D_C": (list) new D-C relations added by this revision Each relation above is a SynthRelation typed dict with: "path": (str) location "src": (Sha1Git) sha1 of the source of the relation "dst": (Sha1Git) sha1 of the destination of the relation "rel_ts": (float) timestamp of the target of the relation (related to the timestamp of the revision) """ with open(get_datafile(filename), "r") as fobj: yield from _parse_synthetic_file(fobj) def _parse_synthetic_file(fobj: Iterable[str]) -> Iterator[SynthRevision]: """Read a 'synthetic' file and generate a dict representation of the synthetic revision for each revision listed in the synthetic file. """ regs = [ "(?PR[0-9]{2,4})?", "(?P[^| ]*)", "([+] )?(?P[^| +]*?)[/]?", "(?P[RDC]) (?P[0-9a-z]{40})", "(?P-?[0-9]+(.[0-9]+)?)", ] regex = re.compile("^ *" + r" *[|] *".join(regs) + r" *(#.*)?$") current_rev: List[dict] = [] for m in (regex.match(line) for line in fobj): if m: d = m.groupdict() if d["revname"]: if current_rev: yield _mk_synth_rev(current_rev) current_rev.clear() current_rev.append(d) if current_rev: yield _mk_synth_rev(current_rev) def _mk_synth_rev(synth_rev: List[Dict[str, str]]) -> SynthRevision: assert synth_rev[0]["type"] == "R" rev = SynthRevision( sha1=hash_to_bytes(synth_rev[0]["sha1"]), date=float(synth_rev[0]["ts"]), msg=synth_rev[0]["revname"], R_C=[], R_D=[], D_C=[], ) current_path = None # path of the last R-D relation we parsed, used a prefix for next D-C # relations for row in synth_rev[1:]: if row["reltype"] == "R---C": assert row["type"] == "C" rev["R_C"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = None elif row["reltype"] == "R-D": assert row["type"] == "D" rev["R_D"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = row["path"] elif row["reltype"] == "D-C": assert row["type"] == "C" rev["D_C"].append( SynthRelation( prefix=current_path, path=row["path"], src=rev["R_D"][-1]["dst"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) return rev diff --git a/swh/provenance/tests/test_provenance_db.py b/swh/provenance/tests/test_provenance_db.py index 3e84768..4b684a0 100644 --- a/swh/provenance/tests/test_provenance_db.py +++ b/swh/provenance/tests/test_provenance_db.py @@ -1,51 +1,51 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta, timezone from typing import Type from swh.model.model import OriginVisitStatus from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.provenance.interface import ProvenanceInterface, ProvenanceStorageInterface from swh.provenance.model import OriginEntry from swh.provenance.origin import origin_add from swh.provenance.postgresql.provenancedb_base import ProvenanceDBBase from swh.provenance.postgresql.provenancedb_with_path import ProvenanceWithPathDB from swh.provenance.postgresql.provenancedb_without_path import ProvenanceWithoutPathDB from swh.provenance.storage.archive import ArchiveStorage from swh.storage.postgresql.storage import Storage # TODO: remove this function in favour of TimestampWithTimezone.to_datetime # from swh.model.model def ts2dt(ts: dict) -> datetime: timestamp = datetime.fromtimestamp( ts["timestamp"]["seconds"], timezone(timedelta(minutes=ts["offset"])) ) return timestamp.replace(microsecond=ts["timestamp"]["microseconds"]) def test_provenance_origin_add( provenance: ProvenanceInterface, swh_storage_with_objects: Storage ) -> None: """Test the origin_add function""" archive = ArchiveStorage(swh_storage_with_objects) for status in TEST_OBJECTS["origin_visit_status"]: assert isinstance(status, OriginVisitStatus) if status.snapshot is not None: entry = OriginEntry(url=status.origin, snapshot=status.snapshot) origin_add(provenance, archive, [entry]) # TODO: check some facts here def test_provenance_flavor(provenance: ProvenanceInterface) -> None: - assert isinstance(provenance.storage, ProvenanceDBBase) - assert provenance.storage.flavor in ("with-path", "without-path") - backend_class: Type[ProvenanceStorageInterface] - if provenance.storage.flavor == "with-path": - backend_class = ProvenanceWithPathDB - else: - backend_class = ProvenanceWithoutPathDB - assert isinstance(provenance.storage, backend_class) + if isinstance(provenance.storage, ProvenanceDBBase): + assert provenance.storage.flavor in ("with-path", "without-path") + backend_class: Type[ProvenanceStorageInterface] + if provenance.storage.flavor == "with-path": + backend_class = ProvenanceWithPathDB + else: + backend_class = ProvenanceWithoutPathDB + assert isinstance(provenance.storage, backend_class)