Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/backend.py
- This file was copied from swh/provenance/provenance.py.
from datetime import datetime | from datetime import datetime | ||||
import logging | import logging | ||||
import os | import os | ||||
from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple | from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple | ||||
import psycopg2 | import psycopg2 # TODO: remove this dependency | ||||
from typing_extensions import Literal, Protocol, TypedDict, runtime_checkable | from typing_extensions import Literal, TypedDict | ||||
from swh.model.model import Sha1Git | from swh.model.model import Sha1Git | ||||
from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry | from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry | ||||
from .provenance import ProvenanceResult | |||||
# XXX: this protocol doesn't make much sense now that flavours have been delegated to | |||||
# another class, lower in the callstack. | |||||
@runtime_checkable | |||||
class ProvenanceInterface(Protocol): | |||||
raise_on_commit: bool = False | |||||
def commit(self): | |||||
"""Commit currently ongoing transactions in the backend DB""" | |||||
... | |||||
def content_add_to_directory( | |||||
self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes | |||||
) -> None: | |||||
... | |||||
def content_add_to_revision( | |||||
self, revision: RevisionEntry, blob: FileEntry, prefix: bytes | |||||
) -> None: | |||||
... | |||||
def content_find_first( | |||||
self, id: Sha1Git | |||||
) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: | |||||
... | |||||
def content_find_all( | |||||
self, id: Sha1Git, limit: Optional[int] = None | |||||
) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: | |||||
... | |||||
def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: | |||||
... | |||||
def content_get_early_dates( | |||||
self, blobs: Iterable[FileEntry] | |||||
) -> Dict[Sha1Git, datetime]: | |||||
... | |||||
def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: | |||||
... | |||||
def directory_add_to_revision( | |||||
self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes | |||||
) -> None: | |||||
... | |||||
def directory_get_date_in_isochrone_frontier( | |||||
self, directory: DirectoryEntry | |||||
) -> Optional[datetime]: | |||||
... | |||||
def directory_get_dates_in_isochrone_frontier( | |||||
self, dirs: Iterable[DirectoryEntry] | |||||
) -> Dict[Sha1Git, datetime]: | |||||
... | |||||
def directory_set_date_in_isochrone_frontier( | |||||
self, directory: DirectoryEntry, date: datetime | |||||
) -> None: | |||||
... | |||||
def origin_add(self, origin: OriginEntry) -> None: | |||||
... | |||||
def revision_add(self, revision: RevisionEntry) -> None: | |||||
... | |||||
def revision_add_before_revision( | |||||
self, relative: RevisionEntry, revision: RevisionEntry | |||||
) -> None: | |||||
... | |||||
def revision_add_to_origin( | |||||
self, origin: OriginEntry, revision: RevisionEntry | |||||
) -> None: | |||||
... | |||||
def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: | |||||
... | |||||
def revision_get_preferred_origin( | |||||
self, revision: RevisionEntry | |||||
) -> Optional[Sha1Git]: | |||||
... | |||||
def revision_in_history(self, revision: RevisionEntry) -> bool: | |||||
... | |||||
def revision_set_preferred_origin( | |||||
self, origin: OriginEntry, revision: RevisionEntry | |||||
) -> None: | |||||
... | |||||
def revision_visited(self, revision: RevisionEntry) -> bool: | |||||
... | |||||
class DatetimeCache(TypedDict): | class DatetimeCache(TypedDict): | ||||
data: Dict[Sha1Git, Optional[datetime]] | data: Dict[Sha1Git, Optional[datetime]] | ||||
added: Set[Sha1Git] | added: Set[Sha1Git] | ||||
class OriginCache(TypedDict): | class OriginCache(TypedDict): | ||||
Show All 16 Lines | class ProvenanceCache(TypedDict): | ||||
directory_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] | directory_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] | ||||
# these two are for the origin layer | # these two are for the origin layer | ||||
origin: OriginCache | origin: OriginCache | ||||
revision_origin: RevisionCache | revision_origin: RevisionCache | ||||
revision_before_revision: Dict[Sha1Git, Set[Sha1Git]] | revision_before_revision: Dict[Sha1Git, Set[Sha1Git]] | ||||
revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]] | revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]] | ||||
def new_cache(): | def new_cache() -> ProvenanceCache: | ||||
return ProvenanceCache( | return ProvenanceCache( | ||||
content=DatetimeCache(data={}, added=set()), | content=DatetimeCache(data={}, added=set()), | ||||
directory=DatetimeCache(data={}, added=set()), | directory=DatetimeCache(data={}, added=set()), | ||||
revision=DatetimeCache(data={}, added=set()), | revision=DatetimeCache(data={}, added=set()), | ||||
content_in_revision=set(), | content_in_revision=set(), | ||||
content_in_directory=set(), | content_in_directory=set(), | ||||
directory_in_revision=set(), | directory_in_revision=set(), | ||||
origin=OriginCache(data={}, added=set()), | origin=OriginCache(data={}, added=set()), | ||||
Show All 18 Lines | def __init__(self, conn: psycopg2.extensions.connection): | ||||
self.storage = ProvenanceWithPathDB(conn) | self.storage = ProvenanceWithPathDB(conn) | ||||
else: | else: | ||||
from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB | from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB | ||||
self.storage = ProvenanceWithoutPathDB(conn) | self.storage = ProvenanceWithoutPathDB(conn) | ||||
self.cache: ProvenanceCache = new_cache() | self.cache: ProvenanceCache = new_cache() | ||||
def clear_caches(self): | def clear_caches(self) -> None: | ||||
self.cache = new_cache() | self.cache = new_cache() | ||||
def commit(self): | def flush(self) -> None: | ||||
# TODO: for now we just forward the cache. This should be improved! | # TODO: for now we just forward the cache. This should be improved! | ||||
while not self.storage.commit(self.cache, raise_on_commit=self.raise_on_commit): | while not self.storage.commit(self.cache, raise_on_commit=self.raise_on_commit): | ||||
logging.warning( | logging.warning( | ||||
f"Unable to commit cached information {self.write_cache}. Retrying..." | f"Unable to commit cached information {self.cache}. Retrying..." | ||||
) | ) | ||||
self.clear_caches() | self.clear_caches() | ||||
def content_add_to_directory( | def content_add_to_directory( | ||||
self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes | self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes | ||||
): | ) -> None: | ||||
self.cache["content_in_directory"].add( | self.cache["content_in_directory"].add( | ||||
(blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) | (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) | ||||
) | ) | ||||
def content_add_to_revision( | def content_add_to_revision( | ||||
self, revision: RevisionEntry, blob: FileEntry, prefix: bytes | self, revision: RevisionEntry, blob: FileEntry, prefix: bytes | ||||
): | ) -> None: | ||||
self.cache["content_in_revision"].add( | self.cache["content_in_revision"].add( | ||||
(blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) | (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) | ||||
) | ) | ||||
def content_find_first( | def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: | ||||
self, id: Sha1Git | |||||
) -> Optional[Tuple[Sha1Git, Sha1Git, datetime, bytes]]: | |||||
return self.storage.content_find_first(id) | return self.storage.content_find_first(id) | ||||
def content_find_all( | def content_find_all( | ||||
self, id: Sha1Git, limit: Optional[int] = None | self, id: Sha1Git, limit: Optional[int] = None | ||||
) -> Generator[Tuple[Sha1Git, Sha1Git, datetime, bytes], None, None]: | ) -> Generator[ProvenanceResult, None, None]: | ||||
yield from self.storage.content_find_all(id, limit=limit) | yield from self.storage.content_find_all(id, limit=limit) | ||||
def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: | def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: | ||||
return self.get_dates("content", [blob.id]).get(blob.id) | return self.get_dates("content", [blob.id]).get(blob.id) | ||||
def content_get_early_dates( | def content_get_early_dates( | ||||
self, blobs: Iterable[FileEntry] | self, blobs: Iterable[FileEntry] | ||||
) -> Dict[Sha1Git, datetime]: | ) -> Dict[Sha1Git, datetime]: | ||||
return self.get_dates("content", [blob.id for blob in blobs]) | return self.get_dates("content", [blob.id for blob in blobs]) | ||||
def content_set_early_date(self, blob: FileEntry, date: datetime): | def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: | ||||
self.cache["content"]["data"][blob.id] = date | self.cache["content"]["data"][blob.id] = date | ||||
self.cache["content"]["added"].add(blob.id) | self.cache["content"]["added"].add(blob.id) | ||||
def directory_add_to_revision( | def directory_add_to_revision( | ||||
self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes | self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes | ||||
): | ) -> None: | ||||
self.cache["directory_in_revision"].add( | self.cache["directory_in_revision"].add( | ||||
(directory.id, revision.id, normalize(path)) | (directory.id, revision.id, normalize(path)) | ||||
) | ) | ||||
def directory_get_date_in_isochrone_frontier( | def directory_get_date_in_isochrone_frontier( | ||||
self, directory: DirectoryEntry | self, directory: DirectoryEntry | ||||
) -> Optional[datetime]: | ) -> Optional[datetime]: | ||||
return self.get_dates("directory", [directory.id]).get(directory.id) | return self.get_dates("directory", [directory.id]).get(directory.id) | ||||
def directory_get_dates_in_isochrone_frontier( | def directory_get_dates_in_isochrone_frontier( | ||||
self, dirs: Iterable[DirectoryEntry] | self, dirs: Iterable[DirectoryEntry] | ||||
) -> Dict[Sha1Git, datetime]: | ) -> Dict[Sha1Git, datetime]: | ||||
return self.get_dates("directory", [directory.id for directory in dirs]) | return self.get_dates("directory", [directory.id for directory in dirs]) | ||||
def directory_set_date_in_isochrone_frontier( | def directory_set_date_in_isochrone_frontier( | ||||
self, directory: DirectoryEntry, date: datetime | self, directory: DirectoryEntry, date: datetime | ||||
): | ) -> None: | ||||
self.cache["directory"]["data"][directory.id] = date | self.cache["directory"]["data"][directory.id] = date | ||||
self.cache["directory"]["added"].add(directory.id) | self.cache["directory"]["added"].add(directory.id) | ||||
def get_dates( | def get_dates( | ||||
self, entity: Literal["content", "revision", "directory"], ids: List[Sha1Git] | self, entity: Literal["content", "revision", "directory"], ids: List[Sha1Git] | ||||
) -> Dict[Sha1Git, datetime]: | ) -> Dict[Sha1Git, datetime]: | ||||
cache = self.cache[entity] | cache = self.cache[entity] | ||||
missing_ids = set(id for id in ids if id not in cache) | missing_ids = set(id for id in ids if id not in cache) | ||||
if missing_ids: | if missing_ids: | ||||
cache["data"].update(self.storage.get_dates(entity, list(missing_ids))) | cache["data"].update(self.storage.get_dates(entity, list(missing_ids))) | ||||
dates: Dict[Sha1Git, datetime] = {} | dates: Dict[Sha1Git, datetime] = {} | ||||
for sha1 in ids: | for sha1 in ids: | ||||
date = cache["data"].get(sha1) | date = cache["data"].get(sha1) | ||||
if date is not None: | if date is not None: | ||||
dates[sha1] = date | dates[sha1] = date | ||||
return dates | return dates | ||||
def origin_add(self, origin: OriginEntry) -> None: | def origin_add(self, origin: OriginEntry) -> None: | ||||
self.cache["origin"]["data"][origin.id] = origin.url | self.cache["origin"]["data"][origin.id] = origin.url | ||||
self.cache["origin"]["added"].add(origin.id) | self.cache["origin"]["added"].add(origin.id) | ||||
def revision_add(self, revision: RevisionEntry): | def revision_add(self, revision: RevisionEntry) -> None: | ||||
self.cache["revision"]["data"][revision.id] = revision.date | self.cache["revision"]["data"][revision.id] = revision.date | ||||
self.cache["revision"]["added"].add(revision.id) | self.cache["revision"]["added"].add(revision.id) | ||||
def revision_add_before_revision( | def revision_add_before_revision( | ||||
self, relative: RevisionEntry, revision: RevisionEntry | self, head: RevisionEntry, revision: RevisionEntry | ||||
): | ) -> None: | ||||
self.cache["revision_before_revision"].setdefault(revision.id, set()).add( | self.cache["revision_before_revision"].setdefault(revision.id, set()).add( | ||||
relative.id | head.id | ||||
) | ) | ||||
def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): | def revision_add_to_origin( | ||||
self, origin: OriginEntry, revision: RevisionEntry | |||||
) -> None: | |||||
self.cache["revision_in_origin"].add((revision.id, origin.id)) | self.cache["revision_in_origin"].add((revision.id, origin.id)) | ||||
def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: | def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: | ||||
return self.get_dates("revision", [revision.id]).get(revision.id) | return self.get_dates("revision", [revision.id]).get(revision.id) | ||||
def revision_get_preferred_origin( | def revision_get_preferred_origin( | ||||
self, revision: RevisionEntry | self, revision: RevisionEntry | ||||
) -> Optional[Sha1Git]: | ) -> Optional[Sha1Git]: | ||||
cache = self.cache["revision_origin"] | cache = self.cache["revision_origin"] | ||||
if revision.id not in cache: | if revision.id not in cache: | ||||
origin = self.storage.revision_get_preferred_origin(revision.id) | origin = self.storage.revision_get_preferred_origin(revision.id) | ||||
if origin is not None: | if origin is not None: | ||||
cache["data"][revision.id] = origin | cache["data"][revision.id] = origin | ||||
return cache["data"].get(revision.id) | return cache["data"].get(revision.id) | ||||
def revision_in_history(self, revision: RevisionEntry) -> bool: | def revision_in_history(self, revision: RevisionEntry) -> bool: | ||||
return revision.id in self.cache[ | return revision.id in self.cache[ | ||||
"revision_before_revision" | "revision_before_revision" | ||||
] or self.storage.revision_in_history(revision.id) | ] or self.storage.revision_in_history(revision.id) | ||||
def revision_set_preferred_origin( | def revision_set_preferred_origin( | ||||
self, origin: OriginEntry, revision: RevisionEntry | self, origin: OriginEntry, revision: RevisionEntry | ||||
): | ) -> None: | ||||
self.cache["revision_origin"]["data"][revision.id] = origin.id | self.cache["revision_origin"]["data"][revision.id] = origin.id | ||||
self.cache["revision_origin"]["added"].add(revision.id) | self.cache["revision_origin"]["added"].add(revision.id) | ||||
def revision_visited(self, revision: RevisionEntry) -> bool: | def revision_visited(self, revision: RevisionEntry) -> bool: | ||||
return revision.id in dict( | return revision.id in dict( | ||||
self.cache["revision_in_origin"] | self.cache["revision_in_origin"] | ||||
) or self.storage.revision_visited(revision.id) | ) or self.storage.revision_visited(revision.id) | ||||
def normalize(path: bytes) -> bytes: | def normalize(path: bytes) -> bytes: | ||||
return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path | return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path |