diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py index 697c7cf..9f167e1 100644 --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -1,47 +1,47 @@ from typing import TYPE_CHECKING import warnings from .postgresql.db_utils import connect if TYPE_CHECKING: from swh.provenance.archive import ArchiveInterface from swh.provenance.provenance import ProvenanceInterface def get_archive(cls: str, **kwargs) -> "ArchiveInterface": if cls == "api": from swh.provenance.storage.archive import ArchiveStorage from swh.storage import get_storage return ArchiveStorage(get_storage(**kwargs["storage"])) elif cls == "direct": from swh.provenance.postgresql.archive import ArchivePostgreSQL return ArchivePostgreSQL(connect(kwargs["db"])) else: raise NotImplementedError def get_provenance(cls: str, **kwargs) -> "ProvenanceInterface": if cls == "local": conn = connect(kwargs["db"]) if "with_path" in kwargs: warnings.warn( "Usage of the 'with-path' config option is deprecated. " "The db flavor is now used instead.", DeprecationWarning, ) with_path = kwargs.get("with_path") - from swh.provenance.provenance import ProvenanceBackend + from swh.provenance.backend import ProvenanceBackend prov = ProvenanceBackend(conn) if with_path is not None: flavor = "with-path" if with_path else "without-path" if prov.storage.flavor != flavor: raise ValueError( "The given flavor does not match the flavor stored in the backend." ) return prov else: raise NotImplementedError diff --git a/swh/provenance/provenance.py b/swh/provenance/backend.py similarity index 59% copy from swh/provenance/provenance.py copy to swh/provenance/backend.py index 1731ec4..70b2906 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/backend.py @@ -1,365 +1,213 @@ from datetime import datetime import logging import os from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple -import psycopg2 -from typing_extensions import Literal, Protocol, TypedDict, runtime_checkable +import psycopg2 # TODO: remove this dependency +from typing_extensions import Literal, TypedDict from swh.model.model import Sha1Git from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry - - -class ProvenanceResult: - def __init__( - self, - content: Sha1Git, - revision: Sha1Git, - date: datetime, - origin: Optional[str], - path: bytes, - ) -> None: - self.content = content - self.revision = revision - self.date = date - self.origin = origin - self.path = path - - -@runtime_checkable -class ProvenanceInterface(Protocol): - raise_on_commit: bool = False - - def flush(self) -> None: - """Flush internal cache to the underlying `storage`.""" - ... - - def content_add_to_directory( - self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes - ) -> None: - """Associate `blob` with `directory` in the provenance model. `prefix` is the - relative path from `directory` to `blob` (excluding `blob`'s name). - """ - ... - - def content_add_to_revision( - self, revision: RevisionEntry, blob: FileEntry, prefix: bytes - ) -> None: - """Associate `blob` with `revision` in the provenance model. `prefix` is the - absolute path from `revision`'s root directory to `blob` (excluding `blob`'s - name). - """ - ... - - def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: - """Retrieve the first occurrence of the blob identified by `id`.""" - ... - - def content_find_all( - self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[ProvenanceResult, None, None]: - """Retrieve all the occurrences of the blob identified by `id`.""" - ... - - def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: - """Retrieve the earliest known date of `blob`.""" - ... - - def content_get_early_dates( - self, blobs: Iterable[FileEntry] - ) -> Dict[Sha1Git, datetime]: - """Retrieve the earliest known date for each blob in `blobs`. If some blob has - no associated date, it is not present in the resulting dictionary. - """ - ... - - def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: - """Associate `date` to `blob` as it's earliest known date.""" - ... - - def directory_add_to_revision( - self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes - ) -> None: - """Associate `directory` with `revision` in the provenance model. `path` is the - absolute path from `revision`'s root directory to `directory` (including - `directory`'s name). - """ - ... - - def directory_get_date_in_isochrone_frontier( - self, directory: DirectoryEntry - ) -> Optional[datetime]: - """Retrieve the earliest known date of `directory` as an isochrone frontier in - the provenance model. - """ - ... - - def directory_get_dates_in_isochrone_frontier( - self, dirs: Iterable[DirectoryEntry] - ) -> Dict[Sha1Git, datetime]: - """Retrieve the earliest known date for each directory in `dirs` as isochrone - frontiers provenance model. If some directory has no associated date, it is not - present in the resulting dictionary. - """ - ... - - def directory_set_date_in_isochrone_frontier( - self, directory: DirectoryEntry, date: datetime - ) -> None: - """Associate `date` to `directory` as it's earliest known date as an isochrone - frontier in the provenance model. - """ - ... - - def origin_add(self, origin: OriginEntry) -> None: - """Add `origin` to the provenance model.""" - ... - - def revision_add(self, revision: RevisionEntry) -> None: - """Add `revision` to the provenance model. This implies storing `revision`'s - date in the model, thus `revision.date` must be a valid date. - """ - ... - - def revision_add_before_revision( - self, head: RevisionEntry, revision: RevisionEntry - ) -> None: - """Associate `revision` to `head` as an ancestor of the latter.""" - ... - - def revision_add_to_origin( - self, origin: OriginEntry, revision: RevisionEntry - ) -> None: - """Associate `revision` to `origin` as a head revision of the latter (ie. the - target of an snapshot for `origin` in the archive).""" - ... - - def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: - """Retrieve the date associated to `revision`.""" - ... - - def revision_get_preferred_origin( - self, revision: RevisionEntry - ) -> Optional[Sha1Git]: - """Retrieve the preferred origin associated to `revision`.""" - ... - - def revision_in_history(self, revision: RevisionEntry) -> bool: - """Check if `revision` is known to be an ancestor of some head revision in the - provenance model. - """ - ... - - def revision_set_preferred_origin( - self, origin: OriginEntry, revision: RevisionEntry - ) -> None: - """Associate `origin` as the preferred origin for `revision`.""" - ... - - def revision_visited(self, revision: RevisionEntry) -> bool: - """Check if `revision` is known to be a head revision for some origin in the - provenance model. - """ - ... +from .provenance import ProvenanceResult class DatetimeCache(TypedDict): data: Dict[Sha1Git, Optional[datetime]] added: Set[Sha1Git] class OriginCache(TypedDict): data: Dict[Sha1Git, str] added: Set[Sha1Git] class RevisionCache(TypedDict): data: Dict[Sha1Git, Sha1Git] added: Set[Sha1Git] class ProvenanceCache(TypedDict): content: DatetimeCache directory: DatetimeCache revision: DatetimeCache # below are insertion caches only content_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] content_in_directory: Set[Tuple[Sha1Git, Sha1Git, bytes]] directory_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] # these two are for the origin layer origin: OriginCache revision_origin: RevisionCache revision_before_revision: Dict[Sha1Git, Set[Sha1Git]] revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]] def new_cache() -> ProvenanceCache: return ProvenanceCache( content=DatetimeCache(data={}, added=set()), directory=DatetimeCache(data={}, added=set()), revision=DatetimeCache(data={}, added=set()), content_in_revision=set(), content_in_directory=set(), directory_in_revision=set(), origin=OriginCache(data={}, added=set()), revision_origin=RevisionCache(data={}, added=set()), revision_before_revision={}, revision_in_origin=set(), ) # TODO: maybe move this to a separate file class ProvenanceBackend: raise_on_commit: bool = False def __init__(self, conn: psycopg2.extensions.connection): from .postgresql.provenancedb_base import ProvenanceDBBase # TODO: this class should not know what the actual used DB is. self.storage: ProvenanceDBBase flavor = ProvenanceDBBase(conn).flavor if flavor == "with-path": from .postgresql.provenancedb_with_path import ProvenanceWithPathDB self.storage = ProvenanceWithPathDB(conn) else: from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB self.storage = ProvenanceWithoutPathDB(conn) self.cache: ProvenanceCache = new_cache() def clear_caches(self) -> None: self.cache = new_cache() def flush(self) -> None: # TODO: for now we just forward the cache. This should be improved! while not self.storage.commit(self.cache, raise_on_commit=self.raise_on_commit): logging.warning( f"Unable to commit cached information {self.cache}. Retrying..." ) self.clear_caches() def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: self.cache["content_in_directory"].add( (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: self.cache["content_in_revision"].add( (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) ) def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: return self.storage.content_find_first(id) def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: yield from self.storage.content_find_all(id, limit=limit) def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: return self.get_dates("content", [blob.id]).get(blob.id) def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[Sha1Git, datetime]: return self.get_dates("content", [blob.id for blob in blobs]) def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: self.cache["content"]["data"][blob.id] = date self.cache["content"]["added"].add(blob.id) def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: self.cache["directory_in_revision"].add( (directory.id, revision.id, normalize(path)) ) def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: return self.get_dates("directory", [directory.id]).get(directory.id) def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[Sha1Git, datetime]: return self.get_dates("directory", [directory.id for directory in dirs]) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: self.cache["directory"]["data"][directory.id] = date self.cache["directory"]["added"].add(directory.id) def get_dates( self, entity: Literal["content", "revision", "directory"], ids: List[Sha1Git] ) -> Dict[Sha1Git, datetime]: cache = self.cache[entity] missing_ids = set(id for id in ids if id not in cache) if missing_ids: cache["data"].update(self.storage.get_dates(entity, list(missing_ids))) dates: Dict[Sha1Git, datetime] = {} for sha1 in ids: date = cache["data"].get(sha1) if date is not None: dates[sha1] = date return dates def origin_add(self, origin: OriginEntry) -> None: self.cache["origin"]["data"][origin.id] = origin.url self.cache["origin"]["added"].add(origin.id) def revision_add(self, revision: RevisionEntry) -> None: self.cache["revision"]["data"][revision.id] = revision.date self.cache["revision"]["added"].add(revision.id) def revision_add_before_revision( self, head: RevisionEntry, revision: RevisionEntry ) -> None: self.cache["revision_before_revision"].setdefault(revision.id, set()).add( head.id ) def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: self.cache["revision_in_origin"].add((revision.id, origin.id)) def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: return self.get_dates("revision", [revision.id]).get(revision.id) def revision_get_preferred_origin( self, revision: RevisionEntry ) -> Optional[Sha1Git]: cache = self.cache["revision_origin"] if revision.id not in cache: origin = self.storage.revision_get_preferred_origin(revision.id) if origin is not None: cache["data"][revision.id] = origin return cache["data"].get(revision.id) def revision_in_history(self, revision: RevisionEntry) -> bool: return revision.id in self.cache[ "revision_before_revision" ] or self.storage.revision_in_history(revision.id) def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: self.cache["revision_origin"]["data"][revision.id] = origin.id self.cache["revision_origin"]["added"].add(revision.id) def revision_visited(self, revision: RevisionEntry) -> bool: return revision.id in dict( self.cache["revision_in_origin"] ) or self.storage.revision_visited(revision.id) def normalize(path: bytes) -> bytes: return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index 1731ec4..de95e82 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,365 +1,161 @@ from datetime import datetime -import logging -import os -from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple +from typing import Dict, Generator, Iterable, Optional -import psycopg2 -from typing_extensions import Literal, Protocol, TypedDict, runtime_checkable +from typing_extensions import Protocol, runtime_checkable from swh.model.model import Sha1Git from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry class ProvenanceResult: def __init__( self, content: Sha1Git, revision: Sha1Git, date: datetime, origin: Optional[str], path: bytes, ) -> None: self.content = content self.revision = revision self.date = date self.origin = origin self.path = path @runtime_checkable class ProvenanceInterface(Protocol): raise_on_commit: bool = False def flush(self) -> None: """Flush internal cache to the underlying `storage`.""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: """Associate `blob` with `directory` in the provenance model. `prefix` is the relative path from `directory` to `blob` (excluding `blob`'s name). """ ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: """Associate `blob` with `revision` in the provenance model. `prefix` is the absolute path from `revision`'s root directory to `blob` (excluding `blob`'s name). """ ... def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: """Retrieve the first occurrence of the blob identified by `id`.""" ... def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: """Retrieve all the occurrences of the blob identified by `id`.""" ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: """Retrieve the earliest known date of `blob`.""" ... def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[Sha1Git, datetime]: """Retrieve the earliest known date for each blob in `blobs`. If some blob has no associated date, it is not present in the resulting dictionary. """ ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: """Associate `date` to `blob` as it's earliest known date.""" ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: """Associate `directory` with `revision` in the provenance model. `path` is the absolute path from `revision`'s root directory to `directory` (including `directory`'s name). """ ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: """Retrieve the earliest known date of `directory` as an isochrone frontier in the provenance model. """ ... def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[Sha1Git, datetime]: """Retrieve the earliest known date for each directory in `dirs` as isochrone frontiers provenance model. If some directory has no associated date, it is not present in the resulting dictionary. """ ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: """Associate `date` to `directory` as it's earliest known date as an isochrone frontier in the provenance model. """ ... def origin_add(self, origin: OriginEntry) -> None: """Add `origin` to the provenance model.""" ... def revision_add(self, revision: RevisionEntry) -> None: """Add `revision` to the provenance model. This implies storing `revision`'s date in the model, thus `revision.date` must be a valid date. """ ... def revision_add_before_revision( self, head: RevisionEntry, revision: RevisionEntry ) -> None: """Associate `revision` to `head` as an ancestor of the latter.""" ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: """Associate `revision` to `origin` as a head revision of the latter (ie. the target of an snapshot for `origin` in the archive).""" ... def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: """Retrieve the date associated to `revision`.""" ... def revision_get_preferred_origin( self, revision: RevisionEntry ) -> Optional[Sha1Git]: """Retrieve the preferred origin associated to `revision`.""" ... def revision_in_history(self, revision: RevisionEntry) -> bool: """Check if `revision` is known to be an ancestor of some head revision in the provenance model. """ ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: """Associate `origin` as the preferred origin for `revision`.""" ... def revision_visited(self, revision: RevisionEntry) -> bool: """Check if `revision` is known to be a head revision for some origin in the provenance model. """ ... - - -class DatetimeCache(TypedDict): - data: Dict[Sha1Git, Optional[datetime]] - added: Set[Sha1Git] - - -class OriginCache(TypedDict): - data: Dict[Sha1Git, str] - added: Set[Sha1Git] - - -class RevisionCache(TypedDict): - data: Dict[Sha1Git, Sha1Git] - added: Set[Sha1Git] - - -class ProvenanceCache(TypedDict): - content: DatetimeCache - directory: DatetimeCache - revision: DatetimeCache - # below are insertion caches only - content_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] - content_in_directory: Set[Tuple[Sha1Git, Sha1Git, bytes]] - directory_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] - # these two are for the origin layer - origin: OriginCache - revision_origin: RevisionCache - revision_before_revision: Dict[Sha1Git, Set[Sha1Git]] - revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]] - - -def new_cache() -> ProvenanceCache: - return ProvenanceCache( - content=DatetimeCache(data={}, added=set()), - directory=DatetimeCache(data={}, added=set()), - revision=DatetimeCache(data={}, added=set()), - content_in_revision=set(), - content_in_directory=set(), - directory_in_revision=set(), - origin=OriginCache(data={}, added=set()), - revision_origin=RevisionCache(data={}, added=set()), - revision_before_revision={}, - revision_in_origin=set(), - ) - - -# TODO: maybe move this to a separate file -class ProvenanceBackend: - raise_on_commit: bool = False - - def __init__(self, conn: psycopg2.extensions.connection): - from .postgresql.provenancedb_base import ProvenanceDBBase - - # TODO: this class should not know what the actual used DB is. - self.storage: ProvenanceDBBase - flavor = ProvenanceDBBase(conn).flavor - if flavor == "with-path": - from .postgresql.provenancedb_with_path import ProvenanceWithPathDB - - self.storage = ProvenanceWithPathDB(conn) - else: - from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB - - self.storage = ProvenanceWithoutPathDB(conn) - self.cache: ProvenanceCache = new_cache() - - def clear_caches(self) -> None: - self.cache = new_cache() - - def flush(self) -> None: - # TODO: for now we just forward the cache. This should be improved! - while not self.storage.commit(self.cache, raise_on_commit=self.raise_on_commit): - logging.warning( - f"Unable to commit cached information {self.cache}. Retrying..." - ) - self.clear_caches() - - def content_add_to_directory( - self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes - ) -> None: - self.cache["content_in_directory"].add( - (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) - ) - - def content_add_to_revision( - self, revision: RevisionEntry, blob: FileEntry, prefix: bytes - ) -> None: - self.cache["content_in_revision"].add( - (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) - ) - - def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: - return self.storage.content_find_first(id) - - def content_find_all( - self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[ProvenanceResult, None, None]: - yield from self.storage.content_find_all(id, limit=limit) - - def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: - return self.get_dates("content", [blob.id]).get(blob.id) - - def content_get_early_dates( - self, blobs: Iterable[FileEntry] - ) -> Dict[Sha1Git, datetime]: - return self.get_dates("content", [blob.id for blob in blobs]) - - def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: - self.cache["content"]["data"][blob.id] = date - self.cache["content"]["added"].add(blob.id) - - def directory_add_to_revision( - self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes - ) -> None: - self.cache["directory_in_revision"].add( - (directory.id, revision.id, normalize(path)) - ) - - def directory_get_date_in_isochrone_frontier( - self, directory: DirectoryEntry - ) -> Optional[datetime]: - return self.get_dates("directory", [directory.id]).get(directory.id) - - def directory_get_dates_in_isochrone_frontier( - self, dirs: Iterable[DirectoryEntry] - ) -> Dict[Sha1Git, datetime]: - return self.get_dates("directory", [directory.id for directory in dirs]) - - def directory_set_date_in_isochrone_frontier( - self, directory: DirectoryEntry, date: datetime - ) -> None: - self.cache["directory"]["data"][directory.id] = date - self.cache["directory"]["added"].add(directory.id) - - def get_dates( - self, entity: Literal["content", "revision", "directory"], ids: List[Sha1Git] - ) -> Dict[Sha1Git, datetime]: - cache = self.cache[entity] - missing_ids = set(id for id in ids if id not in cache) - if missing_ids: - cache["data"].update(self.storage.get_dates(entity, list(missing_ids))) - dates: Dict[Sha1Git, datetime] = {} - for sha1 in ids: - date = cache["data"].get(sha1) - if date is not None: - dates[sha1] = date - return dates - - def origin_add(self, origin: OriginEntry) -> None: - self.cache["origin"]["data"][origin.id] = origin.url - self.cache["origin"]["added"].add(origin.id) - - def revision_add(self, revision: RevisionEntry) -> None: - self.cache["revision"]["data"][revision.id] = revision.date - self.cache["revision"]["added"].add(revision.id) - - def revision_add_before_revision( - self, head: RevisionEntry, revision: RevisionEntry - ) -> None: - self.cache["revision_before_revision"].setdefault(revision.id, set()).add( - head.id - ) - - def revision_add_to_origin( - self, origin: OriginEntry, revision: RevisionEntry - ) -> None: - self.cache["revision_in_origin"].add((revision.id, origin.id)) - - def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: - return self.get_dates("revision", [revision.id]).get(revision.id) - - def revision_get_preferred_origin( - self, revision: RevisionEntry - ) -> Optional[Sha1Git]: - cache = self.cache["revision_origin"] - if revision.id not in cache: - origin = self.storage.revision_get_preferred_origin(revision.id) - if origin is not None: - cache["data"][revision.id] = origin - return cache["data"].get(revision.id) - - def revision_in_history(self, revision: RevisionEntry) -> bool: - return revision.id in self.cache[ - "revision_before_revision" - ] or self.storage.revision_in_history(revision.id) - - def revision_set_preferred_origin( - self, origin: OriginEntry, revision: RevisionEntry - ) -> None: - self.cache["revision_origin"]["data"][revision.id] = origin.id - self.cache["revision_origin"]["added"].add(revision.id) - - def revision_visited(self, revision: RevisionEntry) -> bool: - return revision.id in dict( - self.cache["revision_in_origin"] - ) or self.storage.revision_visited(revision.id) - - -def normalize(path: bytes) -> bytes: - return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py index ddde62f..3ce45fc 100644 --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -1,235 +1,235 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from os import path import re from typing import Iterable, Iterator, List, Optional import msgpack import pytest from typing_extensions import TypedDict from swh.core.db import BaseDb from swh.journal.serializers import msgpack_ext_hook from swh.model.hashutil import hash_to_bytes from swh.model.model import Sha1Git from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.provenance.postgresql.archive import ArchivePostgreSQL from swh.provenance.storage.archive import ArchiveStorage from swh.storage.replay import process_replay_objects @pytest.fixture(params=["with-path", "without-path"]) def provenance(request, postgresql): """return a working and initialized provenance db""" from swh.core.cli.db import populate_database_for_package flavor = request.param populate_database_for_package("swh.provenance", postgresql.dsn, flavor=flavor) - from swh.provenance.provenance import ProvenanceBackend + from swh.provenance.backend import ProvenanceBackend BaseDb.adapt_conn(postgresql) prov = ProvenanceBackend(postgresql) assert prov.storage.flavor == flavor # in test sessions, we DO want to raise any exception occurring at commit time prov.raise_on_commit = True return prov @pytest.fixture def swh_storage_with_objects(swh_storage): """return a Storage object (postgresql-based by default) with a few of each object type in it The inserted content comes from swh.model.tests.swh_model_data. """ for obj_type in ( "content", "skipped_content", "directory", "revision", "release", "snapshot", "origin", "origin_visit", "origin_visit_status", ): getattr(swh_storage, f"{obj_type}_add")(TEST_OBJECTS[obj_type]) return swh_storage @pytest.fixture def archive_direct(swh_storage_with_objects): return ArchivePostgreSQL(swh_storage_with_objects.get_db().conn) @pytest.fixture def archive_api(swh_storage_with_objects): return ArchiveStorage(swh_storage_with_objects) @pytest.fixture(params=["archive", "db"]) def archive(request, swh_storage_with_objects): """Return a ArchivePostgreSQL based StorageInterface object""" # this is a workaround to prevent tests from hanging because of an unclosed # transaction. # TODO: refactor the ArchivePostgreSQL to properly deal with # transactions and get rif of this fixture if request.param == "db": archive = ArchivePostgreSQL(conn=swh_storage_with_objects.get_db().conn) yield archive archive.conn.rollback() else: yield ArchiveStorage(swh_storage_with_objects) def get_datafile(fname): return path.join(path.dirname(__file__), "data", fname) def load_repo_data(repo): data = {} with open(get_datafile(f"{repo}.msgpack"), "rb") as fobj: unpacker = msgpack.Unpacker( fobj, raw=False, ext_hook=msgpack_ext_hook, strict_map_key=False, timestamp=3, # convert Timestamp in datetime objects (tz UTC) ) for objtype, objd in unpacker: data.setdefault(objtype, []).append(objd) return data def filter_dict(d, keys): return {k: v for (k, v) in d.items() if k in keys} def fill_storage(storage, data): process_replay_objects(data, storage=storage) class SynthRelation(TypedDict): prefix: Optional[str] path: str src: Sha1Git dst: Sha1Git rel_ts: float class SynthRevision(TypedDict): sha1: Sha1Git date: float msg: str R_C: List[SynthRelation] R_D: List[SynthRelation] D_C: List[SynthRelation] def synthetic_result(filename: str) -> Iterator[SynthRevision]: """Generates dict representations of synthetic revisions found in the synthetic file (from the data/ directory) given as argument of the generator. Generated SynthRevision (typed dict) with the following elements: "sha1": (Sha1Git) sha1 of the revision, "date": (float) timestamp of the revision, "msg": (str) commit message of the revision, "R_C": (list) new R---C relations added by this revision "R_D": (list) new R-D relations added by this revision "D_C": (list) new D-C relations added by this revision Each relation above is a SynthRelation typed dict with: "path": (str) location "src": (Sha1Git) sha1 of the source of the relation "dst": (Sha1Git) sha1 of the destination of the relation "rel_ts": (float) timestamp of the target of the relation (related to the timestamp of the revision) """ with open(get_datafile(filename), "r") as fobj: yield from _parse_synthetic_file(fobj) def _parse_synthetic_file(fobj: Iterable[str]) -> Iterator[SynthRevision]: """Read a 'synthetic' file and generate a dict representation of the synthetic revision for each revision listed in the synthetic file. """ regs = [ "(?PR[0-9]{2,4})?", "(?P[^| ]*)", "([+] )?(?P[^| +]*?)[/]?", "(?P[RDC]) (?P[0-9a-z]{40})", "(?P-?[0-9]+(.[0-9]+)?)", ] regex = re.compile("^ *" + r" *[|] *".join(regs) + r" *(#.*)?$") current_rev: List[dict] = [] for m in (regex.match(line) for line in fobj): if m: d = m.groupdict() if d["revname"]: if current_rev: yield _mk_synth_rev(current_rev) current_rev.clear() current_rev.append(d) if current_rev: yield _mk_synth_rev(current_rev) def _mk_synth_rev(synth_rev) -> SynthRevision: assert synth_rev[0]["type"] == "R" rev = SynthRevision( sha1=hash_to_bytes(synth_rev[0]["sha1"]), date=float(synth_rev[0]["ts"]), msg=synth_rev[0]["revname"], R_C=[], R_D=[], D_C=[], ) current_path = None # path of the last R-D relation we parsed, used a prefix for next D-C # relations for row in synth_rev[1:]: if row["reltype"] == "R---C": assert row["type"] == "C" rev["R_C"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = None elif row["reltype"] == "R-D": assert row["type"] == "D" rev["R_D"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = row["path"] elif row["reltype"] == "D-C": assert row["type"] == "C" rev["D_C"].append( SynthRelation( prefix=current_path, path=row["path"], src=rev["R_D"][-1]["dst"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) return rev