Page MenuHomeSoftware Heritage

D5829.diff
No OneTemporary

D5829.diff

diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py
--- a/swh/provenance/postgresql/provenancedb_base.py
+++ b/swh/provenance/postgresql/provenancedb_base.py
@@ -19,9 +19,15 @@
def commit(self, data: Dict[str, Any], raise_on_commit: bool = False) -> bool:
try:
# First insert entities
- self.insert_entity("content", data["content"])
- self.insert_entity("directory", data["directory"])
- self.insert_entity("revision", data["revision"])
+ for entity in ("content", "directory", "revision"):
+
+ self.insert_entity(
+ entity,
+ {
+ sha1: data[entity]["data"][sha1]
+ for sha1 in data[entity]["added"]
+ },
+ )
# Relations should come after ids for entities were resolved
self.insert_relation(
diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py
--- a/swh/provenance/provenance.py
+++ b/swh/provenance/provenance.py
@@ -1,10 +1,10 @@
from datetime import datetime
import logging
import os
-from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple
+from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple
import psycopg2
-from typing_extensions import Protocol, runtime_checkable
+from typing_extensions import Literal, Protocol, TypedDict, runtime_checkable
from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry
@@ -104,6 +104,37 @@
...
+class Cache(TypedDict):
+ data: Dict[bytes, datetime]
+ added: Set[bytes]
+
+
+class ProvenanceCache(TypedDict):
+ content: Cache
+ directory: Cache
+ revision: Cache
+ # below are insertion caches only
+ content_early_in_rev: Set[Tuple[bytes, bytes, bytes]]
+ content_in_dir: Set[Tuple[bytes, bytes, bytes]]
+ directory_in_rev: Set[Tuple[bytes, bytes, bytes]]
+ # these two are for the origin layer
+ revision_before_rev: List[Tuple[bytes, bytes]]
+ revision_in_org: List[Tuple[bytes, int]]
+
+
+def new_cache():
+ return ProvenanceCache(
+ content=Cache(data={}, added=set()),
+ directory=Cache(data={}, added=set()),
+ revision=Cache(data={}, added=set()),
+ content_early_in_rev=set(),
+ content_in_dir=set(),
+ directory_in_rev=set(),
+ revision_before_rev=[],
+ revision_in_org=[],
+ )
+
+
# TODO: maybe move this to a separate file
class ProvenanceBackend:
raise_on_commit: bool = False
@@ -122,28 +153,14 @@
self.storage = ProvenanceWithoutPathDB(conn)
- self.write_cache: Dict[str, Any] = {}
- self.read_cache: Dict[str, Any] = {}
- self.clear_caches()
+ self.cache: ProvenanceCache = new_cache()
def clear_caches(self):
- self.write_cache = {
- "content": dict(),
- "content_early_in_rev": set(),
- "content_in_dir": set(),
- "directory": dict(),
- "directory_in_rev": set(),
- "revision": dict(),
- "revision_before_rev": list(),
- "revision_in_org": list(),
- }
- self.read_cache = {"content": dict(), "directory": dict(), "revision": dict()}
+ self.cache = new_cache()
def commit(self):
# TODO: for now we just forward the write_cache. This should be improved!
- while not self.storage.commit(
- self.write_cache, raise_on_commit=self.raise_on_commit
- ):
+ while not self.storage.commit(self.cache, raise_on_commit=self.raise_on_commit):
logging.warning(
f"Unable to commit cached information {self.write_cache}. Retrying..."
)
@@ -152,14 +169,14 @@
def content_add_to_directory(
self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes
):
- self.write_cache["content_in_dir"].add(
+ self.cache["content_in_dir"].add(
(blob.id, directory.id, normalize(os.path.join(prefix, blob.name)))
)
def content_add_to_revision(
self, revision: RevisionEntry, blob: FileEntry, prefix: bytes
):
- self.write_cache["content_early_in_rev"].add(
+ self.cache["content_early_in_rev"].add(
(blob.id, revision.id, normalize(os.path.join(prefix, blob.name)))
)
@@ -182,16 +199,13 @@
return self.get_dates("content", [blob.id for blob in blobs])
def content_set_early_date(self, blob: FileEntry, date: datetime):
- self.write_cache["content"][blob.id] = date
- # update read cache as well
- self.read_cache["content"][blob.id] = date
+ self.cache["content"]["data"][blob.id] = date
+ self.cache["content"]["added"].add(blob.id)
def directory_add_to_revision(
self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes
):
- self.write_cache["directory_in_rev"].add(
- (directory.id, revision.id, normalize(path))
- )
+ self.cache["directory_in_rev"].add((directory.id, revision.id, normalize(path)))
def directory_get_date_in_isochrone_frontier(
self, directory: DirectoryEntry
@@ -206,22 +220,17 @@
def directory_set_date_in_isochrone_frontier(
self, directory: DirectoryEntry, date: datetime
):
- self.write_cache["directory"][directory.id] = date
- # update read cache as well
- self.read_cache["directory"][directory.id] = date
-
- def get_dates(self, entity: str, ids: List[bytes]) -> Dict[bytes, datetime]:
- dates = {}
- pending = []
- for sha1 in ids:
- # Check whether the date has been queried before
- date = self.read_cache[entity].get(sha1, None)
- if date is not None:
- dates[sha1] = date
- else:
- pending.append(sha1)
- dates.update(self.storage.get_dates(entity, pending))
- return dates
+ self.cache["directory"]["data"][directory.id] = date
+ self.cache["directory"]["added"].add(directory.id)
+
+ def get_dates(
+ self, entity: Literal["content", "revision", "directory"], ids: List[bytes]
+ ) -> Dict[bytes, datetime]:
+ cache = self.cache[entity]
+ missing_ids = set(id for id in ids if id not in cache)
+ if missing_ids:
+ cache["data"].update(self.storage.get_dates(entity, list(missing_ids)))
+ return {sha1: cache["data"][sha1] for sha1 in ids if sha1 in cache["data"]}
def origin_get_id(self, origin: OriginEntry) -> int:
if origin.id is None:
@@ -231,17 +240,18 @@
def revision_add(self, revision: RevisionEntry):
# Add current revision to the compact DB
- self.write_cache["revision"][revision.id] = revision.date
- # update read cache as well
- self.read_cache["revision"][revision.id] = revision.date
+ assert revision.date is not None
+ self.cache["revision"]["data"][revision.id] = revision.date
+ self.cache["revision"]["added"].add(revision.id)
def revision_add_before_revision(
self, relative: RevisionEntry, revision: RevisionEntry
):
- self.write_cache["revision_before_rev"].append((revision.id, relative.id))
+ self.cache["revision_before_rev"].append((revision.id, relative.id))
def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry):
- self.write_cache["revision_in_org"].append((revision.id, origin.id))
+ assert origin.id is not None
+ self.cache["revision_in_org"].append((revision.id, origin.id))
def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]:
return self.get_dates("revision", [revision.id]).get(revision.id, None)

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 12:26 PM (2 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3221941

Event Timeline