Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/provenance.py
- This file was copied to swh/provenance/interface.py.
# Copyright (C) 2021 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
from datetime import datetime | from datetime import datetime | ||||
import enum | import logging | ||||
from typing import Dict, Generator, Iterable, Optional, Set | import os | ||||
from typing import Dict, Generator, Iterable, Optional, Set, Tuple | |||||
from typing_extensions import Protocol, runtime_checkable | from typing_extensions import Literal, TypedDict | ||||
from swh.model.model import Sha1Git | from swh.model.model import Sha1Git | ||||
from .interface import ( | |||||
ProvenanceResult, | |||||
ProvenanceStorageInterface, | |||||
RelationData, | |||||
RelationType, | |||||
) | |||||
from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry | from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry | ||||
class EntityType(enum.Enum): | class DatetimeCache(TypedDict): | ||||
CONTENT = "content" | data: Dict[Sha1Git, Optional[datetime]] | ||||
DIRECTORY = "directory" | added: Set[Sha1Git] | ||||
REVISION = "revision" | |||||
ORIGIN = "origin" | |||||
class OriginCache(TypedDict): | |||||
data: Dict[Sha1Git, str] | |||||
class RelationType(enum.Enum): | added: Set[Sha1Git] | ||||
CNT_EARLY_IN_REV = "content_in_revision" | |||||
CNT_IN_DIR = "content_in_directory" | |||||
DIR_IN_REV = "directory_in_revision" | class RevisionCache(TypedDict): | ||||
REV_IN_ORG = "revision_in_origin" | data: Dict[Sha1Git, Sha1Git] | ||||
REV_BEFORE_REV = "revision_before_revision" | added: Set[Sha1Git] | ||||
class ProvenanceResult: | class ProvenanceCache(TypedDict): | ||||
def __init__( | content: DatetimeCache | ||||
self, | directory: DatetimeCache | ||||
content: Sha1Git, | revision: DatetimeCache | ||||
revision: Sha1Git, | # below are insertion caches only | ||||
date: datetime, | content_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] | ||||
origin: Optional[str], | content_in_directory: Set[Tuple[Sha1Git, Sha1Git, bytes]] | ||||
path: bytes, | directory_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] | ||||
) -> None: | # these two are for the origin layer | ||||
self.content = content | origin: OriginCache | ||||
self.revision = revision | revision_origin: RevisionCache | ||||
self.date = date | revision_before_revision: Dict[Sha1Git, Set[Sha1Git]] | ||||
self.origin = origin | revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]] | ||||
self.path = path | |||||
def new_cache() -> ProvenanceCache: | |||||
class RevisionData: | return ProvenanceCache( | ||||
"""Object representing the data associated to a revision in the provenance model, | content=DatetimeCache(data={}, added=set()), | ||||
where `date` is the optional date of the revision (specifying it acknowledges that | directory=DatetimeCache(data={}, added=set()), | ||||
the revision was already processed by the revision-content algorithm); and `origin` | revision=DatetimeCache(data={}, added=set()), | ||||
identifies the preferred origin for the revision, if any. | content_in_revision=set(), | ||||
""" | content_in_directory=set(), | ||||
directory_in_revision=set(), | |||||
def __init__( | origin=OriginCache(data={}, added=set()), | ||||
self, | revision_origin=RevisionCache(data={}, added=set()), | ||||
date: Optional[datetime], | revision_before_revision={}, | ||||
origin: Optional[Sha1Git], | revision_in_origin=set(), | ||||
) -> None: | ) | ||||
self.date = date | |||||
self.origin = origin | |||||
class Provenance: | |||||
def __init__(self, storage: ProvenanceStorageInterface) -> None: | |||||
class RelationData: | self.storage = storage | ||||
"""Object representing a relation entry in the provenance model, where `src` and | self.cache = new_cache() | ||||
`dst` are the sha1 ids of the entities being related, and `path` is optional | |||||
depending on the relation being represented. | |||||
""" | |||||
def __init__( | |||||
self, | |||||
src: Sha1Git, | |||||
dst: Sha1Git, | |||||
path: Optional[bytes], | |||||
) -> None: | |||||
self.src = src | |||||
self.dst = dst | |||||
self.path = path | |||||
@runtime_checkable | def clear_caches(self) -> None: | ||||
class ProvenanceStorageInterface(Protocol): | self.cache = new_cache() | ||||
raise_on_commit: bool = False | |||||
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: | |||||
"""Retrieve the first occurrence of the blob identified by `id`.""" | |||||
... | |||||
def content_find_all( | def flush(self) -> None: | ||||
self, id: Sha1Git, limit: Optional[int] = None | # Revision-content layer insertions ############################################ | ||||
) -> Generator[ProvenanceResult, None, None]: | |||||
"""Retrieve all the occurrences of the blob identified by `id`.""" | |||||
... | |||||
def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: | # For this layer, relations need to be inserted first so that, in case of | ||||
"""Associate dates to blobs identified by sha1 ids, as paired in `dates`. Return | # failure, reprocessing the input does not generated an inconsistent database. | ||||
a boolean stating whether the information was successfully stored. | while not self.storage.relation_add( | ||||
""" | RelationType.CNT_EARLY_IN_REV, | ||||
... | ( | ||||
RelationData(src=src, dst=dst, path=path) | |||||
def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: | for src, dst, path in self.cache["content_in_revision"] | ||||
"""Retrieve the associated date for each blob sha1 in `ids`. If some blob has | ), | ||||
no associated date, it is not present in the resulting dictionary. | ): | ||||
""" | logging.warning( | ||||
... | f"Unable to write {RelationType.CNT_EARLY_IN_REV} rows to the storage. " | ||||
f"Data: {self.cache['content_in_revision']}. Retrying..." | |||||
def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: | ) | ||||
"""Associate dates to directories identified by sha1 ids, as paired in | |||||
`dates`. Return a boolean stating whether the information was successfully | while not self.storage.relation_add( | ||||
stored. | RelationType.CNT_IN_DIR, | ||||
""" | ( | ||||
... | RelationData(src=src, dst=dst, path=path) | ||||
for src, dst, path in self.cache["content_in_directory"] | |||||
def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: | ), | ||||
"""Retrieve the associated date for each directory sha1 in `ids`. If some | ): | ||||
directory has no associated date, it is not present in the resulting dictionary. | logging.warning( | ||||
""" | f"Unable to write {RelationType.CNT_IN_DIR} rows to the storage. " | ||||
... | f"Data: {self.cache['content_in_directory']}. Retrying..." | ||||
) | |||||
def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: | |||||
"""Retrieve all sha1 ids for entities of type `entity` present in the provenance | while not self.storage.relation_add( | ||||
model. | RelationType.DIR_IN_REV, | ||||
""" | ( | ||||
... | RelationData(src=src, dst=dst, path=path) | ||||
for src, dst, path in self.cache["directory_in_revision"] | |||||
def location_get(self) -> Set[bytes]: | ), | ||||
"""Retrieve all paths present in the provenance model.""" | ): | ||||
... | logging.warning( | ||||
f"Unable to write {RelationType.DIR_IN_REV} rows to the storage. " | |||||
def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: | f"Data: {self.cache['directory_in_revision']}. Retrying..." | ||||
"""Associate urls to origins identified by sha1 ids, as paired in `urls`. Return | ) | ||||
a boolean stating whether the information was successfully stored. | |||||
""" | # After relations, dates for the entities can be safely set, acknowledging that | ||||
... | # these entities won't need to be reprocessed in case of failure. | ||||
dates = { | |||||
def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: | sha1: date | ||||
"""Retrieve the associated url for each origin sha1 in `ids`. If some origin has | for sha1, date in self.cache["content"]["data"].items() | ||||
no associated date, it is not present in the resulting dictionary. | if sha1 in self.cache["content"]["added"] and date is not None | ||||
""" | } | ||||
... | while not self.storage.content_set_date(dates): | ||||
logging.warning( | |||||
def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: | f"Unable to write content dates to the storage. " | ||||
"""Associate dates to revisions identified by sha1 ids, as paired in `dates`. | f"Data: {dates}. Retrying..." | ||||
Return a boolean stating whether the information was successfully stored. | ) | ||||
""" | |||||
... | dates = { | ||||
sha1: date | |||||
def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: | for sha1, date in self.cache["directory"]["data"].items() | ||||
"""Associate origins to revisions identified by sha1 ids, as paired in | if sha1 in self.cache["directory"]["added"] and date is not None | ||||
`origins` (revision ids are keys and origin ids, values). Return a boolean | } | ||||
stating whether the information was successfully stored. | while not self.storage.directory_set_date(dates): | ||||
""" | logging.warning( | ||||
... | f"Unable to write directory dates to the storage. " | ||||
f"Data: {dates}. Retrying..." | |||||
def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: | ) | ||||
"""Retrieve the associated date and origin for each revision sha1 in `ids`. If | |||||
some revision has no associated date nor origin, it is not present in the | dates = { | ||||
resulting dictionary. | sha1: date | ||||
""" | for sha1, date in self.cache["revision"]["data"].items() | ||||
... | if sha1 in self.cache["revision"]["added"] and date is not None | ||||
} | |||||
def relation_add( | while not self.storage.revision_set_date(dates): | ||||
self, relation: RelationType, data: Iterable[RelationData] | logging.warning( | ||||
) -> bool: | f"Unable to write revision dates to the storage. " | ||||
"""Add entries in the selected `relation`.""" | f"Data: {dates}. Retrying..." | ||||
... | ) | ||||
def relation_get( | # Origin-revision layer insertions ############################################# | ||||
self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False | |||||
) -> Set[RelationData]: | # Origins urls should be inserted first so that internal ids' resolution works | ||||
"""Retrieve all entries in the selected `relation` whose source entities are | # properly. | ||||
identified by some sha1 id in `ids`. If `reverse` is set, destination entities | urls = { | ||||
are matched instead. | sha1: date | ||||
""" | for sha1, date in self.cache["origin"]["data"].items() | ||||
... | if sha1 in self.cache["origin"]["added"] | ||||
} | |||||
def relation_get_all(self, relation: RelationType) -> Set[RelationData]: | while not self.storage.origin_set_url(urls): | ||||
"""Retrieve all entries in the selected `relation` that are present in the | logging.warning( | ||||
provenance model. | f"Unable to write origins urls to the storage. " | ||||
""" | f"Data: {urls}. Retrying..." | ||||
... | ) | ||||
# Second, flat models for revisions' histories (ie. revision-before-revision). | |||||
@runtime_checkable | data: Iterable[RelationData] = sum( | ||||
class ProvenanceInterface(Protocol): | [ | ||||
storage: ProvenanceStorageInterface | [ | ||||
RelationData(src=prev, dst=next, path=None) | |||||
for next in self.cache["revision_before_revision"][prev] | |||||
] | |||||
for prev in self.cache["revision_before_revision"] | |||||
], | |||||
[], | |||||
) | |||||
while not self.storage.relation_add(RelationType.REV_BEFORE_REV, data): | |||||
logging.warning( | |||||
f"Unable to write {RelationType.REV_BEFORE_REV} rows to the storage. " | |||||
f"Data: {data}. Retrying..." | |||||
) | |||||
# Heads (ie. revision-in-origin entries) should be inserted once flat models for | |||||
# their histories were already added. This is to guarantee consistent results if | |||||
# something needs to be reprocessed due to a failure: already inserted heads | |||||
# won't get reprocessed in such a case. | |||||
data = ( | |||||
RelationData(src=rev, dst=org, path=None) | |||||
for rev, org in self.cache["revision_in_origin"] | |||||
) | |||||
while not self.storage.relation_add(RelationType.REV_IN_ORG, data): | |||||
logging.warning( | |||||
f"Unable to write {RelationType.REV_IN_ORG} rows to the storage. " | |||||
f"Data: {data}. Retrying..." | |||||
) | |||||
# Finally, preferred origins for the visited revisions are set (this step can be | |||||
# reordered if required). | |||||
origins = { | |||||
sha1: self.cache["revision_origin"]["data"][sha1] | |||||
for sha1 in self.cache["revision_origin"]["added"] | |||||
} | |||||
while not self.storage.revision_set_origin(origins): | |||||
logging.warning( | |||||
f"Unable to write preferred origins to the storage. " | |||||
f"Data: {origins}. Retrying..." | |||||
) | |||||
def flush(self) -> None: | # clear local cache ############################################################ | ||||
"""Flush internal cache to the underlying `storage`.""" | self.clear_caches() | ||||
... | |||||
def content_add_to_directory( | def content_add_to_directory( | ||||
self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes | self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes | ||||
) -> None: | ) -> None: | ||||
"""Associate `blob` with `directory` in the provenance model. `prefix` is the | self.cache["content_in_directory"].add( | ||||
relative path from `directory` to `blob` (excluding `blob`'s name). | (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) | ||||
""" | ) | ||||
... | |||||
def content_add_to_revision( | def content_add_to_revision( | ||||
self, revision: RevisionEntry, blob: FileEntry, prefix: bytes | self, revision: RevisionEntry, blob: FileEntry, prefix: bytes | ||||
) -> None: | ) -> None: | ||||
"""Associate `blob` with `revision` in the provenance model. `prefix` is the | self.cache["content_in_revision"].add( | ||||
absolute path from `revision`'s root directory to `blob` (excluding `blob`'s | (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) | ||||
name). | ) | ||||
""" | |||||
... | |||||
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: | def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: | ||||
"""Retrieve the first occurrence of the blob identified by `id`.""" | return self.storage.content_find_first(id) | ||||
... | |||||
def content_find_all( | def content_find_all( | ||||
self, id: Sha1Git, limit: Optional[int] = None | self, id: Sha1Git, limit: Optional[int] = None | ||||
) -> Generator[ProvenanceResult, None, None]: | ) -> Generator[ProvenanceResult, None, None]: | ||||
"""Retrieve all the occurrences of the blob identified by `id`.""" | yield from self.storage.content_find_all(id, limit=limit) | ||||
... | |||||
def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: | def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: | ||||
"""Retrieve the earliest known date of `blob`.""" | return self.get_dates("content", [blob.id]).get(blob.id) | ||||
... | |||||
def content_get_early_dates( | def content_get_early_dates( | ||||
self, blobs: Iterable[FileEntry] | self, blobs: Iterable[FileEntry] | ||||
) -> Dict[Sha1Git, datetime]: | ) -> Dict[Sha1Git, datetime]: | ||||
"""Retrieve the earliest known date for each blob in `blobs`. If some blob has | return self.get_dates("content", [blob.id for blob in blobs]) | ||||
no associated date, it is not present in the resulting dictionary. | |||||
""" | |||||
... | |||||
def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: | def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: | ||||
"""Associate `date` to `blob` as it's earliest known date.""" | self.cache["content"]["data"][blob.id] = date | ||||
... | self.cache["content"]["added"].add(blob.id) | ||||
def directory_add_to_revision( | def directory_add_to_revision( | ||||
self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes | self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes | ||||
) -> None: | ) -> None: | ||||
"""Associate `directory` with `revision` in the provenance model. `path` is the | self.cache["directory_in_revision"].add( | ||||
absolute path from `revision`'s root directory to `directory` (including | (directory.id, revision.id, normalize(path)) | ||||
`directory`'s name). | ) | ||||
""" | |||||
... | |||||
def directory_get_date_in_isochrone_frontier( | def directory_get_date_in_isochrone_frontier( | ||||
self, directory: DirectoryEntry | self, directory: DirectoryEntry | ||||
) -> Optional[datetime]: | ) -> Optional[datetime]: | ||||
"""Retrieve the earliest known date of `directory` as an isochrone frontier in | return self.get_dates("directory", [directory.id]).get(directory.id) | ||||
the provenance model. | |||||
""" | |||||
... | |||||
def directory_get_dates_in_isochrone_frontier( | def directory_get_dates_in_isochrone_frontier( | ||||
self, dirs: Iterable[DirectoryEntry] | self, dirs: Iterable[DirectoryEntry] | ||||
) -> Dict[Sha1Git, datetime]: | ) -> Dict[Sha1Git, datetime]: | ||||
"""Retrieve the earliest known date for each directory in `dirs` as isochrone | return self.get_dates("directory", [directory.id for directory in dirs]) | ||||
frontiers provenance model. If some directory has no associated date, it is not | |||||
present in the resulting dictionary. | |||||
""" | |||||
... | |||||
def directory_set_date_in_isochrone_frontier( | def directory_set_date_in_isochrone_frontier( | ||||
self, directory: DirectoryEntry, date: datetime | self, directory: DirectoryEntry, date: datetime | ||||
) -> None: | ) -> None: | ||||
"""Associate `date` to `directory` as it's earliest known date as an isochrone | self.cache["directory"]["data"][directory.id] = date | ||||
frontier in the provenance model. | self.cache["directory"]["added"].add(directory.id) | ||||
""" | |||||
... | def get_dates( | ||||
self, | |||||
entity: Literal["content", "directory", "revision"], | |||||
ids: Iterable[Sha1Git], | |||||
) -> Dict[Sha1Git, datetime]: | |||||
cache = self.cache[entity] | |||||
missing_ids = set(id for id in ids if id not in cache) | |||||
if missing_ids: | |||||
if entity == "revision": | |||||
updated = { | |||||
id: rev.date | |||||
for id, rev in self.storage.revision_get(missing_ids).items() | |||||
if rev.date is not None | |||||
} | |||||
else: | |||||
updated = getattr(self.storage, f"{entity}_get")(missing_ids) | |||||
cache["data"].update(updated) | |||||
dates: Dict[Sha1Git, datetime] = {} | |||||
for sha1 in ids: | |||||
date = cache["data"].get(sha1) | |||||
if date is not None: | |||||
dates[sha1] = date | |||||
return dates | |||||
def origin_add(self, origin: OriginEntry) -> None: | def origin_add(self, origin: OriginEntry) -> None: | ||||
"""Add `origin` to the provenance model.""" | self.cache["origin"]["data"][origin.id] = origin.url | ||||
... | self.cache["origin"]["added"].add(origin.id) | ||||
def revision_add(self, revision: RevisionEntry) -> None: | def revision_add(self, revision: RevisionEntry) -> None: | ||||
"""Add `revision` to the provenance model. This implies storing `revision`'s | self.cache["revision"]["data"][revision.id] = revision.date | ||||
date in the model, thus `revision.date` must be a valid date. | self.cache["revision"]["added"].add(revision.id) | ||||
""" | |||||
... | |||||
def revision_add_before_revision( | def revision_add_before_revision( | ||||
self, head: RevisionEntry, revision: RevisionEntry | self, head: RevisionEntry, revision: RevisionEntry | ||||
) -> None: | ) -> None: | ||||
"""Associate `revision` to `head` as an ancestor of the latter.""" | self.cache["revision_before_revision"].setdefault(revision.id, set()).add( | ||||
... | head.id | ||||
) | |||||
def revision_add_to_origin( | def revision_add_to_origin( | ||||
self, origin: OriginEntry, revision: RevisionEntry | self, origin: OriginEntry, revision: RevisionEntry | ||||
) -> None: | ) -> None: | ||||
"""Associate `revision` to `origin` as a head revision of the latter (ie. the | self.cache["revision_in_origin"].add((revision.id, origin.id)) | ||||
target of an snapshot for `origin` in the archive).""" | |||||
... | |||||
def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: | def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: | ||||
"""Retrieve the date associated to `revision`.""" | return self.get_dates("revision", [revision.id]).get(revision.id) | ||||
... | |||||
def revision_get_preferred_origin( | def revision_get_preferred_origin( | ||||
self, revision: RevisionEntry | self, revision: RevisionEntry | ||||
) -> Optional[Sha1Git]: | ) -> Optional[Sha1Git]: | ||||
"""Retrieve the preferred origin associated to `revision`.""" | cache = self.cache["revision_origin"]["data"] | ||||
... | if revision.id not in cache: | ||||
ret = self.storage.revision_get([revision.id]) | |||||
if revision.id in ret: | |||||
origin = ret[revision.id].origin | |||||
if origin is not None: | |||||
cache[revision.id] = origin | |||||
return cache.get(revision.id) | |||||
def revision_in_history(self, revision: RevisionEntry) -> bool: | def revision_in_history(self, revision: RevisionEntry) -> bool: | ||||
"""Check if `revision` is known to be an ancestor of some head revision in the | return revision.id in self.cache["revision_before_revision"] or bool( | ||||
provenance model. | self.storage.relation_get(RelationType.REV_BEFORE_REV, [revision.id]) | ||||
""" | ) | ||||
... | |||||
def revision_set_preferred_origin( | def revision_set_preferred_origin( | ||||
self, origin: OriginEntry, revision: RevisionEntry | self, origin: OriginEntry, revision: RevisionEntry | ||||
) -> None: | ) -> None: | ||||
"""Associate `origin` as the preferred origin for `revision`.""" | self.cache["revision_origin"]["data"][revision.id] = origin.id | ||||
... | self.cache["revision_origin"]["added"].add(revision.id) | ||||
def revision_visited(self, revision: RevisionEntry) -> bool: | def revision_visited(self, revision: RevisionEntry) -> bool: | ||||
"""Check if `revision` is known to be a head revision for some origin in the | return revision.id in dict(self.cache["revision_in_origin"]) or bool( | ||||
provenance model. | self.storage.relation_get(RelationType.REV_IN_ORG, [revision.id]) | ||||
""" | ) | ||||
... | |||||
def normalize(path: bytes) -> bytes: | |||||
return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path |