diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -1,47 +1,46 @@ from typing import TYPE_CHECKING -import warnings from .postgresql.db_utils import connect if TYPE_CHECKING: - from swh.provenance.archive import ArchiveInterface - from swh.provenance.provenance import ProvenanceInterface + from .archive import ArchiveInterface + from .provenance import ProvenanceInterface, ProvenanceStorageInterface def get_archive(cls: str, **kwargs) -> "ArchiveInterface": if cls == "api": - from swh.provenance.storage.archive import ArchiveStorage from swh.storage import get_storage + from .storage.archive import ArchiveStorage + return ArchiveStorage(get_storage(**kwargs["storage"])) elif cls == "direct": - from swh.provenance.postgresql.archive import ArchivePostgreSQL + from .postgresql.archive import ArchivePostgreSQL return ArchivePostgreSQL(connect(kwargs["db"])) else: raise NotImplementedError -def get_provenance(cls: str, **kwargs) -> "ProvenanceInterface": +def get_provenance(**kwargs) -> "ProvenanceInterface": + from .backend import ProvenanceBackend + + return ProvenanceBackend(get_provenance_storage(**kwargs)) + + +def get_provenance_storage(cls: str, **kwargs) -> "ProvenanceStorageInterface": if cls == "local": + from .postgresql.provenancedb_base import ProvenanceDBBase + conn = connect(kwargs["db"]) - if "with_path" in kwargs: - warnings.warn( - "Usage of the 'with-path' config option is deprecated. " - "The db flavor is now used instead.", - DeprecationWarning, - ) - - with_path = kwargs.get("with_path") - from swh.provenance.backend import ProvenanceBackend - - prov = ProvenanceBackend(conn) - if with_path is not None: - flavor = "with-path" if with_path else "without-path" - if prov.storage.flavor != flavor: - raise ValueError( - "The given flavor does not match the flavor stored in the backend." - ) - return prov + flavor = ProvenanceDBBase(conn).flavor + if flavor == "with-path": + from .postgresql.provenancedb_with_path import ProvenanceWithPathDB + + return ProvenanceWithPathDB(conn) + else: + from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB + + return ProvenanceWithoutPathDB(conn) else: raise NotImplementedError diff --git a/swh/provenance/backend.py b/swh/provenance/backend.py --- a/swh/provenance/backend.py +++ b/swh/provenance/backend.py @@ -1,15 +1,14 @@ from datetime import datetime import logging import os -from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple +from typing import Dict, Generator, Iterable, Optional, Set, Tuple -import psycopg2 # TODO: remove this dependency from typing_extensions import Literal, TypedDict from swh.model.model import Sha1Git from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry -from .provenance import ProvenanceResult +from .provenance import ProvenanceResult, ProvenanceStorageInterface, RelationType class DatetimeCache(TypedDict): @@ -58,33 +57,168 @@ class ProvenanceBackend: - raise_on_commit: bool = False + def __init__(self, storage: ProvenanceStorageInterface): + self.storage = storage + self.cache = new_cache() - def __init__(self, conn: psycopg2.extensions.connection): - from .postgresql.provenancedb_base import ProvenanceDBBase + def clear_caches(self) -> None: + self.cache = new_cache() - # TODO: this class should not know what the actual used DB is. - self.storage: ProvenanceDBBase - flavor = ProvenanceDBBase(conn).flavor - if flavor == "with-path": - from .postgresql.provenancedb_with_path import ProvenanceWithPathDB + def flush(self) -> None: + # Revision-content layer insertions ############################################ - self.storage = ProvenanceWithPathDB(conn) - else: - from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB + # For this layer, relations need to be inserted first so that, in case of + # failure, reprocessing the input does not generated an inconsistent database. + while not self.storage.relation_add( + RelationType.CNT_EARLY_IN_REV, self.cache["content_in_revision"] + ): + logging.warning( + f"Unable to write {RelationType.CNT_EARLY_IN_REV} rows to the storage. " + f"Data: {self.cache['content_in_revision']}. Retrying..." + ) + print( + "content_in_revision", + self.storage.relation_get( + RelationType.CNT_EARLY_IN_REV, + (src for src, _, _ in self.cache["content_in_revision"]), + ), + ) - self.storage = ProvenanceWithoutPathDB(conn) - self.cache: ProvenanceCache = new_cache() + while not self.storage.relation_add( + RelationType.CNT_IN_DIR, self.cache["content_in_directory"] + ): + logging.warning( + f"Unable to write {RelationType.CNT_IN_DIR} rows to the storage. " + f"Data: {self.cache['content_in_directory']}. Retrying..." + ) + print( + "content_in_directory", + self.storage.relation_get( + RelationType.CNT_IN_DIR, + (src for src, _, _ in self.cache["content_in_directory"]), + ), + ) - def clear_caches(self) -> None: - self.cache = new_cache() + while not self.storage.relation_add( + RelationType.DIR_IN_REV, self.cache["directory_in_revision"] + ): + logging.warning( + f"Unable to write {RelationType.DIR_IN_REV} rows to the storage. " + f"Data: {self.cache['directory_in_revision']}. Retrying..." + ) + print( + "directory_in_revision", + self.storage.relation_get( + RelationType.DIR_IN_REV, + (src for src, _, _ in self.cache["directory_in_revision"]), + ), + ) - def flush(self) -> None: - # TODO: for now we just forward the cache. This should be improved! - while not self.storage.commit(self.cache, raise_on_commit=self.raise_on_commit): + # After relations, dates for the entities can be safely set, acknowledging that + # these entities won't need to be reprocessed in case of failure. + dates = { + sha1: date + for sha1, date in self.cache["content"]["data"].items() + if sha1 in self.cache["content"]["added"] and date is not None + } + while not self.storage.content_set_date(dates): + logging.warning( + f"Unable to write content dates to the storage. " + f"Data: {dates}. Retrying..." + ) + + dates = { + sha1: date + for sha1, date in self.cache["directory"]["data"].items() + if sha1 in self.cache["directory"]["added"] and date is not None + } + while not self.storage.directory_set_date(dates): + logging.warning( + f"Unable to write directory dates to the storage. " + f"Data: {dates}. Retrying..." + ) + + dates = { + sha1: date + for sha1, date in self.cache["revision"]["data"].items() + if sha1 in self.cache["revision"]["added"] and date is not None + } + while not self.storage.revision_set_date(dates): + logging.warning( + f"Unable to write revision dates to the storage. " + f"Data: {dates}. Retrying..." + ) + + # Origin-revision layer insertions ############################################# + + # Origins urls should be inserted first so that internal ids' resolution works + # properly. + urls = { + sha1: date + for sha1, date in self.cache["origin"]["data"].items() + if sha1 in self.cache["origin"]["added"] + } + while not self.storage.origin_set_url(urls): + logging.warning( + f"Unable to write origins urls to the storage. " + f"Data: {urls}. Retrying..." + ) + + # Second, flat models for revisions' histories (ie. revision-before-revision). + rbr_data: Iterable[Tuple[Sha1Git, Sha1Git, Optional[bytes]]] = sum( + [ + [ + (prev, next, None) + for next in self.cache["revision_before_revision"][prev] + ] + for prev in self.cache["revision_before_revision"] + ], + [], + ) + while not self.storage.relation_add(RelationType.REV_BEFORE_REV, rbr_data): logging.warning( - f"Unable to commit cached information {self.cache}. Retrying..." + f"Unable to write {RelationType.REV_BEFORE_REV} rows to the storage. " + f"Data: {rbr_data}. Retrying..." ) + print( + "revision_before_revision", + self.storage.relation_get( + RelationType.REV_BEFORE_REV, + self.cache["revision_before_revision"], + ), + ) + + # Heads (ie. revision-in-origin entries) should be inserted once flat models for + # their histories were already added. This is to guarantee consistent results if + # something needs to be reprocessed due to a failure: already inserted heads + # won't get reprocessed in such a case. + rio_data = [(rev, org, None) for rev, org in self.cache["revision_in_origin"]] + while not self.storage.relation_add(RelationType.REV_IN_ORG, rio_data): + logging.warning( + f"Unable to write {RelationType.REV_IN_ORG} rows to the storage. " + f"Data: {rio_data}. Retrying..." + ) + print( + "revision_in_origin", + self.storage.relation_get( + RelationType.REV_IN_ORG, + (src for src, _ in self.cache["revision_in_origin"]), + ), + ) + + # Finally, preferred origins for the visited revisions are set (this step can be + # reordered if required). + origins = { + sha1: self.cache["revision_origin"]["data"][sha1] + for sha1 in self.cache["revision_origin"]["added"] + } + while not self.storage.revision_set_origin(origins): + logging.warning( + f"Unable to write preferred origins to the storage. " + f"Data: {origins}. Retrying..." + ) + + # clear local cache ############################################################ self.clear_caches() def content_add_to_directory( @@ -145,12 +279,22 @@ self.cache["directory"]["added"].add(directory.id) def get_dates( - self, entity: Literal["content", "revision", "directory"], ids: List[Sha1Git] + self, + entity: Literal["content", "directory", "revision"], + ids: Iterable[Sha1Git], ) -> Dict[Sha1Git, datetime]: cache = self.cache[entity] missing_ids = set(id for id in ids if id not in cache) if missing_ids: - cache["data"].update(self.storage.get_dates(entity, list(missing_ids))) + if entity == "revision": + updated = { + id: date + for id, (date, _) in self.storage.revision_get(missing_ids).items() + if date is not None + } + else: + updated = getattr(self.storage, f"{entity}_get")(missing_ids) + cache["data"].update(updated) return { sha1: date for sha1, date in cache["data"].items() @@ -183,17 +327,19 @@ def revision_get_preferred_origin( self, revision: RevisionEntry ) -> Optional[Sha1Git]: - cache = self.cache["revision_origin"] + cache = self.cache["revision_origin"]["data"] if revision.id not in cache: - origin = self.storage.revision_get_preferred_origin(revision.id) - if origin is not None: - cache["data"][revision.id] = origin - return cache["data"].get(revision.id) + ret = self.storage.revision_get([revision.id]) + if revision.id in ret: + origin = ret[revision.id][1] # TODO: make this not a tuple + if origin is not None: + cache[revision.id] = origin + return cache.get(revision.id) def revision_in_history(self, revision: RevisionEntry) -> bool: - return revision.id in self.cache[ - "revision_before_revision" - ] or self.storage.revision_in_history(revision.id) + return revision.id in self.cache["revision_before_revision"] or bool( + self.storage.relation_get(RelationType.REV_BEFORE_REV, [revision.id]) + ) def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry @@ -202,9 +348,9 @@ self.cache["revision_origin"]["added"].add(revision.id) def revision_visited(self, revision: RevisionEntry) -> bool: - return revision.id in dict( - self.cache["revision_in_origin"] - ) or self.storage.revision_visited(revision.id) + return revision.id in dict(self.cache["revision_in_origin"]) or bool( + self.storage.relation_get(RelationType.REV_IN_ORG, [revision.id]) + ) def normalize(path: bytes) -> bytes: diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,30 +1,35 @@ from datetime import datetime import itertools import logging -from typing import Any, Dict, Generator, List, Mapping, Optional, Set, Tuple +from typing import Dict, Generator, Iterable, Optional, Set, Tuple import psycopg2 import psycopg2.extras +from typing_extensions import Literal from swh.model.model import Sha1Git -from ..provenance import ProvenanceResult +from ..provenance import ProvenanceResult, RelationType class ProvenanceDBBase: + raise_on_commit: bool = False + def __init__(self, conn: psycopg2.extensions.connection): conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor() # XXX: not sure this is the best place to do it! - self.cursor.execute("SET timezone TO 'UTC'") + sql = "SET timezone TO 'UTC'" + self.cursor.execute(sql) self._flavor: Optional[str] = None @property def flavor(self) -> str: if self._flavor is None: - self.cursor.execute("select swh_get_dbflavor()") + sql = "SELECT swh_get_dbflavor()" + self.cursor.execute(sql) self._flavor = self.cursor.fetchone()[0] assert self._flavor is not None return self._flavor @@ -33,231 +38,246 @@ def with_path(self) -> bool: return self.flavor == "with-path" - def commit(self, data: Mapping[str, Any], raise_on_commit: bool = False) -> bool: - try: - # First insert entities - for entity in ("content", "directory", "revision"): - self.insert_entity( - entity, - { - sha1: data[entity]["data"][sha1] - for sha1 in data[entity]["added"] - }, - ) - data[entity]["data"].clear() - data[entity]["added"].clear() - - # Relations should come after ids for entities were resolved - for relation in ( - "content_in_revision", - "content_in_directory", - "directory_in_revision", - ): - self.insert_relation(relation, data[relation]) - - # Insert origins - self.insert_origin( - { - sha1: data["origin"]["data"][sha1] - for sha1 in data["origin"]["added"] - }, - ) - data["origin"]["data"].clear() - data["origin"]["added"].clear() - - # Insert relations from the origin-revision layer - self.insert_revision_history(data["revision_before_revision"]) - self.insert_origin_head(data["revision_in_origin"]) - - # Update preferred origins - self.update_preferred_origin( - { - sha1: data["revision_origin"]["data"][sha1] - for sha1 in data["revision_origin"]["added"] - } - ) - data["revision_origin"]["data"].clear() - data["revision_origin"]["added"].clear() + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: + ... - return True + def content_find_all( + self, id: Sha1Git, limit: Optional[int] = None + ) -> Generator[ProvenanceResult, None, None]: + ... + + def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: + return self._entity_set_date("content", dates) + + def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: + return self._entity_get_date("content", ids) + + def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: + return self._entity_set_date("directory", dates) + def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: + return self._entity_get_date("directory", ids) + + def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: + try: + if urls: + sql = """ + LOCK TABLE ONLY origin; + INSERT INTO origin(sha1, url) VALUES %s + ON CONFLICT DO NOTHING + """ + psycopg2.extras.execute_values(self.cursor, sql, urls.items()) + return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") - if raise_on_commit: + if self.raise_on_commit: raise - return False - def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: - ... + def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: + urls = {} + sha1s = tuple(ids) + if sha1s: + values = ", ".join(itertools.repeat("%s", len(sha1s))) + sql = f""" + SELECT sha1, url + FROM origin + WHERE sha1 IN ({values}) + """ + self.cursor.execute(sql, sha1s) + urls.update(self.cursor.fetchall()) + return urls - def content_find_all( - self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[ProvenanceResult, None, None]: - ... + def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: + return self._entity_set_date("revision", dates) - def get_dates(self, entity: str, ids: List[Sha1Git]) -> Dict[Sha1Git, datetime]: - dates = {} - if ids: - values = ", ".join(itertools.repeat("%s", len(ids))) - self.cursor.execute( - f"""SELECT sha1, date FROM {entity} WHERE sha1 IN ({values})""", - tuple(ids), - ) - dates.update(self.cursor.fetchall()) - return dates + def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: + try: + if origins: + sql = """ + LOCK TABLE ONLY revision; + INSERT INTO revision(sha1, origin) + (SELECT V.rev AS sha1, O.id AS origin + FROM (VALUES %s) AS V(rev, org) + JOIN origin AS O ON (O.sha1=V.org)) + ON CONFLICT (sha1) DO + UPDATE SET origin=EXCLUDED.origin + """ + psycopg2.extras.execute_values(self.cursor, sql, origins.items()) + return True + except: # noqa: E722 + # Unexpected error occurred, rollback all changes and log message + logging.exception("Unexpected error") + if self.raise_on_commit: + raise + return False - def insert_entity(self, entity: str, data: Dict[Sha1Git, datetime]): - if data: - psycopg2.extras.execute_values( - self.cursor, - f""" - LOCK TABLE ONLY {entity}; - INSERT INTO {entity}(sha1, date) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) - """, - data.items(), - ) - # XXX: not sure if Python takes a reference or a copy. - # This might be useless! - data.clear() - - def insert_origin(self, data: Dict[Sha1Git, str]): - if data: - psycopg2.extras.execute_values( - self.cursor, - """ - LOCK TABLE ONLY origin; - INSERT INTO origin(sha1, url) VALUES %s - ON CONFLICT DO NOTHING - """, - data.items(), - ) - # XXX: not sure if Python takes a reference or a copy. - # This might be useless! - data.clear() - - def insert_origin_head(self, data: Set[Tuple[Sha1Git, Sha1Git]]): - if data: - # Insert revisions first, to ensure "foreign keys" exist - # Origins are assumed to be already inserted (they require knowing the url) - psycopg2.extras.execute_values( - self.cursor, + def revision_get( + self, ids: Iterable[Sha1Git] + ) -> Dict[Sha1Git, Tuple[Optional[datetime], Optional[Sha1Git]]]: + result: Dict[Sha1Git, Tuple[Optional[datetime], Optional[Sha1Git]]] = {} + sha1s = tuple(ids) + if sha1s: + values = ", ".join(itertools.repeat("%s", len(sha1s))) + sql = f""" + SELECT sha1, date, origin + FROM revision + WHERE sha1 IN ({values}) """ - LOCK TABLE ONLY revision; - INSERT INTO revision(sha1) VALUES %s - ON CONFLICT DO NOTHING - """, - set((rev,) for rev, _ in data), + self.cursor.execute(sql, sha1s) + result.update( + (rev, (date, org)) for rev, date, org in self.cursor.fetchall() ) + return result - psycopg2.extras.execute_values( - self.cursor, - # XXX: not clear how conflicts are handled here! - """ - LOCK TABLE ONLY revision_in_origin; - INSERT INTO revision_in_origin - SELECT R.id, O.id - FROM (VALUES %s) AS V(rev, org) - INNER JOIN revision AS R on (R.sha1=V.rev) - INNER JOIN origin AS O on (O.sha1=V.org) - ON CONFLICT DO NOTHING - """, - data, - ) - data.clear() + def relation_add( + self, + relation: RelationType, + data: Iterable[Tuple[Sha1Git, Sha1Git, Optional[bytes]]], + ) -> bool: + try: + if data: + table = relation.value + src, *_, dst = table.split("_") - def insert_relation(self, relation: str, data: Set[Tuple[Sha1Git, Sha1Git, bytes]]): - ... + if src != "origin": + # Origin entries should be inserted previously as they require extra + # non-null information + srcs = tuple(set((sha1,) for (sha1, _, _) in data)) + sql = f""" + LOCK TABLE ONLY {src}; + INSERT INTO {src}(sha1) VALUES %s + ON CONFLICT DO NOTHING + """ + psycopg2.extras.execute_values(self.cursor, sql, srcs) + if dst != "origin": + # Origin entries should be inserted previously as they require extra + # non-null information + dsts = tuple(set((sha1,) for (_, sha1, _) in data)) + sql = f""" + LOCK TABLE ONLY {dst}; + INSERT INTO {dst}(sha1) VALUES %s + ON CONFLICT DO NOTHING + """ + psycopg2.extras.execute_values(self.cursor, sql, dsts) + joins = [ + f"INNER JOIN {src} AS S ON (S.sha1=V.src)", + f"INNER JOIN {dst} AS D ON (D.sha1=V.dst)", + ] + selected = ["S.id", "D.id"] - def insert_revision_history(self, data: Dict[Sha1Git, Set[Sha1Git]]): - if data: - # print(f"Inserting histories: {data}") - # Insert revisions first, to ensure "foreign keys" exist - revisions = set(data) - for rev in data: - revisions.update(data[rev]) - psycopg2.extras.execute_values( - self.cursor, - """ - LOCK TABLE ONLY revision; - INSERT INTO revision(sha1) VALUES %s - ON CONFLICT DO NOTHING - """, - ((rev,) for rev in revisions), - ) + if self._relation_uses_location_table(relation): + locations = tuple(set((path,) for (_, _, path) in data)) + sql = """ + LOCK TABLE ONLY location; + INSERT INTO location(path) VALUES %s + ON CONFLICT (path) DO NOTHING + """ + psycopg2.extras.execute_values(self.cursor, sql, locations) + + joins.append("INNER JOIN location AS L ON (L.path=V.path)") + selected.append("L.id") + + sql = f""" + INSERT INTO {table} + (SELECT {", ".join(selected)} + FROM (VALUES %s) AS V(src, dst, path) + {''' + '''.join(joins)}) + ON CONFLICT DO NOTHING + """ + psycopg2.extras.execute_values(self.cursor, sql, data) + return True + except: # noqa: E722 + # Unexpected error occurred, rollback all changes and log message + logging.exception("Unexpected error") + if self.raise_on_commit: + raise + return False + + def relation_get( + self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False + ) -> Set[Tuple[Sha1Git, Sha1Git, Optional[bytes]]]: + result: Set[Tuple[Sha1Git, Sha1Git, Optional[bytes]]] = set() + sha1s = tuple(ids) + if sha1s: + table = relation.value + src, *_, dst = table.split("_") - values = [[(prev, next) for next in data[prev]] for prev in data] - psycopg2.extras.execute_values( - self.cursor, - # XXX: not clear how conflicts are handled here! + # TODO: improve this! + if src == "revision" and dst == "revision": + src_field = "prev" + dst_field = "next" + else: + src_field = src + dst_field = dst + + joins = [ + f"INNER JOIN {src} AS S ON (S.id=R.{src_field})", + f"INNER JOIN {dst} AS D ON (D.id=R.{dst_field})", + ] + selected = ["S.sha1", "D.sha1"] + selector = "S.sha1" if not reverse else "D.sha1" + + if self._relation_uses_location_table(relation): + joins.append("INNER JOIN location AS L ON (L.id=R.location)") + selected.append("L.path") + else: + selected.append("NULL") + + sql = f""" + SELECT {", ".join(selected)} + FROM {table} AS R + {" ".join(joins)} + WHERE {selector} IN %s """ - LOCK TABLE ONLY revision_before_revision; - INSERT INTO revision_before_revision - SELECT P.id, N.id - FROM (VALUES %s) AS V(prev, next) - INNER JOIN revision AS P on (P.sha1=V.prev) - INNER JOIN revision AS N on (N.sha1=V.next) - ON CONFLICT DO NOTHING - """, - sum(values, []), + self.cursor.execute(sql, (sha1s,)) + result.update( + (src_sha1, dst_sha1, path) + for src_sha1, dst_sha1, path in self.cursor.fetchall() ) - data.clear() - - def revision_get_preferred_origin(self, revision: Sha1Git) -> Optional[Sha1Git]: - self.cursor.execute( - """ - SELECT O.sha1 - FROM revision AS R - JOIN origin as O - ON R.origin=O.id - WHERE R.sha1=%s""", - (revision,), - ) - row = self.cursor.fetchone() - return row[0] if row is not None else None - - def revision_in_history(self, revision: Sha1Git) -> bool: - self.cursor.execute( - """ - SELECT 1 - FROM revision_before_revision - JOIN revision - ON revision.id=revision_before_revision.prev - WHERE revision.sha1=%s - """, - (revision,), - ) - return self.cursor.fetchone() is not None - - def revision_visited(self, revision: Sha1Git) -> bool: - self.cursor.execute( - """ - SELECT 1 - FROM revision_in_origin - JOIN revision - ON revision.id=revision_in_origin.revision - WHERE revision.sha1=%s - """, - (revision,), - ) - return self.cursor.fetchone() is not None - - def update_preferred_origin(self, data: Dict[Sha1Git, Sha1Git]): - if data: - # XXX: this is assuming the revision already exists in the db! It should - # be improved by allowing null dates in the revision table. - psycopg2.extras.execute_values( - self.cursor, + return result + + def _entity_get_date( + self, + entity: Literal["content", "directory", "revision"], + ids: Iterable[Sha1Git], + ) -> Dict[Sha1Git, datetime]: + dates = {} + sha1s = tuple(ids) + if sha1s: + values = ", ".join(itertools.repeat("%s", len(sha1s))) + sql = f""" + SELECT sha1, date + FROM {entity} + WHERE sha1 IN ({values}) """ - UPDATE revision R - SET origin=O.id - FROM (VALUES %s) AS V(rev, org) - INNER JOIN origin AS O on (O.sha1=V.org) - WHERE R.sha1=V.rev - """, - data.items(), - ) - data.clear() + self.cursor.execute(sql, sha1s) + dates.update(self.cursor.fetchall()) + return dates + + def _entity_set_date( + self, + entity: Literal["content", "directory", "revision"], + data: Dict[Sha1Git, datetime], + ) -> bool: + try: + if data: + sql = f""" + LOCK TABLE ONLY {entity}; + INSERT INTO {entity}(sha1, date) VALUES %s + ON CONFLICT (sha1) DO + UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) + """ + psycopg2.extras.execute_values(self.cursor, sql, data.items()) + return True + except: # noqa: E722 + # Unexpected error occurred, rollback all changes and log message + logging.exception("Unexpected error") + if self.raise_on_commit: + raise + return False + + def _relation_uses_location_table(self, relation: RelationType) -> bool: + ... diff --git a/swh/provenance/postgresql/provenancedb_with_path.py b/swh/provenance/postgresql/provenancedb_with_path.py --- a/swh/provenance/postgresql/provenancedb_with_path.py +++ b/swh/provenance/postgresql/provenancedb_with_path.py @@ -1,18 +1,14 @@ -from typing import Generator, Optional, Set, Tuple - -import psycopg2 -import psycopg2.extras +from typing import Generator, Optional from swh.model.model import Sha1Git -from ..provenance import ProvenanceResult +from ..provenance import ProvenanceResult, RelationType from .provenancedb_base import ProvenanceDBBase class ProvenanceWithPathDB(ProvenanceDBBase): def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: - self.cursor.execute( - """ + sql = """ SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, @@ -25,9 +21,8 @@ LEFT JOIN origin as O ON (R.origin=O.id) WHERE C.sha1=%s ORDER BY date, rev, url, path ASC LIMIT 1 - """, - (id,), - ) + """ + self.cursor.execute(sql, (id,)) row = self.cursor.fetchone() if row: return ProvenanceResult( @@ -40,8 +35,7 @@ self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" - self.cursor.execute( - f""" + sql = f""" (SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, @@ -72,49 +66,13 @@ LEFT JOIN origin AS O ON (R.origin=O.id) WHERE C.sha1=%s) ORDER BY date, rev, url, path {early_cut} - """, - (id, id), - ) + """ + self.cursor.execute(sql, (id, id)) for row in self.cursor.fetchall(): yield ProvenanceResult( content=row[0], revision=row[1], date=row[2], origin=row[3], path=row[4] ) - def insert_relation(self, relation: str, data: Set[Tuple[Sha1Git, Sha1Git, bytes]]): - """Insert entries in `relation` from `data` - - Also insert missing location entries in the 'location' table. - """ - if data: - assert relation in ( - "content_in_revision", - "content_in_directory", - "directory_in_revision", - ) - src, dst = relation.split("_in_") - - # insert missing locations - locations = tuple(set((loc,) for (_, _, loc) in data)) - psycopg2.extras.execute_values( - self.cursor, - """ - LOCK TABLE ONLY location; - INSERT INTO location(path) VALUES %s - ON CONFLICT (path) DO NOTHING - """, - locations, - ) - psycopg2.extras.execute_values( - self.cursor, - f""" - LOCK TABLE ONLY {relation}; - INSERT INTO {relation} - SELECT {src}.id, {dst}.id, location.id - FROM (VALUES %s) AS V(src, dst, path) - INNER JOIN {src} on ({src}.sha1=V.src) - INNER JOIN {dst} on ({dst}.sha1=V.dst) - INNER JOIN location on (location.path=V.path) - """, - data, - ) - data.clear() + def _relation_uses_location_table(self, relation: RelationType) -> bool: + src, *_ = relation.value.split("_") + return src in ("content", "directory") diff --git a/swh/provenance/postgresql/provenancedb_without_path.py b/swh/provenance/postgresql/provenancedb_without_path.py --- a/swh/provenance/postgresql/provenancedb_without_path.py +++ b/swh/provenance/postgresql/provenancedb_without_path.py @@ -1,18 +1,14 @@ -from typing import Generator, Optional, Set, Tuple - -import psycopg2 -import psycopg2.extras +from typing import Generator, Optional from swh.model.model import Sha1Git -from ..provenance import ProvenanceResult +from ..provenance import ProvenanceResult, RelationType from .provenancedb_base import ProvenanceDBBase class ProvenanceWithoutPathDB(ProvenanceDBBase): def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: - self.cursor.execute( - """ + sql = """ SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, @@ -24,9 +20,8 @@ LEFT JOIN origin as O ON (R.origin=O.id) WHERE C.sha1=%s ORDER BY date, rev, url ASC LIMIT 1 - """, - (id,), - ) + """ + self.cursor.execute(sql, (id,)) row = self.cursor.fetchone() if row: return ProvenanceResult( @@ -39,8 +34,7 @@ self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" - self.cursor.execute( - f""" + sql = f""" (SELECT C.sha1 AS blob, R.sha1 AS rev, R.date AS date, @@ -64,33 +58,12 @@ LEFT JOIN origin as O ON (R.origin=O.id) WHERE C.sha1=%s) ORDER BY date, rev, url {early_cut} - """, - (id, id), - ) + """ + self.cursor.execute(sql, (id, id)) for row in self.cursor.fetchall(): yield ProvenanceResult( content=row[0], revision=row[1], date=row[2], origin=row[3], path=row[4] ) - def insert_relation(self, relation: str, data: Set[Tuple[Sha1Git, Sha1Git, bytes]]): - if data: - assert relation in ( - "content_in_revision", - "content_in_directory", - "directory_in_revision", - ) - src, dst = relation.split("_in_") - - psycopg2.extras.execute_values( - self.cursor, - f""" - LOCK TABLE ONLY {relation}; - INSERT INTO {relation} - SELECT {src}.id, {dst}.id - FROM (VALUES %s) AS V(src, dst) - INNER JOIN {src} on ({src}.sha1=V.src) - INNER JOIN {dst} on ({dst}.sha1=V.dst) - """, - data, - ) - data.clear() + def _relation_uses_location_table(self, relation: RelationType) -> bool: + return False diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,5 +1,6 @@ from datetime import datetime -from typing import Dict, Generator, Iterable, Optional +import enum +from typing import Dict, Generator, Iterable, Optional, Set, Tuple from typing_extensions import Protocol, TypedDict, runtime_checkable @@ -18,7 +19,7 @@ @runtime_checkable class ProvenanceInterface(Protocol): - raise_on_commit: bool = False + storage: "ProvenanceStorageInterface" def flush(self) -> None: """Flush internal cache to the underlying `storage`.""" @@ -151,3 +152,105 @@ provenance model. """ ... + + +class RelationType(enum.Enum): + CNT_EARLY_IN_REV = "content_in_revision" + CNT_IN_DIR = "content_in_directory" + DIR_IN_REV = "directory_in_revision" + REV_IN_ORG = "revision_in_origin" + REV_BEFORE_REV = "revision_before_revision" + + +@runtime_checkable +class ProvenanceStorageInterface(Protocol): + raise_on_commit: bool = False + + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: + """Retrieve the first occurrence of the blob identified by `id`.""" + ... + + def content_find_all( + self, id: Sha1Git, limit: Optional[int] = None + ) -> Generator[ProvenanceResult, None, None]: + """Retrieve all the occurrences of the blob identified by `id`.""" + ... + + def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: + """Associate dates to blobs identified by sha1 ids, as paired in `dates`. Return + a boolean stating whether the information was successfully stored. + """ + ... + + def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: + """Retrieve the associated date for each blob sha1 in `ids`. If some blob has + no associated date, it is not present in the resulting dictionary. + """ + ... + + def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: + """Associate dates to directories identified by sha1 ids, as paired in + `dates`. Return a boolean stating whether the information was successfully + stored. + """ + ... + + def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: + """Retrieve the associated date for each directory sha1 in `ids`. If some + directory has no associated date, it is not present in the resulting dictionary. + """ + ... + + def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: + """Associate urls to origins identified by sha1 ids, as paired in `urls`. Return + a boolean stating whether the information was successfully stored. + """ + ... + + def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: + """Retrieve the associated url for each origin sha1 in `ids`. If some origin has + no associated date, it is not present in the resulting dictionary. + """ + ... + + def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: + """Associate dates to revisions identified by sha1 ids, as paired in `dates`. + Return a boolean stating whether the information was successfully stored. + """ + ... + + def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: + """Associate origins to revisions identified by sha1 ids, as paired in + `origins` (revision ids are keys and origin ids, values). Return a boolean + stating whether the information was successfully stored. + """ + ... + + def revision_get( + self, ids: Iterable[Sha1Git] + ) -> Dict[Sha1Git, Tuple[Optional[datetime], Optional[Sha1Git]]]: + """Retrieve the associated date and origin for each revision sha1 in `ids`. If + some revision has no associated date nor origin, it is not present in the + resulting dictionary. + """ + ... + + def relation_add( + self, + relation: RelationType, + data: Iterable[Tuple[Sha1Git, Sha1Git, Optional[bytes]]], + ) -> bool: + """Add entries in the selected `relation`. Each tuple in `data` is of the from + (`src`, `dst`, `path`), where `src` and `dst` are the sha1 ids of the entities + being related, and `path` is optional depending on the selected `relation`. + """ + ... + + def relation_get( + self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False + ) -> Set[Tuple[Sha1Git, Sha1Git, Optional[bytes]]]: + """Retrieve all tuples in the selected `relation` whose source entities are + identified by some sha1 id in `ids`. If `reverse` is set, destination entities + are matched instead. + """ + ... diff --git a/swh/provenance/sql/30-schema.sql b/swh/provenance/sql/30-schema.sql --- a/swh/provenance/sql/30-schema.sql +++ b/swh/provenance/sql/30-schema.sql @@ -27,7 +27,7 @@ ( id bigserial primary key, -- internal identifier of the content blob sha1 sha1_git unique not null, -- intrinsic identifier of the content blob - date timestamptz not null -- timestamp of the revision where the blob appears early + date timestamptz -- timestamp of the revision where the blob appears early ); comment on column content.id is 'Content internal identifier'; comment on column content.sha1 is 'Content intrinsic identifier'; @@ -37,7 +37,7 @@ ( id bigserial primary key, -- internal identifier of the directory appearing in an isochrone inner frontier sha1 sha1_git unique not null, -- intrinsic identifier of the directory - date timestamptz not null -- max timestamp among those of the directory children's + date timestamptz -- max timestamp among those of the directory children's ); comment on column directory.id is 'Directory internal identifier'; comment on column directory.sha1 is 'Directory intrinsic identifier'; @@ -77,9 +77,9 @@ -- relation tables create table content_in_revision ( - content bigint not null, -- internal identifier of the content blob - revision bigint not null, -- internal identifier of the revision where the blob appears for the first time - location bigint -- location of the content relative to the revision root directory + content bigint not null, -- internal identifier of the content blob + revision bigint not null, -- internal identifier of the revision where the blob appears for the first time + location bigint -- location of the content relative to the revision root directory -- foreign key (blob) references content (id), -- foreign key (rev) references revision (id), -- foreign key (loc) references location (id) @@ -90,9 +90,9 @@ create table content_in_directory ( - content bigint not null, -- internal identifier of the content blob - directory bigint not null, -- internal identifier of the directory containing the blob - location bigint -- location of the content relative to its parent directory in the isochrone frontier + content bigint not null, -- internal identifier of the content blob + directory bigint not null, -- internal identifier of the directory containing the blob + location bigint -- location of the content relative to its parent directory in the isochrone frontier -- foreign key (blob) references content (id), -- foreign key (dir) references directory (id), -- foreign key (loc) references location (id) @@ -103,9 +103,9 @@ create table directory_in_revision ( - directory bigint not null, -- internal identifier of the directory appearing in the revision - revision bigint not null, -- internal identifier of the revision containing the directory - location bigint -- location of the directory relative to the revision root directory + directory bigint not null, -- internal identifier of the directory appearing in the revision + revision bigint not null, -- internal identifier of the revision containing the directory + location bigint -- location of the directory relative to the revision root directory -- foreign key (dir) references directory (id), -- foreign key (rev) references revision (id), -- foreign key (loc) references location (id) @@ -116,8 +116,8 @@ create table revision_in_origin ( - revision bigint not null, -- internal identifier of the revision poined by the origin - origin bigint not null -- internal identifier of the origin that points to the revision + revision bigint not null, -- internal identifier of the revision poined by the origin + origin bigint not null -- internal identifier of the origin that points to the revision -- foreign key (rev) references revision (id), -- foreign key (org) references origin (id) ); diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -16,6 +16,7 @@ from swh.model.hashutil import hash_to_bytes from swh.model.model import Sha1Git from swh.model.tests.swh_model_data import TEST_OBJECTS +from swh.provenance import get_provenance from swh.provenance.postgresql.archive import ArchivePostgreSQL from swh.provenance.storage.archive import ArchiveStorage from swh.storage.replay import process_replay_objects @@ -29,13 +30,14 @@ flavor = request.param populate_database_for_package("swh.provenance", postgresql.dsn, flavor=flavor) - from swh.provenance.backend import ProvenanceBackend - BaseDb.adapt_conn(postgresql) - prov = ProvenanceBackend(postgresql) + + args = dict(tuple(item.split("=")) for item in postgresql.dsn.split()) + args.pop("options") + prov = get_provenance(cls="local", db=args) assert prov.storage.flavor == flavor # in test sessions, we DO want to raise any exception occurring at commit time - prov.raise_on_commit = True + prov.storage.raise_on_commit = True return prov