Page MenuHomeSoftware Heritage

D5947.id21370.diff
No OneTemporary

D5947.id21370.diff

diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py
--- a/swh/provenance/__init__.py
+++ b/swh/provenance/__init__.py
@@ -1,47 +1,46 @@
from typing import TYPE_CHECKING
-import warnings
from .postgresql.db_utils import connect
if TYPE_CHECKING:
- from swh.provenance.archive import ArchiveInterface
- from swh.provenance.provenance import ProvenanceInterface
+ from .archive import ArchiveInterface
+ from .provenance import ProvenanceInterface, ProvenanceStorageInterface
def get_archive(cls: str, **kwargs) -> "ArchiveInterface":
if cls == "api":
- from swh.provenance.storage.archive import ArchiveStorage
from swh.storage import get_storage
+ from .storage.archive import ArchiveStorage
+
return ArchiveStorage(get_storage(**kwargs["storage"]))
elif cls == "direct":
- from swh.provenance.postgresql.archive import ArchivePostgreSQL
+ from .postgresql.archive import ArchivePostgreSQL
return ArchivePostgreSQL(connect(kwargs["db"]))
else:
raise NotImplementedError
-def get_provenance(cls: str, **kwargs) -> "ProvenanceInterface":
+def get_provenance(**kwargs) -> "ProvenanceInterface":
+ from .backend import ProvenanceBackend
+
+ return ProvenanceBackend(get_provenance_storage(**kwargs))
+
+
+def get_provenance_storage(cls: str, **kwargs) -> "ProvenanceStorageInterface":
if cls == "local":
+ from .postgresql.provenancedb_base import ProvenanceDBBase
+
conn = connect(kwargs["db"])
- if "with_path" in kwargs:
- warnings.warn(
- "Usage of the 'with-path' config option is deprecated. "
- "The db flavor is now used instead.",
- DeprecationWarning,
- )
-
- with_path = kwargs.get("with_path")
- from swh.provenance.backend import ProvenanceBackend
-
- prov = ProvenanceBackend(conn)
- if with_path is not None:
- flavor = "with-path" if with_path else "without-path"
- if prov.storage.flavor != flavor:
- raise ValueError(
- "The given flavor does not match the flavor stored in the backend."
- )
- return prov
+ flavor = ProvenanceDBBase(conn).flavor
+ if flavor == "with-path":
+ from .postgresql.provenancedb_with_path import ProvenanceWithPathDB
+
+ return ProvenanceWithPathDB(conn)
+ else:
+ from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB
+
+ return ProvenanceWithoutPathDB(conn)
else:
raise NotImplementedError
diff --git a/swh/provenance/backend.py b/swh/provenance/backend.py
--- a/swh/provenance/backend.py
+++ b/swh/provenance/backend.py
@@ -1,15 +1,14 @@
from datetime import datetime
import logging
import os
-from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple
+from typing import Dict, Generator, Iterable, Optional, Set, Tuple
-import psycopg2 # TODO: remove this dependency
from typing_extensions import Literal, TypedDict
from swh.model.model import Sha1Git
from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry
-from .provenance import ProvenanceResult
+from .provenance import ProvenanceResult, ProvenanceStorageInterface, RelationType
class DatetimeCache(TypedDict):
@@ -58,33 +57,168 @@
class ProvenanceBackend:
- raise_on_commit: bool = False
+ def __init__(self, storage: ProvenanceStorageInterface):
+ self.storage = storage
+ self.cache = new_cache()
- def __init__(self, conn: psycopg2.extensions.connection):
- from .postgresql.provenancedb_base import ProvenanceDBBase
+ def clear_caches(self) -> None:
+ self.cache = new_cache()
- # TODO: this class should not know what the actual used DB is.
- self.storage: ProvenanceDBBase
- flavor = ProvenanceDBBase(conn).flavor
- if flavor == "with-path":
- from .postgresql.provenancedb_with_path import ProvenanceWithPathDB
+ def flush(self) -> None:
+ # Revision-content layer insertions ############################################
- self.storage = ProvenanceWithPathDB(conn)
- else:
- from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB
+ # For this layer, relations need to be inserted first so that, in case of
+ # failure, reprocessing the input does not generated an inconsistent database.
+ while not self.storage.relation_add(
+ RelationType.CNT_EARLY_IN_REV, self.cache["content_in_revision"]
+ ):
+ logging.warning(
+ f"Unable to write {RelationType.CNT_EARLY_IN_REV} rows to the storage. "
+ f"Data: {self.cache['content_in_revision']}. Retrying..."
+ )
+ print(
+ "content_in_revision",
+ self.storage.relation_get(
+ RelationType.CNT_EARLY_IN_REV,
+ (src for src, _, _ in self.cache["content_in_revision"]),
+ ),
+ )
- self.storage = ProvenanceWithoutPathDB(conn)
- self.cache: ProvenanceCache = new_cache()
+ while not self.storage.relation_add(
+ RelationType.CNT_IN_DIR, self.cache["content_in_directory"]
+ ):
+ logging.warning(
+ f"Unable to write {RelationType.CNT_IN_DIR} rows to the storage. "
+ f"Data: {self.cache['content_in_directory']}. Retrying..."
+ )
+ print(
+ "content_in_directory",
+ self.storage.relation_get(
+ RelationType.CNT_IN_DIR,
+ (src for src, _, _ in self.cache["content_in_directory"]),
+ ),
+ )
- def clear_caches(self) -> None:
- self.cache = new_cache()
+ while not self.storage.relation_add(
+ RelationType.DIR_IN_REV, self.cache["directory_in_revision"]
+ ):
+ logging.warning(
+ f"Unable to write {RelationType.DIR_IN_REV} rows to the storage. "
+ f"Data: {self.cache['directory_in_revision']}. Retrying..."
+ )
+ print(
+ "directory_in_revision",
+ self.storage.relation_get(
+ RelationType.DIR_IN_REV,
+ (src for src, _, _ in self.cache["directory_in_revision"]),
+ ),
+ )
- def flush(self) -> None:
- # TODO: for now we just forward the cache. This should be improved!
- while not self.storage.commit(self.cache, raise_on_commit=self.raise_on_commit):
+ # After relations, dates for the entities can be safely set, acknowledging that
+ # these entities won't need to be reprocessed in case of failure.
+ dates = {
+ sha1: date
+ for sha1, date in self.cache["content"]["data"].items()
+ if sha1 in self.cache["content"]["added"] and date is not None
+ }
+ while not self.storage.content_set_date(dates):
+ logging.warning(
+ f"Unable to write content dates to the storage. "
+ f"Data: {dates}. Retrying..."
+ )
+
+ dates = {
+ sha1: date
+ for sha1, date in self.cache["directory"]["data"].items()
+ if sha1 in self.cache["directory"]["added"] and date is not None
+ }
+ while not self.storage.directory_set_date(dates):
+ logging.warning(
+ f"Unable to write directory dates to the storage. "
+ f"Data: {dates}. Retrying..."
+ )
+
+ dates = {
+ sha1: date
+ for sha1, date in self.cache["revision"]["data"].items()
+ if sha1 in self.cache["revision"]["added"] and date is not None
+ }
+ while not self.storage.revision_set_date(dates):
+ logging.warning(
+ f"Unable to write revision dates to the storage. "
+ f"Data: {dates}. Retrying..."
+ )
+
+ # Origin-revision layer insertions #############################################
+
+ # Origins urls should be inserted first so that internal ids' resolution works
+ # properly.
+ urls = {
+ sha1: date
+ for sha1, date in self.cache["origin"]["data"].items()
+ if sha1 in self.cache["origin"]["added"]
+ }
+ while not self.storage.origin_set_url(urls):
+ logging.warning(
+ f"Unable to write origins urls to the storage. "
+ f"Data: {urls}. Retrying..."
+ )
+
+ # Second, flat models for revisions' histories (ie. revision-before-revision).
+ rbr_data: Iterable[Tuple[Sha1Git, Sha1Git, Optional[bytes]]] = sum(
+ [
+ [
+ (prev, next, None)
+ for next in self.cache["revision_before_revision"][prev]
+ ]
+ for prev in self.cache["revision_before_revision"]
+ ],
+ [],
+ )
+ while not self.storage.relation_add(RelationType.REV_BEFORE_REV, rbr_data):
logging.warning(
- f"Unable to commit cached information {self.cache}. Retrying..."
+ f"Unable to write {RelationType.REV_BEFORE_REV} rows to the storage. "
+ f"Data: {rbr_data}. Retrying..."
)
+ print(
+ "revision_before_revision",
+ self.storage.relation_get(
+ RelationType.REV_BEFORE_REV,
+ self.cache["revision_before_revision"],
+ ),
+ )
+
+ # Heads (ie. revision-in-origin entries) should be inserted once flat models for
+ # their histories were already added. This is to guarantee consistent results if
+ # something needs to be reprocessed due to a failure: already inserted heads
+ # won't get reprocessed in such a case.
+ rio_data = [(rev, org, None) for rev, org in self.cache["revision_in_origin"]]
+ while not self.storage.relation_add(RelationType.REV_IN_ORG, rio_data):
+ logging.warning(
+ f"Unable to write {RelationType.REV_IN_ORG} rows to the storage. "
+ f"Data: {rio_data}. Retrying..."
+ )
+ print(
+ "revision_in_origin",
+ self.storage.relation_get(
+ RelationType.REV_IN_ORG,
+ (src for src, _ in self.cache["revision_in_origin"]),
+ ),
+ )
+
+ # Finally, preferred origins for the visited revisions are set (this step can be
+ # reordered if required).
+ origins = {
+ sha1: self.cache["revision_origin"]["data"][sha1]
+ for sha1 in self.cache["revision_origin"]["added"]
+ }
+ while not self.storage.revision_set_origin(origins):
+ logging.warning(
+ f"Unable to write preferred origins to the storage. "
+ f"Data: {origins}. Retrying..."
+ )
+
+ # clear local cache ############################################################
self.clear_caches()
def content_add_to_directory(
@@ -145,12 +279,22 @@
self.cache["directory"]["added"].add(directory.id)
def get_dates(
- self, entity: Literal["content", "revision", "directory"], ids: List[Sha1Git]
+ self,
+ entity: Literal["content", "directory", "revision"],
+ ids: Iterable[Sha1Git],
) -> Dict[Sha1Git, datetime]:
cache = self.cache[entity]
missing_ids = set(id for id in ids if id not in cache)
if missing_ids:
- cache["data"].update(self.storage.get_dates(entity, list(missing_ids)))
+ if entity == "revision":
+ updated = {
+ id: date
+ for id, (date, _) in self.storage.revision_get(missing_ids).items()
+ if date is not None
+ }
+ else:
+ updated = getattr(self.storage, f"{entity}_get")(missing_ids)
+ cache["data"].update(updated)
return {
sha1: date
for sha1, date in cache["data"].items()
@@ -183,17 +327,19 @@
def revision_get_preferred_origin(
self, revision: RevisionEntry
) -> Optional[Sha1Git]:
- cache = self.cache["revision_origin"]
+ cache = self.cache["revision_origin"]["data"]
if revision.id not in cache:
- origin = self.storage.revision_get_preferred_origin(revision.id)
- if origin is not None:
- cache["data"][revision.id] = origin
- return cache["data"].get(revision.id)
+ ret = self.storage.revision_get([revision.id])
+ if revision.id in ret:
+ origin = ret[revision.id][1] # TODO: make this not a tuple
+ if origin is not None:
+ cache[revision.id] = origin
+ return cache.get(revision.id)
def revision_in_history(self, revision: RevisionEntry) -> bool:
- return revision.id in self.cache[
- "revision_before_revision"
- ] or self.storage.revision_in_history(revision.id)
+ return revision.id in self.cache["revision_before_revision"] or bool(
+ self.storage.relation_get(RelationType.REV_BEFORE_REV, [revision.id])
+ )
def revision_set_preferred_origin(
self, origin: OriginEntry, revision: RevisionEntry
@@ -202,9 +348,9 @@
self.cache["revision_origin"]["added"].add(revision.id)
def revision_visited(self, revision: RevisionEntry) -> bool:
- return revision.id in dict(
- self.cache["revision_in_origin"]
- ) or self.storage.revision_visited(revision.id)
+ return revision.id in dict(self.cache["revision_in_origin"]) or bool(
+ self.storage.relation_get(RelationType.REV_IN_ORG, [revision.id])
+ )
def normalize(path: bytes) -> bytes:
diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py
--- a/swh/provenance/postgresql/provenancedb_base.py
+++ b/swh/provenance/postgresql/provenancedb_base.py
@@ -1,30 +1,35 @@
from datetime import datetime
import itertools
import logging
-from typing import Any, Dict, Generator, List, Mapping, Optional, Set, Tuple
+from typing import Dict, Generator, Iterable, Optional, Set, Tuple
import psycopg2
import psycopg2.extras
+from typing_extensions import Literal
from swh.model.model import Sha1Git
-from ..provenance import ProvenanceResult
+from ..provenance import ProvenanceResult, RelationType
class ProvenanceDBBase:
+ raise_on_commit: bool = False
+
def __init__(self, conn: psycopg2.extensions.connection):
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
conn.set_session(autocommit=True)
self.conn = conn
self.cursor = self.conn.cursor()
# XXX: not sure this is the best place to do it!
- self.cursor.execute("SET timezone TO 'UTC'")
+ sql = "SET timezone TO 'UTC'"
+ self.cursor.execute(sql)
self._flavor: Optional[str] = None
@property
def flavor(self) -> str:
if self._flavor is None:
- self.cursor.execute("select swh_get_dbflavor()")
+ sql = "SELECT swh_get_dbflavor()"
+ self.cursor.execute(sql)
self._flavor = self.cursor.fetchone()[0]
assert self._flavor is not None
return self._flavor
@@ -33,231 +38,246 @@
def with_path(self) -> bool:
return self.flavor == "with-path"
- def commit(self, data: Mapping[str, Any], raise_on_commit: bool = False) -> bool:
- try:
- # First insert entities
- for entity in ("content", "directory", "revision"):
- self.insert_entity(
- entity,
- {
- sha1: data[entity]["data"][sha1]
- for sha1 in data[entity]["added"]
- },
- )
- data[entity]["data"].clear()
- data[entity]["added"].clear()
-
- # Relations should come after ids for entities were resolved
- for relation in (
- "content_in_revision",
- "content_in_directory",
- "directory_in_revision",
- ):
- self.insert_relation(relation, data[relation])
-
- # Insert origins
- self.insert_origin(
- {
- sha1: data["origin"]["data"][sha1]
- for sha1 in data["origin"]["added"]
- },
- )
- data["origin"]["data"].clear()
- data["origin"]["added"].clear()
-
- # Insert relations from the origin-revision layer
- self.insert_revision_history(data["revision_before_revision"])
- self.insert_origin_head(data["revision_in_origin"])
-
- # Update preferred origins
- self.update_preferred_origin(
- {
- sha1: data["revision_origin"]["data"][sha1]
- for sha1 in data["revision_origin"]["added"]
- }
- )
- data["revision_origin"]["data"].clear()
- data["revision_origin"]["added"].clear()
+ def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
+ ...
- return True
+ def content_find_all(
+ self, id: Sha1Git, limit: Optional[int] = None
+ ) -> Generator[ProvenanceResult, None, None]:
+ ...
+
+ def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool:
+ return self._entity_set_date("content", dates)
+
+ def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
+ return self._entity_get_date("content", ids)
+
+ def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool:
+ return self._entity_set_date("directory", dates)
+ def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
+ return self._entity_get_date("directory", ids)
+
+ def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool:
+ try:
+ if urls:
+ sql = """
+ LOCK TABLE ONLY origin;
+ INSERT INTO origin(sha1, url) VALUES %s
+ ON CONFLICT DO NOTHING
+ """
+ psycopg2.extras.execute_values(self.cursor, sql, urls.items())
+ return True
except: # noqa: E722
# Unexpected error occurred, rollback all changes and log message
logging.exception("Unexpected error")
- if raise_on_commit:
+ if self.raise_on_commit:
raise
-
return False
- def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
- ...
+ def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]:
+ urls = {}
+ sha1s = tuple(ids)
+ if sha1s:
+ values = ", ".join(itertools.repeat("%s", len(sha1s)))
+ sql = f"""
+ SELECT sha1, url
+ FROM origin
+ WHERE sha1 IN ({values})
+ """
+ self.cursor.execute(sql, sha1s)
+ urls.update(self.cursor.fetchall())
+ return urls
- def content_find_all(
- self, id: Sha1Git, limit: Optional[int] = None
- ) -> Generator[ProvenanceResult, None, None]:
- ...
+ def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool:
+ return self._entity_set_date("revision", dates)
- def get_dates(self, entity: str, ids: List[Sha1Git]) -> Dict[Sha1Git, datetime]:
- dates = {}
- if ids:
- values = ", ".join(itertools.repeat("%s", len(ids)))
- self.cursor.execute(
- f"""SELECT sha1, date FROM {entity} WHERE sha1 IN ({values})""",
- tuple(ids),
- )
- dates.update(self.cursor.fetchall())
- return dates
+ def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool:
+ try:
+ if origins:
+ sql = """
+ LOCK TABLE ONLY revision;
+ INSERT INTO revision(sha1, origin)
+ (SELECT V.rev AS sha1, O.id AS origin
+ FROM (VALUES %s) AS V(rev, org)
+ JOIN origin AS O ON (O.sha1=V.org))
+ ON CONFLICT (sha1) DO
+ UPDATE SET origin=EXCLUDED.origin
+ """
+ psycopg2.extras.execute_values(self.cursor, sql, origins.items())
+ return True
+ except: # noqa: E722
+ # Unexpected error occurred, rollback all changes and log message
+ logging.exception("Unexpected error")
+ if self.raise_on_commit:
+ raise
+ return False
- def insert_entity(self, entity: str, data: Dict[Sha1Git, datetime]):
- if data:
- psycopg2.extras.execute_values(
- self.cursor,
- f"""
- LOCK TABLE ONLY {entity};
- INSERT INTO {entity}(sha1, date) VALUES %s
- ON CONFLICT (sha1) DO
- UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date)
- """,
- data.items(),
- )
- # XXX: not sure if Python takes a reference or a copy.
- # This might be useless!
- data.clear()
-
- def insert_origin(self, data: Dict[Sha1Git, str]):
- if data:
- psycopg2.extras.execute_values(
- self.cursor,
- """
- LOCK TABLE ONLY origin;
- INSERT INTO origin(sha1, url) VALUES %s
- ON CONFLICT DO NOTHING
- """,
- data.items(),
- )
- # XXX: not sure if Python takes a reference or a copy.
- # This might be useless!
- data.clear()
-
- def insert_origin_head(self, data: Set[Tuple[Sha1Git, Sha1Git]]):
- if data:
- # Insert revisions first, to ensure "foreign keys" exist
- # Origins are assumed to be already inserted (they require knowing the url)
- psycopg2.extras.execute_values(
- self.cursor,
+ def revision_get(
+ self, ids: Iterable[Sha1Git]
+ ) -> Dict[Sha1Git, Tuple[Optional[datetime], Optional[Sha1Git]]]:
+ result: Dict[Sha1Git, Tuple[Optional[datetime], Optional[Sha1Git]]] = {}
+ sha1s = tuple(ids)
+ if sha1s:
+ values = ", ".join(itertools.repeat("%s", len(sha1s)))
+ sql = f"""
+ SELECT sha1, date, origin
+ FROM revision
+ WHERE sha1 IN ({values})
"""
- LOCK TABLE ONLY revision;
- INSERT INTO revision(sha1) VALUES %s
- ON CONFLICT DO NOTHING
- """,
- set((rev,) for rev, _ in data),
+ self.cursor.execute(sql, sha1s)
+ result.update(
+ (rev, (date, org)) for rev, date, org in self.cursor.fetchall()
)
+ return result
- psycopg2.extras.execute_values(
- self.cursor,
- # XXX: not clear how conflicts are handled here!
- """
- LOCK TABLE ONLY revision_in_origin;
- INSERT INTO revision_in_origin
- SELECT R.id, O.id
- FROM (VALUES %s) AS V(rev, org)
- INNER JOIN revision AS R on (R.sha1=V.rev)
- INNER JOIN origin AS O on (O.sha1=V.org)
- ON CONFLICT DO NOTHING
- """,
- data,
- )
- data.clear()
+ def relation_add(
+ self,
+ relation: RelationType,
+ data: Iterable[Tuple[Sha1Git, Sha1Git, Optional[bytes]]],
+ ) -> bool:
+ try:
+ if data:
+ table = relation.value
+ src, *_, dst = table.split("_")
- def insert_relation(self, relation: str, data: Set[Tuple[Sha1Git, Sha1Git, bytes]]):
- ...
+ if src != "origin":
+ # Origin entries should be inserted previously as they require extra
+ # non-null information
+ srcs = tuple(set((sha1,) for (sha1, _, _) in data))
+ sql = f"""
+ LOCK TABLE ONLY {src};
+ INSERT INTO {src}(sha1) VALUES %s
+ ON CONFLICT DO NOTHING
+ """
+ psycopg2.extras.execute_values(self.cursor, sql, srcs)
+ if dst != "origin":
+ # Origin entries should be inserted previously as they require extra
+ # non-null information
+ dsts = tuple(set((sha1,) for (_, sha1, _) in data))
+ sql = f"""
+ LOCK TABLE ONLY {dst};
+ INSERT INTO {dst}(sha1) VALUES %s
+ ON CONFLICT DO NOTHING
+ """
+ psycopg2.extras.execute_values(self.cursor, sql, dsts)
+ joins = [
+ f"INNER JOIN {src} AS S ON (S.sha1=V.src)",
+ f"INNER JOIN {dst} AS D ON (D.sha1=V.dst)",
+ ]
+ selected = ["S.id", "D.id"]
- def insert_revision_history(self, data: Dict[Sha1Git, Set[Sha1Git]]):
- if data:
- # print(f"Inserting histories: {data}")
- # Insert revisions first, to ensure "foreign keys" exist
- revisions = set(data)
- for rev in data:
- revisions.update(data[rev])
- psycopg2.extras.execute_values(
- self.cursor,
- """
- LOCK TABLE ONLY revision;
- INSERT INTO revision(sha1) VALUES %s
- ON CONFLICT DO NOTHING
- """,
- ((rev,) for rev in revisions),
- )
+ if self._relation_uses_location_table(relation):
+ locations = tuple(set((path,) for (_, _, path) in data))
+ sql = """
+ LOCK TABLE ONLY location;
+ INSERT INTO location(path) VALUES %s
+ ON CONFLICT (path) DO NOTHING
+ """
+ psycopg2.extras.execute_values(self.cursor, sql, locations)
+
+ joins.append("INNER JOIN location AS L ON (L.path=V.path)")
+ selected.append("L.id")
+
+ sql = f"""
+ INSERT INTO {table}
+ (SELECT {", ".join(selected)}
+ FROM (VALUES %s) AS V(src, dst, path)
+ {'''
+ '''.join(joins)})
+ ON CONFLICT DO NOTHING
+ """
+ psycopg2.extras.execute_values(self.cursor, sql, data)
+ return True
+ except: # noqa: E722
+ # Unexpected error occurred, rollback all changes and log message
+ logging.exception("Unexpected error")
+ if self.raise_on_commit:
+ raise
+ return False
+
+ def relation_get(
+ self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False
+ ) -> Set[Tuple[Sha1Git, Sha1Git, Optional[bytes]]]:
+ result: Set[Tuple[Sha1Git, Sha1Git, Optional[bytes]]] = set()
+ sha1s = tuple(ids)
+ if sha1s:
+ table = relation.value
+ src, *_, dst = table.split("_")
- values = [[(prev, next) for next in data[prev]] for prev in data]
- psycopg2.extras.execute_values(
- self.cursor,
- # XXX: not clear how conflicts are handled here!
+ # TODO: improve this!
+ if src == "revision" and dst == "revision":
+ src_field = "prev"
+ dst_field = "next"
+ else:
+ src_field = src
+ dst_field = dst
+
+ joins = [
+ f"INNER JOIN {src} AS S ON (S.id=R.{src_field})",
+ f"INNER JOIN {dst} AS D ON (D.id=R.{dst_field})",
+ ]
+ selected = ["S.sha1", "D.sha1"]
+ selector = "S.sha1" if not reverse else "D.sha1"
+
+ if self._relation_uses_location_table(relation):
+ joins.append("INNER JOIN location AS L ON (L.id=R.location)")
+ selected.append("L.path")
+ else:
+ selected.append("NULL")
+
+ sql = f"""
+ SELECT {", ".join(selected)}
+ FROM {table} AS R
+ {" ".join(joins)}
+ WHERE {selector} IN %s
"""
- LOCK TABLE ONLY revision_before_revision;
- INSERT INTO revision_before_revision
- SELECT P.id, N.id
- FROM (VALUES %s) AS V(prev, next)
- INNER JOIN revision AS P on (P.sha1=V.prev)
- INNER JOIN revision AS N on (N.sha1=V.next)
- ON CONFLICT DO NOTHING
- """,
- sum(values, []),
+ self.cursor.execute(sql, (sha1s,))
+ result.update(
+ (src_sha1, dst_sha1, path)
+ for src_sha1, dst_sha1, path in self.cursor.fetchall()
)
- data.clear()
-
- def revision_get_preferred_origin(self, revision: Sha1Git) -> Optional[Sha1Git]:
- self.cursor.execute(
- """
- SELECT O.sha1
- FROM revision AS R
- JOIN origin as O
- ON R.origin=O.id
- WHERE R.sha1=%s""",
- (revision,),
- )
- row = self.cursor.fetchone()
- return row[0] if row is not None else None
-
- def revision_in_history(self, revision: Sha1Git) -> bool:
- self.cursor.execute(
- """
- SELECT 1
- FROM revision_before_revision
- JOIN revision
- ON revision.id=revision_before_revision.prev
- WHERE revision.sha1=%s
- """,
- (revision,),
- )
- return self.cursor.fetchone() is not None
-
- def revision_visited(self, revision: Sha1Git) -> bool:
- self.cursor.execute(
- """
- SELECT 1
- FROM revision_in_origin
- JOIN revision
- ON revision.id=revision_in_origin.revision
- WHERE revision.sha1=%s
- """,
- (revision,),
- )
- return self.cursor.fetchone() is not None
-
- def update_preferred_origin(self, data: Dict[Sha1Git, Sha1Git]):
- if data:
- # XXX: this is assuming the revision already exists in the db! It should
- # be improved by allowing null dates in the revision table.
- psycopg2.extras.execute_values(
- self.cursor,
+ return result
+
+ def _entity_get_date(
+ self,
+ entity: Literal["content", "directory", "revision"],
+ ids: Iterable[Sha1Git],
+ ) -> Dict[Sha1Git, datetime]:
+ dates = {}
+ sha1s = tuple(ids)
+ if sha1s:
+ values = ", ".join(itertools.repeat("%s", len(sha1s)))
+ sql = f"""
+ SELECT sha1, date
+ FROM {entity}
+ WHERE sha1 IN ({values})
"""
- UPDATE revision R
- SET origin=O.id
- FROM (VALUES %s) AS V(rev, org)
- INNER JOIN origin AS O on (O.sha1=V.org)
- WHERE R.sha1=V.rev
- """,
- data.items(),
- )
- data.clear()
+ self.cursor.execute(sql, sha1s)
+ dates.update(self.cursor.fetchall())
+ return dates
+
+ def _entity_set_date(
+ self,
+ entity: Literal["content", "directory", "revision"],
+ data: Dict[Sha1Git, datetime],
+ ) -> bool:
+ try:
+ if data:
+ sql = f"""
+ LOCK TABLE ONLY {entity};
+ INSERT INTO {entity}(sha1, date) VALUES %s
+ ON CONFLICT (sha1) DO
+ UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date)
+ """
+ psycopg2.extras.execute_values(self.cursor, sql, data.items())
+ return True
+ except: # noqa: E722
+ # Unexpected error occurred, rollback all changes and log message
+ logging.exception("Unexpected error")
+ if self.raise_on_commit:
+ raise
+ return False
+
+ def _relation_uses_location_table(self, relation: RelationType) -> bool:
+ ...
diff --git a/swh/provenance/postgresql/provenancedb_with_path.py b/swh/provenance/postgresql/provenancedb_with_path.py
--- a/swh/provenance/postgresql/provenancedb_with_path.py
+++ b/swh/provenance/postgresql/provenancedb_with_path.py
@@ -1,18 +1,14 @@
-from typing import Generator, Optional, Set, Tuple
-
-import psycopg2
-import psycopg2.extras
+from typing import Generator, Optional
from swh.model.model import Sha1Git
-from ..provenance import ProvenanceResult
+from ..provenance import ProvenanceResult, RelationType
from .provenancedb_base import ProvenanceDBBase
class ProvenanceWithPathDB(ProvenanceDBBase):
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
- self.cursor.execute(
- """
+ sql = """
SELECT C.sha1 AS blob,
R.sha1 AS rev,
R.date AS date,
@@ -25,9 +21,8 @@
LEFT JOIN origin as O ON (R.origin=O.id)
WHERE C.sha1=%s
ORDER BY date, rev, url, path ASC LIMIT 1
- """,
- (id,),
- )
+ """
+ self.cursor.execute(sql, (id,))
row = self.cursor.fetchone()
if row:
return ProvenanceResult(
@@ -40,8 +35,7 @@
self, id: Sha1Git, limit: Optional[int] = None
) -> Generator[ProvenanceResult, None, None]:
early_cut = f"LIMIT {limit}" if limit is not None else ""
- self.cursor.execute(
- f"""
+ sql = f"""
(SELECT C.sha1 AS blob,
R.sha1 AS rev,
R.date AS date,
@@ -72,49 +66,13 @@
LEFT JOIN origin AS O ON (R.origin=O.id)
WHERE C.sha1=%s)
ORDER BY date, rev, url, path {early_cut}
- """,
- (id, id),
- )
+ """
+ self.cursor.execute(sql, (id, id))
for row in self.cursor.fetchall():
yield ProvenanceResult(
content=row[0], revision=row[1], date=row[2], origin=row[3], path=row[4]
)
- def insert_relation(self, relation: str, data: Set[Tuple[Sha1Git, Sha1Git, bytes]]):
- """Insert entries in `relation` from `data`
-
- Also insert missing location entries in the 'location' table.
- """
- if data:
- assert relation in (
- "content_in_revision",
- "content_in_directory",
- "directory_in_revision",
- )
- src, dst = relation.split("_in_")
-
- # insert missing locations
- locations = tuple(set((loc,) for (_, _, loc) in data))
- psycopg2.extras.execute_values(
- self.cursor,
- """
- LOCK TABLE ONLY location;
- INSERT INTO location(path) VALUES %s
- ON CONFLICT (path) DO NOTHING
- """,
- locations,
- )
- psycopg2.extras.execute_values(
- self.cursor,
- f"""
- LOCK TABLE ONLY {relation};
- INSERT INTO {relation}
- SELECT {src}.id, {dst}.id, location.id
- FROM (VALUES %s) AS V(src, dst, path)
- INNER JOIN {src} on ({src}.sha1=V.src)
- INNER JOIN {dst} on ({dst}.sha1=V.dst)
- INNER JOIN location on (location.path=V.path)
- """,
- data,
- )
- data.clear()
+ def _relation_uses_location_table(self, relation: RelationType) -> bool:
+ src, *_ = relation.value.split("_")
+ return src in ("content", "directory")
diff --git a/swh/provenance/postgresql/provenancedb_without_path.py b/swh/provenance/postgresql/provenancedb_without_path.py
--- a/swh/provenance/postgresql/provenancedb_without_path.py
+++ b/swh/provenance/postgresql/provenancedb_without_path.py
@@ -1,18 +1,14 @@
-from typing import Generator, Optional, Set, Tuple
-
-import psycopg2
-import psycopg2.extras
+from typing import Generator, Optional
from swh.model.model import Sha1Git
-from ..provenance import ProvenanceResult
+from ..provenance import ProvenanceResult, RelationType
from .provenancedb_base import ProvenanceDBBase
class ProvenanceWithoutPathDB(ProvenanceDBBase):
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
- self.cursor.execute(
- """
+ sql = """
SELECT C.sha1 AS blob,
R.sha1 AS rev,
R.date AS date,
@@ -24,9 +20,8 @@
LEFT JOIN origin as O ON (R.origin=O.id)
WHERE C.sha1=%s
ORDER BY date, rev, url ASC LIMIT 1
- """,
- (id,),
- )
+ """
+ self.cursor.execute(sql, (id,))
row = self.cursor.fetchone()
if row:
return ProvenanceResult(
@@ -39,8 +34,7 @@
self, id: Sha1Git, limit: Optional[int] = None
) -> Generator[ProvenanceResult, None, None]:
early_cut = f"LIMIT {limit}" if limit is not None else ""
- self.cursor.execute(
- f"""
+ sql = f"""
(SELECT C.sha1 AS blob,
R.sha1 AS rev,
R.date AS date,
@@ -64,33 +58,12 @@
LEFT JOIN origin as O ON (R.origin=O.id)
WHERE C.sha1=%s)
ORDER BY date, rev, url {early_cut}
- """,
- (id, id),
- )
+ """
+ self.cursor.execute(sql, (id, id))
for row in self.cursor.fetchall():
yield ProvenanceResult(
content=row[0], revision=row[1], date=row[2], origin=row[3], path=row[4]
)
- def insert_relation(self, relation: str, data: Set[Tuple[Sha1Git, Sha1Git, bytes]]):
- if data:
- assert relation in (
- "content_in_revision",
- "content_in_directory",
- "directory_in_revision",
- )
- src, dst = relation.split("_in_")
-
- psycopg2.extras.execute_values(
- self.cursor,
- f"""
- LOCK TABLE ONLY {relation};
- INSERT INTO {relation}
- SELECT {src}.id, {dst}.id
- FROM (VALUES %s) AS V(src, dst)
- INNER JOIN {src} on ({src}.sha1=V.src)
- INNER JOIN {dst} on ({dst}.sha1=V.dst)
- """,
- data,
- )
- data.clear()
+ def _relation_uses_location_table(self, relation: RelationType) -> bool:
+ return False
diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py
--- a/swh/provenance/provenance.py
+++ b/swh/provenance/provenance.py
@@ -1,5 +1,6 @@
from datetime import datetime
-from typing import Dict, Generator, Iterable, Optional
+import enum
+from typing import Dict, Generator, Iterable, Optional, Set, Tuple
from typing_extensions import Protocol, TypedDict, runtime_checkable
@@ -18,7 +19,7 @@
@runtime_checkable
class ProvenanceInterface(Protocol):
- raise_on_commit: bool = False
+ storage: "ProvenanceStorageInterface"
def flush(self) -> None:
"""Flush internal cache to the underlying `storage`."""
@@ -151,3 +152,105 @@
provenance model.
"""
...
+
+
+class RelationType(enum.Enum):
+ CNT_EARLY_IN_REV = "content_in_revision"
+ CNT_IN_DIR = "content_in_directory"
+ DIR_IN_REV = "directory_in_revision"
+ REV_IN_ORG = "revision_in_origin"
+ REV_BEFORE_REV = "revision_before_revision"
+
+
+@runtime_checkable
+class ProvenanceStorageInterface(Protocol):
+ raise_on_commit: bool = False
+
+ def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
+ """Retrieve the first occurrence of the blob identified by `id`."""
+ ...
+
+ def content_find_all(
+ self, id: Sha1Git, limit: Optional[int] = None
+ ) -> Generator[ProvenanceResult, None, None]:
+ """Retrieve all the occurrences of the blob identified by `id`."""
+ ...
+
+ def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool:
+ """Associate dates to blobs identified by sha1 ids, as paired in `dates`. Return
+ a boolean stating whether the information was successfully stored.
+ """
+ ...
+
+ def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
+ """Retrieve the associated date for each blob sha1 in `ids`. If some blob has
+ no associated date, it is not present in the resulting dictionary.
+ """
+ ...
+
+ def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool:
+ """Associate dates to directories identified by sha1 ids, as paired in
+ `dates`. Return a boolean stating whether the information was successfully
+ stored.
+ """
+ ...
+
+ def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
+ """Retrieve the associated date for each directory sha1 in `ids`. If some
+ directory has no associated date, it is not present in the resulting dictionary.
+ """
+ ...
+
+ def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool:
+ """Associate urls to origins identified by sha1 ids, as paired in `urls`. Return
+ a boolean stating whether the information was successfully stored.
+ """
+ ...
+
+ def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]:
+ """Retrieve the associated url for each origin sha1 in `ids`. If some origin has
+ no associated date, it is not present in the resulting dictionary.
+ """
+ ...
+
+ def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool:
+ """Associate dates to revisions identified by sha1 ids, as paired in `dates`.
+ Return a boolean stating whether the information was successfully stored.
+ """
+ ...
+
+ def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool:
+ """Associate origins to revisions identified by sha1 ids, as paired in
+ `origins` (revision ids are keys and origin ids, values). Return a boolean
+ stating whether the information was successfully stored.
+ """
+ ...
+
+ def revision_get(
+ self, ids: Iterable[Sha1Git]
+ ) -> Dict[Sha1Git, Tuple[Optional[datetime], Optional[Sha1Git]]]:
+ """Retrieve the associated date and origin for each revision sha1 in `ids`. If
+ some revision has no associated date nor origin, it is not present in the
+ resulting dictionary.
+ """
+ ...
+
+ def relation_add(
+ self,
+ relation: RelationType,
+ data: Iterable[Tuple[Sha1Git, Sha1Git, Optional[bytes]]],
+ ) -> bool:
+ """Add entries in the selected `relation`. Each tuple in `data` is of the from
+ (`src`, `dst`, `path`), where `src` and `dst` are the sha1 ids of the entities
+ being related, and `path` is optional depending on the selected `relation`.
+ """
+ ...
+
+ def relation_get(
+ self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False
+ ) -> Set[Tuple[Sha1Git, Sha1Git, Optional[bytes]]]:
+ """Retrieve all tuples in the selected `relation` whose source entities are
+ identified by some sha1 id in `ids`. If `reverse` is set, destination entities
+ are matched instead.
+ """
+ ...
diff --git a/swh/provenance/sql/30-schema.sql b/swh/provenance/sql/30-schema.sql
--- a/swh/provenance/sql/30-schema.sql
+++ b/swh/provenance/sql/30-schema.sql
@@ -27,7 +27,7 @@
(
id bigserial primary key, -- internal identifier of the content blob
sha1 sha1_git unique not null, -- intrinsic identifier of the content blob
- date timestamptz not null -- timestamp of the revision where the blob appears early
+ date timestamptz -- timestamp of the revision where the blob appears early
);
comment on column content.id is 'Content internal identifier';
comment on column content.sha1 is 'Content intrinsic identifier';
@@ -37,7 +37,7 @@
(
id bigserial primary key, -- internal identifier of the directory appearing in an isochrone inner frontier
sha1 sha1_git unique not null, -- intrinsic identifier of the directory
- date timestamptz not null -- max timestamp among those of the directory children's
+ date timestamptz -- max timestamp among those of the directory children's
);
comment on column directory.id is 'Directory internal identifier';
comment on column directory.sha1 is 'Directory intrinsic identifier';
@@ -77,9 +77,9 @@
-- relation tables
create table content_in_revision
(
- content bigint not null, -- internal identifier of the content blob
- revision bigint not null, -- internal identifier of the revision where the blob appears for the first time
- location bigint -- location of the content relative to the revision root directory
+ content bigint not null, -- internal identifier of the content blob
+ revision bigint not null, -- internal identifier of the revision where the blob appears for the first time
+ location bigint -- location of the content relative to the revision root directory
-- foreign key (blob) references content (id),
-- foreign key (rev) references revision (id),
-- foreign key (loc) references location (id)
@@ -90,9 +90,9 @@
create table content_in_directory
(
- content bigint not null, -- internal identifier of the content blob
- directory bigint not null, -- internal identifier of the directory containing the blob
- location bigint -- location of the content relative to its parent directory in the isochrone frontier
+ content bigint not null, -- internal identifier of the content blob
+ directory bigint not null, -- internal identifier of the directory containing the blob
+ location bigint -- location of the content relative to its parent directory in the isochrone frontier
-- foreign key (blob) references content (id),
-- foreign key (dir) references directory (id),
-- foreign key (loc) references location (id)
@@ -103,9 +103,9 @@
create table directory_in_revision
(
- directory bigint not null, -- internal identifier of the directory appearing in the revision
- revision bigint not null, -- internal identifier of the revision containing the directory
- location bigint -- location of the directory relative to the revision root directory
+ directory bigint not null, -- internal identifier of the directory appearing in the revision
+ revision bigint not null, -- internal identifier of the revision containing the directory
+ location bigint -- location of the directory relative to the revision root directory
-- foreign key (dir) references directory (id),
-- foreign key (rev) references revision (id),
-- foreign key (loc) references location (id)
@@ -116,8 +116,8 @@
create table revision_in_origin
(
- revision bigint not null, -- internal identifier of the revision poined by the origin
- origin bigint not null -- internal identifier of the origin that points to the revision
+ revision bigint not null, -- internal identifier of the revision poined by the origin
+ origin bigint not null -- internal identifier of the origin that points to the revision
-- foreign key (rev) references revision (id),
-- foreign key (org) references origin (id)
);
diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py
--- a/swh/provenance/tests/conftest.py
+++ b/swh/provenance/tests/conftest.py
@@ -16,6 +16,7 @@
from swh.model.hashutil import hash_to_bytes
from swh.model.model import Sha1Git
from swh.model.tests.swh_model_data import TEST_OBJECTS
+from swh.provenance import get_provenance
from swh.provenance.postgresql.archive import ArchivePostgreSQL
from swh.provenance.storage.archive import ArchiveStorage
from swh.storage.replay import process_replay_objects
@@ -29,13 +30,14 @@
flavor = request.param
populate_database_for_package("swh.provenance", postgresql.dsn, flavor=flavor)
- from swh.provenance.backend import ProvenanceBackend
-
BaseDb.adapt_conn(postgresql)
- prov = ProvenanceBackend(postgresql)
+
+ args = dict(tuple(item.split("=")) for item in postgresql.dsn.split())
+ args.pop("options")
+ prov = get_provenance(cls="local", db=args)
assert prov.storage.flavor == flavor
# in test sessions, we DO want to raise any exception occurring at commit time
- prov.raise_on_commit = True
+ prov.storage.raise_on_commit = True
return prov

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 12:22 PM (2 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3225213

Event Timeline