diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -19,9 +19,15 @@ def commit(self, data: Dict[str, Any], raise_on_commit: bool = False) -> bool: try: # First insert entities - self.insert_entity("content", data["content"]) - self.insert_entity("directory", data["directory"]) - self.insert_entity("revision", data["revision"]) + for entity in ("content", "directory", "revision"): + + self.insert_entity( + entity, + { + sha1: data[entity]["data"][sha1] + for sha1 in data[entity]["added"] + }, + ) # Relations should come after ids for entities were resolved self.insert_relation( diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,10 +1,10 @@ from datetime import datetime import logging import os -from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple +from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple import psycopg2 -from typing_extensions import Protocol, runtime_checkable +from typing_extensions import Literal, Protocol, TypedDict, runtime_checkable from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry @@ -104,6 +104,37 @@ ... +class Cache(TypedDict): + data: Dict[bytes, datetime] + added: Set[bytes] + + +class ProvenanceCache(TypedDict): + content: Cache + directory: Cache + revision: Cache + # below are insertion caches only + content_early_in_rev: Set[Tuple[bytes, bytes, bytes]] + content_in_dir: Set[Tuple[bytes, bytes, bytes]] + directory_in_rev: Set[Tuple[bytes, bytes, bytes]] + # these two are for the origin layer + revision_before_rev: List[Tuple[bytes, bytes]] + revision_in_org: List[Tuple[bytes, int]] + + +def new_cache(): + return ProvenanceCache( + content=Cache(data={}, added=set()), + directory=Cache(data={}, added=set()), + revision=Cache(data={}, added=set()), + content_early_in_rev=set(), + content_in_dir=set(), + directory_in_rev=set(), + revision_before_rev=[], + revision_in_org=[], + ) + + # TODO: maybe move this to a separate file class ProvenanceBackend: raise_on_commit: bool = False @@ -122,28 +153,14 @@ self.storage = ProvenanceWithoutPathDB(conn) - self.write_cache: Dict[str, Any] = {} - self.read_cache: Dict[str, Any] = {} - self.clear_caches() + self.cache: ProvenanceCache = new_cache() def clear_caches(self): - self.write_cache = { - "content": dict(), - "content_early_in_rev": set(), - "content_in_dir": set(), - "directory": dict(), - "directory_in_rev": set(), - "revision": dict(), - "revision_before_rev": list(), - "revision_in_org": list(), - } - self.read_cache = {"content": dict(), "directory": dict(), "revision": dict()} + self.cache = new_cache() def commit(self): # TODO: for now we just forward the write_cache. This should be improved! - while not self.storage.commit( - self.write_cache, raise_on_commit=self.raise_on_commit - ): + while not self.storage.commit(self.cache, raise_on_commit=self.raise_on_commit): logging.warning( f"Unable to commit cached information {self.write_cache}. Retrying..." ) @@ -152,14 +169,14 @@ def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ): - self.write_cache["content_in_dir"].add( + self.cache["content_in_dir"].add( (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ): - self.write_cache["content_early_in_rev"].add( + self.cache["content_early_in_rev"].add( (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) ) @@ -182,16 +199,13 @@ return self.get_dates("content", [blob.id for blob in blobs]) def content_set_early_date(self, blob: FileEntry, date: datetime): - self.write_cache["content"][blob.id] = date - # update read cache as well - self.read_cache["content"][blob.id] = date + self.cache["content"]["data"][blob.id] = date + self.cache["content"]["added"].add(blob.id) def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ): - self.write_cache["directory_in_rev"].add( - (directory.id, revision.id, normalize(path)) - ) + self.cache["directory_in_rev"].add((directory.id, revision.id, normalize(path))) def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry @@ -206,22 +220,17 @@ def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ): - self.write_cache["directory"][directory.id] = date - # update read cache as well - self.read_cache["directory"][directory.id] = date - - def get_dates(self, entity: str, ids: List[bytes]) -> Dict[bytes, datetime]: - dates = {} - pending = [] - for sha1 in ids: - # Check whether the date has been queried before - date = self.read_cache[entity].get(sha1, None) - if date is not None: - dates[sha1] = date - else: - pending.append(sha1) - dates.update(self.storage.get_dates(entity, pending)) - return dates + self.cache["directory"]["data"][directory.id] = date + self.cache["directory"]["added"].add(directory.id) + + def get_dates( + self, entity: Literal["content", "revision", "directory"], ids: List[bytes] + ) -> Dict[bytes, datetime]: + cache = self.cache[entity] + missing_ids = set(id for id in ids if id not in cache) + if missing_ids: + cache["data"].update(self.storage.get_dates(entity, list(missing_ids))) + return {sha1: cache["data"][sha1] for sha1 in ids if sha1 in cache["data"]} def origin_get_id(self, origin: OriginEntry) -> int: if origin.id is None: @@ -231,17 +240,18 @@ def revision_add(self, revision: RevisionEntry): # Add current revision to the compact DB - self.write_cache["revision"][revision.id] = revision.date - # update read cache as well - self.read_cache["revision"][revision.id] = revision.date + assert revision.date is not None + self.cache["revision"]["data"][revision.id] = revision.date + self.cache["revision"]["added"].add(revision.id) def revision_add_before_revision( self, relative: RevisionEntry, revision: RevisionEntry ): - self.write_cache["revision_before_rev"].append((revision.id, relative.id)) + self.cache["revision_before_rev"].append((revision.id, relative.id)) def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry): - self.write_cache["revision_in_org"].append((revision.id, origin.id)) + assert origin.id is not None + self.cache["revision_in_org"].append((revision.id, origin.id)) def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: return self.get_dates("revision", [revision.id]).get(revision.id, None)