diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py index 9f167e1..945d8f3 100644 --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -1,47 +1,46 @@ from typing import TYPE_CHECKING -import warnings from .postgresql.db_utils import connect if TYPE_CHECKING: - from swh.provenance.archive import ArchiveInterface - from swh.provenance.provenance import ProvenanceInterface + from .archive import ArchiveInterface + from .provenance import ProvenanceInterface, ProvenanceStorageInterface def get_archive(cls: str, **kwargs) -> "ArchiveInterface": if cls == "api": - from swh.provenance.storage.archive import ArchiveStorage from swh.storage import get_storage + from .storage.archive import ArchiveStorage + return ArchiveStorage(get_storage(**kwargs["storage"])) elif cls == "direct": - from swh.provenance.postgresql.archive import ArchivePostgreSQL + from .postgresql.archive import ArchivePostgreSQL return ArchivePostgreSQL(connect(kwargs["db"])) else: raise NotImplementedError -def get_provenance(cls: str, **kwargs) -> "ProvenanceInterface": +def get_provenance(**kwargs) -> "ProvenanceInterface": + from .backend import ProvenanceBackend + + return ProvenanceBackend(get_provenance_storage(**kwargs)) + + +def get_provenance_storage(cls: str, **kwargs) -> "ProvenanceStorageInterface": if cls == "local": + from .postgresql.provenancedb_base import ProvenanceDBBase + conn = connect(kwargs["db"]) - if "with_path" in kwargs: - warnings.warn( - "Usage of the 'with-path' config option is deprecated. " - "The db flavor is now used instead.", - DeprecationWarning, - ) - - with_path = kwargs.get("with_path") - from swh.provenance.backend import ProvenanceBackend - - prov = ProvenanceBackend(conn) - if with_path is not None: - flavor = "with-path" if with_path else "without-path" - if prov.storage.flavor != flavor: - raise ValueError( - "The given flavor does not match the flavor stored in the backend." - ) - return prov + flavor = ProvenanceDBBase(conn).flavor + if flavor == "with-path": + from .postgresql.provenancedb_with_path import ProvenanceWithPathDB + + return ProvenanceWithPathDB(conn) + else: + from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB + + return ProvenanceWithoutPathDB(conn) else: raise NotImplementedError diff --git a/swh/provenance/backend.py b/swh/provenance/backend.py index 70b2906..fa1b781 100644 --- a/swh/provenance/backend.py +++ b/swh/provenance/backend.py @@ -1,213 +1,324 @@ from datetime import datetime import logging import os -from typing import Dict, Generator, Iterable, List, Optional, Set, Tuple +from typing import Dict, Generator, Iterable, Optional, Set, Tuple -import psycopg2 # TODO: remove this dependency from typing_extensions import Literal, TypedDict from swh.model.model import Sha1Git from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry -from .provenance import ProvenanceResult +from .provenance import ProvenanceResult, ProvenanceStorageInterface, RelationType class DatetimeCache(TypedDict): data: Dict[Sha1Git, Optional[datetime]] added: Set[Sha1Git] class OriginCache(TypedDict): data: Dict[Sha1Git, str] added: Set[Sha1Git] class RevisionCache(TypedDict): data: Dict[Sha1Git, Sha1Git] added: Set[Sha1Git] class ProvenanceCache(TypedDict): content: DatetimeCache directory: DatetimeCache revision: DatetimeCache # below are insertion caches only content_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] content_in_directory: Set[Tuple[Sha1Git, Sha1Git, bytes]] directory_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] # these two are for the origin layer origin: OriginCache revision_origin: RevisionCache revision_before_revision: Dict[Sha1Git, Set[Sha1Git]] revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]] def new_cache() -> ProvenanceCache: return ProvenanceCache( content=DatetimeCache(data={}, added=set()), directory=DatetimeCache(data={}, added=set()), revision=DatetimeCache(data={}, added=set()), content_in_revision=set(), content_in_directory=set(), directory_in_revision=set(), origin=OriginCache(data={}, added=set()), revision_origin=RevisionCache(data={}, added=set()), revision_before_revision={}, revision_in_origin=set(), ) # TODO: maybe move this to a separate file class ProvenanceBackend: - raise_on_commit: bool = False + def __init__(self, storage: ProvenanceStorageInterface): + self.storage = storage + self.cache = new_cache() - def __init__(self, conn: psycopg2.extensions.connection): - from .postgresql.provenancedb_base import ProvenanceDBBase + def clear_caches(self) -> None: + self.cache = new_cache() - # TODO: this class should not know what the actual used DB is. - self.storage: ProvenanceDBBase - flavor = ProvenanceDBBase(conn).flavor - if flavor == "with-path": - from .postgresql.provenancedb_with_path import ProvenanceWithPathDB + def flush(self) -> None: + # Revision-content layer insertions ############################################ - self.storage = ProvenanceWithPathDB(conn) - else: - from .postgresql.provenancedb_without_path import ProvenanceWithoutPathDB + # For this layer, relations need to be inserted first so that, in case of + # failure, reprocessing the input does not generated an inconsistent database. + while not self.storage.relation_add( + RelationType.CNT_EARLY_IN_REV, self.cache["content_in_revision"] + ): + logging.warning( + f"Unable to write {RelationType.CNT_EARLY_IN_REV} rows to the storage. " + f"Data: {self.cache['content_in_revision']}. Retrying..." + ) - self.storage = ProvenanceWithoutPathDB(conn) - self.cache: ProvenanceCache = new_cache() + while not self.storage.relation_add( + RelationType.CNT_IN_DIR, self.cache["content_in_directory"] + ): + logging.warning( + f"Unable to write {RelationType.CNT_IN_DIR} rows to the storage. " + f"Data: {self.cache['content_in_directory']}. Retrying..." + ) - def clear_caches(self) -> None: - self.cache = new_cache() + while not self.storage.relation_add( + RelationType.DIR_IN_REV, self.cache["directory_in_revision"] + ): + logging.warning( + f"Unable to write {RelationType.DIR_IN_REV} rows to the storage. " + f"Data: {self.cache['directory_in_revision']}. Retrying..." + ) - def flush(self) -> None: - # TODO: for now we just forward the cache. This should be improved! - while not self.storage.commit(self.cache, raise_on_commit=self.raise_on_commit): + # After relations, dates for the entities can be safely set, acknowledging that + # these entities won't need to be reprocessed in case of failure. + dates = { + sha1: date + for sha1, date in self.cache["content"]["data"].items() + if sha1 in self.cache["content"]["added"] and date is not None + } + while not self.storage.content_set_date(dates): + logging.warning( + f"Unable to write content dates to the storage. " + f"Data: {dates}. Retrying..." + ) + + dates = { + sha1: date + for sha1, date in self.cache["directory"]["data"].items() + if sha1 in self.cache["directory"]["added"] and date is not None + } + while not self.storage.directory_set_date(dates): + logging.warning( + f"Unable to write directory dates to the storage. " + f"Data: {dates}. Retrying..." + ) + + dates = { + sha1: date + for sha1, date in self.cache["revision"]["data"].items() + if sha1 in self.cache["revision"]["added"] and date is not None + } + while not self.storage.revision_set_date(dates): + logging.warning( + f"Unable to write revision dates to the storage. " + f"Data: {dates}. Retrying..." + ) + + # Origin-revision layer insertions ############################################# + + # Origins urls should be inserted first so that internal ids' resolution works + # properly. + urls = { + sha1: date + for sha1, date in self.cache["origin"]["data"].items() + if sha1 in self.cache["origin"]["added"] + } + while not self.storage.origin_set_url(urls): + logging.warning( + f"Unable to write origins urls to the storage. " + f"Data: {urls}. Retrying..." + ) + + # Second, flat models for revisions' histories (ie. revision-before-revision). + rbr_data: Iterable[Tuple[Sha1Git, Sha1Git, Optional[bytes]]] = sum( + [ + [ + (prev, next, None) + for next in self.cache["revision_before_revision"][prev] + ] + for prev in self.cache["revision_before_revision"] + ], + [], + ) + while not self.storage.relation_add(RelationType.REV_BEFORE_REV, rbr_data): logging.warning( - f"Unable to commit cached information {self.cache}. Retrying..." + f"Unable to write {RelationType.REV_BEFORE_REV} rows to the storage. " + f"Data: {rbr_data}. Retrying..." ) + + # Heads (ie. revision-in-origin entries) should be inserted once flat models for + # their histories were already added. This is to guarantee consistent results if + # something needs to be reprocessed due to a failure: already inserted heads + # won't get reprocessed in such a case. + rio_data = [(rev, org, None) for rev, org in self.cache["revision_in_origin"]] + while not self.storage.relation_add(RelationType.REV_IN_ORG, rio_data): + logging.warning( + f"Unable to write {RelationType.REV_IN_ORG} rows to the storage. " + f"Data: {rio_data}. Retrying..." + ) + + # Finally, preferred origins for the visited revisions are set (this step can be + # reordered if required). + origins = { + sha1: self.cache["revision_origin"]["data"][sha1] + for sha1 in self.cache["revision_origin"]["added"] + } + while not self.storage.revision_set_origin(origins): + logging.warning( + f"Unable to write preferred origins to the storage. " + f"Data: {origins}. Retrying..." + ) + + # clear local cache ############################################################ self.clear_caches() def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: self.cache["content_in_directory"].add( (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) ) def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: self.cache["content_in_revision"].add( (blob.id, revision.id, normalize(os.path.join(prefix, blob.name))) ) def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: return self.storage.content_find_first(id) def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: yield from self.storage.content_find_all(id, limit=limit) def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: return self.get_dates("content", [blob.id]).get(blob.id) def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[Sha1Git, datetime]: return self.get_dates("content", [blob.id for blob in blobs]) def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: self.cache["content"]["data"][blob.id] = date self.cache["content"]["added"].add(blob.id) def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: self.cache["directory_in_revision"].add( (directory.id, revision.id, normalize(path)) ) def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: return self.get_dates("directory", [directory.id]).get(directory.id) def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[Sha1Git, datetime]: return self.get_dates("directory", [directory.id for directory in dirs]) def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: self.cache["directory"]["data"][directory.id] = date self.cache["directory"]["added"].add(directory.id) def get_dates( - self, entity: Literal["content", "revision", "directory"], ids: List[Sha1Git] + self, + entity: Literal["content", "directory", "revision"], + ids: Iterable[Sha1Git], ) -> Dict[Sha1Git, datetime]: cache = self.cache[entity] missing_ids = set(id for id in ids if id not in cache) if missing_ids: - cache["data"].update(self.storage.get_dates(entity, list(missing_ids))) + if entity == "revision": + updated = { + id: date + for id, (date, _) in self.storage.revision_get(missing_ids).items() + if date is not None + } + else: + updated = getattr(self.storage, f"{entity}_get")(missing_ids) + cache["data"].update(updated) dates: Dict[Sha1Git, datetime] = {} for sha1 in ids: date = cache["data"].get(sha1) if date is not None: dates[sha1] = date return dates def origin_add(self, origin: OriginEntry) -> None: self.cache["origin"]["data"][origin.id] = origin.url self.cache["origin"]["added"].add(origin.id) def revision_add(self, revision: RevisionEntry) -> None: self.cache["revision"]["data"][revision.id] = revision.date self.cache["revision"]["added"].add(revision.id) def revision_add_before_revision( self, head: RevisionEntry, revision: RevisionEntry ) -> None: self.cache["revision_before_revision"].setdefault(revision.id, set()).add( head.id ) def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: self.cache["revision_in_origin"].add((revision.id, origin.id)) def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: return self.get_dates("revision", [revision.id]).get(revision.id) def revision_get_preferred_origin( self, revision: RevisionEntry ) -> Optional[Sha1Git]: - cache = self.cache["revision_origin"] + cache = self.cache["revision_origin"]["data"] if revision.id not in cache: - origin = self.storage.revision_get_preferred_origin(revision.id) - if origin is not None: - cache["data"][revision.id] = origin - return cache["data"].get(revision.id) + ret = self.storage.revision_get([revision.id]) + if revision.id in ret: + origin = ret[revision.id][1] # TODO: make this not a tuple + if origin is not None: + cache[revision.id] = origin + return cache.get(revision.id) def revision_in_history(self, revision: RevisionEntry) -> bool: - return revision.id in self.cache[ - "revision_before_revision" - ] or self.storage.revision_in_history(revision.id) + return revision.id in self.cache["revision_before_revision"] or bool( + self.storage.relation_get(RelationType.REV_BEFORE_REV, [revision.id]) + ) def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: self.cache["revision_origin"]["data"][revision.id] = origin.id self.cache["revision_origin"]["added"].add(revision.id) def revision_visited(self, revision: RevisionEntry) -> bool: - return revision.id in dict( - self.cache["revision_in_origin"] - ) or self.storage.revision_visited(revision.id) + return revision.id in dict(self.cache["revision_in_origin"]) or bool( + self.storage.relation_get(RelationType.REV_IN_ORG, [revision.id]) + ) def normalize(path: bytes) -> bytes: return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py index 150b593..30b970c 100644 --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -1,263 +1,283 @@ from datetime import datetime import itertools import logging -from typing import Any, Dict, Generator, List, Mapping, Optional, Set, Tuple +from typing import Dict, Generator, Iterable, Optional, Set, Tuple import psycopg2 import psycopg2.extras +from typing_extensions import Literal from swh.model.model import Sha1Git -from ..provenance import ProvenanceResult +from ..provenance import ProvenanceResult, RelationType class ProvenanceDBBase: + raise_on_commit: bool = False + def __init__(self, conn: psycopg2.extensions.connection): conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) conn.set_session(autocommit=True) self.conn = conn self.cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) # XXX: not sure this is the best place to do it! - self.cursor.execute("SET timezone TO 'UTC'") + sql = "SET timezone TO 'UTC'" + self.cursor.execute(sql) self._flavor: Optional[str] = None @property def flavor(self) -> str: if self._flavor is None: - self.cursor.execute("SELECT swh_get_dbflavor() AS flavor") + sql = "SELECT swh_get_dbflavor() AS flavor" + self.cursor.execute(sql) self._flavor = self.cursor.fetchone()["flavor"] assert self._flavor is not None return self._flavor @property def with_path(self) -> bool: return self.flavor == "with-path" - def commit(self, data: Mapping[str, Any], raise_on_commit: bool = False) -> bool: - try: - # First insert entities - for entity in ("content", "directory", "revision"): - self.insert_entity( - entity, - { - sha1: data[entity]["data"][sha1] - for sha1 in data[entity]["added"] - }, - ) - data[entity]["data"].clear() - data[entity]["added"].clear() - - # Relations should come after ids for entities were resolved - for relation in ( - "content_in_revision", - "content_in_directory", - "directory_in_revision", - ): - self.insert_relation(relation, data[relation]) - - # Insert origins - self.insert_origin( - { - sha1: data["origin"]["data"][sha1] - for sha1 in data["origin"]["added"] - }, - ) - data["origin"]["data"].clear() - data["origin"]["added"].clear() - - # Insert relations from the origin-revision layer - self.insert_revision_history(data["revision_before_revision"]) - self.insert_origin_head(data["revision_in_origin"]) - - # Update preferred origins - self.update_preferred_origin( - { - sha1: data["revision_origin"]["data"][sha1] - for sha1 in data["revision_origin"]["added"] - } - ) - data["revision_origin"]["data"].clear() - data["revision_origin"]["added"].clear() + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: + ... - return True + def content_find_all( + self, id: Sha1Git, limit: Optional[int] = None + ) -> Generator[ProvenanceResult, None, None]: + ... + + def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: + return self._entity_set_date("content", dates) + + def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: + return self._entity_get_date("content", ids) + + def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: + return self._entity_set_date("directory", dates) + def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: + return self._entity_get_date("directory", ids) + + def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: + try: + if urls: + sql = """ + LOCK TABLE ONLY origin; + INSERT INTO origin(sha1, url) VALUES %s + ON CONFLICT DO NOTHING + """ + psycopg2.extras.execute_values(self.cursor, sql, urls.items()) + return True except: # noqa: E722 # Unexpected error occurred, rollback all changes and log message logging.exception("Unexpected error") - if raise_on_commit: + if self.raise_on_commit: raise - return False - def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: - ... + def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: + urls: Dict[Sha1Git, str] = {} + sha1s = tuple(ids) + if sha1s: + values = ", ".join(itertools.repeat("%s", len(sha1s))) + sql = f""" + SELECT sha1, url + FROM origin + WHERE sha1 IN ({values}) + """ + self.cursor.execute(sql, sha1s) + urls.update((row["sha1"], row["url"]) for row in self.cursor.fetchall()) + return urls - def content_find_all( - self, id: Sha1Git, limit: Optional[int] = None - ) -> Generator[ProvenanceResult, None, None]: - ... + def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: + return self._entity_set_date("revision", dates) - def get_dates(self, entity: str, ids: List[Sha1Git]) -> Dict[Sha1Git, datetime]: - dates: Dict[Sha1Git, datetime] = {} - if ids: - values = ", ".join(itertools.repeat("%s", len(ids))) - self.cursor.execute( - f"""SELECT sha1, date FROM {entity} WHERE sha1 IN ({values})""", - tuple(ids), - ) - dates.update(((row["sha1"], row["date"]) for row in self.cursor.fetchall())) - return dates + def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: + try: + if origins: + sql = """ + LOCK TABLE ONLY revision; + INSERT INTO revision(sha1, origin) + (SELECT V.rev AS sha1, O.id AS origin + FROM (VALUES %s) AS V(rev, org) + JOIN origin AS O ON (O.sha1=V.org)) + ON CONFLICT (sha1) DO + UPDATE SET origin=EXCLUDED.origin + """ + psycopg2.extras.execute_values(self.cursor, sql, origins.items()) + return True + except: # noqa: E722 + # Unexpected error occurred, rollback all changes and log message + logging.exception("Unexpected error") + if self.raise_on_commit: + raise + return False - def insert_entity(self, entity: str, data: Dict[Sha1Git, datetime]): - if data: - psycopg2.extras.execute_values( - self.cursor, - f""" - LOCK TABLE ONLY {entity}; - INSERT INTO {entity}(sha1, date) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) - """, - data.items(), - ) - # XXX: not sure if Python takes a reference or a copy. - # This might be useless! - data.clear() - - def insert_origin(self, data: Dict[Sha1Git, str]): - if data: - psycopg2.extras.execute_values( - self.cursor, - """ - LOCK TABLE ONLY origin; - INSERT INTO origin(sha1, url) VALUES %s - ON CONFLICT DO NOTHING - """, - data.items(), - ) - # XXX: not sure if Python takes a reference or a copy. - # This might be useless! - data.clear() - - def insert_origin_head(self, data: Set[Tuple[Sha1Git, Sha1Git]]): - if data: - # Insert revisions first, to ensure "foreign keys" exist - # Origins are assumed to be already inserted (they require knowing the url) - psycopg2.extras.execute_values( - self.cursor, + def revision_get( + self, ids: Iterable[Sha1Git] + ) -> Dict[Sha1Git, Tuple[Optional[datetime], Optional[Sha1Git]]]: + result: Dict[Sha1Git, Tuple[Optional[datetime], Optional[Sha1Git]]] = {} + sha1s = tuple(ids) + if sha1s: + values = ", ".join(itertools.repeat("%s", len(sha1s))) + sql = f""" + SELECT sha1, date, origin + FROM revision + WHERE sha1 IN ({values}) """ - LOCK TABLE ONLY revision; - INSERT INTO revision(sha1) VALUES %s - ON CONFLICT DO NOTHING - """, - {(rev,) for rev, _ in data}, + self.cursor.execute(sql, sha1s) + result.update( + (row["sha1"], (row["date"], row["origin"])) + for row in self.cursor.fetchall() ) + return result - psycopg2.extras.execute_values( - self.cursor, - # XXX: not clear how conflicts are handled here! - """ - LOCK TABLE ONLY revision_in_origin; - INSERT INTO revision_in_origin - SELECT R.id, O.id - FROM (VALUES %s) AS V(rev, org) - INNER JOIN revision AS R on (R.sha1=V.rev) - INNER JOIN origin AS O on (O.sha1=V.org) - ON CONFLICT DO NOTHING - """, - data, - ) - data.clear() + def relation_add( + self, + relation: RelationType, + data: Iterable[Tuple[Sha1Git, Sha1Git, Optional[bytes]]], + ) -> bool: + try: + if data: + table = relation.value + src, *_, dst = table.split("_") - def insert_relation(self, relation: str, data: Set[Tuple[Sha1Git, Sha1Git, bytes]]): - ... + if src != "origin": + # Origin entries should be inserted previously as they require extra + # non-null information + srcs = tuple(set((sha1,) for (sha1, _, _) in data)) + sql = f""" + LOCK TABLE ONLY {src}; + INSERT INTO {src}(sha1) VALUES %s + ON CONFLICT DO NOTHING + """ + psycopg2.extras.execute_values(self.cursor, sql, srcs) + if dst != "origin": + # Origin entries should be inserted previously as they require extra + # non-null information + dsts = tuple(set((sha1,) for (_, sha1, _) in data)) + sql = f""" + LOCK TABLE ONLY {dst}; + INSERT INTO {dst}(sha1) VALUES %s + ON CONFLICT DO NOTHING + """ + psycopg2.extras.execute_values(self.cursor, sql, dsts) + joins = [ + f"INNER JOIN {src} AS S ON (S.sha1=V.src)", + f"INNER JOIN {dst} AS D ON (D.sha1=V.dst)", + ] + selected = ["S.id", "D.id"] - def insert_revision_history(self, data: Dict[Sha1Git, Set[Sha1Git]]): - if data: - # print(f"Inserting histories: {data}") - # Insert revisions first, to ensure "foreign keys" exist - revisions = set(data) - for rev in data: - revisions.update(data[rev]) - psycopg2.extras.execute_values( - self.cursor, - """ - LOCK TABLE ONLY revision; - INSERT INTO revision(sha1) VALUES %s - ON CONFLICT DO NOTHING - """, - ((rev,) for rev in revisions), - ) + if self._relation_uses_location_table(relation): + locations = tuple(set((path,) for (_, _, path) in data)) + sql = """ + LOCK TABLE ONLY location; + INSERT INTO location(path) VALUES %s + ON CONFLICT (path) DO NOTHING + """ + psycopg2.extras.execute_values(self.cursor, sql, locations) + + joins.append("INNER JOIN location AS L ON (L.path=V.path)") + selected.append("L.id") + + sql = f""" + INSERT INTO {table} + (SELECT {", ".join(selected)} + FROM (VALUES %s) AS V(src, dst, path) + {''' + '''.join(joins)}) + ON CONFLICT DO NOTHING + """ + psycopg2.extras.execute_values(self.cursor, sql, data) + return True + except: # noqa: E722 + # Unexpected error occurred, rollback all changes and log message + logging.exception("Unexpected error") + if self.raise_on_commit: + raise + return False + + def relation_get( + self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False + ) -> Set[Tuple[Sha1Git, Sha1Git, Optional[bytes]]]: + result: Set[Tuple[Sha1Git, Sha1Git, Optional[bytes]]] = set() + sha1s = tuple(ids) + if sha1s: + table = relation.value + src, *_, dst = table.split("_") - values = [[(prev, next) for next in data[prev]] for prev in data] - psycopg2.extras.execute_values( - self.cursor, - # XXX: not clear how conflicts are handled here! + # TODO: improve this! + if src == "revision" and dst == "revision": + src_field = "prev" + dst_field = "next" + else: + src_field = src + dst_field = dst + + joins = [ + f"INNER JOIN {src} AS S ON (S.id=R.{src_field})", + f"INNER JOIN {dst} AS D ON (D.id=R.{dst_field})", + ] + selected = ["S.sha1 AS src", "D.sha1 AS dst"] + selector = "S.sha1" if not reverse else "D.sha1" + + if self._relation_uses_location_table(relation): + joins.append("INNER JOIN location AS L ON (L.id=R.location)") + selected.append("L.path AS path") + else: + selected.append("NULL AS path") + + sql = f""" + SELECT {", ".join(selected)} + FROM {table} AS R + {" ".join(joins)} + WHERE {selector} IN %s """ - LOCK TABLE ONLY revision_before_revision; - INSERT INTO revision_before_revision - SELECT P.id, N.id - FROM (VALUES %s) AS V(prev, next) - INNER JOIN revision AS P on (P.sha1=V.prev) - INNER JOIN revision AS N on (N.sha1=V.next) - ON CONFLICT DO NOTHING - """, - sum(values, []), + self.cursor.execute(sql, (sha1s,)) + result.update( + (row["src"], row["dst"], row["path"]) for row in self.cursor.fetchall() ) - data.clear() - - def revision_get_preferred_origin(self, revision: Sha1Git) -> Optional[Sha1Git]: - self.cursor.execute( - """ - SELECT O.sha1 - FROM revision AS R - JOIN origin as O - ON R.origin=O.id - WHERE R.sha1=%s""", - (revision,), - ) - row = self.cursor.fetchone() - return row["sha1"] if row is not None else None - - def revision_in_history(self, revision: Sha1Git) -> bool: - self.cursor.execute( - """ - SELECT 1 - FROM revision_before_revision - JOIN revision - ON revision.id=revision_before_revision.prev - WHERE revision.sha1=%s - """, - (revision,), - ) - return self.cursor.fetchone() is not None - - def revision_visited(self, revision: Sha1Git) -> bool: - self.cursor.execute( - """ - SELECT 1 - FROM revision_in_origin - JOIN revision - ON revision.id=revision_in_origin.revision - WHERE revision.sha1=%s - """, - (revision,), - ) - return self.cursor.fetchone() is not None - - def update_preferred_origin(self, data: Dict[Sha1Git, Sha1Git]): - if data: - # XXX: this is assuming the revision already exists in the db! It should - # be improved by allowing null dates in the revision table. - psycopg2.extras.execute_values( - self.cursor, + return result + + def _entity_get_date( + self, + entity: Literal["content", "directory", "revision"], + ids: Iterable[Sha1Git], + ) -> Dict[Sha1Git, datetime]: + dates: Dict[Sha1Git, datetime] = {} + sha1s = tuple(ids) + if sha1s: + values = ", ".join(itertools.repeat("%s", len(sha1s))) + sql = f""" + SELECT sha1, date + FROM {entity} + WHERE sha1 IN ({values}) """ - UPDATE revision R - SET origin=O.id - FROM (VALUES %s) AS V(rev, org) - INNER JOIN origin AS O on (O.sha1=V.org) - WHERE R.sha1=V.rev - """, - data.items(), - ) - data.clear() + self.cursor.execute(sql, sha1s) + dates.update((row["sha1"], row["date"]) for row in self.cursor.fetchall()) + return dates + + def _entity_set_date( + self, + entity: Literal["content", "directory", "revision"], + data: Dict[Sha1Git, datetime], + ) -> bool: + try: + if data: + sql = f""" + LOCK TABLE ONLY {entity}; + INSERT INTO {entity}(sha1, date) VALUES %s + ON CONFLICT (sha1) DO + UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) + """ + psycopg2.extras.execute_values(self.cursor, sql, data.items()) + return True + except: # noqa: E722 + # Unexpected error occurred, rollback all changes and log message + logging.exception("Unexpected error") + if self.raise_on_commit: + raise + return False + + def _relation_uses_location_table(self, relation: RelationType) -> bool: + ... diff --git a/swh/provenance/postgresql/provenancedb_with_path.py b/swh/provenance/postgresql/provenancedb_with_path.py index a502b8f..8e27387 100644 --- a/swh/provenance/postgresql/provenancedb_with_path.py +++ b/swh/provenance/postgresql/provenancedb_with_path.py @@ -1,108 +1,70 @@ -from typing import Generator, Optional, Set, Tuple - -import psycopg2 -import psycopg2.extras +from typing import Generator, Optional from swh.model.model import Sha1Git -from ..provenance import ProvenanceResult +from ..provenance import ProvenanceResult, RelationType from .provenancedb_base import ProvenanceDBBase class ProvenanceWithPathDB(ProvenanceDBBase): def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: sql = """ SELECT C.sha1 AS content, R.sha1 AS revision, R.date AS date, O.url AS origin, L.path AS path FROM content AS C INNER JOIN content_in_revision AS CR ON (CR.content=C.id) INNER JOIN location as L ON (CR.location=L.id) INNER JOIN revision as R ON (CR.revision=R.id) LEFT JOIN origin as O ON (R.origin=O.id) WHERE C.sha1=%s ORDER BY date, revision, origin, path ASC LIMIT 1 """ self.cursor.execute(sql, (id,)) row = self.cursor.fetchone() return ProvenanceResult(**row) if row is not None else None def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" sql = f""" (SELECT C.sha1 AS content, R.sha1 AS revision, R.date AS date, O.url AS origin, L.path AS path FROM content AS C INNER JOIN content_in_revision AS CR ON (CR.content=C.id) INNER JOIN location AS L ON (CR.location=L.id) INNER JOIN revision AS R ON (CR.revision=R.id) LEFT JOIN origin as O ON (R.origin=O.id) WHERE C.sha1=%s) UNION (SELECT C.sha1 AS content, R.sha1 AS revision, R.date AS date, O.url AS origin, CASE DL.path WHEN '' THEN CL.path WHEN '.' THEN CL.path ELSE (DL.path || '/' || CL.path)::unix_path END AS path FROM content AS C INNER JOIN content_in_directory AS CD ON (C.id=CD.content) INNER JOIN directory_in_revision AS DR ON (CD.directory=DR.directory) INNER JOIN revision AS R ON (DR.revision=R.id) INNER JOIN location AS CL ON (CD.location=CL.id) INNER JOIN location AS DL ON (DR.location=DL.id) LEFT JOIN origin AS O ON (R.origin=O.id) WHERE C.sha1=%s) ORDER BY date, revision, origin, path {early_cut} """ self.cursor.execute(sql, (id, id)) yield from (ProvenanceResult(**row) for row in self.cursor.fetchall()) - def insert_relation(self, relation: str, data: Set[Tuple[Sha1Git, Sha1Git, bytes]]): - """Insert entries in `relation` from `data` - - Also insert missing location entries in the 'location' table. - """ - if data: - assert relation in ( - "content_in_revision", - "content_in_directory", - "directory_in_revision", - ) - src, dst = relation.split("_in_") - - # insert missing locations - locations = tuple(set((loc,) for (_, _, loc) in data)) - psycopg2.extras.execute_values( - self.cursor, - """ - LOCK TABLE ONLY location; - INSERT INTO location(path) VALUES %s - ON CONFLICT (path) DO NOTHING - """, - locations, - ) - psycopg2.extras.execute_values( - self.cursor, - f""" - LOCK TABLE ONLY {relation}; - INSERT INTO {relation} - SELECT {src}.id, {dst}.id, location.id - FROM (VALUES %s) AS V(src, dst, path) - INNER JOIN {src} on ({src}.sha1=V.src) - INNER JOIN {dst} on ({dst}.sha1=V.dst) - INNER JOIN location on (location.path=V.path) - """, - data, - ) - data.clear() + def _relation_uses_location_table(self, relation: RelationType) -> bool: + src, *_ = relation.value.split("_") + return src in ("content", "directory") diff --git a/swh/provenance/postgresql/provenancedb_without_path.py b/swh/provenance/postgresql/provenancedb_without_path.py index 789bc92..11a2a70 100644 --- a/swh/provenance/postgresql/provenancedb_without_path.py +++ b/swh/provenance/postgresql/provenancedb_without_path.py @@ -1,84 +1,61 @@ -from typing import Generator, Optional, Set, Tuple - -import psycopg2 -import psycopg2.extras +from typing import Generator, Optional from swh.model.model import Sha1Git -from ..provenance import ProvenanceResult +from ..provenance import ProvenanceResult, RelationType from .provenancedb_base import ProvenanceDBBase class ProvenanceWithoutPathDB(ProvenanceDBBase): def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: sql = """ SELECT C.sha1 AS content, R.sha1 AS revision, R.date AS date, O.url AS origin, '\\x'::bytea as path FROM content AS C INNER JOIN content_in_revision AS CR ON (CR.content=C.id) INNER JOIN revision as R ON (CR.revision=R.id) LEFT JOIN origin as O ON (R.origin=O.id) WHERE C.sha1=%s ORDER BY date, revision, origin ASC LIMIT 1 """ self.cursor.execute(sql, (id,)) row = self.cursor.fetchone() return ProvenanceResult(**row) if row is not None else None def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: early_cut = f"LIMIT {limit}" if limit is not None else "" sql = f""" (SELECT C.sha1 AS content, R.sha1 AS revision, R.date AS date, O.url AS origin, '\\x'::bytea as path FROM content AS C INNER JOIN content_in_revision AS CR ON (CR.content=C.id) INNER JOIN revision AS R ON (CR.revision=R.id) LEFT JOIN origin as O ON (R.origin=O.id) WHERE C.sha1=%s) UNION (SELECT C.sha1 AS content, R.sha1 AS revision, R.date AS date, O.url AS origin, '\\x'::bytea as path FROM content AS C INNER JOIN content_in_directory AS CD ON (C.id=CD.content) INNER JOIN directory_in_revision AS DR ON (CD.directory=DR.directory) INNER JOIN revision AS R ON (DR.revision=R.id) LEFT JOIN origin as O ON (R.origin=O.id) WHERE C.sha1=%s) ORDER BY date, revision, origin {early_cut} """ self.cursor.execute(sql, (id, id)) yield from (ProvenanceResult(**row) for row in self.cursor.fetchall()) - def insert_relation(self, relation: str, data: Set[Tuple[Sha1Git, Sha1Git, bytes]]): - if data: - assert relation in ( - "content_in_revision", - "content_in_directory", - "directory_in_revision", - ) - src, dst = relation.split("_in_") - - psycopg2.extras.execute_values( - self.cursor, - f""" - LOCK TABLE ONLY {relation}; - INSERT INTO {relation} - SELECT {src}.id, {dst}.id - FROM (VALUES %s) AS V(src, dst) - INNER JOIN {src} on ({src}.sha1=V.src) - INNER JOIN {dst} on ({dst}.sha1=V.dst) - """, - data, - ) - data.clear() + def _relation_uses_location_table(self, relation: RelationType) -> bool: + return False diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py index de95e82..9ce884a 100644 --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,161 +1,264 @@ from datetime import datetime -from typing import Dict, Generator, Iterable, Optional +import enum +from typing import Dict, Generator, Iterable, Optional, Set, Tuple from typing_extensions import Protocol, runtime_checkable from swh.model.model import Sha1Git from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry class ProvenanceResult: def __init__( self, content: Sha1Git, revision: Sha1Git, date: datetime, origin: Optional[str], path: bytes, ) -> None: self.content = content self.revision = revision self.date = date self.origin = origin self.path = path @runtime_checkable class ProvenanceInterface(Protocol): - raise_on_commit: bool = False + storage: "ProvenanceStorageInterface" def flush(self) -> None: """Flush internal cache to the underlying `storage`.""" ... def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: """Associate `blob` with `directory` in the provenance model. `prefix` is the relative path from `directory` to `blob` (excluding `blob`'s name). """ ... def content_add_to_revision( self, revision: RevisionEntry, blob: FileEntry, prefix: bytes ) -> None: """Associate `blob` with `revision` in the provenance model. `prefix` is the absolute path from `revision`'s root directory to `blob` (excluding `blob`'s name). """ ... def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: """Retrieve the first occurrence of the blob identified by `id`.""" ... def content_find_all( self, id: Sha1Git, limit: Optional[int] = None ) -> Generator[ProvenanceResult, None, None]: """Retrieve all the occurrences of the blob identified by `id`.""" ... def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: """Retrieve the earliest known date of `blob`.""" ... def content_get_early_dates( self, blobs: Iterable[FileEntry] ) -> Dict[Sha1Git, datetime]: """Retrieve the earliest known date for each blob in `blobs`. If some blob has no associated date, it is not present in the resulting dictionary. """ ... def content_set_early_date(self, blob: FileEntry, date: datetime) -> None: """Associate `date` to `blob` as it's earliest known date.""" ... def directory_add_to_revision( self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes ) -> None: """Associate `directory` with `revision` in the provenance model. `path` is the absolute path from `revision`'s root directory to `directory` (including `directory`'s name). """ ... def directory_get_date_in_isochrone_frontier( self, directory: DirectoryEntry ) -> Optional[datetime]: """Retrieve the earliest known date of `directory` as an isochrone frontier in the provenance model. """ ... def directory_get_dates_in_isochrone_frontier( self, dirs: Iterable[DirectoryEntry] ) -> Dict[Sha1Git, datetime]: """Retrieve the earliest known date for each directory in `dirs` as isochrone frontiers provenance model. If some directory has no associated date, it is not present in the resulting dictionary. """ ... def directory_set_date_in_isochrone_frontier( self, directory: DirectoryEntry, date: datetime ) -> None: """Associate `date` to `directory` as it's earliest known date as an isochrone frontier in the provenance model. """ ... def origin_add(self, origin: OriginEntry) -> None: """Add `origin` to the provenance model.""" ... def revision_add(self, revision: RevisionEntry) -> None: """Add `revision` to the provenance model. This implies storing `revision`'s date in the model, thus `revision.date` must be a valid date. """ ... def revision_add_before_revision( self, head: RevisionEntry, revision: RevisionEntry ) -> None: """Associate `revision` to `head` as an ancestor of the latter.""" ... def revision_add_to_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: """Associate `revision` to `origin` as a head revision of the latter (ie. the target of an snapshot for `origin` in the archive).""" ... def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: """Retrieve the date associated to `revision`.""" ... def revision_get_preferred_origin( self, revision: RevisionEntry ) -> Optional[Sha1Git]: """Retrieve the preferred origin associated to `revision`.""" ... def revision_in_history(self, revision: RevisionEntry) -> bool: """Check if `revision` is known to be an ancestor of some head revision in the provenance model. """ ... def revision_set_preferred_origin( self, origin: OriginEntry, revision: RevisionEntry ) -> None: """Associate `origin` as the preferred origin for `revision`.""" ... def revision_visited(self, revision: RevisionEntry) -> bool: """Check if `revision` is known to be a head revision for some origin in the provenance model. """ ... + + +class RelationType(enum.Enum): + CNT_EARLY_IN_REV = "content_in_revision" + CNT_IN_DIR = "content_in_directory" + DIR_IN_REV = "directory_in_revision" + REV_IN_ORG = "revision_in_origin" + REV_BEFORE_REV = "revision_before_revision" + + +@runtime_checkable +class ProvenanceStorageInterface(Protocol): + raise_on_commit: bool = False + + def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: + """Retrieve the first occurrence of the blob identified by `id`.""" + ... + + def content_find_all( + self, id: Sha1Git, limit: Optional[int] = None + ) -> Generator[ProvenanceResult, None, None]: + """Retrieve all the occurrences of the blob identified by `id`.""" + ... + + def content_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: + """Associate dates to blobs identified by sha1 ids, as paired in `dates`. Return + a boolean stating whether the information was successfully stored. + """ + ... + + def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: + """Retrieve the associated date for each blob sha1 in `ids`. If some blob has + no associated date, it is not present in the resulting dictionary. + """ + ... + + def directory_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: + """Associate dates to directories identified by sha1 ids, as paired in + `dates`. Return a boolean stating whether the information was successfully + stored. + """ + ... + + def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: + """Retrieve the associated date for each directory sha1 in `ids`. If some + directory has no associated date, it is not present in the resulting dictionary. + """ + ... + + def origin_set_url(self, urls: Dict[Sha1Git, str]) -> bool: + """Associate urls to origins identified by sha1 ids, as paired in `urls`. Return + a boolean stating whether the information was successfully stored. + """ + ... + + def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]: + """Retrieve the associated url for each origin sha1 in `ids`. If some origin has + no associated date, it is not present in the resulting dictionary. + """ + ... + + def revision_set_date(self, dates: Dict[Sha1Git, datetime]) -> bool: + """Associate dates to revisions identified by sha1 ids, as paired in `dates`. + Return a boolean stating whether the information was successfully stored. + """ + ... + + def revision_set_origin(self, origins: Dict[Sha1Git, Sha1Git]) -> bool: + """Associate origins to revisions identified by sha1 ids, as paired in + `origins` (revision ids are keys and origin ids, values). Return a boolean + stating whether the information was successfully stored. + """ + ... + + def revision_get( + self, ids: Iterable[Sha1Git] + ) -> Dict[Sha1Git, Tuple[Optional[datetime], Optional[Sha1Git]]]: + """Retrieve the associated date and origin for each revision sha1 in `ids`. If + some revision has no associated date nor origin, it is not present in the + resulting dictionary. + """ + ... + + def relation_add( + self, + relation: RelationType, + data: Iterable[Tuple[Sha1Git, Sha1Git, Optional[bytes]]], + ) -> bool: + """Add entries in the selected `relation`. Each tuple in `data` is of the from + (`src`, `dst`, `path`), where `src` and `dst` are the sha1 ids of the entities + being related, and `path` is optional depending on the selected `relation`. + """ + ... + + def relation_get( + self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False + ) -> Set[Tuple[Sha1Git, Sha1Git, Optional[bytes]]]: + """Retrieve all tuples in the selected `relation` whose source entities are + identified by some sha1 id in `ids`. If `reverse` is set, destination entities + are matched instead. + """ + ... diff --git a/swh/provenance/sql/30-schema.sql b/swh/provenance/sql/30-schema.sql index 310e004..8bf09a5 100644 --- a/swh/provenance/sql/30-schema.sql +++ b/swh/provenance/sql/30-schema.sql @@ -1,135 +1,135 @@ -- psql variables to get the current database flavor create table dbversion ( version int primary key, release timestamptz, description text ); comment on table dbversion is 'Details of current db version'; comment on column dbversion.version is 'SQL schema version'; comment on column dbversion.release is 'Version deployment timestamp'; comment on column dbversion.description is 'Release description'; -- latest schema version insert into dbversion(version, release, description) values(1, now(), 'Work In Progress'); -- a Git object ID, i.e., a Git-style salted SHA1 checksum create domain sha1_git as bytea check (length(value) = 20); -- UNIX path (absolute, relative, individual path component, etc.) create domain unix_path as bytea; -- entity tables create table content ( id bigserial primary key, -- internal identifier of the content blob sha1 sha1_git unique not null, -- intrinsic identifier of the content blob - date timestamptz not null -- timestamp of the revision where the blob appears early + date timestamptz -- timestamp of the revision where the blob appears early ); comment on column content.id is 'Content internal identifier'; comment on column content.sha1 is 'Content intrinsic identifier'; comment on column content.date is 'Earliest timestamp for the content (first seen time)'; create table directory ( id bigserial primary key, -- internal identifier of the directory appearing in an isochrone inner frontier sha1 sha1_git unique not null, -- intrinsic identifier of the directory - date timestamptz not null -- max timestamp among those of the directory children's + date timestamptz -- max timestamp among those of the directory children's ); comment on column directory.id is 'Directory internal identifier'; comment on column directory.sha1 is 'Directory intrinsic identifier'; comment on column directory.date is 'Latest timestamp for the content in the directory'; create table revision ( id bigserial primary key, -- internal identifier of the revision sha1 sha1_git unique not null, -- intrinsic identifier of the revision date timestamptz, -- timestamp of the revision origin bigint -- id of the preferred origin -- foreign key (org) references origin (id) ); comment on column revision.id is 'Revision internal identifier'; comment on column revision.sha1 is 'Revision intrinsic identifier'; comment on column revision.date is 'Revision timestamp'; comment on column revision.origin is 'preferred origin for the revision'; create table location ( id bigserial primary key, -- internal identifier of the location path unix_path unique not null -- path to the location ); comment on column location.id is 'Location internal identifier'; comment on column location.path is 'Path to the location'; create table origin ( id bigserial primary key, -- internal identifier of the origin sha1 sha1_git unique not null, -- intrinsic identifier of the origin url unix_path unique not null -- url of the origin ); comment on column origin.id is 'Origin internal identifier'; comment on column origin.sha1 is 'Origin intrinsic identifier'; comment on column origin.url is 'URL of the origin'; -- relation tables create table content_in_revision ( - content bigint not null, -- internal identifier of the content blob - revision bigint not null, -- internal identifier of the revision where the blob appears for the first time - location bigint -- location of the content relative to the revision root directory + content bigint not null, -- internal identifier of the content blob + revision bigint not null, -- internal identifier of the revision where the blob appears for the first time + location bigint -- location of the content relative to the revision root directory -- foreign key (blob) references content (id), -- foreign key (rev) references revision (id), -- foreign key (loc) references location (id) ); comment on column content_in_revision.content is 'Content internal identifier'; comment on column content_in_revision.revision is 'Revision internal identifier'; comment on column content_in_revision.location is 'Location of content in revision'; create table content_in_directory ( - content bigint not null, -- internal identifier of the content blob - directory bigint not null, -- internal identifier of the directory containing the blob - location bigint -- location of the content relative to its parent directory in the isochrone frontier + content bigint not null, -- internal identifier of the content blob + directory bigint not null, -- internal identifier of the directory containing the blob + location bigint -- location of the content relative to its parent directory in the isochrone frontier -- foreign key (blob) references content (id), -- foreign key (dir) references directory (id), -- foreign key (loc) references location (id) ); comment on column content_in_directory.content is 'Content internal identifier'; comment on column content_in_directory.directory is 'Directory internal identifier'; comment on column content_in_directory.location is 'Location of content in directory'; create table directory_in_revision ( - directory bigint not null, -- internal identifier of the directory appearing in the revision - revision bigint not null, -- internal identifier of the revision containing the directory - location bigint -- location of the directory relative to the revision root directory + directory bigint not null, -- internal identifier of the directory appearing in the revision + revision bigint not null, -- internal identifier of the revision containing the directory + location bigint -- location of the directory relative to the revision root directory -- foreign key (dir) references directory (id), -- foreign key (rev) references revision (id), -- foreign key (loc) references location (id) ); comment on column directory_in_revision.directory is 'Directory internal identifier'; comment on column directory_in_revision.revision is 'Revision internal identifier'; comment on column directory_in_revision.location is 'Location of directory in revision'; create table revision_in_origin ( - revision bigint not null, -- internal identifier of the revision poined by the origin - origin bigint not null -- internal identifier of the origin that points to the revision + revision bigint not null, -- internal identifier of the revision poined by the origin + origin bigint not null -- internal identifier of the origin that points to the revision -- foreign key (rev) references revision (id), -- foreign key (org) references origin (id) ); comment on column revision_in_origin.revision is 'Revision internal identifier'; comment on column revision_in_origin.origin is 'Origin internal identifier'; create table revision_before_revision ( prev bigserial not null, -- internal identifier of the source revision next bigserial not null -- internal identifier of the destination revision -- foreign key (prev) references revision (id), -- foreign key (next) references revision (id) ); comment on column revision_before_revision.prev is 'Source revision internal identifier'; comment on column revision_before_revision.next is 'Destination revision internal identifier'; diff --git a/swh/provenance/tests/conftest.py b/swh/provenance/tests/conftest.py index 3ce45fc..c0f519b 100644 --- a/swh/provenance/tests/conftest.py +++ b/swh/provenance/tests/conftest.py @@ -1,235 +1,237 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from os import path import re from typing import Iterable, Iterator, List, Optional import msgpack import pytest from typing_extensions import TypedDict from swh.core.db import BaseDb from swh.journal.serializers import msgpack_ext_hook from swh.model.hashutil import hash_to_bytes from swh.model.model import Sha1Git from swh.model.tests.swh_model_data import TEST_OBJECTS +from swh.provenance import get_provenance from swh.provenance.postgresql.archive import ArchivePostgreSQL from swh.provenance.storage.archive import ArchiveStorage from swh.storage.replay import process_replay_objects @pytest.fixture(params=["with-path", "without-path"]) def provenance(request, postgresql): """return a working and initialized provenance db""" from swh.core.cli.db import populate_database_for_package flavor = request.param populate_database_for_package("swh.provenance", postgresql.dsn, flavor=flavor) - from swh.provenance.backend import ProvenanceBackend - BaseDb.adapt_conn(postgresql) - prov = ProvenanceBackend(postgresql) + + args = dict(tuple(item.split("=")) for item in postgresql.dsn.split()) + args.pop("options") + prov = get_provenance(cls="local", db=args) assert prov.storage.flavor == flavor # in test sessions, we DO want to raise any exception occurring at commit time - prov.raise_on_commit = True + prov.storage.raise_on_commit = True return prov @pytest.fixture def swh_storage_with_objects(swh_storage): """return a Storage object (postgresql-based by default) with a few of each object type in it The inserted content comes from swh.model.tests.swh_model_data. """ for obj_type in ( "content", "skipped_content", "directory", "revision", "release", "snapshot", "origin", "origin_visit", "origin_visit_status", ): getattr(swh_storage, f"{obj_type}_add")(TEST_OBJECTS[obj_type]) return swh_storage @pytest.fixture def archive_direct(swh_storage_with_objects): return ArchivePostgreSQL(swh_storage_with_objects.get_db().conn) @pytest.fixture def archive_api(swh_storage_with_objects): return ArchiveStorage(swh_storage_with_objects) @pytest.fixture(params=["archive", "db"]) def archive(request, swh_storage_with_objects): """Return a ArchivePostgreSQL based StorageInterface object""" # this is a workaround to prevent tests from hanging because of an unclosed # transaction. # TODO: refactor the ArchivePostgreSQL to properly deal with # transactions and get rif of this fixture if request.param == "db": archive = ArchivePostgreSQL(conn=swh_storage_with_objects.get_db().conn) yield archive archive.conn.rollback() else: yield ArchiveStorage(swh_storage_with_objects) def get_datafile(fname): return path.join(path.dirname(__file__), "data", fname) def load_repo_data(repo): data = {} with open(get_datafile(f"{repo}.msgpack"), "rb") as fobj: unpacker = msgpack.Unpacker( fobj, raw=False, ext_hook=msgpack_ext_hook, strict_map_key=False, timestamp=3, # convert Timestamp in datetime objects (tz UTC) ) for objtype, objd in unpacker: data.setdefault(objtype, []).append(objd) return data def filter_dict(d, keys): return {k: v for (k, v) in d.items() if k in keys} def fill_storage(storage, data): process_replay_objects(data, storage=storage) class SynthRelation(TypedDict): prefix: Optional[str] path: str src: Sha1Git dst: Sha1Git rel_ts: float class SynthRevision(TypedDict): sha1: Sha1Git date: float msg: str R_C: List[SynthRelation] R_D: List[SynthRelation] D_C: List[SynthRelation] def synthetic_result(filename: str) -> Iterator[SynthRevision]: """Generates dict representations of synthetic revisions found in the synthetic file (from the data/ directory) given as argument of the generator. Generated SynthRevision (typed dict) with the following elements: "sha1": (Sha1Git) sha1 of the revision, "date": (float) timestamp of the revision, "msg": (str) commit message of the revision, "R_C": (list) new R---C relations added by this revision "R_D": (list) new R-D relations added by this revision "D_C": (list) new D-C relations added by this revision Each relation above is a SynthRelation typed dict with: "path": (str) location "src": (Sha1Git) sha1 of the source of the relation "dst": (Sha1Git) sha1 of the destination of the relation "rel_ts": (float) timestamp of the target of the relation (related to the timestamp of the revision) """ with open(get_datafile(filename), "r") as fobj: yield from _parse_synthetic_file(fobj) def _parse_synthetic_file(fobj: Iterable[str]) -> Iterator[SynthRevision]: """Read a 'synthetic' file and generate a dict representation of the synthetic revision for each revision listed in the synthetic file. """ regs = [ "(?PR[0-9]{2,4})?", "(?P[^| ]*)", "([+] )?(?P[^| +]*?)[/]?", "(?P[RDC]) (?P[0-9a-z]{40})", "(?P-?[0-9]+(.[0-9]+)?)", ] regex = re.compile("^ *" + r" *[|] *".join(regs) + r" *(#.*)?$") current_rev: List[dict] = [] for m in (regex.match(line) for line in fobj): if m: d = m.groupdict() if d["revname"]: if current_rev: yield _mk_synth_rev(current_rev) current_rev.clear() current_rev.append(d) if current_rev: yield _mk_synth_rev(current_rev) def _mk_synth_rev(synth_rev) -> SynthRevision: assert synth_rev[0]["type"] == "R" rev = SynthRevision( sha1=hash_to_bytes(synth_rev[0]["sha1"]), date=float(synth_rev[0]["ts"]), msg=synth_rev[0]["revname"], R_C=[], R_D=[], D_C=[], ) current_path = None # path of the last R-D relation we parsed, used a prefix for next D-C # relations for row in synth_rev[1:]: if row["reltype"] == "R---C": assert row["type"] == "C" rev["R_C"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = None elif row["reltype"] == "R-D": assert row["type"] == "D" rev["R_D"].append( SynthRelation( prefix=None, path=row["path"], src=rev["sha1"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) current_path = row["path"] elif row["reltype"] == "D-C": assert row["type"] == "C" rev["D_C"].append( SynthRelation( prefix=current_path, path=row["path"], src=rev["R_D"][-1]["dst"], dst=hash_to_bytes(row["sha1"]), rel_ts=float(row["ts"]), ) ) return rev