Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9341923
D5947.id21370.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
49 KB
Subscribers
None
D5947.id21370.diff
View Options
diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py
--- a/swh/provenance/__init__.py
+++ b/swh/provenance/__init__.py
@@ -1,47 +1,46 @@
from typing import TYPE_CHECKING
-import warnings
from .postgresql.db_utils import connect
if TYPE_CHECKING:
- from swh.provenance.archive import ArchiveInterface
- from swh.provenance.provenance import ProvenanceInterface
+ from .archive import ArchiveInterface
+ from .provenance import ProvenanceInterface, ProvenanceStorageInterface
def get_archive(cls: str, **kwargs) -> "ArchiveInterface":
if cls == "api":
- from swh.provenance.storage.archive import ArchiveStorage
from swh.storage import get_storage
+ from .storage.archive import ArchiveStorage
+
return ArchiveStorage(get_storage(**kwargs["storage"]))
elif cls == "direct":
- from swh.provenance.postgresql.archive import ArchivePostgreSQL
+ from .postgresql.archive import ArchivePostgreSQL
return ArchivePostgreSQL(connect(kwargs["db"]))
else:
raise NotImplementedError
-def get_provenance(cls: str, **kwargs) -> "ProvenanceInterface":
+def get_provenance(**kwargs) -> "ProvenanceInterface":
+ from .backend import ProvenanceBackend
+
+ return ProvenanceBackend(get_provenance_storage(**kwargs))
+
+
+def get_provenance_storage(cls: str, **kwargs) -> "ProvenanceStorageInterface":
if cls == "local":
+ from .postgresql.provenancedb_base import ProvenanceDBBase
+
conn = connect(kwargs["db"])
- if "with_path" in kwargs:
- warnings.warn(
- "Usage of the 'with-path' config option is deprecated. "
- "The db flavor is now used instead.",
- DeprecationWarning,
- )
-
- with_path = kwargs.get("with_path")
- from swh.provenance.backend import ProvenanceBackend
-
- prov = ProvenanceBackend(conn)
- if with_path is not None:
- flavor = "with-path" if with_path else "without-path"
- if prov.storage.flavor != flavor:
- raise ValueError(
- "The given flavor does not match the flavor stored in the backend."
- )
- return prov
+ flavor = ProvenanceDBBase(conn).flavor
+ if flavor == "with-path":
+ from .postgresql.provenancedb_with_path import ProvenanceWithPathDB
+
+ return ProvenanceWithPathDB(conn)
+ else:
+ from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB
+
+ return ProvenanceWithoutPathDB(conn)
else:
raise NotImplementedError
diff --git a/swh/provenance/backend.py b/swh/provenance/backend.py
--- a/swh/provenance/backend.py
+++ b/swh/provenance/backend.py
@@ -1,15 +1,14 @@
from datetime import datetime
import logging
import os
-from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple
+from typing import Dict, Generator, Iterable, Optional, Set, Tuple
-import psycopg2 # TODO: remove this dependency
from typing_extensions import Literal, TypedDict
from swh.model.model import Sha1Git
from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry
-from .provenance import ProvenanceResult
+from .provenance import ProvenanceResult, ProvenanceStorageInterface, RelationType
class DatetimeCache(TypedDict):
@@ -58,33 +57,168 @@
class ProvenanceBackend:
- raise_on_commit: bool = False
+ def __init__(self, storage: ProvenanceStorageInterface):
+ self.storage = storage
+ self.cache = new_cache()
- def __init__(self, conn: psycopg2.extensions.connection):
- from .postgresql.provenancedb_base import ProvenanceDBBase
+ def clear_caches(self) -> None:
+ self.cache = new_cache()
- # TODO: this class should not know what the actual used DB is.
- self.storage: ProvenanceDBBase
- flavor = ProvenanceDBBase(conn).flavor
- if flavor == "with-path":
- from .postgresql.provenancedb_with_path import ProvenanceWithPathDB
+ def flush(self) -> None:
+ # Revision-content layer insertions ############################################
- self.storage = ProvenanceWithPathDB(conn)
- else:
- from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB
+ # For this layer, relations need to be inserted first so that, in case of
+ # failure, reprocessing the input does not generated an inconsistent database.
+ while not self.storage.relation_add(
+ RelationType.CNT_EARLY_IN_REV, self.cache["content_in_revision"]
+ ):
+ logging.warning(
+ f"Unable to write {RelationType.CNT_EARLY_IN_REV} rows to the storage. "
+ f"Data: {self.cache['content_in_revision']}. Retrying..."
+ )
+ print(
+ "content_in_revision",
+ self.storage.relation_get(
+ RelationType.CNT_EARLY_IN_REV,
+ (src for src, _, _ in self.cache["content_in_revision"]),
+ ),
+ )
- self.storage = ProvenanceWithoutPathDB(conn)
- self.cache: ProvenanceCache = new_cache()
+ while not self.storage.relation_add(
+ RelationType.CNT_IN_DIR, self.cache["content_in_directory"]
+ ):
+ logging.warning(
+ f"Unable to write {RelationType.CNT_IN_DIR} rows to the storage. "
+ f"Data: {self.cache['content_in_directory']}. Retrying..."
+ )
+ print(
+ "content_in_directory",
+ self.storage.relation_get(
+ RelationType.CNT_IN_DIR,
+ (src for src, _, _ in self.cache["content_in_directory"]),
+ ),
+ )
- def clear_caches(self) -> None:
- self.cache = new_cache()
+ while not self.storage.relation_add(
+ RelationType.DIR_IN_REV, self.cache["directory_in_revision"]
+ ):
+ logging.warning(
+ f"Unable to write {RelationType.DIR_IN_REV} rows to the storage. "
+ f"Data: {self.cache['directory_in_revision']}. Retrying..."
+ )
+ print(
+ "directory_in_revision",
+ self.storage.relation_get(
+ RelationType.DIR_IN_REV,
+ (src for src, _, _ in self.cache["directory_in_revision"]),
+ ),
+ )
- def flush(self) -> None:
- # TODO: for now we just forward the cache. This should be improved!
- while not self.storage.commit(self.cache, raise_on_commit=self.raise_on_commit):
+ # After relations, dates for the entities can be safely set, acknowledging that
+ # these entities won't need to be reprocessed in case of failure.
+ dates = {
+ sha1: date
+ for sha1, date in self.cache["content"]["data"].items()
+ if sha1 in self.cache["content"]["added"] and date is not None
+ }
+ while not self.storage.content_set_date(dates):
+ logging.warning(
+ f"Unable to write content dates to the storage. "
+ f"Data: {dates}. Retrying..."
+ )
+
+ dates = {
+ sha1: date
+ for sha1, date in self.cache["directory"]["data"].items()
+ if sha1 in self.cache["directory"]["added"] and date is not None
+ }
+ while not self.storage.directory_set_date(dates):
+ logging.warning(
+ f"Unable to write directory dates to the storage. "
+ f"Data: {dates}. Retrying..."
+ )
+
+ dates = {
+ sha1: date
+ for sha1, date in self.cache["revision"]["data"].items()
+ if sha1 in self.cache["revision"]["added"] and date is not None
+ }
+ while not self.storage.revision_set_date(dates):
+ logging.warning(
+ f"Unable to write revision dates to the storage. "
+ f"Data: {dates}. Retrying..."
+ )
+
+ # Origin-revision layer insertions #############################################
+
+ # Origins urls should be inserted first so that internal ids' resolution works
+ # properly.
+ urls = {
+ sha1: date
+ for sha1, date in self.cache["origin"]["data"].items()
+ if sha1 in self.cache["origin"]["added"]
+ }
+ while not self.storage.origin_set_url(urls):
+ logging.warning(
+ f"Unable to write origins urls to the storage. "
+ f"Data: {urls}. Retrying..."
+ )
+
+ # Second, flat models for revisions' histories (ie. revision-before-revision).
+ rbr_data: Iterable[Tuple[Sha1Git, Sha1Git, Optional[bytes]]] = sum(
+ [
+ [
+ (prev, next, None)
+ for next in self.cache["revision_before_revision"][prev]
+ ]
+ for prev in self.cache["revision_before_revision"]
+ ],
+ [],
+ )
+ while not self.storage.relation_add(RelationType.REV_BEFORE_REV, rbr_data):
logging.warning(
- f"Unable to commit cached information {self.cache}. Retrying..."
+ f"Unable to write {RelationType.REV_BEFORE_REV} rows to the storage. "
+ f"Data: {rbr_data}. Retrying..."
)
+ print(
+ "revision_before_revision",
+ self.storage.relation_get(
+ RelationType.REV_BEFORE_REV,
+ self.cache["revision_before_revision"],
+ ),
+ )
+
+ # Heads (ie. revision-in-origin entries) should be inserted once flat models for
+ # their histories were already added. This is to guarantee consistent results if
+ # something needs to be reprocessed due to a failure: already inserted heads
+ # won't get reprocessed in such a case.
+ rio_data = [(rev, org, None) for rev, org in self.cache["revision_in_origin"]]
+ while not self.storage.relation_add(RelationType.REV_IN_ORG, rio_data):
+ logging.warning(
+ f"Unable to write {RelationType.REV_IN_ORG} rows to the storage. "
+ f"Data: {rio_data}. Retrying..."
+ )
+ print(
+ "revision_in_origin",
+ self.storage.relation_get(
+ RelationType.REV_IN_ORG,
+ (src for src, _ in self.cache["revision_in_origin"]),
+ ),
+ )
+
+ # Finally, preferred origins for the visited revisions are set (this step can be
+ # reordered if required).
+ origins = {
+ sha1: self.cache["revision_origin"]["data"][sha1]
+ for sha1 in self.cache["revision_origin"]["added"]
+ }
+ while not self.storage.revision_set_origin(origins):
+ logging.warning(
+ f"Unable to write preferred origins to the storage. "
+ f"Data: {origins}. Retrying..."
+ )
+
+ # clear local cache ############################################################
self.clear_caches()
def content_add_to_directory(
@@ -145,12 +279,22 @@
self.cache["directory"]["added"].add(directory.id)
def get_dates(
- self, entity: Literal["content", "revision", "directory"], ids: List[Sha1Git]
+ self,
+ entity: Literal["content", "directory", "revision"],
+ ids: Iterable[Sha1Git],
) -> Dict[Sha1Git, datetime]:
cache = self.cache[entity]
missing_ids = set(id for id in ids if id not in cache)
if missing_ids:
- cache["data"].update(self.storage.get_dates(entity, list(missing_ids)))
+ if entity == "revision":
+ updated = {
+ id: date
+ for id, (date, _) in self.storage.revision_get(missing_ids).items()
+ if date is not None
+ }
+ else:
+ updated = getattr(self.storage, f"{entity}_get")(missing_ids)
+ cache["data"].update(updated)
return {
sha1: date
for sha1, date in cache["data"].items()
@@ -183,17 +327,19 @@
def revision_get_preferred_origin(
self, revision: RevisionEntry
) -> Optional[Sha1Git]:
- cache = self.cache["revision_origin"]
+ cache = self.cache["revision_origin"]["data"]
if revision.id not in cache:
- origin = self.storage.revision_get_preferred_origin(revision.id)
- if origin is not None:
- cache["data"][revision.id] = origin
- return cache["data"].get(revision.id)
+ ret = self.storage.revision_get([revision.id])
+ if revision.id in ret:
+ origin = ret[revision.id][1] # TODO: make this not a tuple
+ if origin is not None:
+ cache[revision.id] = origin
+ return cache.get(revision.id)
def revision_in_history(self, revision: RevisionEntry) -> bool:
- return revision.id in self.cache[
- "revision_before_revision"
- ] or self.storage.revision_in_history(revision.id)
+ return revision.id in self.cache["revision_before_revision"] or bool(
+ self.storage.relation_get(RelationType.REV_BEFORE_REV, [revision.id])
+ )
def revision_set_preferred_origin(
self, origin: OriginEntry, revision: RevisionEntry
@@ -202,9 +348,9 @@
self.cache["revision_origin"]["added"].add(revision.id)
def revision_visited(self, revision: RevisionEntry) -> bool:
- return revision.id in dict(
- self.cache["revision_in_origin"]
- ) or self.storage.revision_visited(revision.id)
+ return revision.id in dict(self.cache["revision_in_origin"]) or bool(
+ self.storage.relation_get(RelationType.REV_IN_ORG, [revision.id])
+ )
def normalize(path: bytes) -> bytes:
diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py
--- a/swh/provenance/postgresql/provenancedb_base.py
+++ b/swh/provenance/postgresql/provenancedb_base.py
@@ -1,30 +1,35 @@
from datetime import datetime
import itertools
import logging
-from typing import Any, Dict, Generator, List, Mapping, Optional, Set, Tuple
+from typing import Dict, Generator, Iterable, Optional, Set, Tuple
import psycopg2
import psycopg2.extras
+from typing_extensions import Literal
from swh.model.model import Sha1Git
-from ..provenance import ProvenanceResult
+from ..provenance import ProvenanceResult, RelationType
class ProvenanceDBBase:
+ raise_on_commit: bool = False
+
def __init__(self, conn: psycopg2.extensions.connection):
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
conn.set_session(autocommit=True)
self.conn = conn
self.cursor = self.conn.cursor()
# XXX: not sure this is the best place to do it!
- self.cursor.execute("SET timezone TO 'UTC'")
+ sql = "SET timezone TO 'UTC'"
+ self.cursor.execute(sql)
self._flavor: Optional[str] = None
@property
def flavor(self) -> str:
if self._flavor is None:
- self.cursor.execute("select swh_get_dbflavor()")
+ sql = "SELECT swh_get_dbflavor()"
+ self.cursor.execute(sql)
self._flavor = self.cursor.fetchone()[0]
assert self._flavor is not None
return self._flavor
@@ -33,231 +38,246 @@
def with_path(self) -> bool:
return self.flavor == "with-path"
- def commit(self, data: Mapping[str, Any], raise_on_commit: bool = False) -> bool:
- try:
- # First insert entities
- for entity in ("content", "directory", "revision"):
- self.insert_entity(
- entity,
- {
- sha1: data[entity]["data"][sha1]
- for sha1 in data[entity]["added"]
- },
- )
- data[entity]["data"].clear()
- data[entity]["added"].clear()
-
- # Relations should come after ids for entities were resolved
- for relation in (
- "content_in_revision",
- "content_in_directory",
- "directory_in_revision",
- ):
- self.insert_relation(relation, data[relation])
-
- # Insert origins
- self.insert_origin(
- {
- sha1: data["origin"]["data"][sha1]
- for sha1 in data["origin"]["added"]
- },
- )
- data["origin"]["data"].clear()
- data["origin"]["added"].clear()
-
- # Insert relations from the origin-revision layer
- self.insert_revision_history(data["revision_before_revision"])
- self.insert_origin_head(data["revision_in_origin"])
-
- # Update preferred origins
- self.update_preferred_origin(
- {
- sha1: data["revision_origin"]["data"][sha1]
- for sha1 in data["revision_origin"]["added"]
- }
- )
- data["revision_origin"]["data"].clear()
- data["revision_origin"]["added"].clear()
+ def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
+ ...
- return True
+ def content_find_all(
+ self, id: Sha1Git, limit: Optional[int] = None
+ ) -> Generator[ProvenanceResult, None, None]:
+ ...
+
+ def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool:
+ return self._entity_set_date("content", dates)
+
+ def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
+ return self._entity_get_date("content", ids)
+
+ def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool:
+ return self._entity_set_date("directory", dates)
+ def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
+ return self._entity_get_date("directory", ids)
+
+ def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool:
+ try:
+ if urls:
+ sql = """
+ LOCK TABLE ONLY origin;
+ INSERT INTO origin(sha1, url) VALUES %s
+ ON CONFLICT DO NOTHING
+ """
+ psycopg2.extras.execute_values(self.cursor, sql, urls.items())
+ return True
except: # noqa: E722
# Unexpected error occurred, rollback all changes and log message
logging.exception("Unexpected error")
- if raise_on_commit:
+ if self.raise_on_commit:
raise
-
return False
- def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
- ...
+ def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]:
+ urls = {}
+ sha1s = tuple(ids)
+ if sha1s:
+ values = ", ".join(itertools.repeat("%s", len(sha1s)))
+ sql = f"""
+ SELECT sha1, url
+ FROM origin
+ WHERE sha1 IN ({values})
+ """
+ self.cursor.execute(sql, sha1s)
+ urls.update(self.cursor.fetchall())
+ return urls
- def content_find_all(
- self, id: Sha1Git, limit: Optional[int] = None
- ) -> Generator[ProvenanceResult, None, None]:
- ...
+ def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool:
+ return self._entity_set_date("revision", dates)
- def get_dates(self, entity: str, ids: List[Sha1Git]) -> Dict[Sha1Git, datetime]:
- dates = {}
- if ids:
- values = ", ".join(itertools.repeat("%s", len(ids)))
- self.cursor.execute(
- f"""SELECT sha1, date FROM {entity} WHERE sha1 IN ({values})""",
- tuple(ids),
- )
- dates.update(self.cursor.fetchall())
- return dates
+ def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool:
+ try:
+ if origins:
+ sql = """
+ LOCK TABLE ONLY revision;
+ INSERT INTO revision(sha1, origin)
+ (SELECT V.rev AS sha1, O.id AS origin
+ FROM (VALUES %s) AS V(rev, org)
+ JOIN origin AS O ON (O.sha1=V.org))
+ ON CONFLICT (sha1) DO
+ UPDATE SET origin=EXCLUDED.origin
+ """
+ psycopg2.extras.execute_values(self.cursor, sql, origins.items())
+ return True
+ except: # noqa: E722
+ # Unexpected error occurred, rollback all changes and log message
+ logging.exception("Unexpected error")
+ if self.raise_on_commit:
+ raise
+ return False
- def insert_entity(self, entity: str, data: Dict[Sha1Git, datetime]):
- if data:
- psycopg2.extras.execute_values(
- self.cursor,
- f"""
- LOCK TABLE ONLY {entity};
- INSERT INTO {entity}(sha1, date) VALUES %s
- ON CONFLICT (sha1) DO
- UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date)
- """,
- data.items(),
- )
- # XXX: not sure if Python takes a reference or a copy.
- # This might be useless!
- data.clear()
-
- def insert_origin(self, data: Dict[Sha1Git, str]):
- if data:
- psycopg2.extras.execute_values(
- self.cursor,
- """
- LOCK TABLE ONLY origin;
- INSERT INTO origin(sha1, url) VALUES %s
- ON CONFLICT DO NOTHING
- """,
- data.items(),
- )
- # XXX: not sure if Python takes a reference or a copy.
- # This might be useless!
- data.clear()
-
- def insert_origin_head(self, data: Set[Tuple[Sha1Git, Sha1Git]]):
- if data:
- # Insert revisions first, to ensure "foreign keys" exist
- # Origins are assumed to be already inserted (they require knowing the url)
- psycopg2.extras.execute_values(
- self.cursor,
+ def revision_get(
+ self, ids: Iterable[Sha1Git]
+ ) -> Dict[Sha1Git, Tuple[Optional[datetime], Optional[Sha1Git]]]:
+ result: Dict[Sha1Git, Tuple[Optional[datetime], Optional[Sha1Git]]] = {}
+ sha1s = tuple(ids)
+ if sha1s:
+ values = ", ".join(itertools.repeat("%s", len(sha1s)))
+ sql = f"""
+ SELECT sha1, date, origin
+ FROM revision
+ WHERE sha1 IN ({values})
"""
- LOCK TABLE ONLY revision;
- INSERT INTO revision(sha1) VALUES %s
- ON CONFLICT DO NOTHING
- """,
- set((rev,) for rev, _ in data),
+ self.cursor.execute(sql, sha1s)
+ result.update(
+ (rev, (date, org)) for rev, date, org in self.cursor.fetchall()
)
+ return result
- psycopg2.extras.execute_values(
- self.cursor,
- # XXX: not clear how conflicts are handled here!
- """
- LOCK TABLE ONLY revision_in_origin;
- INSERT INTO revision_in_origin
- SELECT R.id, O.id
- FROM (VALUES %s) AS V(rev, org)
- INNER JOIN revision AS R on (R.sha1=V.rev)
- INNER JOIN origin AS O on (O.sha1=V.org)
- ON CONFLICT DO NOTHING
- """,
- data,
- )
- data.clear()
+ def relation_add(
+ self,
+ relation: RelationType,
+ data: Iterable[Tuple[Sha1Git, Sha1Git, Optional[bytes]]],
+ ) -> bool:
+ try:
+ if data:
+ table = relation.value
+ src, *_, dst = table.split("_")
- def insert_relation(self, relation: str, data: Set[Tuple[Sha1Git, Sha1Git, bytes]]):
- ...
+ if src != "origin":
+ # Origin entries should be inserted previously as they require extra
+ # non-null information
+ srcs = tuple(set((sha1,) for (sha1, _, _) in data))
+ sql = f"""
+ LOCK TABLE ONLY {src};
+ INSERT INTO {src}(sha1) VALUES %s
+ ON CONFLICT DO NOTHING
+ """
+ psycopg2.extras.execute_values(self.cursor, sql, srcs)
+ if dst != "origin":
+ # Origin entries should be inserted previously as they require extra
+ # non-null information
+ dsts = tuple(set((sha1,) for (_, sha1, _) in data))
+ sql = f"""
+ LOCK TABLE ONLY {dst};
+ INSERT INTO {dst}(sha1) VALUES %s
+ ON CONFLICT DO NOTHING
+ """
+ psycopg2.extras.execute_values(self.cursor, sql, dsts)
+ joins = [
+ f"INNER JOIN {src} AS S ON (S.sha1=V.src)",
+ f"INNER JOIN {dst} AS D ON (D.sha1=V.dst)",
+ ]
+ selected = ["S.id", "D.id"]
- def insert_revision_history(self, data: Dict[Sha1Git, Set[Sha1Git]]):
- if data:
- # print(f"Inserting histories: {data}")
- # Insert revisions first, to ensure "foreign keys" exist
- revisions = set(data)
- for rev in data:
- revisions.update(data[rev])
- psycopg2.extras.execute_values(
- self.cursor,
- """
- LOCK TABLE ONLY revision;
- INSERT INTO revision(sha1) VALUES %s
- ON CONFLICT DO NOTHING
- """,
- ((rev,) for rev in revisions),
- )
+ if self._relation_uses_location_table(relation):
+ locations = tuple(set((path,) for (_, _, path) in data))
+ sql = """
+ LOCK TABLE ONLY location;
+ INSERT INTO location(path) VALUES %s
+ ON CONFLICT (path) DO NOTHING
+ """
+ psycopg2.extras.execute_values(self.cursor, sql, locations)
+
+ joins.append("INNER JOIN location AS L ON (L.path=V.path)")
+ selected.append("L.id")
+
+ sql = f"""
+ INSERT INTO {table}
+ (SELECT {", ".join(selected)}
+ FROM (VALUES %s) AS V(src, dst, path)
+ {'''
+ '''.join(joins)})
+ ON CONFLICT DO NOTHING
+ """
+ psycopg2.extras.execute_values(self.cursor, sql, data)
+ return True
+ except: # noqa: E722
+ # Unexpected error occurred, rollback all changes and log message
+ logging.exception("Unexpected error")
+ if self.raise_on_commit:
+ raise
+ return False
+
+ def relation_get(
+ self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False
+ ) -> Set[Tuple[Sha1Git, Sha1Git, Optional[bytes]]]:
+ result: Set[Tuple[Sha1Git, Sha1Git, Optional[bytes]]] = set()
+ sha1s = tuple(ids)
+ if sha1s:
+ table = relation.value
+ src, *_, dst = table.split("_")
- values = [[(prev, next) for next in data[prev]] for prev in data]
- psycopg2.extras.execute_values(
- self.cursor,
- # XXX: not clear how conflicts are handled here!
+ # TODO: improve this!
+ if src == "revision" and dst == "revision":
+ src_field = "prev"
+ dst_field = "next"
+ else:
+ src_field = src
+ dst_field = dst
+
+ joins = [
+ f"INNER JOIN {src} AS S ON (S.id=R.{src_field})",
+ f"INNER JOIN {dst} AS D ON (D.id=R.{dst_field})",
+ ]
+ selected = ["S.sha1", "D.sha1"]
+ selector = "S.sha1" if not reverse else "D.sha1"
+
+ if self._relation_uses_location_table(relation):
+ joins.append("INNER JOIN location AS L ON (L.id=R.location)")
+ selected.append("L.path")
+ else:
+ selected.append("NULL")
+
+ sql = f"""
+ SELECT {", ".join(selected)}
+ FROM {table} AS R
+ {" ".join(joins)}
+ WHERE {selector} IN %s
"""
- LOCK TABLE ONLY revision_before_revision;
- INSERT INTO revision_before_revision
- SELECT P.id, N.id
- FROM (VALUES %s) AS V(prev, next)
- INNER JOIN revision AS P on (P.sha1=V.prev)
- INNER JOIN revision AS N on (N.sha1=V.next)
- ON CONFLICT DO NOTHING
- """,
- sum(values, []),
+ self.cursor.execute(sql, (sha1s,))
+ result.update(
+ (src_sha1, dst_sha1, path)
+ for src_sha1, dst_sha1, path in self.cursor.fetchall()
)
- data.clear()
-
- def revision_get_preferred_origin(self, revision: Sha1Git) -> Optional[Sha1Git]:
- self.cursor.execute(
- """
- SELECT O.sha1
- FROM revision AS R
- JOIN origin as O
- ON R.origin=O.id
- WHERE R.sha1=%s""",
- (revision,),
- )
- row = self.cursor.fetchone()
- return row[0] if row is not None else None
-
- def revision_in_history(self, revision: Sha1Git) -> bool:
- self.cursor.execute(
- """
- SELECT 1
- FROM revision_before_revision
- JOIN revision
- ON revision.id=revision_before_revision.prev
- WHERE revision.sha1=%s
- """,
- (revision,),
- )
- return self.cursor.fetchone() is not None
-
- def revision_visited(self, revision: Sha1Git) -> bool:
- self.cursor.execute(
- """
- SELECT 1
- FROM revision_in_origin
- JOIN revision
- ON revision.id=revision_in_origin.revision
- WHERE revision.sha1=%s
- """,
- (revision,),
- )
- return self.cursor.fetchone() is not None
-
- def update_preferred_origin(self, data: Dict[Sha1Git, Sha1Git]):
- if data:
- # XXX: this is assuming the revision already exists in the db! It should
- # be improved by allowing null dates in the revision table.
- psycopg2.extras.execute_values(
- self.cursor,
+ return result
+
+ def _entity_get_date(
+ self,
+ entity: Literal["content", "directory", "revision"],
+ ids: Iterable[Sha1Git],
+ ) -> Dict[Sha1Git, datetime]:
+ dates = {}
+ sha1s = tuple(ids)
+ if sha1s:
+ values = ", ".join(itertools.repeat("%s", len(sha1s)))
+ sql = f"""
+ SELECT sha1, date
+ FROM {entity}
+ WHERE sha1 IN ({values})
"""
- UPDATE revision R
- SET origin=O.id
- FROM (VALUES %s) AS V(rev, org)
- INNER JOIN origin AS O on (O.sha1=V.org)
- WHERE R.sha1=V.rev
- """,
- data.items(),
- )
- data.clear()
+ self.cursor.execute(sql, sha1s)
+ dates.update(self.cursor.fetchall())
+ return dates
+
+ def _entity_set_date(
+ self,
+ entity: Literal["content", "directory", "revision"],
+ data: Dict[Sha1Git, datetime],
+ ) -> bool:
+ try:
+ if data:
+ sql = f"""
+ LOCK TABLE ONLY {entity};
+ INSERT INTO {entity}(sha1, date) VALUES %s
+ ON CONFLICT (sha1) DO
+ UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date)
+ """
+ psycopg2.extras.execute_values(self.cursor, sql, data.items())
+ return True
+ except: # noqa: E722
+ # Unexpected error occurred, rollback all changes and log message
+ logging.exception("Unexpected error")
+ if self.raise_on_commit:
+ raise
+ return False
+
+ def _relation_uses_location_table(self, relation: RelationType) -> bool:
+ ...
diff --git a/swh/provenance/postgresql/provenancedb_with_path.py b/swh/provenance/postgresql/provenancedb_with_path.py
--- a/swh/provenance/postgresql/provenancedb_with_path.py
+++ b/swh/provenance/postgresql/provenancedb_with_path.py
@@ -1,18 +1,14 @@
-from typing import Generator, Optional, Set, Tuple
-
-import psycopg2
-import psycopg2.extras
+from typing import Generator, Optional
from swh.model.model import Sha1Git
-from ..provenance import ProvenanceResult
+from ..provenance import ProvenanceResult, RelationType
from .provenancedb_base import ProvenanceDBBase
class ProvenanceWithPathDB(ProvenanceDBBase):
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
- self.cursor.execute(
- """
+ sql = """
SELECT C.sha1 AS blob,
R.sha1 AS rev,
R.date AS date,
@@ -25,9 +21,8 @@
LEFT JOIN origin as O ON (R.origin=O.id)
WHERE C.sha1=%s
ORDER BY date, rev, url, path ASC LIMIT 1
- """,
- (id,),
- )
+ """
+ self.cursor.execute(sql, (id,))
row = self.cursor.fetchone()
if row:
return ProvenanceResult(
@@ -40,8 +35,7 @@
self, id: Sha1Git, limit: Optional[int] = None
) -> Generator[ProvenanceResult, None, None]:
early_cut = f"LIMIT {limit}" if limit is not None else ""
- self.cursor.execute(
- f"""
+ sql = f"""
(SELECT C.sha1 AS blob,
R.sha1 AS rev,
R.date AS date,
@@ -72,49 +66,13 @@
LEFT JOIN origin AS O ON (R.origin=O.id)
WHERE C.sha1=%s)
ORDER BY date, rev, url, path {early_cut}
- """,
- (id, id),
- )
+ """
+ self.cursor.execute(sql, (id, id))
for row in self.cursor.fetchall():
yield ProvenanceResult(
content=row[0], revision=row[1], date=row[2], origin=row[3], path=row[4]
)
- def insert_relation(self, relation: str, data: Set[Tuple[Sha1Git, Sha1Git, bytes]]):
- """Insert entries in `relation` from `data`
-
- Also insert missing location entries in the 'location' table.
- """
- if data:
- assert relation in (
- "content_in_revision",
- "content_in_directory",
- "directory_in_revision",
- )
- src, dst = relation.split("_in_")
-
- # insert missing locations
- locations = tuple(set((loc,) for (_, _, loc) in data))
- psycopg2.extras.execute_values(
- self.cursor,
- """
- LOCK TABLE ONLY location;
- INSERT INTO location(path) VALUES %s
- ON CONFLICT (path) DO NOTHING
- """,
- locations,
- )
- psycopg2.extras.execute_values(
- self.cursor,
- f"""
- LOCK TABLE ONLY {relation};
- INSERT INTO {relation}
- SELECT {src}.id, {dst}.id, location.id
- FROM (VALUES %s) AS V(src, dst, path)
- INNER JOIN {src} on ({src}.sha1=V.src)
- INNER JOIN {dst} on ({dst}.sha1=V.dst)
- INNER JOIN location on (location.path=V.path)
- """,
- data,
- )
- data.clear()
+ def _relation_uses_location_table(self, relation: RelationType) -> bool:
+ src, *_ = relation.value.split("_")
+ return src in ("content", "directory")
diff --git a/swh/provenance/postgresql/provenancedb_without_path.py b/swh/provenance/postgresql/provenancedb_without_path.py
--- a/swh/provenance/postgresql/provenancedb_without_path.py
+++ b/swh/provenance/postgresql/provenancedb_without_path.py
@@ -1,18 +1,14 @@
-from typing import Generator, Optional, Set, Tuple
-
-import psycopg2
-import psycopg2.extras
+from typing import Generator, Optional
from swh.model.model import Sha1Git
-from ..provenance import ProvenanceResult
+from ..provenance import ProvenanceResult, RelationType
from .provenancedb_base import ProvenanceDBBase
class ProvenanceWithoutPathDB(ProvenanceDBBase):
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
- self.cursor.execute(
- """
+ sql = """
SELECT C.sha1 AS blob,
R.sha1 AS rev,
R.date AS date,
@@ -24,9 +20,8 @@
LEFT JOIN origin as O ON (R.origin=O.id)
WHERE C.sha1=%s
ORDER BY date, rev, url ASC LIMIT 1
- """,
- (id,),
- )
+ """
+ self.cursor.execute(sql, (id,))
row = self.cursor.fetchone()
if row:
return ProvenanceResult(
@@ -39,8 +34,7 @@
self, id: Sha1Git, limit: Optional[int] = None
) -> Generator[ProvenanceResult, None, None]:
early_cut = f"LIMIT {limit}" if limit is not None else ""
- self.cursor.execute(
- f"""
+ sql = f"""
(SELECT C.sha1 AS blob,
R.sha1 AS rev,
R.date AS date,
@@ -64,33 +58,12 @@
LEFT JOIN origin as O ON (R.origin=O.id)
WHERE C.sha1=%s)
ORDER BY date, rev, url {early_cut}
- """,
- (id, id),
- )
+ """
+ self.cursor.execute(sql, (id, id))
for row in self.cursor.fetchall():
yield ProvenanceResult(
content=row[0], revision=row[1], date=row[2], origin=row[3], path=row[4]
)
- def insert_relation(self, relation: str, data: Set[Tuple[Sha1Git, Sha1Git, bytes]]):
- if data:
- assert relation in (
- "content_in_revision",
- "content_in_directory",
- "directory_in_revision",
- )
- src, dst = relation.split("_in_")
-
- psycopg2.extras.execute_values(
- self.cursor,
- f"""
- LOCK TABLE ONLY {relation};
- INSERT INTO {relation}
- SELECT {src}.id, {dst}.id
- FROM (VALUES %s) AS V(src, dst)
- INNER JOIN {src} on ({src}.sha1=V.src)
- INNER JOIN {dst} on ({dst}.sha1=V.dst)
- """,
- data,
- )
- data.clear()
+ def _relation_uses_location_table(self, relation: RelationType) -> bool:
+ return False
diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py
--- a/swh/provenance/provenance.py
+++ b/swh/provenance/provenance.py
@@ -1,5 +1,6 @@
from datetime import datetime
-from typing import Dict, Generator, Iterable, Optional
+import enum
+from typing import Dict, Generator, Iterable, Optional, Set, Tuple
from typing_extensions import Protocol, TypedDict, runtime_checkable
@@ -18,7 +19,7 @@
@runtime_checkable
class ProvenanceInterface(Protocol):
- raise_on_commit: bool = False
+ storage: "ProvenanceStorageInterface"
def flush(self) -> None:
"""Flush internal cache to the underlying `storage`."""
@@ -151,3 +152,105 @@
provenance model.
"""
...
+
+
+class RelationType(enum.Enum):
+ CNT_EARLY_IN_REV = "content_in_revision"
+ CNT_IN_DIR = "content_in_directory"
+ DIR_IN_REV = "directory_in_revision"
+ REV_IN_ORG = "revision_in_origin"
+ REV_BEFORE_REV = "revision_before_revision"
+
+
+@runtime_checkable
+class ProvenanceStorageInterface(Protocol):
+ raise_on_commit: bool = False
+
+ def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
+ """Retrieve the first occurrence of the blob identified by `id`."""
+ ...
+
+ def content_find_all(
+ self, id: Sha1Git, limit: Optional[int] = None
+ ) -> Generator[ProvenanceResult, None, None]:
+ """Retrieve all the occurrences of the blob identified by `id`."""
+ ...
+
+ def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool:
+ """Associate dates to blobs identified by sha1 ids, as paired in `dates`. Return
+ a boolean stating whether the information was successfully stored.
+ """
+ ...
+
+ def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
+ """Retrieve the associated date for each blob sha1 in `ids`. If some blob has
+ no associated date, it is not present in the resulting dictionary.
+ """
+ ...
+
+ def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool:
+ """Associate dates to directories identified by sha1 ids, as paired in
+ `dates`. Return a boolean stating whether the information was successfully
+ stored.
+ """
+ ...
+
+ def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
+ """Retrieve the associated date for each directory sha1 in `ids`. If some
+ directory has no associated date, it is not present in the resulting dictionary.
+ """
+ ...
+
+ def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool:
+ """Associate urls to origins identified by sha1 ids, as paired in `urls`. Return
+ a boolean stating whether the information was successfully stored.
+ """
+ ...
+
+ def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]:
+ """Retrieve the associated url for each origin sha1 in `ids`. If some origin has
+ no associated date, it is not present in the resulting dictionary.
+ """
+ ...
+
+ def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool:
+ """Associate dates to revisions identified by sha1 ids, as paired in `dates`.
+ Return a boolean stating whether the information was successfully stored.
+ """
+ ...
+
+ def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool:
+ """Associate origins to revisions identified by sha1 ids, as paired in
+ `origins` (revision ids are keys and origin ids, values). Return a boolean
+ stating whether the information was successfully stored.
+ """
+ ...
+
+ def revision_get(
+ self, ids: Iterable[Sha1Git]
+ ) -> Dict[Sha1Git, Tuple[Optional[datetime], Optional[Sha1Git]]]:
+ """Retrieve the associated date and origin for each revision sha1 in `ids`. If
+ some revision has no associated date nor origin, it is not present in the
+ resulting dictionary.
+ """
+ ...
+
+ def relation_add(
+ self,
+ relation: RelationType,
+ data: Iterable[Tuple[Sha1Git, Sha1Git, Optional[bytes]]],
+ ) -> bool:
+ """Add entries in the selected `relation`. Each tuple in `data` is of the from
+ (`src`, `dst`, `path`), where `src` and `dst` are the sha1 ids of the entities
+ being related, and `path` is optional depending on the selected `relation`.
+ """
+ ...
+
+ def relation_get(
+ self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False
+ ) -> Set[Tuple[Sha1Git, Sha1Git, Optional[bytes]]]:
+ """Retrieve all tuples in the selected `relation` whose source entities are
+ identified by some sha1 id in `ids`. If `reverse` is set, destination entities
+ are matched instead.
+ """
+ ...
diff --git a/swh/provenance/sql/30-schema.sql b/swh/provenance/sql/30-schema.sql
--- a/swh/provenance/sql/30-schema.sql
+++ b/swh/provenance/sql/30-schema.sql
@@ -27,7 +27,7 @@
(
id bigserial primary key, -- internal identifier of the content blob
sha1 sha1_git unique not null, -- intrinsic identifier of the content blob
- date timestamptz not null -- timestamp of the revision where the blob appears early
+ date timestamptz -- timestamp of the revision where the blob appears early
);
comment on column content.id is 'Content internal identifier';
comment on column content.sha1 is 'Content intrinsic identifier';
@@ -37,7 +37,7 @@
(
id bigserial primary key, -- internal identifier of the directory appearing in an isochrone inner frontier
sha1 sha1_git unique not null, -- intrinsic identifier of the directory
- date timestamptz not null -- max timestamp among those of the directory children's
+ date timestamptz -- max timestamp among those of the directory children's
);
comment on column directory.id is 'Directory internal identifier';
comment on column directory.sha1 is 'Directory intrinsic identifier';
@@ -77,9 +77,9 @@
-- relation tables
create table content_in_revision
(
- content bigint not null, -- internal identifier of the content blob
- revision bigint not null, -- internal identifier of the revision where the blob appears for the first time
- location bigint -- location of the content relative to the revision root directory
+ content bigint not null, -- internal identifier of the content blob
+ revision bigint not null, -- internal identifier of the revision where the blob appears for the first time
+ location bigint -- location of the content relative to the revision root directory
-- foreign key (blob) references content (id),
-- foreign key (rev) references revision (id),
-- foreign key (loc) references location (id)
@@ -90,9 +90,9 @@
create table content_in_directory
(
- content bigint not null, -- internal identifier of the content blob
- directory bigint not null, -- internal identifier of the directory containing the blob
- location bigint -- location of the content relative to its parent directory in the isochrone frontier
+ content bigint not null, -- internal identifier of the content blob
+ directory bigint not null, -- internal identifier of the directory containing the blob
+ location bigint -- location of the content relative to its parent directory in the isochrone frontier
-- foreign key (blob) references content (id),
-- foreign key (dir) references directory (id),
-- foreign key (loc) references location (id)
@@ -103,9 +103,9 @@
create table directory_in_revision
(
- directory bigint not null, -- internal identifier of the directory appearing in the revision
- revision bigint not null, -- internal identifier of the revision containing the directory
- location bigint -- location of the directory relative to the revision root directory
+ directory bigint not null, -- internal identifier of the directory appearing in the revision
+ revision bigint not null, -- internal identifier of the revision containing the directory
+ location bigint -- location of the directory relative to the revision root directory
-- foreign key (dir) references directory (id),
-- foreign key (rev) references revision (id),
-- foreign key (loc) references location (id)
@@ -116,8 +116,8 @@
create table revision_in_origin
(
- revision bigint not null, -- internal identifier of the revision poined by the origin
- origin bigint not null -- internal identifier of the origin that points to the revision
+ revision bigint not null, -- internal identifier of the revision poined by the origin
+ origin bigint not null -- internal identifier of the origin that points to the revision
-- foreign key (rev) references revision (id),
-- foreign key (org) references origin (id)
);
diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py
--- a/swh/provenance/tests/conftest.py
+++ b/swh/provenance/tests/conftest.py
@@ -16,6 +16,7 @@
from swh.model.hashutil import hash_to_bytes
from swh.model.model import Sha1Git
from swh.model.tests.swh_model_data import TEST_OBJECTS
+from swh.provenance import get_provenance
from swh.provenance.postgresql.archive import ArchivePostgreSQL
from swh.provenance.storage.archive import ArchiveStorage
from swh.storage.replay import process_replay_objects
@@ -29,13 +30,14 @@
flavor = request.param
populate_database_for_package("swh.provenance", postgresql.dsn, flavor=flavor)
- from swh.provenance.backend import ProvenanceBackend
-
BaseDb.adapt_conn(postgresql)
- prov = ProvenanceBackend(postgresql)
+
+ args = dict(tuple(item.split("=")) for item in postgresql.dsn.split())
+ args.pop("options")
+ prov = get_provenance(cls="local", db=args)
assert prov.storage.flavor == flavor
# in test sessions, we DO want to raise any exception occurring at commit time
- prov.raise_on_commit = True
+ prov.storage.raise_on_commit = True
return prov
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 12:22 PM (2 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3225213
Attached To
D5947: Add `ProvenanceStorageInterface` as discussed during backend design
Event Timeline
Log In to Comment