diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -4,7 +4,7 @@ if TYPE_CHECKING: from .archive import ArchiveInterface - from .provenance import ProvenanceInterface, ProvenanceStorageInterface + from .interface import ProvenanceInterface, ProvenanceStorageInterface def get_archive(cls: str, **kwargs) -> ArchiveInterface: @@ -19,7 +19,7 @@ queries to the archive's database) Raises: - :cls:`ValueError` if passed an unknown archive class. + :cls:`ValueError` if passed an unknown archive class. """ if cls == "api": from swh.storage import get_storage @@ -47,9 +47,9 @@ Returns: an instance of provenance object """ - from .backend import ProvenanceBackend + from .provenance import Provenance - return ProvenanceBackend(get_provenance_storage(**kwargs)) + return Provenance(get_provenance_storage(**kwargs)) def get_provenance_storage(cls: str, **kwargs) -> ProvenanceStorageInterface: @@ -63,7 +63,7 @@ an instance of storage object Raises: - :cls:`ValueError` if passed an unknown archive class. + :cls:`ValueError` if passed an unknown archive class. """ if cls == "local": from swh.core.db import BaseDb diff --git a/swh/provenance/backend.py b/swh/provenance/backend.py deleted file mode 100644 --- a/swh/provenance/backend.py +++ /dev/null @@ -1,344 +0,0 @@ -from datetime import datetime -import logging -import os -from typing import Dict, Generator, Iterable, Optional, Set, Tuple - -from typing_extensions import Literal, TypedDict - -from swh.model.model import Sha1Git - -from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry -from .provenance import ( - ProvenanceResult, - ProvenanceStorageInterface, - RelationData, - RelationType, -) - - -class DatetimeCache(TypedDict): - data: Dict[Sha1Git, Optional[datetime]] - added: Set[Sha1Git] - - -class OriginCache(TypedDict): - data: Dict[Sha1Git, str] - added: Set[Sha1Git] - - -class RevisionCache(TypedDict): - data: Dict[Sha1Git, Sha1Git] - added: Set[Sha1Git] - - -class ProvenanceCache(TypedDict): - content: DatetimeCache - directory: DatetimeCache - revision: DatetimeCache - # below are insertion caches only - content_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] - content_in_directory: Set[Tuple[Sha1Git, Sha1Git, bytes]] - directory_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] - # these two are for the origin layer - origin: OriginCache - revision_origin: RevisionCache - revision_before_revision: Dict[Sha1Git, Set[Sha1Git]] - revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]] - - -def new_cache() -> ProvenanceCache: - return ProvenanceCache( - content=DatetimeCache(data={}, added=set()), - directory=DatetimeCache(data={}, added=set()), - revision=DatetimeCache(data={}, added=set()), - content_in_revision=set(), - content_in_directory=set(), - directory_in_revision=set(), - origin=OriginCache(data={}, added=set()), - revision_origin=RevisionCache(data={}, added=set()), - revision_before_revision={}, - revision_in_origin=set(), - ) - - -# TODO: maybe move this to a separate file -class ProvenanceBackend: - def __init__(self, storage: ProvenanceStorageInterface) -> None: - self.storage = storage - self.cache = new_cache() - - def clear_caches(self) -> None: - self.cache = new_cache() - - def flush(self) -> None: - # Revision-content layer insertions ############################################ - - # For this layer, relations need to be inserted first so that, in case of - # failure, reprocessing the input does not generated an inconsistent database. - while not self.storage.relation_add( - RelationType.CNT_EARLY_IN_REV, - ( - RelationData(src=src, dst=dst, path=path) - for src, dst, path in self.cache["content_in_revision"] - ), - ): - logging.warning( - f"Unable to write {RelationType.CNT_EARLY_IN_REV} rows to the storage. " - f"Data: {self.cache['content_in_revision']}. Retrying..." - ) - - while not self.storage.relation_add( - RelationType.CNT_IN_DIR, - ( - RelationData(src=src, dst=dst, path=path) - for src, dst, path in self.cache["content_in_directory"] - ), - ): - logging.warning( - f"Unable to write {RelationType.CNT_IN_DIR} rows to the storage. " - f"Data: {self.cache['content_in_directory']}. Retrying..." - ) - - while not self.storage.relation_add( - RelationType.DIR_IN_REV, - ( - RelationData(src=src, dst=dst, path=path) - for src, dst, path in self.cache["directory_in_revision"] - ), - ): - logging.warning( - f"Unable to write {RelationType.DIR_IN_REV} rows to the storage. " - f"Data: {self.cache['directory_in_revision']}. Retrying..." - ) - - # After relations, dates for the entities can be safely set, acknowledging that - # these entities won't need to be reprocessed in case of failure. - dates = { - sha1: date - for sha1, date in self.cache["content"]["data"].items() - if sha1 in self.cache["content"]["added"] and date is not None - } - while not self.storage.content_set_date(dates): - logging.warning( - f"Unable to write content dates to the storage. " - f"Data: {dates}. Retrying..." - ) - - dates = { - sha1: date - for sha1, date in self.cache["directory"]["data"].items() - if sha1 in self.cache["directory"]["added"] and date is not None - } - while not self.storage.directory_set_date(dates): - logging.warning( - f"Unable to write directory dates to the storage. " - f"Data: {dates}. Retrying..." - ) - - dates = { - sha1: date - for sha1, date in self.cache["revision"]["data"].items() - if sha1 in self.cache["revision"]["added"] and date is not None - } - while not self.storage.revision_set_date(dates): - logging.warning( - f"Unable to write revision dates to the storage. " - f"Data: {dates}. Retrying..." - ) - - # Origin-revision layer insertions ############################################# - - # Origins urls should be inserted first so that internal ids' resolution works - # properly. - urls = { - sha1: date - for sha1, date in self.cache["origin"]["data"].items() - if sha1 in self.cache["origin"]["added"] - } - while not self.storage.origin_set_url(urls): - logging.warning( - f"Unable to write origins urls to the storage. " - f"Data: {urls}. Retrying..." - ) - - # Second, flat models for revisions' histories (ie. revision-before-revision). - data: Iterable[RelationData] = sum( - [ - [ - RelationData(src=prev, dst=next, path=None) - for next in self.cache["revision_before_revision"][prev] - ] - for prev in self.cache["revision_before_revision"] - ], - [], - ) - while not self.storage.relation_add(RelationType.REV_BEFORE_REV, data): - logging.warning( - f"Unable to write {RelationType.REV_BEFORE_REV} rows to the storage. " - f"Data: {data}. Retrying..." - ) - - # Heads (ie. revision-in-origin entries) should be inserted once flat models for - # their histories were already added. This is to guarantee consistent results if - # something needs to be reprocessed due to a failure: already inserted heads - # won't get reprocessed in such a case. - data = ( - RelationData(src=rev, dst=org, path=None) - for rev, org in self.cache["revision_in_origin"] - ) - while not self.storage.relation_add(RelationType.REV_IN_ORG, data): - logging.warning( - f"Unable to write {RelationType.REV_IN_ORG} rows to the storage. " - f"Data: {data}. Retrying..." - ) - - # Finally, preferred origins for the visited revisions are set (this step can be - # reordered if required). - origins = { - sha1: self.cache["revision_origin"]["data"][sha1] - for sha1 in self.cache["revision_origin"]["added"] - } - while not self.storage.revision_set_origin(origins): - logging.warning( - f"Unable to write preferred origins to the storage. " - f"Data: {origins}. Retrying..." - ) - - # clear local cache ############################################################ - self.clear_caches() - - def content_add_to_directory( - self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes - ) -> None: - self.cache["content_in_directory"].add( - (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) - ) - - def content_add_to_revision( - self, revision: RevisionEntry, blob: FileEntry, prefix: bytes - ) -> None: - self.cache["content_in_revision"].add( - (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) - ) - - def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: - return self.storage.content_find_first(id) - - def content_find_all( - self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[ProvenanceResult, None, None]: - yield from self.storage.content_find_all(id, limit=limit) - - def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: - return self.get_dates("content", [blob.id]).get(blob.id) - - def content_get_early_dates( - self, blobs: Iterable[FileEntry] - ) -> Dict[Sha1Git, datetime]: - return self.get_dates("content", [blob.id for blob in blobs]) - - def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: - self.cache["content"]["data"][blob.id] = date - self.cache["content"]["added"].add(blob.id) - - def directory_add_to_revision( - self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes - ) -> None: - self.cache["directory_in_revision"].add( - (directory.id, revision.id, normalize(path)) - ) - - def directory_get_date_in_isochrone_frontier( - self, directory: DirectoryEntry - ) -> Optional[datetime]: - return self.get_dates("directory", [directory.id]).get(directory.id) - - def directory_get_dates_in_isochrone_frontier( - self, dirs: Iterable[DirectoryEntry] - ) -> Dict[Sha1Git, datetime]: - return self.get_dates("directory", [directory.id for directory in dirs]) - - def directory_set_date_in_isochrone_frontier( - self, directory: DirectoryEntry, date: datetime - ) -> None: - self.cache["directory"]["data"][directory.id] = date - self.cache["directory"]["added"].add(directory.id) - - def get_dates( - self, - entity: Literal["content", "directory", "revision"], - ids: Iterable[Sha1Git], - ) -> Dict[Sha1Git, datetime]: - cache = self.cache[entity] - missing_ids = set(id for id in ids if id not in cache) - if missing_ids: - if entity == "revision": - updated = { - id: rev.date - for id, rev in self.storage.revision_get(missing_ids).items() - if rev.date is not None - } - else: - updated = getattr(self.storage, f"{entity}_get")(missing_ids) - cache["data"].update(updated) - dates: Dict[Sha1Git, datetime] = {} - for sha1 in ids: - date = cache["data"].get(sha1) - if date is not None: - dates[sha1] = date - return dates - - def origin_add(self, origin: OriginEntry) -> None: - self.cache["origin"]["data"][origin.id] = origin.url - self.cache["origin"]["added"].add(origin.id) - - def revision_add(self, revision: RevisionEntry) -> None: - self.cache["revision"]["data"][revision.id] = revision.date - self.cache["revision"]["added"].add(revision.id) - - def revision_add_before_revision( - self, head: RevisionEntry, revision: RevisionEntry - ) -> None: - self.cache["revision_before_revision"].setdefault(revision.id, set()).add( - head.id - ) - - def revision_add_to_origin( - self, origin: OriginEntry, revision: RevisionEntry - ) -> None: - self.cache["revision_in_origin"].add((revision.id, origin.id)) - - def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: - return self.get_dates("revision", [revision.id]).get(revision.id) - - def revision_get_preferred_origin( - self, revision: RevisionEntry - ) -> Optional[Sha1Git]: - cache = self.cache["revision_origin"]["data"] - if revision.id not in cache: - ret = self.storage.revision_get([revision.id]) - if revision.id in ret: - origin = ret[revision.id].origin - if origin is not None: - cache[revision.id] = origin - return cache.get(revision.id) - - def revision_in_history(self, revision: RevisionEntry) -> bool: - return revision.id in self.cache["revision_before_revision"] or bool( - self.storage.relation_get(RelationType.REV_BEFORE_REV, [revision.id]) - ) - - def revision_set_preferred_origin( - self, origin: OriginEntry, revision: RevisionEntry - ) -> None: - self.cache["revision_origin"]["data"][revision.id] = origin.id - self.cache["revision_origin"]["added"].add(revision.id) - - def revision_visited(self, revision: RevisionEntry) -> bool: - return revision.id in dict(self.cache["revision_in_origin"]) or bool( - self.storage.relation_get(RelationType.REV_IN_ORG, [revision.id]) - ) - - -def normalize(path: bytes) -> bytes: - return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path diff --git a/swh/provenance/graph.py b/swh/provenance/graph.py --- a/swh/provenance/graph.py +++ b/swh/provenance/graph.py @@ -8,8 +8,8 @@ from swh.model.model import Sha1Git from .archive import ArchiveInterface +from .interface import ProvenanceInterface from .model import DirectoryEntry, RevisionEntry -from .provenance import ProvenanceInterface UTCMIN = datetime.min.replace(tzinfo=timezone.utc) diff --git a/swh/provenance/provenance.py b/swh/provenance/interface.py copy from swh/provenance/provenance.py copy to swh/provenance/interface.py diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -7,8 +7,8 @@ from .archive import ArchiveInterface from .graph import HistoryNode, build_history_graph +from .interface import ProvenanceInterface from .model import OriginEntry, RevisionEntry -from .provenance import ProvenanceInterface class CSVOriginIterator: diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -10,7 +10,7 @@ from swh.core.db import BaseDb from swh.model.model import Sha1Git -from ..provenance import ( +from ..interface import ( EntityType, ProvenanceResult, RelationData, diff --git a/swh/provenance/postgresql/provenancedb_with_path.py b/swh/provenance/postgresql/provenancedb_with_path.py --- a/swh/provenance/postgresql/provenancedb_with_path.py +++ b/swh/provenance/postgresql/provenancedb_with_path.py @@ -2,7 +2,7 @@ from swh.model.model import Sha1Git -from ..provenance import ProvenanceResult, RelationType +from ..interface import ProvenanceResult, RelationType from .provenancedb_base import ProvenanceDBBase diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,313 +1,343 @@ from datetime import datetime -import enum -from typing import Dict, Generator, Iterable, Optional, Set +import logging +import os +from typing import Dict, Generator, Iterable, Optional, Set, Tuple -from typing_extensions import Protocol, runtime_checkable +from typing_extensions import Literal, TypedDict from swh.model.model import Sha1Git +from .interface import ( + ProvenanceResult, + ProvenanceStorageInterface, + RelationData, + RelationType, +) from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry -class EntityType(enum.Enum): - CONTENT = "content" - DIRECTORY = "directory" - REVISION = "revision" - ORIGIN = "origin" +class DatetimeCache(TypedDict): + data: Dict[Sha1Git, Optional[datetime]] + added: Set[Sha1Git] -class RelationType(enum.Enum): - CNT_EARLY_IN_REV = "content_in_revision" - CNT_IN_DIR = "content_in_directory" - DIR_IN_REV = "directory_in_revision" - REV_IN_ORG = "revision_in_origin" - REV_BEFORE_REV = "revision_before_revision" +class OriginCache(TypedDict): + data: Dict[Sha1Git, str] + added: Set[Sha1Git] -class ProvenanceResult: - def __init__( - self, - content: Sha1Git, - revision: Sha1Git, - date: datetime, - origin: Optional[str], - path: bytes, - ) -> None: - self.content = content - self.revision = revision - self.date = date - self.origin = origin - self.path = path - +class RevisionCache(TypedDict): + data: Dict[Sha1Git, Sha1Git] + added: Set[Sha1Git] -class RevisionData: - """Object representing the data associated to a revision in the provenance model, - where `date` is the optional date of the revision (specifying it acknowledges that - the revision was already processed by the revision-content algorithm); and `origin` - identifies the preferred origin for the revision, if any. - """ - def __init__( - self, - date: Optional[datetime], - origin: Optional[Sha1Git], - ) -> None: - self.date = date - self.origin = origin +class ProvenanceCache(TypedDict): + content: DatetimeCache + directory: DatetimeCache + revision: DatetimeCache + # below are insertion caches only + content_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] + content_in_directory: Set[Tuple[Sha1Git, Sha1Git, bytes]] + directory_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] + # these two are for the origin layer + origin: OriginCache + revision_origin: RevisionCache + revision_before_revision: Dict[Sha1Git, Set[Sha1Git]] + revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]] -class RelationData: - """Object representing a relation entry in the provenance model, where `src` and - `dst` are the sha1 ids of the entities being related, and `path` is optional - depending on the relation being represented. - """ - - def __init__( - self, - src: Sha1Git, - dst: Sha1Git, - path: Optional[bytes], - ) -> None: - self.src = src - self.dst = dst - self.path = path - +def new_cache() -> ProvenanceCache: + return ProvenanceCache( + content=DatetimeCache(data={}, added=set()), + directory=DatetimeCache(data={}, added=set()), + revision=DatetimeCache(data={}, added=set()), + content_in_revision=set(), + content_in_directory=set(), + directory_in_revision=set(), + origin=OriginCache(data={}, added=set()), + revision_origin=RevisionCache(data={}, added=set()), + revision_before_revision={}, + revision_in_origin=set(), + ) -@runtime_checkable -class ProvenanceStorageInterface(Protocol): - raise_on_commit: bool = False - def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: - """Retrieve the first occurrence of the blob identified by `id`.""" - ... +class Provenance: + def __init__(self, storage: ProvenanceStorageInterface) -> None: + self.storage = storage + self.cache = new_cache() - def content_find_all( - self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[ProvenanceResult, None, None]: - """Retrieve all the occurrences of the blob identified by `id`.""" - ... - - def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: - """Associate dates to blobs identified by sha1 ids, as paired in `dates`. Return - a boolean stating whether the information was successfully stored. - """ - ... - - def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: - """Retrieve the associated date for each blob sha1 in `ids`. If some blob has - no associated date, it is not present in the resulting dictionary. - """ - ... - - def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: - """Associate dates to directories identified by sha1 ids, as paired in - `dates`. Return a boolean stating whether the information was successfully - stored. - """ - ... - - def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: - """Retrieve the associated date for each directory sha1 in `ids`. If some - directory has no associated date, it is not present in the resulting dictionary. - """ - ... - - def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: - """Retrieve all sha1 ids for entities of type `entity` present in the provenance - model. - """ - ... - - def location_get(self) -> Set[bytes]: - """Retrieve all paths present in the provenance model.""" - ... - - def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: - """Associate urls to origins identified by sha1 ids, as paired in `urls`. Return - a boolean stating whether the information was successfully stored. - """ - ... - - def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: - """Retrieve the associated url for each origin sha1 in `ids`. If some origin has - no associated date, it is not present in the resulting dictionary. - """ - ... - - def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: - """Associate dates to revisions identified by sha1 ids, as paired in `dates`. - Return a boolean stating whether the information was successfully stored. - """ - ... - - def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: - """Associate origins to revisions identified by sha1 ids, as paired in - `origins` (revision ids are keys and origin ids, values). Return a boolean - stating whether the information was successfully stored. - """ - ... - - def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: - """Retrieve the associated date and origin for each revision sha1 in `ids`. If - some revision has no associated date nor origin, it is not present in the - resulting dictionary. - """ - ... - - def relation_add( - self, relation: RelationType, data: Iterable[RelationData] - ) -> bool: - """Add entries in the selected `relation`.""" - ... - - def relation_get( - self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False - ) -> Set[RelationData]: - """Retrieve all entries in the selected `relation` whose source entities are - identified by some sha1 id in `ids`. If `reverse` is set, destination entities - are matched instead. - """ - ... - - def relation_get_all(self, relation: RelationType) -> Set[RelationData]: - """Retrieve all entries in the selected `relation` that are present in the - provenance model. - """ - ... - - -@runtime_checkable -class ProvenanceInterface(Protocol): - storage: ProvenanceStorageInterface + def clear_caches(self) -> None: + self.cache = new_cache() def flush(self) -> None: - """Flush internal cache to the underlying `storage`.""" - ... + # Revision-content layer insertions ############################################ + + # For this layer, relations need to be inserted first so that, in case of + # failure, reprocessing the input does not generated an inconsistent database. + while not self.storage.relation_add( + RelationType.CNT_EARLY_IN_REV, + ( + RelationData(src=src, dst=dst, path=path) + for src, dst, path in self.cache["content_in_revision"] + ), + ): + logging.warning( + f"Unable to write {RelationType.CNT_EARLY_IN_REV} rows to the storage. " + f"Data: {self.cache['content_in_revision']}. Retrying..." + ) + + while not self.storage.relation_add( + RelationType.CNT_IN_DIR, + ( + RelationData(src=src, dst=dst, path=path) + for src, dst, path in self.cache["content_in_directory"] + ), + ): + logging.warning( + f"Unable to write {RelationType.CNT_IN_DIR} rows to the storage. " + f"Data: {self.cache['content_in_directory']}. Retrying..." + ) + + while not self.storage.relation_add( + RelationType.DIR_IN_REV, + ( + RelationData(src=src, dst=dst, path=path) + for src, dst, path in self.cache["directory_in_revision"] + ), + ): + logging.warning( + f"Unable to write {RelationType.DIR_IN_REV} rows to the storage. " + f"Data: {self.cache['directory_in_revision']}. Retrying..." + ) + + # After relations, dates for the entities can be safely set, acknowledging that + # these entities won't need to be reprocessed in case of failure. + dates = { + sha1: date + for sha1, date in self.cache["content"]["data"].items() + if sha1 in self.cache["content"]["added"] and date is not None + } + while not self.storage.content_set_date(dates): + logging.warning( + f"Unable to write content dates to the storage. " + f"Data: {dates}. Retrying..." + ) + + dates = { + sha1: date + for sha1, date in self.cache["directory"]["data"].items() + if sha1 in self.cache["directory"]["added"] and date is not None + } + while not self.storage.directory_set_date(dates): + logging.warning( + f"Unable to write directory dates to the storage. " + f"Data: {dates}. Retrying..." + ) + + dates = { + sha1: date + for sha1, date in self.cache["revision"]["data"].items() + if sha1 in self.cache["revision"]["added"] and date is not None + } + while not self.storage.revision_set_date(dates): + logging.warning( + f"Unable to write revision dates to the storage. " + f"Data: {dates}. Retrying..." + ) + + # Origin-revision layer insertions ############################################# + + # Origins urls should be inserted first so that internal ids' resolution works + # properly. + urls = { + sha1: date + for sha1, date in self.cache["origin"]["data"].items() + if sha1 in self.cache["origin"]["added"] + } + while not self.storage.origin_set_url(urls): + logging.warning( + f"Unable to write origins urls to the storage. " + f"Data: {urls}. Retrying..." + ) + + # Second, flat models for revisions' histories (ie. revision-before-revision). + data: Iterable[RelationData] = sum( + [ + [ + RelationData(src=prev, dst=next, path=None) + for next in self.cache["revision_before_revision"][prev] + ] + for prev in self.cache["revision_before_revision"] + ], + [], + ) + while not self.storage.relation_add(RelationType.REV_BEFORE_REV, data): + logging.warning( + f"Unable to write {RelationType.REV_BEFORE_REV} rows to the storage. " + f"Data: {data}. Retrying..." + ) + + # Heads (ie. revision-in-origin entries) should be inserted once flat models for + # their histories were already added. This is to guarantee consistent results if + # something needs to be reprocessed due to a failure: already inserted heads + # won't get reprocessed in such a case. + data = ( + RelationData(src=rev, dst=org, path=None) + for rev, org in self.cache["revision_in_origin"] + ) + while not self.storage.relation_add(RelationType.REV_IN_ORG, data): + logging.warning( + f"Unable to write {RelationType.REV_IN_ORG} rows to the storage. " + f"Data: {data}. Retrying..." + ) + + # Finally, preferred origins for the visited revisions are set (this step can be + # reordered if required). + origins = { + sha1: self.cache["revision_origin"]["data"][sha1] + for sha1 in self.cache["revision_origin"]["added"] + } + while not self.storage.revision_set_origin(origins): + logging.warning( + f"Unable to write preferred origins to the storage. " + f"Data: {origins}. Retrying..." + ) + + # clear local cache ############################################################ + self.clear_caches() def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: - """Associate `blob` with `directory` in the provenance model. `prefix` is the - relative path from `directory` to `blob` (excluding `blob`'s name). - """ - ... + self.cache["content_in_directory"].add( + (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) + ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: - """Associate `blob` with `revision` in the provenance model. `prefix` is the - absolute path from `revision`'s root directory to `blob` (excluding `blob`'s - name). - """ - ... + self.cache["content_in_revision"].add( + (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) + ) def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: - """Retrieve the first occurrence of the blob identified by `id`.""" - ... + return self.storage.content_find_first(id) def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: - """Retrieve all the occurrences of the blob identified by `id`.""" - ... + yield from self.storage.content_find_all(id, limit=limit) def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: - """Retrieve the earliest known date of `blob`.""" - ... + return self.get_dates("content", [blob.id]).get(blob.id) def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[Sha1Git, datetime]: - """Retrieve the earliest known date for each blob in `blobs`. If some blob has - no associated date, it is not present in the resulting dictionary. - """ - ... + return self.get_dates("content", [blob.id for blob in blobs]) def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: - """Associate `date` to `blob` as it's earliest known date.""" - ... + self.cache["content"]["data"][blob.id] = date + self.cache["content"]["added"].add(blob.id) def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: - """Associate `directory` with `revision` in the provenance model. `path` is the - absolute path from `revision`'s root directory to `directory` (including - `directory`'s name). - """ - ... + self.cache["directory_in_revision"].add( + (directory.id, revision.id, normalize(path)) + ) def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: - """Retrieve the earliest known date of `directory` as an isochrone frontier in - the provenance model. - """ - ... + return self.get_dates("directory", [directory.id]).get(directory.id) def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[Sha1Git, datetime]: - """Retrieve the earliest known date for each directory in `dirs` as isochrone - frontiers provenance model. If some directory has no associated date, it is not - present in the resulting dictionary. - """ - ... + return self.get_dates("directory", [directory.id for directory in dirs]) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: - """Associate `date` to `directory` as it's earliest known date as an isochrone - frontier in the provenance model. - """ - ... + self.cache["directory"]["data"][directory.id] = date + self.cache["directory"]["added"].add(directory.id) + + def get_dates( + self, + entity: Literal["content", "directory", "revision"], + ids: Iterable[Sha1Git], + ) -> Dict[Sha1Git, datetime]: + cache = self.cache[entity] + missing_ids = set(id for id in ids if id not in cache) + if missing_ids: + if entity == "revision": + updated = { + id: rev.date + for id, rev in self.storage.revision_get(missing_ids).items() + if rev.date is not None + } + else: + updated = getattr(self.storage, f"{entity}_get")(missing_ids) + cache["data"].update(updated) + dates: Dict[Sha1Git, datetime] = {} + for sha1 in ids: + date = cache["data"].get(sha1) + if date is not None: + dates[sha1] = date + return dates def origin_add(self, origin: OriginEntry) -> None: - """Add `origin` to the provenance model.""" - ... + self.cache["origin"]["data"][origin.id] = origin.url + self.cache["origin"]["added"].add(origin.id) def revision_add(self, revision: RevisionEntry) -> None: - """Add `revision` to the provenance model. This implies storing `revision`'s - date in the model, thus `revision.date` must be a valid date. - """ - ... + self.cache["revision"]["data"][revision.id] = revision.date + self.cache["revision"]["added"].add(revision.id) def revision_add_before_revision( self, head: RevisionEntry, revision: RevisionEntry ) -> None: - """Associate `revision` to `head` as an ancestor of the latter.""" - ... + self.cache["revision_before_revision"].setdefault(revision.id, set()).add( + head.id + ) def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: - """Associate `revision` to `origin` as a head revision of the latter (ie. the - target of an snapshot for `origin` in the archive).""" - ... + self.cache["revision_in_origin"].add((revision.id, origin.id)) def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: - """Retrieve the date associated to `revision`.""" - ... + return self.get_dates("revision", [revision.id]).get(revision.id) def revision_get_preferred_origin( self, revision: RevisionEntry ) -> Optional[Sha1Git]: - """Retrieve the preferred origin associated to `revision`.""" - ... + cache = self.cache["revision_origin"]["data"] + if revision.id not in cache: + ret = self.storage.revision_get([revision.id]) + if revision.id in ret: + origin = ret[revision.id].origin + if origin is not None: + cache[revision.id] = origin + return cache.get(revision.id) def revision_in_history(self, revision: RevisionEntry) -> bool: - """Check if `revision` is known to be an ancestor of some head revision in the - provenance model. - """ - ... + return revision.id in self.cache["revision_before_revision"] or bool( + self.storage.relation_get(RelationType.REV_BEFORE_REV, [revision.id]) + ) def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: - """Associate `origin` as the preferred origin for `revision`.""" - ... + self.cache["revision_origin"]["data"][revision.id] = origin.id + self.cache["revision_origin"]["added"].add(revision.id) def revision_visited(self, revision: RevisionEntry) -> bool: - """Check if `revision` is known to be a head revision for some origin in the - provenance model. - """ - ... + return revision.id in dict(self.cache["revision_in_origin"]) or bool( + self.storage.relation_get(RelationType.REV_IN_ORG, [revision.id]) + ) + + +def normalize(path: bytes) -> bytes: + return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -8,8 +8,8 @@ from .archive import ArchiveInterface from .graph import IsochroneNode, build_isochrone_graph +from .interface import ProvenanceInterface from .model import DirectoryEntry, RevisionEntry -from .provenance import ProvenanceInterface class CSVRevisionIterator: diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -19,9 +19,9 @@ from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.provenance import get_provenance from swh.provenance.archive import ArchiveInterface +from swh.provenance.interface import ProvenanceInterface from swh.provenance.postgresql.archive import ArchivePostgreSQL from swh.provenance.postgresql.provenancedb_base import ProvenanceDBBase -from swh.provenance.provenance import ProvenanceInterface from swh.provenance.storage.archive import ArchiveStorage from swh.storage.postgresql.storage import Storage from swh.storage.replay import process_replay_objects diff --git a/swh/provenance/tests/test_conftest.py b/swh/provenance/tests/test_conftest.py --- a/swh/provenance/tests/test_conftest.py +++ b/swh/provenance/tests/test_conftest.py @@ -3,7 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.provenance.provenance import ProvenanceInterface +from swh.provenance.interface import ProvenanceInterface from swh.storage.postgresql.storage import Storage diff --git a/swh/provenance/tests/test_history_graph.py b/swh/provenance/tests/test_history_graph.py --- a/swh/provenance/tests/test_history_graph.py +++ b/swh/provenance/tests/test_history_graph.py @@ -11,9 +11,9 @@ from swh.model.hashutil import hash_to_bytes from swh.provenance.archive import ArchiveInterface from swh.provenance.graph import HistoryNode, build_history_graph +from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import OriginEntry, RevisionEntry from swh.provenance.origin import origin_add_revision -from swh.provenance.provenance import ProvenanceInterface from swh.provenance.tests.conftest import fill_storage, get_datafile, load_repo_data from swh.storage.postgresql.storage import Storage diff --git a/swh/provenance/tests/test_isochrone_graph.py b/swh/provenance/tests/test_isochrone_graph.py --- a/swh/provenance/tests/test_isochrone_graph.py +++ b/swh/provenance/tests/test_isochrone_graph.py @@ -13,8 +13,8 @@ from swh.model.hashutil import hash_to_bytes from swh.provenance.archive import ArchiveInterface from swh.provenance.graph import IsochroneNode, build_isochrone_graph +from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import DirectoryEntry, RevisionEntry -from swh.provenance.provenance import ProvenanceInterface from swh.provenance.revision import revision_add from swh.provenance.tests.conftest import fill_storage, get_datafile, load_repo_data from swh.provenance.tests.test_provenance_db import ts2dt diff --git a/swh/provenance/tests/test_provenance_db.py b/swh/provenance/tests/test_provenance_db.py --- a/swh/provenance/tests/test_provenance_db.py +++ b/swh/provenance/tests/test_provenance_db.py @@ -8,12 +8,12 @@ from swh.model.model import OriginVisitStatus from swh.model.tests.swh_model_data import TEST_OBJECTS +from swh.provenance.interface import ProvenanceInterface, ProvenanceStorageInterface from swh.provenance.model import OriginEntry from swh.provenance.origin import origin_add from swh.provenance.postgresql.provenancedb_base import ProvenanceDBBase from swh.provenance.postgresql.provenancedb_with_path import ProvenanceWithPathDB from swh.provenance.postgresql.provenancedb_without_path import ProvenanceWithoutPathDB -from swh.provenance.provenance import ProvenanceInterface, ProvenanceStorageInterface from swh.provenance.storage.archive import ArchiveStorage from swh.storage.postgresql.storage import Storage diff --git a/swh/provenance/tests/test_provenance_heuristics.py b/swh/provenance/tests/test_provenance_heuristics.py --- a/swh/provenance/tests/test_provenance_heuristics.py +++ b/swh/provenance/tests/test_provenance_heuristics.py @@ -9,9 +9,9 @@ from swh.model.hashutil import hash_to_bytes from swh.provenance.archive import ArchiveInterface +from swh.provenance.interface import EntityType, ProvenanceInterface, RelationType from swh.provenance.model import RevisionEntry from swh.provenance.postgresql.provenancedb_base import ProvenanceDBBase -from swh.provenance.provenance import EntityType, ProvenanceInterface, RelationType from swh.provenance.revision import revision_add from swh.provenance.tests.conftest import ( fill_storage,